debezium 实时同步 kafka
本文档介绍了如何从 mysql 和 oracle 中实时抽取数据到 kafka。
前期准备
2台 linux 服务器用于部署 debezium 高可用。
java 11环境
搭建 msk 集群(3可用区,3节点),记录 msk 私有终端节点(b-1.xxxx)和 zookeeper(z-1.xxxx) 连接地址
-
mysql 和 oracle 开启 binlog,format 为 row,且 binglog 保留日期不低于2天,如果 binlog 保留时间很短,debezium 中断重启后会找不到断点记录的 binlog 文件而报错
rds mysql 设置 binlog 保留时长,连接到数据库,执行以下 sql 语句
CALL mysql.rds_set_configuration('binlog retention hours', 168); # 设置 binlog 保存时长为168个小时
-
sqlserver 要开启 cdc 才能进行同步,且只有企业版 sqlserver 才能开启 cdc
使用 RDS 提供的函数启用 SQL Server 的 CDC(Change Data Capture,数据变更捕捉)功能
EXEC msdb.dbo.rds_cdc_enable_db <数据库名称>
再启用同步表的 CDC 功能,每一张同步的表都需要此操作
EXEC sys.sp_cdc_enable_table @source_schema = N'<schema 名称>', @source_name = N'<需要同步的表名>', @role_name = NULL
此时,我们可以用下列命令来查看 CDC 情况
EXEC sys.sp_cdc_help_change_data_capture
进入 msk 控制台,创建参数组,并且修改 auto.create.topics.enable=true,记录下 default.replication.factors 的值
修改 msk 集群的参数组为新创建的参数组,等待其自动更新配置完毕
搭建 debezium 环境
登录到用于部署 debezium 的服务器上,2台都需要搭建
-
下载 kafka 安装包:kafka download 复制对应版本的安装包的下载链接
sudo wget https://archive.apache.org/dist/kafka/2.5.1/kafka_2.12-2.5.1.tgz sudo tar -zxvf kafka_2.12-2.5.1.tgz sudo mv kafka_2.12-2.5.1 /opt/kafka
-
下载 debezium connector:debezium connector 复制 mysql, sqlserver, oracle connector 的下载链接
sudo mkdir -p /opt/kafka-connectors cd /opt/kafka-connectors sudo wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.2.Final/debezium-connector-mysql-1.9.2.Final-plugin.tar.gz sudo wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/1.9.2.Final/debezium-connector-oracle-1.9.2.Final-plugin.tar.gz sudo wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/1.9.2.Final/debezium-connector-sqlserver-1.9.2.Final-plugin.tar.gz sudo tar -zxvf debezium-connector-mysql-1.9.2.Final-plugin.tar.gz sudo tar -zxvf debezium-connector-oracle-1.9.2.Final-plugin.tar.gz sudo tar -zxvf debezium-connector-sqlserver-1.9.2.Final-plugin.tar.gz
-
下载 ojdbc8.jar 到 debezium-connector-oracle 中
cd debezium-connector-oracle sudo wget https://download.oracle.com/otn-pub/otn_software/jdbc/215/ojdbc8.jar ll oj* # 查看确保ojdbc8.jar 存在
-
修改 kafka 配置
cd /opt/kafka sudo vim config/connect-distributed.properties # 修改 bootstrap.servers=b-1.xxxxx:9092,b-2.xxxxxx:9092,b-3.xxxxxx:9092 group.id=debezium1 # 相同 group.id 会组成一个 debezium 集群 config.storage.replication.factor=<msk 中 default.replication.factors 的值> status.storage.replication.factor=<msk 中 default.replication.factors 的值> offsets.storage.replication.factor=<msk 中 default.replication.factors 的值> # 新增 plugin.path=/opt/kafka-connectors connector.client.config.override.policy=All # 保存退出
-
创建 /usr/lib/systemd/system/kafka-connector.service 文件,并填入以下内容
[Unit] Description=Kafka Connector After=network.target [Service] User=root Type=simple WorkingDirectory=/opt/kafka Environment="LOG_DIR=/var/log/opt/kafka" ExecStart=/opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties RestartSec=1s Restart=on-failure [Install] WantedBy=multi-user.target
-
启动服务
sudo systemctl daemon-reload sudo systemctl enable kafka-connector.service sudo systemctl start kafka-connector.service
-
查看任务状态
sudo systemctl status kafka-connector.service tail -f /var/log/opt/kafka/connect.log
创建同步任务
-
创建配置文件,只需要在其中一台服务器上操作即可。
sudo mkdir -p /opt/kafka/connector sudo vim /opt/kafka/connector/lims-cdc.json
-
修改配置文件内容
{ "name": "lims-cdc", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "xxxxxx.rds.cn-northwest-1.amazonaws.com.cn", "database.port": "3306", "database.user": "xxxxxx", "database.password": "xxxxxx", "database.server.id": "54321", "database.server.name": "xxxx", "table.include.list": "lims1_dev.t_lims_order,lims1_dev.t_lims_folder", "snapshot.mode": "initial", "tombstones.on.delete": "false", "database.history.kafka.bootstrap.servers": "b-1.xxxx:9092,b-3.xxxx:9092,b-2.xxxx:9092", "database.history.kafka.topic": "dbhistory", "tombstones.on.delete": "false", "transforms": "Reroute", "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter", "transforms.Reroute.topic.regex": "(.*)", "transforms.Reroute.topic.replacement": "collect_data_lims", "transforms.Reroute.key.field.name": "shard_id", "include.schema.changes": "false", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter" } }
部分参数说明:
- name:本 connector 的 name,可自定义
- database.hostname:数据库 host
- database.server.name: 影响 kafka 中生成 topic 的前缀,可以和 name 一致
- table.include.list: 需要同步的表的名称:<database-name>.<table-name>,需带数据库名称,可以多个,以英文逗号分隔
- database.history.kafka.bootstrap.servers:msk 私有终端节点名称
- database.history.kafka.topic:history 的 topic 的名称,修改成 dbhistory-<connector 的 name>
- transforms.Reroute.topic.replacement:该任务对应的 msk 中的 topic 的名称,可自定义,不填则默认每张表一个topic
其余参数保持默认即可
如果需要在快照时指定条件,在 config 列表中加入以下两个参数,修改值为你的表名和 sql 语句。
"snapshot.select.statement.overrides": "customer.orders", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC",
-
启动任务 (官方接口文档)
curl -i -X POST -H Accept:application/json -H Content-Type:application/json localhost:8083/connectors/ -d @<配置文件.json>
-
查看任务状态
curl -X GET -H Accept:application/json localhost:8083/connectors/<你的 connector 的 name>/status | jq
-
暂停任务
curl -i -X PUT -H Accept:application/json localhost:8083/connectors/<你的 connector 的 name>/pause
-
恢复任务
curl -i -X PUT -H Accept:application/json localhost:8083/connectors/<你的 connector 的 name>/resume
-
删除任务
curl -X DELETE -H Accept:application/json localhost:8083/connectors/<你的 connector 的 name>
-
在线更新配置
获取当前 connector 的配置
curl -X GET -H "Accept:application/json" localhost:8083/connectors/<你的 connector 的 name>/config | jq
复制结果,保存到 /opt/kafka/connector/<你的 connector 的 name>-config-当前日期.json 文件中,根据需求修改对应参数。
增加同步表,只需修改 table.include.list
{ "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "xxxxxx.rds.cn-northwest-1.amazonaws.com.cn", "database.port": "3306", "database.user": "xxxxxx", "database.password": "xxxxxx", "database.server.id": "54321", "database.server.name": "xxxx", "table.include.list": "lims1_dev.t_lims_order,lims1_dev.t_lims_folder", "snapshot.mode": "initial", "tombstones.on.delete": "false", "database.history.kafka.bootstrap.servers": "b-1.xxxx:9092,b-3.xxxx:9092,b-2.xxxx:9092", "database.history.kafka.topic": "dbhistory", "tombstones.on.delete": "false", "transforms": "Reroute", "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter", "transforms.Reroute.topic.regex": "(.*)", "transforms.Reroute.topic.replacement": "collect_data_lims", "transforms.Reroute.key.field.name": "shard_id", "include.schema.changes": "false", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter" }
curl -i -X PUT -H Accept:application/json -H Content-Type:application/json localhost:8083/connectors/<你的 connector 的 name>/config -d @<修改后的json名称>
-
oracle 配置有区别外,其余操作相同。
decimal.handling.mode:默认参数值是 precise,用 java 的 java.math.BigDecimal 来表示数字,可根据需求修改为 double/string
{ "name": "srm-cdc", "config": { "connector.class": "io.debezium.connector.oracle.OracleConnector", "tasks.max": "1", "database.hostname": "xxxxxx.rds.cn-north-1.amazonaws.com.cn", "database.port": "1521", "database.user": "xxxx", "database.password": "xxxx", "database.dbname": "SRMDEV", "database.server.name": "srm", "decimal.handling.mode": "string", "table.include.list":"SRMTEST.base_supplier,SRMTEST.base_supplier_manufacturer", "database.history.kafka.bootstrap.servers": "b-1.xxxx:9092,b-3.xxxx:9092,b-2.xxxx:9092", "database.history.kafka.topic": "dbhistory-oracle", "tombstones.on.delete": "false", "transforms": "Reroute", "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter", "transforms.Reroute.topic.regex": "(.*)", "transforms.Reroute.topic.replacement": "collect_data_srm", "transforms.Reroute.key.field.name": "shard_id", "include.schema.changes": "false", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter" } }
- sqlserver 配置有区别外,其余操作相同。
{ "name": "sqlserver-cdc", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "xxxxxxx", "database.port": "1433", "database.user": "xxxx", "database.password": "xxxx", "database.server.id": "54321", "database.server.name": "mssql", "database.dbname": "xxxxx", "schema.include.list": "xxxxx", "table.include.list": "xxx.xxxx", "database.history.kafka.bootstrap.servers": "b-1.xxxx:9092,b-2.xxxx:9092", "database.history.kafka.topic": "dbhistory-mssql", "tombstones.on.delete": "false", "transforms": "Reroute", "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter", "transforms.Reroute.topic.regex": "(.*)", "transforms.Reroute.topic.replacement": "collect_data_mssql", "transforms.Reroute.key.field.name": "shard_id", "include.schema.changes": "false", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter" } }
- kafka 同步到 mongodb
在 kafka-connectors 中下载 mongo connector
cd /opt/kafka-connectors sudo wget https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.7.0/mongo-kafka-connect-1.7.0-all.jar
在 /opt/kafka/connector 中新增 mongo-sink.json(可自定义) 配置文件,修改内容:
{ "name": "mongo-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "connection.uri": "mongodb://<username>:<password>@<host>:<port>/?retryWrites=false", "database": "xxxx", "collection": "xxxx", "topics": "mysql.test_flink_cdc.table1", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.mysql.MysqlHandler", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter" } }
部分参数说明:
connection.uri:mongo 连接的 uri,根据实际情况填写
-
change.data.capture.handler:根据 topic 中来源的数据库类型,选择对一个的 handler
Change Data Capture Handlers — MongoDB Kafka Connector
- com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.mysql.MysqlHandler
- com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.postgres.PostgresHandler
- com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler
使用第3步中的 curl 请求,并指定 mongo-sink.json 启动 connector
查看同步信息
-
查看 kafka topic
# 定义环境变量 zk_addr=z-1.xxxx:2181,z-3.xxxx:2181,z-2.xxxx:2181 bs_addr=b-1.xxxx:9092,b-2.xxxx:9092 bin/kafka-topics.sh --list --zookeeper $zk_addr
topic 中应有以下topic:
- connect-configs
- connect-offsets
- connect-status
- 你定义的 Reroute.topic
- 你定义的 history.kafka.topic
-
查看你的 Reroute.topic,如果数据量很大,删除 --from-beginning 参数
bin/kafka-console-consumer.sh --bootstrap-server $bs_addr --from-beginning --topic <你定义的Reroute.topic>
示例数据:
{ "before":{ "id":4, "class":"4", "teacher":"dddd" }, "after":{ "id":4, "class":"4", "teacher":"eeee" }, "source":{ "version":"1.9.2.Final", "connector":"mysql", "name":"mysql", "ts_ms":1652342716000, "snapshot":"false", "db":"test_flink_cdc", "sequence":null, "table":"table2", "server_id":488905673, "gtid":null, "file":"mysql-bin-changelog.197425", "pos":605, "row":0, "thread":0, "query":null }, "op":"u", "ts_ms":1652342716967, "transaction":null }
字段说明:
- before:原数据,新增则是 null
- after:修改后数据,删除则是 null
- source:配置信息:
- file: binlog 文件
- table: 表名
- db: 数据库名称
- op: 操作类型
- r: 第一次 snapshot 时获取到的行
- u:修改
- c:新增
- d: 删除
- ts_ms:时间戳
修改源端数据库,kafka topic 可以实时刷新出 message
坑
- 时间戳 +8 小时
如果你的数据库里是设置了时区的,debezium 会把date类型的数据转成timestamp,但是它不会拿数据库里的时区设置,而是直接把这个字符串按照utc时间转时间戳,所以在kafka中看到的时间戳比实际的时间就多了8个小时。解决方法就是自己写转换器,打成 jar 包并放到对应的mysql/oracle connector 的目录下,然后在 json 里配置这个 convert。
github 上有一些现成的转换器,但都有些奇奇怪怪的问题,比如cdc的时间是对的,但是快照的时间错误,date 类型是对的,timestamp类型是错的等。我这有改好的 jar 包,懒得找地方上传了,反正也就我自己看。 - oracle redo log设置的空间太小,导致 oracle 归档日志死循环,迅速把磁盘空间撑爆。
debeizum 做 cdc的时候,会产生大量的 redo log,超过 oracle 设置的 redo log 空间时,oracle 就会把多的日志落盘,这个行为又会产生日志,就一直死循环,每小时10G左右的速度消耗磁盘空间,直到磁盘占满,系统挂掉。解决方法就是把 oracle 的 redo log 空间调大。 - debezium 锁表
debezium 打快照的时候默认会锁表,如果快照的数据量大的话,锁几个小时都是可能的,可能会导致其他应用的连接断开,可以通过设置 snapshot.locking.mode:none 参数,让它不锁表。建议生产环境还是在数据库停机的状态下执行快照行为。