推荐阅读:
文章推荐系统 | 一、推荐流程设计
在推荐系统架构中,推荐系统的数据库和业务系统的数据库是分离的,这样才能有效避免推荐系统的数据读写、计算等对业务系统的影响,所以同步业务数据往往也是推荐系统要做的第一件事情。通常业务数据是存储在关系型数据库中,比如 MySQL,而推荐系统通常需要使用大数据平台,比如 Hadoop,我们可以利用 Sqoop 将 MySQL 中的业务数据同步到 Hive 中,在我们的文章推荐系统中,需要同步的数据包括:
- 用户信息表(user_profile):包括 user_id | gender | birthday | real_name | create_time | update_time | register_media_time | id_number | id_card_front | id_card_back | id_card_handheld | area | company | career 等字段。
- 用户基础信息表(user_basic):包括 user_id | mobile | password | user_name | profile_photo | last_login | is_media | article_count | following_count | fans_count | like_count | read_count | introduction | certificate | is_verified | account | email | status 等字段。
- 文章信息表(news_article_basic):包括 article_id | user_id | channel_id | title | cover | is_advertising | create_time | status | reviewer_id | review_time | delete_time | comment_count | allow_comment | update_time | reject_reason 等字段。
- 文章内容表(news_article_content):包括 article_id | content | update_time 等字段。
- 频道信息表(news_channel):包括 channel_id | channel_name | create_time | update_time | sequence | is_visible | is_default 等字段。
首先,在 Hive 中创建业务数据库 toutiao,并创建相关表
create database if not exists toutiao comment "user,news information of mysql" location '/user/hive/warehouse/toutiao.db/';
-- 用户信息表
create table user_profile
(
user_id BIGINT comment "userid",
gender BOOLEAN comment "gender",
birthday STRING comment "birthday",
real_name STRING comment "real_name",
create_time STRING comment "create_time",
update_time STRING comment "update_time",
register_media_time STRING comment "register_media_time",
id_number STRING comment "id_number",
id_card_front STRING comment "id_card_front",
id_card_back STRING comment "id_card_back",
id_card_handheld STRING comment "id_card_handheld",
area STRING comment "area",
company STRING comment "company",
career STRING comment "career"
)
COMMENT "toutiao user profile"
row format delimited fields terminated by ',' # 指定分隔符
LOCATION '/user/hive/warehouse/toutiao.db/user_profile';
-- 用户基础信息表
create table user_basic
(
user_id BIGINT comment "user_id",
mobile STRING comment "mobile",
password STRING comment "password",
profile_photo STRING comment "profile_photo",
last_login STRING comment "last_login",
is_media BOOLEAN comment "is_media",
article_count BIGINT comment "article_count",
following_count BIGINT comment "following_count",
fans_count BIGINT comment "fans_count",
like_count BIGINT comment "like_count",
read_count BIGINT comment "read_count",
introduction STRING comment "introduction",
certificate STRING comment "certificate",
is_verified BOOLEAN comment "is_verified"
)
COMMENT "toutiao user basic"
row format delimited fields terminated by ',' # 指定分隔符
LOCATION '/user/hive/warehouse/toutiao.db/user_basic';
-- 文章基础信息表
create table news_article_basic
(
article_id BIGINT comment "article_id",
user_id BIGINT comment "user_id",
channel_id BIGINT comment "channel_id",
title STRING comment "title",
status BIGINT comment "status",
update_time STRING comment "update_time"
)
COMMENT "toutiao news_article_basic"
row format delimited fields terminated by ',' # 指定分隔符
LOCATION '/user/hive/warehouse/toutiao.db/news_article_basic';
-- 文章频道表
create table news_channel
(
channel_id BIGINT comment "channel_id",
channel_name STRING comment "channel_name",
create_time STRING comment "create_time",
update_time STRING comment "update_time",
sequence BIGINT comment "sequence",
is_visible BOOLEAN comment "is_visible",
is_default BOOLEAN comment "is_default"
)
COMMENT "toutiao news_channel"
row format delimited fields terminated by ',' # 指定分隔符
LOCATION '/user/hive/warehouse/toutiao.db/news_channel';
-- 文章内容表
create table news_article_content
(
article_id BIGINT comment "article_id",
content STRING comment "content",
update_time STRING comment "update_time"
)
COMMENT "toutiao news_article_content"
row format delimited fields terminated by ',' # 指定分隔符
LOCATION '/user/hive/warehouse/toutiao.db/news_article_content';
查看 Sqoop 连接到 MySQL 的数据库列表
sqoop list-databases --connect jdbc:mysql://192.168.19.137:3306/ --username root -P
mysql
sys
toutiao
Sqoop 可以指定全量导入和增量导入,通常我们可以先执行一次全量导入,将历史数据全部导入进来,后面再通过定时任务执行增量导入,来保持 MySQL 和 Hive 的数据同步,全量导入不需要提前创建 Hive 表,可以自动创建
array=(user_profile user_basic news_article_basic news_channel news_article_content)
for table_name in ${array[@]};
do
sqoop import \
--connect jdbc:mysql://192.168.19.137/toutiao \
--username root \
--password password \
--table $table_name \
--m 5 \ # 线程数
--hive-home /root/bigdata/hive \ # hive路径
--hive-import \ # 导入形式
--create-hive-table \ # 自动创建表
--hive-drop-import-delims \
--warehouse-dir /user/hive/warehouse/toutiao.db \ # 导入地址
--hive-table toutiao.$table_name
done
增量导入,有 append 和 lastmodified 两种模式
- append:通过指定一个递增的列进行更新,只能追加,不能修改
num=0
declare -A check
check=([user_profile]=user_id [user_basic]=user_id [news_article_basic]=article_id [news_channel]=channel_id [news_article_content]=channel_id)
for k in ${!check[@]}
do
sqoop import \
--connect jdbc:mysql://192.168.19.137/toutiao \
--username root \
--password password \
--table $k \
--m 4 \
--hive-home /root/bigdata/hive \ # hive路径
--hive-import \ # 导入到hive
--create-hive-table \ # 自动创建表
--incremental append \ # 按照id更新
--check-column ${check[$k]} \ # 指定id列
--last-value ${num} # 指定最后更新的id
done
- lastmodified:按照最后修改时间更新,支持追加和修改
time=`date +"%Y-%m-%d" -d "-1day"`
declare -A check
check=([user_profile]=update_time [user_basic]=last_login [news_article_basic]=update_time [news_channel]=update_time)
declare -A merge
merge=([user_profile]=user_id [user_basic]=user_id [news_article_basic]=article_id [news_channel]=channel_id)
for k in ${!check[@]}
do
sqoop import \
--connect jdbc:mysql://192.168.19.137/toutiao \
--username root \
--password password \
--table $k \
--m 4 \
--target-dir /user/hive/warehouse/toutiao.db/$k \ # hive路径
--incremental lastmodified \ # 按照最后修改时间更新
--check-column ${check[$k]} \ # 指定时间列
--merge-key ${merge[$k]} \ # 根据指定列合并重复或修改数据
--last-value ${time} # 指定最后修改时间
done
Sqoop 可以直接导入到 Hive,自动创建 Hive 表,但是 lastmodified 模式不支持
Sqoop 也可以先导入到 HDFS,然后再建立 Hive 表关联,为了使用 lastmodified 模式,通常使用这种方法
--target-dir /user/hive/warehouse/toutiao.db/user_profile # 指定导入的HDFS目录
若先导入到 HDFS,需要注意 HDFS 默认分隔符为 ,
而 Hive 默认分隔符为 /u0001
,所以需要在创建 Hive 表时指定分隔符为 HDFS 分隔符 ,
,若不指定分隔符,查询结果将全部为 NULL
row format delimited fields terminated by ',' # 指定分隔符
如果 MySQL 数据中存在特殊字符,如 , \t \n
都会导致 Hive 读取失败(但不影响导入到 HDFS 中),可以利用 --query
参数,在读取时使用 REPLACE() CHAR() CHR()
进行字符替换。如果特殊字符过多,比如 news_article_content 表,可以选择全量导入
--query 'select article_id, user_id, channel_id, REPLACE(REPLACE(REPLACE(title, CHAR(13),""),CHAR(10),""), ",", " ") title, status, update_time from news_article_basic WHERE $CONDITIONS' \
如果 MySQL 数据中存在 tinyint 类型,必须在 --connet
中加入 ?tinyInt1isBit=false
,防止 Hive 将 tinyint 类型默认转化为 boolean 类型
--connect jdbc:mysql://192.168.19.137/toutiao?tinyInt1isBit=false
MySQL 与 Hive 对应类型如下
MySQL | Hive |
---|---|
bigint | bigint |
tinyint | boolean |
int | int |
double | double |
bit | boolean |
varchar | string |
decimal | double |
date / timestamp | string |
我们可以利用 crontab 来创建定时任务,将更新命令写入脚本 import_incremental.sh,输入 crontab -e
打开编辑界面,输入如下内容,即可定时执行数据同步任务
# 每隔半小时增量导入一次
*/30 * * * * /root/toutiao_project/scripts/import_incremental.sh
crontab 命令格式为 * * * * * shell
,其中前五个 *
分别代表分钟 (0-59)、小时(0-59)、月内的某天(1-31)、月(1-12)、周内的某天(0-7,周日为 0 或 7),shell
表示要执行的命令或脚本,使用方法如下
# 每隔5分钟运行一次backupscript脚本
*/5 * * * * /root/backupscript.sh
# 每天的凌晨1点运行backupscript脚本
0 1 * * * /root/backupscript.sh
# 每月的第一个凌晨3:15运行backupscript脚本
15 3 1 * * /root/backupscript.sh
crontab 常用命令如下
crontab -e # 修改 crontab 文件,如果文件不存在会自动创建。
crontab -l # 显示 crontab 文件。
crontab -r # 删除 crontab 文件。
crontab -ir # 删除 crontab 文件前提醒用户。
service crond status # 查看crontab服务状态
service crond start # 启动服务
service crond stop # 关闭服务
service crond restart # 重启服务
service crond reload # 重新载入配置
参考
https://www.bilibili.com/video/av68356229
https://pan.baidu.com/s/1-uvGJ-mEskjhtaial0Xmgw(学习资源已保存至网盘, 提取码:eakp)