mysql+canal+kafka+elasticsearch构建订单搜索平台(kafka接收binlog变更)

上面几篇文章讲了基础搭建过程:

canal搭建:https://www.jianshu.com/p/917795a7acc1

zookeeper 搭建:https://www.jianshu.com/p/d3c1727ba1c4

kafka搭建:https://www.jianshu.com/p/c6916c4e4cae

下面我们来看如何串联整体流程。

1.首先进行canal配置。改动配置文件canal.properties。改动如下:

serverMode改为kafka

消息队列配置:

监听地址改为本机地址

2.具体实例改动:

instance.propties改动:

topic配置说明:


在这里我们是topic:scheme.table配置。

kafka分片配置是根据表的主键进行配置:

3.在这里有个关键步骤,需要开启消息队列的自动创建topic模式。否则会报错

相关配置在kafka的server.properties

添加如下配置:


4.相关改动完成后重启canal和kafka

5.如何测试kafka接收binlog消息:

下载canal项目:git clone https://github.com/alibaba/canal.git

运行如下代码:

项目代码:

canal/example/src/main/java/com/alibaba/otter/canal/example/kafka/CanalKafkaClientExample.java

改动对应配置为测试环境配置:


运行CanalKafkaClientExample的main方法。然后改动数据库数据。查看效果,目前已经成功接收数据

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容