文章推荐系统 | 二、同步业务数据

推荐阅读:
文章推荐系统 | 一、推荐流程设计

在推荐系统架构中,推荐系统的数据库和业务系统的数据库是分离的,这样才能有效避免推荐系统的数据读写、计算等对业务系统的影响,所以同步业务数据往往也是推荐系统要做的第一件事情。通常业务数据是存储在关系型数据库中,比如 MySQL,而推荐系统通常需要使用大数据平台,比如 Hadoop,我们可以利用 Sqoop 将 MySQL 中的业务数据同步到 Hive 中,在我们的文章推荐系统中,需要同步的数据包括:

  • 用户信息表(user_profile):包括 user_id | gender | birthday | real_name | create_time | update_time | register_media_time | id_number | id_card_front | id_card_back | id_card_handheld | area | company | career 等字段。
  • 用户基础信息表(user_basic):包括 user_id | mobile | password | user_name | profile_photo | last_login | is_media | article_count | following_count | fans_count | like_count | read_count | introduction | certificate | is_verified | account | email | status 等字段。
  • 文章信息表(news_article_basic):包括 article_id | user_id | channel_id | title | cover | is_advertising | create_time | status | reviewer_id | review_time | delete_time | comment_count | allow_comment | update_time | reject_reason 等字段。
  • 文章内容表(news_article_content):包括 article_id | content | update_time 等字段。
  • 频道信息表(news_channel):包括 channel_id | channel_name | create_time | update_time | sequence | is_visible | is_default 等字段。

首先,在 Hive 中创建业务数据库 toutiao,并创建相关表

create database if not exists toutiao comment "user,news information of mysql" location '/user/hive/warehouse/toutiao.db/';
-- 用户信息表
create table user_profile
(
    user_id             BIGINT comment "userid",
    gender              BOOLEAN comment "gender",
    birthday            STRING comment "birthday",
    real_name           STRING comment "real_name",
    create_time         STRING comment "create_time",
    update_time         STRING comment "update_time",
    register_media_time STRING comment "register_media_time",
    id_number           STRING comment "id_number",
    id_card_front       STRING comment "id_card_front",
    id_card_back        STRING comment "id_card_back",
    id_card_handheld    STRING comment "id_card_handheld",
    area                STRING comment "area",
    company             STRING comment "company",
    career              STRING comment "career"
)
    COMMENT "toutiao user profile"
    row format delimited fields terminated by ',' # 指定分隔符
    LOCATION '/user/hive/warehouse/toutiao.db/user_profile';
-- 用户基础信息表
create table user_basic
(
    user_id         BIGINT comment "user_id",
    mobile          STRING comment "mobile",
    password        STRING comment "password",
    profile_photo   STRING comment "profile_photo",
    last_login      STRING comment "last_login",
    is_media        BOOLEAN comment "is_media",
    article_count   BIGINT comment "article_count",
    following_count BIGINT comment "following_count",
    fans_count      BIGINT comment "fans_count",
    like_count      BIGINT comment "like_count",
    read_count      BIGINT comment "read_count",
    introduction    STRING comment "introduction",
    certificate     STRING comment "certificate",
    is_verified     BOOLEAN comment "is_verified"
)
    COMMENT "toutiao user basic"
    row format delimited fields terminated by ',' # 指定分隔符
    LOCATION '/user/hive/warehouse/toutiao.db/user_basic';
-- 文章基础信息表
create table news_article_basic
(
    article_id  BIGINT comment "article_id",
    user_id     BIGINT comment "user_id",
    channel_id  BIGINT comment "channel_id",
    title       STRING comment "title",
    status      BIGINT comment "status",
    update_time STRING comment "update_time"
)
    COMMENT "toutiao news_article_basic"
    row format delimited fields terminated by ',' # 指定分隔符
    LOCATION '/user/hive/warehouse/toutiao.db/news_article_basic';
-- 文章频道表
create table news_channel
(
    channel_id   BIGINT comment "channel_id",
    channel_name STRING comment "channel_name",
    create_time  STRING comment "create_time",
    update_time  STRING comment "update_time",
    sequence     BIGINT comment "sequence",
    is_visible   BOOLEAN comment "is_visible",
    is_default   BOOLEAN comment "is_default"
)
    COMMENT "toutiao news_channel"
    row format delimited fields terminated by ',' # 指定分隔符
    LOCATION '/user/hive/warehouse/toutiao.db/news_channel';
-- 文章内容表
create table news_article_content
(
    article_id BIGINT comment "article_id",
    content    STRING comment "content",
    update_time STRING comment "update_time"
)
    COMMENT "toutiao news_article_content"
    row format delimited fields terminated by ',' # 指定分隔符
    LOCATION '/user/hive/warehouse/toutiao.db/news_article_content';

查看 Sqoop 连接到 MySQL 的数据库列表

sqoop list-databases --connect jdbc:mysql://192.168.19.137:3306/ --username root -P

mysql
sys
toutiao

Sqoop 可以指定全量导入和增量导入,通常我们可以先执行一次全量导入,将历史数据全部导入进来,后面再通过定时任务执行增量导入,来保持 MySQL 和 Hive 的数据同步,全量导入不需要提前创建 Hive 表,可以自动创建

array=(user_profile user_basic news_article_basic news_channel news_article_content)

for table_name in ${array[@]};
do
    sqoop import \
        --connect jdbc:mysql://192.168.19.137/toutiao \
        --username root \
        --password password \
        --table $table_name \
        --m 5 \ # 线程数
        --hive-home /root/bigdata/hive \ # hive路径
        --hive-import \ # 导入形式
        --create-hive-table  \ # 自动创建表
        --hive-drop-import-delims \
        --warehouse-dir /user/hive/warehouse/toutiao.db \ # 导入地址
        --hive-table toutiao.$table_name 
done

增量导入,有 append 和 lastmodified 两种模式

  • append:通过指定一个递增的列进行更新,只能追加,不能修改
num=0
declare -A check
check=([user_profile]=user_id [user_basic]=user_id [news_article_basic]=article_id [news_channel]=channel_id [news_article_content]=channel_id)

for k in ${!check[@]}
do
    sqoop import \
        --connect jdbc:mysql://192.168.19.137/toutiao \
        --username root \
        --password password \
        --table $k \
        --m 4 \
        --hive-home /root/bigdata/hive \ # hive路径
        --hive-import \ # 导入到hive
        --create-hive-table  \ # 自动创建表
        --incremental append \ # 按照id更新
        --check-column ${check[$k]} \ # 指定id列
        --last-value ${num} # 指定最后更新的id
done
  • lastmodified:按照最后修改时间更新,支持追加和修改
time=`date +"%Y-%m-%d" -d "-1day"`
declare -A check
check=([user_profile]=update_time [user_basic]=last_login [news_article_basic]=update_time [news_channel]=update_time)
declare -A merge
merge=([user_profile]=user_id [user_basic]=user_id [news_article_basic]=article_id [news_channel]=channel_id)

for k in ${!check[@]}
do
    sqoop import \
        --connect jdbc:mysql://192.168.19.137/toutiao \
        --username root \
        --password password \
        --table $k \
        --m 4 \
        --target-dir /user/hive/warehouse/toutiao.db/$k \ # hive路径
        --incremental lastmodified \ # 按照最后修改时间更新
        --check-column ${check[$k]} \ # 指定时间列
        --merge-key ${merge[$k]} \ # 根据指定列合并重复或修改数据
        --last-value ${time} # 指定最后修改时间
done

Sqoop 可以直接导入到 Hive,自动创建 Hive 表,但是 lastmodified 模式不支持
Sqoop 也可以先导入到 HDFS,然后再建立 Hive 表关联,为了使用 lastmodified 模式,通常使用这种方法

--target-dir /user/hive/warehouse/toutiao.db/user_profile # 指定导入的HDFS目录

若先导入到 HDFS,需要注意 HDFS 默认分隔符为 , 而 Hive 默认分隔符为 /u0001,所以需要在创建 Hive 表时指定分隔符为 HDFS 分隔符 ,,若不指定分隔符,查询结果将全部为 NULL

row format delimited fields terminated by ',' # 指定分隔符

如果 MySQL 数据中存在特殊字符,如 , \t \n 都会导致 Hive 读取失败(但不影响导入到 HDFS 中),可以利用 --query 参数,在读取时使用 REPLACE() CHAR() CHR() 进行字符替换。如果特殊字符过多,比如 news_article_content 表,可以选择全量导入

--query 'select article_id, user_id, channel_id, REPLACE(REPLACE(REPLACE(title, CHAR(13),""),CHAR(10),""), ",", " ") title, status, update_time from news_article_basic WHERE $CONDITIONS' \

如果 MySQL 数据中存在 tinyint 类型,必须在 --connet 中加入 ?tinyInt1isBit=false,防止 Hive 将 tinyint 类型默认转化为 boolean 类型

--connect jdbc:mysql://192.168.19.137/toutiao?tinyInt1isBit=false

MySQL 与 Hive 对应类型如下

MySQL Hive
bigint bigint
tinyint boolean
int int
double double
bit boolean
varchar string
decimal double
date / timestamp string

我们可以利用 crontab 来创建定时任务,将更新命令写入脚本 import_incremental.sh,输入 crontab -e 打开编辑界面,输入如下内容,即可定时执行数据同步任务

# 每隔半小时增量导入一次
*/30 * * * * /root/toutiao_project/scripts/import_incremental.sh

crontab 命令格式为 * * * * * shell,其中前五个 * 分别代表分钟 (0-59)、小时(0-59)、月内的某天(1-31)、月(1-12)、周内的某天(0-7,周日为 0 或 7),shell 表示要执行的命令或脚本,使用方法如下

# 每隔5分钟运行一次backupscript脚本
*/5 * * * * /root/backupscript.sh
# 每天的凌晨1点运行backupscript脚本
0 1 * * * /root/backupscript.sh
# 每月的第一个凌晨3:15运行backupscript脚本
15 3 1 * * /root/backupscript.sh

crontab 常用命令如下

crontab -e      # 修改 crontab 文件,如果文件不存在会自动创建。 
crontab -l      # 显示 crontab 文件。 
crontab -r      # 删除 crontab 文件。
crontab -ir     # 删除 crontab 文件前提醒用户。

service crond status      # 查看crontab服务状态
service crond start       # 启动服务 
service crond stop        # 关闭服务 
service crond restart     # 重启服务 
service crond reload      # 重新载入配置

参考

https://www.bilibili.com/video/av68356229
https://pan.baidu.com/s/1-uvGJ-mEskjhtaial0Xmgw(学习资源已保存至网盘, 提取码:eakp)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。