CanalAdapter同步ES7
预准备
- Mysql5.7
- ElasticSearch7.12.1集群或单击(7.x +)
- 一个数据库和一个表结构
1.下载
canal.deployer-1.1.5.tar.gz对应的是canal的server端,负责订阅并解析Mysql-Binlog
canal.adapter-1.1.5.tar.gz对应的是适配器,负责将server的binlog转换并发送给对应的应用
canal.admin-1.1.5.tar.gz一个可视化webui可以不安装
额外需要下载v1.1.5-alpha-2快照版本的canal.adapter-1.1.5.tar.gz(release1.1.5版本的jar包有bug)
分别解压缩后,将v1.1.5-alpha-2解压缩文件夹下plugin文件夹中的 client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar 替换掉release版本的plugin文件的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar
2. 开启Mysql-Binlog
进入mysql终端执行
show variables like 'log_bin';
如果是ON就代表已经开启。
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
3. 修改配置
- 先修改canal.deployer的配置
参考官方中文文档: https://github.com/alibaba/canal/wiki/QuickStart - 修改canal.adapter配置 conf/application.yml
更多请参考官方中文文档:https://github.com/alibaba/canal/tree/master/client-adapter
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# 省略.... 默认即可
srcDataSources: # 注意打开注释,然后要和上面对齐否则会报错(但是却又不告诉你是格式问题..)
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
username: root
password: 123456
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: es7 # 这里要改为es7 他会去es7下去找配置
key: exampleKey #这里是全量导入时的taskid
hosts: es1:9201,es2:9202,es3:9203 # 我这里是集群,如果单机写一个就行ip:port
properties:
mode: rest # restful 模式
cluster.name: es-cluster # 集群名字 GET _cat/health?v 可以看到对应的名字
- 修改es7下模板文件 conf/es7/mytest_user.yml
dataSourceKey: defaultDS #和上面canal.conf.srcDataSources.defaultDS要一样
outerAdapterKey: exampleKey #和上面canal.conf.canalAdapters.instance.groups.outerAdapters.key要一样
destination: example #和上面canal.conf.canalAdapters.instance要一样
groupId: g1 #和上面canal.conf.canalAdapters.instance.groups.groupId要一样
esMapping:
_index: test #索引名称
_id: _id #documentid
_type: _doc # type
# upsert: true
# pk: id
sql: "select a.id as _id, a.name_cn, a.name_en, a.email
from user a" # 查询的sql返回的结构要使用as别名和es的filed对应
# objFields:
# _labels: array:;
commitBatch: 3000 #批量提交数量
4. 创建对应的索引
PUT test
{
"settings":{
"number_of_shards":1,
"number_of_replicas":2
},
"mappings":{
"properties":{
"name_cn" : {
"type" : "keyword"
},
"name_en" : {
"type" : "keyword"
},
"email" : {
"type" : "keyword"
}
}
}
}
5. 启动
均在对应目录下 /bin/startup.sh
- 先启动deployer,查看/logs/canal/canal.log 无异常后再查看/logs/example/example.log均无异常则启动成功
- 启动adapter,查看/logs/adapter/adapter.log 无异常则启动成功
6.首次全量导入
curl http://localhost:8081/etl/es7/exampleKey/mytest_user.yml -X POST
//执行完对应在/logs/adapter/adapter.log会有批量导入成功的日志
start etl to import data to index: test
数据全量导入完成, 一共导入 1585 条数据, 耗时: 643
/**
* ETL curl http://127.0.0.1:8081/etl/rdb/oracle1/mytest_user.yml -X POST
*
* @param type 类型 hbase, es
* @param key adapter key
* @param task 任务名对应配置文件名 mytest_user.yml
* @param params etl where条件参数, 为空全部导入,匹配etlCondition中的参数,使用;分号多个值
*/
@PostMapping("/etl/{type}/{key}/{task}")
public EtlResult etl(@PathVariable String type, @PathVariable String key, @PathVariable String task,
@RequestParam(name = "params", required = false) String params)
7.增量添加更新删除自动同步
- 如果以上操作在程序运行期间无任何异常则自动就开启了同步,可以手动添加更新删除一个记录试试。
- 如果出现提示表异常,可能是因为库太多某些库有问题
修改/deployer/conf/example/instance.properties的匹配策略
#只同步test库的全部表数据
canal.instance.filter.regex=test\\..*
如果解决不了则建议去看官方的issue,https://github.com/alibaba/canal/issues。