Maxwell同步mysql增量表数据到kafka,并消费到hdfs对应的目录

1 数据通道

image.png

2 Maxwell配置
默认情况下,Maxwell会同步binlog中的所有表的数据变更记录,按照规划,有cart_info、order_info等共计11张表需进行增量同步,按理我们应对Maxwell进行配置,令其只同步这特定的11张表,但为了与实时数仓架构保持一致,此处不做相应配置,而令 Maxwell 对 binlog 中所有表的数据变更记录进行同步,并将数据全部发往 topic_db 主题。
Maxwell最终配置如下:
1)修改Maxwell配置文件config.properties

[yobhel@hadoop101 maxwell]$ vim /opt/module/maxwell/config.properties

2)全部配置参数如下

log_level=info

producer=kafka
kafka.bootstrap.servers=hadoop101:9092,hadoop101:9092,hadoop101:9092

#kafka topic配置,业务数据发往的目标主题
kafka_topic=topic_db
# mysql login info
host=hadoop101
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

3)重新启动Maxwell

[yobhel@hadoop101 bin]$ mxw.sh restart

4)通道测试
(1)启动Zookeeper以及Kafka集群
(2)启动一个Kafka Console Consumer,消费 topic_db 主题的数据

[yobhel@hadoop101 kafka]$ kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic topic_db

(3)生成模拟数据

[yobhel@hadoop101 bin]$ cd /opt/module/data_mocker/
[yobhel@hadoop101 data_mocker]$ java -jar edu2021-mock-2022-06-18.jar

(4)观察Kafka消费者是否能消费到数据

{"database":"edu2077","table":"order_info","type":"update","ts":1645425636,"xid":37606,"commit":true,"data":{"id":23899,"user_id":16,"origin_amount":800.00,"coupon_reduce":0.00,"final_amount":800.00,"order_status":"1002","out_trade_no":"211814417714292","trade_body":"大数据技术之Zookeeper(2021最新版)等4件商品","session_id":"3a96bddb-7f94-4a0f-9a5b-1aa6fadd718c","province_id":30,"create_time":"2022-02-21 15:15:14","expire_time":"2022-02-21 15:30:14","update_time":"2022-02-21 15:15:42"},"old":{"order_status":"1001","update_time":null}}
{"database":"edu2077","table":"order_info","type":"update","ts":1645425636,"xid":37589,"commit":true,"data":{"id":23900,"user_id":473,"origin_amount":200.00,"coupon_reduce":0.00,"final_amount":200.00,"order_status":"1003","out_trade_no":"462573352988853","trade_body":"尚硅谷大数据技术之Azkaban等1件商品","session_id":"d78dd675-5a38-4e33-b431-b1ef68a89089","province_id":29,"create_time":"2022-02-21 11:26:30","expire_time":"2022-02-21 11:41:30","update_time":"2022-02-21 11:41:47"},"old":{"order_status":"1001","update_time":null}}
{"database":"edu2077","table":"order_info","type":"update","ts":1645425636,"xid":37694,"commit":true,"data":{"id":23901,"user_id":70,"origin_amount":400.00,"coupon_reduce":0.00,"final_amount":400.00,"order_status":"1002","out_trade_no":"677577676596486","trade_body":"尚硅谷大数据技术之Shell等2件商品","session_id":"9b842bcc-3288-49da-8ec2-0e00d743b783","province_id":33,"create_time":"2022-02-21 19:45:13","expire_time":"2022-02-21 20:00:13","update_time":"2022-02-21 19:45:33"},"old":{"order_status":"1001","update_time":null}}

3 Flume配置

1)Flume配置概述
Flume需要将Kafka中各topic的数据传输到HDFS,故其需选用KafkaSource以及HDFSSink,Channe选用FileChanne。
需要注意的是,KafkaSource需订阅Kafka中的11个topic,HDFSSink需要将不同topic的数据写到不同的路径,并且路径中应当包含一层日期,用于区分每天的数据。关键配置如下:


image.png

具体数据示例如下:


image.png

2)Flume配置实操
(1)创建Flume配置文件
在hadoop103节点的Flume的job目录下创建kafka_to_hdfs_db.conf
[yobhel@hadoop103 job]$ vim kafka_to_hdfs_db.conf 

(2)配置文件内容如下

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092
a1.sources.r1.kafka.topics = topic_db
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.yobhel.flume.interceptors.TimestampAndTableNameInterceptor$Builder


a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/data/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/data/flume/data/behavior2
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/edu/db/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0


a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

(3)编写Flume拦截器
代码:https://github.com/Yobhel121/edu-flume-interceptor

将打好的包放入到hadoop103的/opt/module/flume/lib文件夹下
3)通道测试
(1)启动Zookeeper、Kafka集群
(2)启动hadoop103的Flume

[yobhel@hadoop103 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_db.conf -Dflume.root.logger=INFO,console

(3)反注释mock.sh中关于Maxwell的内容, 执行脚本生成模拟数据

#!/bin/bash
DATA_HOME=/opt/module/data_mocker
MAXWELL_HOME=/opt/module/maxwell

function mock_data() {
  if [ $1 ]
  then
    sed -i "/mock.date/s/.*/mock.date: \"$1\"/" $DATA_HOME/application.yml
    echo "正在生成 $1 当日的数据"
  fi
  cd $DATA_HOME
      nohup java -jar "edu2021-mock-2022-03-14.jar" >/dev/null 2>&1  
}

case $1 in
"init")
  [ $2 ] && do_date=$2 || do_date='2022-02-21'
  sed -i "/mock.clear.busi/s/.*/mock.clear.busi: 1/" $DATA_HOME/application.yml
  sed -i "/mock.clear.user/s/.*/mock.clear.user: 1/" $DATA_HOME/application.yml
  mock_data $(date -d "$do_date -5 days" +%F)
  sed -i "/mock.clear.busi/s/.*/mock.clear.busi: 0/" $DATA_HOME/application.yml
  sed -i "/mock.clear.user/s/.*/mock.clear.user: 0/" $DATA_HOME/application.yml
  for ((i=4;i>=0;i--));
  do
    mock_data $(date -d "$do_date -$i days" +%F)
  done
  ;;
[0-9][0-9][0-9][0-9]-[0-1][0-9]-[0-3][0-9])

    sed -i "/mock_date/s/.*/mock_date=$1/" $MAXWELL_HOME/config.properties
    mxw.sh restart
    sleep 1  
    mock_data $1
    ;;
esac

执行脚本生成数据

[yobhel@hadoop101 bin]$ mock.sh 2022-02-22

(4)观察HDFS上的目标路径是否有数据出现
若HDFS上的目标路径已有增量表的数据出现了,就证明数据通道已经打通。

(5)数据目标路径的日期说明
仔细观察,会发现目标路径中的日期,并非模拟数据的业务日期,而是当前日期。这是由于Maxwell输出的JSON字符串中的ts字段的值,是数据的变动日期。而真实场景下,数据的业务日期与变动日期应当是一致的。

此处为了模拟真实环境,对Maxwell源码进行了改动,增加了一个参数mock_date,该参数的作用就是指定Maxwell输出JSON字符串的ts时间戳的日期,接下来进行测试。

mock.sh脚本在生成数据时会修改Maxwell的配置信息

4)编写Flume启停脚本
为方便使用,此处编写一个Flume的启停脚本
(1)在hadoop101节点的/home/yobhel/bin目录下创建脚本f3.sh

[yobhel@hadoop101 bin]$ vim f3.sh

在脚本中填写如下内容

#!/bin/bash

case $1 in
"start")
        echo " --------启动 hadoop103 业务数据flume-------"
        ssh hadoop103 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;
"stop")

        echo " --------停止 hadoop103 业务数据flume-------"
        ssh hadoop103 "ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

(2)增加脚本执行权限

[yobhel@hadoop101 bin]$ chmod +x f3.sh

(3)f3启动

[yobhel@hadoop101 module]$ f3.sh start

(4)f3停止

[yobhel@hadoop101 module]$ f3.sh stop

4 增量表首日全量同步

通常情况下,增量表需要在首日进行一次全量同步,后续每日再进行增量同步,首日全量同步可以使用Maxwell的bootstrap功能,方便起见,下面编写一个增量表首日全量同步脚本。
1)在~/bin目录创建mysql_to_kafka_inc_init.sh

[yobhel@hadoop101 bin]$ vim mysql_to_kafka_inc_init.sh

脚本内容如下

#!/bin/bash

# 该脚本的作用是初始化所有的增量表,只需执行一次

MAXWELL_HOME=/opt/module/maxwell

import_data() {
    $MAXWELL_HOME/bin/maxwell-bootstrap --database edu2077 --table $1 --config $MAXWELL_HOME/config.properties
}

case $1 in
cart_info | comment_info | favor_info | order_detail | order_info | payment_info | review_info | test_exam | test_exam_question | user_info | vip_change_detail)
  import_data $1
  ;;
"all")
  for tmp in cart_info comment_info favor_info order_detail order_info payment_info review_info test_exam test_exam_question user_info vip_change_detail
  do
    import_data $tmp
  done
  ;;
esac

2)为mysql_to_kafka_inc_init.sh all增加执行权限

[yobhel@hadoop101 bin]$ chmod +x ~/bin/mysql_to_kafka_inc_init.sh

3)测试同步脚本
(1)清理历史数据
为方便查看结果,现将HDFS上之前同步的增量表数据删除

[yobhel@hadoop101 ~]$ hadoop fs -ls /origin_data/edu/db | grep _inc | awk '{print $8}' | xargs hadoop fs -rm -r -f

(2)执行同步脚本

[yobhel@hadoop101 bin]$ mysql_to_kafka_inc_init.sh all 

4)检查同步结果
观察HDFS上是否重新出现增量表数据。

5 增量表同步总结

增量表同步,需要在首日进行一次全量同步,后续每日才是增量同步。首日进行全量同步时,需先启动数据通道,包括Maxwell、Kafka、Flume,然后执行增量表首日同步脚本mysql_to_kafka_inc_init.sh进行同步。后续每日只需保证采集通道正常运行即可,Maxwell便会实时将变动数据发往Kafka。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容