上面几篇文章讲了基础搭建过程:
canal搭建:https://www.jianshu.com/p/917795a7acc1
zookeeper 搭建:https://www.jianshu.com/p/d3c1727ba1c4
kafka搭建:https://www.jianshu.com/p/c6916c4e4cae
下面我们来看如何串联整体流程。
1.首先进行canal配置。改动配置文件canal.properties。改动如下:
serverMode改为kafka
消息队列配置:
监听地址改为本机地址
2.具体实例改动:
instance.propties改动:
topic配置说明:
在这里我们是topic:scheme.table配置。
kafka分片配置是根据表的主键进行配置:
3.在这里有个关键步骤,需要开启消息队列的自动创建topic模式。否则会报错
相关配置在kafka的server.properties
添加如下配置:
4.相关改动完成后重启canal和kafka
5.如何测试kafka接收binlog消息:
下载canal项目:git clone https://github.com/alibaba/canal.git
运行如下代码:
项目代码:
canal/example/src/main/java/com/alibaba/otter/canal/example/kafka/CanalKafkaClientExample.java
改动对应配置为测试环境配置:
运行CanalKafkaClientExample的main方法。然后改动数据库数据。查看效果,目前已经成功接收数据