1 数据通道
2 Maxwell配置
默认情况下,Maxwell会同步binlog中的所有表的数据变更记录,按照规划,有cart_info、order_info等共计11张表需进行增量同步,按理我们应对Maxwell进行配置,令其只同步这特定的11张表,但为了与实时数仓架构保持一致,此处不做相应配置,而令 Maxwell 对 binlog 中所有表的数据变更记录进行同步,并将数据全部发往 topic_db 主题。
Maxwell最终配置如下:
1)修改Maxwell配置文件config.properties
[yobhel@hadoop101 maxwell]$ vim /opt/module/maxwell/config.properties
2)全部配置参数如下
log_level=info
producer=kafka
kafka.bootstrap.servers=hadoop101:9092,hadoop101:9092,hadoop101:9092
#kafka topic配置,业务数据发往的目标主题
kafka_topic=topic_db
# mysql login info
host=hadoop101
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
3)重新启动Maxwell
[yobhel@hadoop101 bin]$ mxw.sh restart
4)通道测试
(1)启动Zookeeper以及Kafka集群
(2)启动一个Kafka Console Consumer,消费 topic_db 主题的数据
[yobhel@hadoop101 kafka]$ kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic topic_db
(3)生成模拟数据
[yobhel@hadoop101 bin]$ cd /opt/module/data_mocker/
[yobhel@hadoop101 data_mocker]$ java -jar edu2021-mock-2022-06-18.jar
(4)观察Kafka消费者是否能消费到数据
{"database":"edu2077","table":"order_info","type":"update","ts":1645425636,"xid":37606,"commit":true,"data":{"id":23899,"user_id":16,"origin_amount":800.00,"coupon_reduce":0.00,"final_amount":800.00,"order_status":"1002","out_trade_no":"211814417714292","trade_body":"大数据技术之Zookeeper(2021最新版)等4件商品","session_id":"3a96bddb-7f94-4a0f-9a5b-1aa6fadd718c","province_id":30,"create_time":"2022-02-21 15:15:14","expire_time":"2022-02-21 15:30:14","update_time":"2022-02-21 15:15:42"},"old":{"order_status":"1001","update_time":null}}
{"database":"edu2077","table":"order_info","type":"update","ts":1645425636,"xid":37589,"commit":true,"data":{"id":23900,"user_id":473,"origin_amount":200.00,"coupon_reduce":0.00,"final_amount":200.00,"order_status":"1003","out_trade_no":"462573352988853","trade_body":"尚硅谷大数据技术之Azkaban等1件商品","session_id":"d78dd675-5a38-4e33-b431-b1ef68a89089","province_id":29,"create_time":"2022-02-21 11:26:30","expire_time":"2022-02-21 11:41:30","update_time":"2022-02-21 11:41:47"},"old":{"order_status":"1001","update_time":null}}
{"database":"edu2077","table":"order_info","type":"update","ts":1645425636,"xid":37694,"commit":true,"data":{"id":23901,"user_id":70,"origin_amount":400.00,"coupon_reduce":0.00,"final_amount":400.00,"order_status":"1002","out_trade_no":"677577676596486","trade_body":"尚硅谷大数据技术之Shell等2件商品","session_id":"9b842bcc-3288-49da-8ec2-0e00d743b783","province_id":33,"create_time":"2022-02-21 19:45:13","expire_time":"2022-02-21 20:00:13","update_time":"2022-02-21 19:45:33"},"old":{"order_status":"1001","update_time":null}}
3 Flume配置
1)Flume配置概述
Flume需要将Kafka中各topic的数据传输到HDFS,故其需选用KafkaSource以及HDFSSink,Channe选用FileChanne。
需要注意的是,KafkaSource需订阅Kafka中的11个topic,HDFSSink需要将不同topic的数据写到不同的路径,并且路径中应当包含一层日期,用于区分每天的数据。关键配置如下:
具体数据示例如下:
2)Flume配置实操
(1)创建Flume配置文件
在hadoop103节点的Flume的job目录下创建kafka_to_hdfs_db.conf
[yobhel@hadoop103 job]$ vim kafka_to_hdfs_db.conf
(2)配置文件内容如下
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092
a1.sources.r1.kafka.topics = topic_db
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.yobhel.flume.interceptors.TimestampAndTableNameInterceptor$Builder
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/data/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/data/flume/data/behavior2
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/edu/db/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
(3)编写Flume拦截器
代码:https://github.com/Yobhel121/edu-flume-interceptor
将打好的包放入到hadoop103的/opt/module/flume/lib文件夹下
3)通道测试
(1)启动Zookeeper、Kafka集群
(2)启动hadoop103的Flume
[yobhel@hadoop103 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_db.conf -Dflume.root.logger=INFO,console
(3)反注释mock.sh中关于Maxwell的内容, 执行脚本生成模拟数据
#!/bin/bash
DATA_HOME=/opt/module/data_mocker
MAXWELL_HOME=/opt/module/maxwell
function mock_data() {
if [ $1 ]
then
sed -i "/mock.date/s/.*/mock.date: \"$1\"/" $DATA_HOME/application.yml
echo "正在生成 $1 当日的数据"
fi
cd $DATA_HOME
nohup java -jar "edu2021-mock-2022-03-14.jar" >/dev/null 2>&1
}
case $1 in
"init")
[ $2 ] && do_date=$2 || do_date='2022-02-21'
sed -i "/mock.clear.busi/s/.*/mock.clear.busi: 1/" $DATA_HOME/application.yml
sed -i "/mock.clear.user/s/.*/mock.clear.user: 1/" $DATA_HOME/application.yml
mock_data $(date -d "$do_date -5 days" +%F)
sed -i "/mock.clear.busi/s/.*/mock.clear.busi: 0/" $DATA_HOME/application.yml
sed -i "/mock.clear.user/s/.*/mock.clear.user: 0/" $DATA_HOME/application.yml
for ((i=4;i>=0;i--));
do
mock_data $(date -d "$do_date -$i days" +%F)
done
;;
[0-9][0-9][0-9][0-9]-[0-1][0-9]-[0-3][0-9])
sed -i "/mock_date/s/.*/mock_date=$1/" $MAXWELL_HOME/config.properties
mxw.sh restart
sleep 1
mock_data $1
;;
esac
执行脚本生成数据
[yobhel@hadoop101 bin]$ mock.sh 2022-02-22
(4)观察HDFS上的目标路径是否有数据出现
若HDFS上的目标路径已有增量表的数据出现了,就证明数据通道已经打通。
(5)数据目标路径的日期说明
仔细观察,会发现目标路径中的日期,并非模拟数据的业务日期,而是当前日期。这是由于Maxwell输出的JSON字符串中的ts字段的值,是数据的变动日期。而真实场景下,数据的业务日期与变动日期应当是一致的。
此处为了模拟真实环境,对Maxwell源码进行了改动,增加了一个参数mock_date,该参数的作用就是指定Maxwell输出JSON字符串的ts时间戳的日期,接下来进行测试。
mock.sh脚本在生成数据时会修改Maxwell的配置信息
4)编写Flume启停脚本
为方便使用,此处编写一个Flume的启停脚本
(1)在hadoop101节点的/home/yobhel/bin目录下创建脚本f3.sh
[yobhel@hadoop101 bin]$ vim f3.sh
在脚本中填写如下内容
#!/bin/bash
case $1 in
"start")
echo " --------启动 hadoop103 业务数据flume-------"
ssh hadoop103 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;
"stop")
echo " --------停止 hadoop103 业务数据flume-------"
ssh hadoop103 "ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
(2)增加脚本执行权限
[yobhel@hadoop101 bin]$ chmod +x f3.sh
(3)f3启动
[yobhel@hadoop101 module]$ f3.sh start
(4)f3停止
[yobhel@hadoop101 module]$ f3.sh stop
4 增量表首日全量同步
通常情况下,增量表需要在首日进行一次全量同步,后续每日再进行增量同步,首日全量同步可以使用Maxwell的bootstrap功能,方便起见,下面编写一个增量表首日全量同步脚本。
1)在~/bin目录创建mysql_to_kafka_inc_init.sh
[yobhel@hadoop101 bin]$ vim mysql_to_kafka_inc_init.sh
脚本内容如下
#!/bin/bash
# 该脚本的作用是初始化所有的增量表,只需执行一次
MAXWELL_HOME=/opt/module/maxwell
import_data() {
$MAXWELL_HOME/bin/maxwell-bootstrap --database edu2077 --table $1 --config $MAXWELL_HOME/config.properties
}
case $1 in
cart_info | comment_info | favor_info | order_detail | order_info | payment_info | review_info | test_exam | test_exam_question | user_info | vip_change_detail)
import_data $1
;;
"all")
for tmp in cart_info comment_info favor_info order_detail order_info payment_info review_info test_exam test_exam_question user_info vip_change_detail
do
import_data $tmp
done
;;
esac
2)为mysql_to_kafka_inc_init.sh all增加执行权限
[yobhel@hadoop101 bin]$ chmod +x ~/bin/mysql_to_kafka_inc_init.sh
3)测试同步脚本
(1)清理历史数据
为方便查看结果,现将HDFS上之前同步的增量表数据删除
[yobhel@hadoop101 ~]$ hadoop fs -ls /origin_data/edu/db | grep _inc | awk '{print $8}' | xargs hadoop fs -rm -r -f
(2)执行同步脚本
[yobhel@hadoop101 bin]$ mysql_to_kafka_inc_init.sh all
4)检查同步结果
观察HDFS上是否重新出现增量表数据。
5 增量表同步总结
增量表同步,需要在首日进行一次全量同步,后续每日才是增量同步。首日进行全量同步时,需先启动数据通道,包括Maxwell、Kafka、Flume,然后执行增量表首日同步脚本mysql_to_kafka_inc_init.sh进行同步。后续每日只需保证采集通道正常运行即可,Maxwell便会实时将变动数据发往Kafka。