Debezium SQL server 2016同步到kafka

写这篇文章的主要目的是因为在自己使用Debezium的时候遇见的问题,网上的案例又比较少,自己决定写一篇文章,希望可以帮助到其他人,因为是第一次写博客,有什么不足的地方希望见谅或提出宝贵意见。

一、需求和软件的版本

(1)需求:监控sellInfo表的部分列,当这些列新增或更新或删除的时候,kafka生成一条变更信息到消费端,业务代码拿到这些信息来更新Elasticsearch

(2)软件版本:sqlServer版本,Microsoft SQL Server 2016 (SP1) ;

                          kafka版本,kafka2.12-2.5.0;

                          Debezium,JAR版本,debezium-debezium-connector-sqlserver-1.2.1

二、开启CDC

(1)为数据库开启CDC(需要sysadmin权限)

EXEC sys.sp_cdc_enable_db

(2)为数据库表开启CDC

EXEC sys.sp_cdc_enable_table       

@source_schema = 'dbo',      --数据库名

@source_name = 'SellInfo',        --数据库表名

@capture_instance = default,     --默认实例   

@role_name = NULL,        --角色权限,null表示不设置权限

@captured_column_list = 'ArticleId,HtmlID,Title'  --为哪些列开启监控

sellInfo开启cdc成功的标志,数据库出现cdc库,如图所示,这是发生update操作时,cdc会做记录。

二、安装kafka和开启kafka-connect

(1)下载kafka,并解压,重命名

当前路径  /root

tar zxvf kafka_2.12-2.5.0.tgz -C ./

mv kafka_2.12-2.5.0 kafka

  (2)修改kafka/config/server.properties

listeners = PLAINTEXT://192.168.16.2:9092

advertised.listeners=PLAINTEXT://192.168.16.2:9092

(3)开启zookeeper,开启kafka

当前路径  /root/kafka

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

bin/kafka-server-start.sh -daemon config/server.properties

   (4)下载Debezium JAR包

     JAR下载地址,下载jar,并解压到 /kafka/kafka_connect_plugins(自己创建一个文件夹)

 (5)修改kafka/config/connect-distribute.properties

bootstrap.servers=192.168.16.2:9092

plugin.path= /root/kafka/kafka_connect_plugins

   (6)启动kafka-connect

当前路径 /root/kafka

bin/connect-distributed.sh -daemon config/connect-distributed.properties

开启成功的标志,postman可以使用访问端口(默认的端口号,可以在connect-distributed.properties 更改)


(7)编辑connect的属性

当前路径 /root/kafka

创建一个json文件,vi sellInfo.json

{

    "name": "sqlserver-cdc-dbo-sellInfo",

    "config": {

        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",

        "database.hostname": "xx.xx.xx.xx",     ---数据库ip

        "database.port": "1433",                        ----端口号

        "database.user": "xxxx",                         ----数据库用户名

        "database.password": "xxxx",                 ----数据库密码

        "database.dbname": "idea_test",            -----数据库名

        "database.server.name": "fullfillment",

        "table.whitelist": "dbo.SellInfo",               -----数据库表白名单

        "snapshot.mode":"schema_only",           ----快照方式,inital--表数据全量更新,schema_only--表数据开启cdc后的更新

        "column.blacklist":"dbo.SellInfo.xxx,dbo.SellInfo.xxx"   ----不需要监控的表的字段(下面有我用的时候遇见的问题)

        "database.history.kafka.bootstrap.servers": "192.168.16.2:9092", 

        "database.history.kafka.topic": "dbhistory.fullfillment",

        "value.converter.schemas.enable":"false",

        "value.converter":"org.apache.kafka.connect.json.JsonConverter",

        "event.processing.failure.handing.mode":"warn"

    }

}

更多数据设置可以参考Debezium的官方文档:

用于SQL Server的Debezium连接器(里面还有一些其他的连接器,mysql等)

   (8)执行连接器

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://192.168.16.2:8083/connectors/ -d @sellInfo.json

开启成功的标志:

更多postman关于连接器的操作可以参考这篇文章,帮助我很多

sqlserver增量订阅&消费实时同步kafka,最新解决方案,看完不会你打我!

三、验证成果

1、update [dbo].SellInfo set Audit = 2 where ArticleId = 1 (可以是新增,可以是删除)

2、查看当前的有哪些topic

此时路径 /root/kafka

bin/kafka-topics.sh --zookeeper localhost --list

__consumer_offsets  ----默认创建

connect-configs         ----默认创建

connect-offsets          ----默认创建

connect-status           ----默认创建

dbhistory.fullfillment   ----记录表的结构信息

fullfillment                  ----记录表的结构信息

fullfillment.dbo.SellInfo  -----SellInfo表改变的记录

查看fullfillment.dbo.SellInfo的信息:

当前路径  /root/kafka

bin/kafka-console-consumer.sh --bootstrap-server 192.168.16.2:9092 --topic fullfillment.dbo.SellInfo --from-beginning

四、总结

(1)第一次搞这种冷门的东西,刚开始确实有点头疼,后来明白,一定要学会参考别人的东西,查看官方文档,制定流程分阶段进行等,这也是我为什么要写这篇文章的原因

(2)可能出现的问题,CDC的开启,如果在执行命令的时候出现错误、按照给出的错误信息去baidu;kafka的安装与运行 --daemon是可以让程序离开shell界面后台运行,但是不会显示开启时的日志;kafka-connect的配置文件sellInfo.json中的'column.blacklist'字段出现问题,比如一张表a,id,name,age,phone,我CDC监控id,age ,’column.blacklist‘剩下的字段,运行connect同步数据时会出错,我看了源码感觉应该是一个bug,当然很有可能是有些配置不知道怎么使用导致的,最后我通过分析源码将表的结构变成id,age,name,phone避免了这种异常,错误代码我贴出来,希望有人可以解答这个问题。

08-03 12:41:56,160] ERROR Error requesting a row value, row: 3, requested index: 15 at position 3 (io.debezium.relational.TableSchemaBuilder:221) [2020-08-03 12:41:56,160] ERROR Producer failure (io.debezium.pipeline.ErrorHandler:31) org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, event_serial_no=2, commit_lsn=000003cb:000ec557:0003, change_lsn=000003cb:000ec557:0002} at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:220) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:247) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:524) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:190) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:162) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:108) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:222) at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:251) at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:143) at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:97) at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:51) at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:193) ... 10 more

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352