Kafka Connect 实现MySQL增量同步

前言

本文将使用Kafka Connect 实现MySQL增量同步,设计三种模式,分别为incrementing timestamp timestamp+incrementing
理论续自上文
当然你也可以使用除了MySQL其他DB,参考官网放置对应的驱动文件即可。

以下实验请在能正常Kafka生产消费的基础之上进行。

1、incrementing 自增模式

准备工作

创建 A数据库源表person

CREATE TABLE `person` (
  `pid` int(11) NOT NULL AUTO_INCREMENT,
  `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`pid`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

创建 B数据库目标表kafkaperson

CREATE TABLE `kafkaperson` (
  `pid` int(11) NOT NULL AUTO_INCREMENT,
  `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`pid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

切换目录 D:\com\kafka_2.11-2.0.1\config
quickstart-mysql.properties(source)

name=mysql-a-source-person
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/A?user=***&password=***
# incrementing  自增
mode=incrementing
# 自增字段  pid
incrementing.column.name=pid
# 白名单表  person
table.whitelist=person
# topic前缀   mysql-kafka-
topic.prefix=mysql-kafka-

quickstart-mysql-sink.properties(sink)

name=mysql-a-sink-person
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称
topics=mysql-kafka-person
# 配置JDBC链接
connection.url=jdbc:mysql://localhost:3306/B?user=***&password=***
# 不自动创建表,如果为true,会自动创建表,表名为topic名称
auto.create=false
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以pid为主键更新
pk.mode = record_value
pk.fields = pid
#表名为kafkatable
table.name.format=kafkaperson

实验一

创建 Kafka Topic: mysql-kafka-person

D:\com\kafka_2.11-2.0.1\bin\windows>kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-person

启动 Kafka Connect

D:\com\kafka_2.11-2.0.1\bin\windows>connect-standalone.bat D:/com/kafka_2.11-2.0.1/config/connect-standalone.properties D:/com/kafka_2.11-2.0.1/config/quickstart-mysql.properties D:/com/kafka_2.11-2.0.1/config/quickstart-mysql-sink.properties

A库person表插入三条数据


创建测试数据

稍后发现B库kafkaperson表中进入了这三条数据


image.png

consumer控制台 观察到三条数据
Topic console

此时插入1条新数据

INSERT INTO person(pid, firstname, age) VALUES(4, 'zl', 20);

发现B库表中也多了一条pid为4的数据


添加新的数据

实验一结论

JDBC Sink Connector 官网中指出insert.mode有且仅有两个值
insert.mode=insert只接收标准的INSERT SQL新增语句
insert.mode=upsert接收新增和更新,当对主键修改时也可以洞察并且输出。而insert是无法满足此要求的,因此根据实际业务使用的场景选择insert.mode

INSERT INTO person (pid, firstname, age) VALUES (2, 'ls', 15) ON DUPLICATE KEY UPDATE firstname="world"; 

然而我在实验过程中并没有成功,因此没看出来insert和upsert的区别,希望成功的人可以在留言中指正下!

2、timestamp 时间戳模式

准备工作

创建 A数据库源表comments

CREATE TABLE `comments` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `content` varchar(255) DEFAULT NULL,
  `commenttime` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=latin1;

注意timestamp必须指定not null,否则会报错!(无法使用时间戳列进行增量查询因为时间戳字段是可为空的...)
创建 B数据库源表kafkacomments

CREATE TABLE `kafkacomments` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `content` varchar(255) DEFAULT NULL,
  `commenttime` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=latin1;

切换目录 D:\com\kafka_2.11-2.0.1\config
timestamp-mysql-source.properties(source)

name=mysql-b-source-comments
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/A?user=***&password=***
table.whitelist=comments
mode=timestamp
timestamp.column.name=commenttime
topic.prefix=mysql-kafka-

timestamp-mysql-sink.properties(sink)

name=mysql-b-sink-comments
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称
topics=mysql-kafka-comments
# 配置JDBC链接
connection.url=jdbc:mysql://localhost:3306/B?user=***&password=***
# 不自动创建表,如果为true,会自动创建表,表名为topic名称
auto.create=false
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以id为主键更新
pk.mode = record_value
pk.fields = id
#表名为kafkatable
table.name.format=kafkacomments

实验二

创建 Kafka Topic: mysql-kafka-comments

D:\com\kafka_2.11-2.0.1\bin\windows>kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-comments

启动 Kafka Connect

D:\com\kafka_2.11-2.0.1\bin\windows>connect-standalone.bat D:/com/kafka_2.11-2.0.1/config/connect-standalone.properties D:/com/kafka_2.11-2.0.1/config/timestamp-mysql-source.properties D:/com/kafka_2.11-2.0.1/config/timestamp-mysql-sink.properties

A库comments表插入四条数据


A.comments表

此时发现控制台和目标表中有了四条数据


image.png

B.kafkacomments表

此时修改id为2和4的内容content,并修改评论时间commenttime

update comments set content = "show test data" ,commenttime="2018-12-20 15:55:10" where id in(2,4)

发现源表和目标表中的内容都发生了变化!


image.png

image.png

image.png

注意:

1、如果修改的时间戳早于latest time ,则不会洞察到更新。例如MySQL中的now()获取当前时间就是很好的能被获取到的例子。
2、源表向目标表传输数据,假设有两条(或以上)的数据行拥有同样的时间戳,如果在传输第二条的过程中崩溃,恢复过后第二条将会被丢失,因为latest time已经被记录过了,他只会去找更新的下一次时间。这种方式虽然能获取到update更新,但是不够稳健。而如果使用自增字段加时间戳字段混合的方式,即使崩溃也能记录到更新的最大ID,恢复之后可以被找到不会丢失。因此我们更推荐第三种方式!timestamp+incrementing

3、timestamp+incrementing 时间戳自增混合模式

实验过程同方法2不做赘述,唯一变动的是source的config文件

name=mysql-b-source-comments
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/A?user=***&password=***
table.whitelist=comments
mode=timestamp+incrementing
timestamp.column.name=commenttime
incrementing.column.name=id
topic.prefix=mysql-kafka-

至此完成使用Kafka Connect 三种模式实现MySQL增量同步!

谢谢阅读,有帮助的点个❤!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,193评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,306评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,130评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,110评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,118评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,085评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,007评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,844评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,283评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,508评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,395评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,985评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,630评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,797评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,653评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,553评论 2 352

推荐阅读更多精彩内容

  • 前言: 最近需要调研Kafka Connect可连接哪些数据源,特此学习官网并翻译了下文档。Confluent J...
    CNSTT阅读 13,906评论 4 6
  • MYSQL 基础知识 1 MySQL数据库概要 2 简单MySQL环境 3 数据的存储和获取 4 MySQL基本操...
    Kingtester阅读 7,809评论 5 116
  • 关于Mongodb的全面总结 MongoDB的内部构造《MongoDB The Definitive Guide》...
    中v中阅读 31,928评论 2 89
  • 有一种落差是,你配不上自己的野心,也辜负了自己的期望。 现在的我成了当时最讨厌的那类人,却身处迷惘,我们都希望来生...
    麒实在等你阅读 598评论 0 1
  • 不管昨夜你经历了怎样的泣不成声,早晨醒来 这个城市依旧车水马龙,人 总要学着自己长大。
    小翼a阅读 121评论 0 0