maxwell、canal分别与GPKAFKA连接的异同
粗浅的看了下两者的区别以及简单应用了下,如果有错误的地方请大家指正
一.筛选比对
1.maxwell的filter:
官网说明:https://maxwells-daemon.io/filtering/
--filter = 'exclude: *.*, include: db1.*'
请务必记住前面要加入
exclude: *.*
采用带启动配置文件的修改对应的config.properties文件
/app/maxwell-1.35.5/bin/maxwell --config config.properties
格式大同小异,如下:
filter= exclude: *.*, include: test.home
2.canal的渠道模式
官网说明:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
参考配置及说明:
vi /app/canal/conf/example/instance.properties
canal.mq.topic=mysql
canal.mq.dynamicTopic=test.home
如果说对应的binlog中包括了test.home的表,他自动将对应的binlog解析转到test_home这个topic下,其他的统统转到mysql这个topic下。
3.二者区别或者局限性相,对于GPKAFKA而言
首先申明,可能是我对GPSS的理解上可能存在不足,我没有找到更好的解决方案,因此我在配置GPSS的时候存在比较多的问题,理论上应该有,但是不知道怎么设置。因此先把这个假设前提建立在我的认识上,即GPKAFKA可能没有更好的筛选方案,GPKAFKA对kafka的消费数据的渠道分流能力不够好。
假设一个项目有10张表需要同步到greenplum
maxwell方案:由于GPKAFKA无法精细分析出该topic下binlog日志对应的每个表的分别操作或者相对较弱,或者资源消耗相对过大,因此需要对应kafka开启10个topic,并且开启10个maxwell,分别筛选至对应的topic;然后开启10个GPSS,分别对应不同的topic建立对应的yaml进行同步;
canal方案:由于canal本身支持管道功能,因此在第一步就可以只在kafka下开启10个topic,然后只用1个canal就可以写入分别对应的topic;同时由于分别写入的topic对应的数据量也不会太高;
假设GPKAFKA有较好的筛选功能,但是由于maxwell的局限性,他不能将一个binlog分别解析到不同的topic,也就是说GPKAFKA消费的数据量会从10张表的数据源去筛选出自己对应这个进程所需要的内容,这样其实也会造成资源的浪费;
总而言之maxwell方案不适合给GPKAFKA做多表的实时同步!至少我看到的是这样的。
二.GPKAFKA的实现比对:
1.json的格式不一样:
- maxwell:
{"data":[{"id":"13","tab":"123","values":"1231231"}],"database":"test","es":1642058435000,"id":10,"isDdl":false,"mysqlType":{"id":"int(14)","tab":"varchar(20)","values":"varchar(60)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"tab":12,"values":12},"table":"home","ts":1642058440150,"type":"DELETE"}
{"data":[{"id":"32","tab":"12","values":"213"}],"database":"test","es":1642058451000,"id":11,"isDdl":false,"mysqlType":{"id":"int(14)","tab":"varchar(20)","values":"varchar(60)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"tab":12,"values":12},"table":"home","ts":1642058455906,"type":"INSERT"}
- canal:
{"data":[{"id":"444","tab":"444","values":"4444","rrr":null}],"database":"test","es":1642062004000,"id":5,"isDdl":false,"mysqlType":{"id":"int(14)","tab":"varchar(21)","values":"varchar(60)","rrr":"varchar(50)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"tab":12,"values":12,"rrr":12},"table":"home","ts":1642062008970,"type":"INSERT"}
2.yaml样式
- maxwell
DATABASE: test
USER: gpadmin
HOST: 192.168.10.227
PORT: 5432
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: 192.168.10.225:9092,192.168.10.226:9092,192.168.10.228:9092
TOPIC: mysql
VALUE:
COLUMNS:
- NAME: c1
TYPE: json
FORMAT: json
ERROR_LIMIT: 100
OUTPUT:
SCHEMA: test
MODE: MERGE
MATCH_COLUMNS:
- id
UPDATE_COLUMNS:
- tab
- values
ORDER_COLUMNS:
- ts
- xid
- del_mark
- ddl_type
TABLE: home
MAPPING:
- NAME: id
EXPRESSION: (c1->'data'->>'id')::decimal
- NAME: tab
EXPRESSION: (c1->'data'->>'tab')::text
- NAME: values
EXPRESSION: (c1->'data'->>'values')::text
- NAME: ts
EXPRESSION: (c1->>'es')::decimal
- NAME: xid
EXPRESSION: (c1->>'id')::decimal
- NAME: ddl_type
EXPRESSION: (c1->>'type')::text
- NAME: del_mark
EXPRESSION: CASE WHEN ((c1->>'type')::text= 'DELETE') then true else false end
COMMIT:
MINIMAL_INTERVAL: 2000
- canal
DATABASE: test
USER: gpadmin
HOST: 192.168.10.227
PORT: 5432
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: 192.168.10.225:9092,192.168.10.226:9092,192.168.10.228:9092
TOPIC: test_home
VALUE:
COLUMNS:
- NAME: c1
TYPE: json
FORMAT: json
ERROR_LIMIT: 100
OUTPUT:
SCHEMA: test
MODE: MERGE
MATCH_COLUMNS:
- id
UPDATE_COLUMNS:
- tab
- values
ORDER_COLUMNS:
- ts
- xid
- del_mark
- ddl_type
TABLE: home
MAPPING:
- NAME: id
EXPRESSION: ((c1#>>'{data,0}')::json->>'id')::decimal
- NAME: tab
EXPRESSION: ((c1#>>'{data,0}')::json->>'tab')::text
- NAME: values
EXPRESSION: ((c1#>>'{data,0}')::json->>'values')::text
- NAME: ts
EXPRESSION: (c1->>'es')::decimal
- NAME: xid
EXPRESSION: (c1->>'id')::decimal
- NAME: ddl_type
EXPRESSION: (c1->>'type')::text
- NAME: del_mark
EXPRESSION: CASE WHEN ((c1->>'type')::text= 'DELETE') then true else false end
COMMIT:
MINIMAL_INTERVAL: 2000
-
两者区别:
json格式的设置这个需要注意设置。
当对应的binlog有其他表在进行的时候,MAXWELL对应的GPKAFKA任务表会不停的插入空数据行,可能是我设置的问题,由于canal是采取分流方式获得的topic,因此GPKAKA在获取数据的时候仅有一个表的数据,不会有空行的出现。
综合推荐:
如果采用GPKAFKA作为数据实时同步的解决方案,建议还是考虑用canal,当然这个可能是因为我太菜,maxwell+GPKAFKA会摩擦出更多火花也不一定。