Canal同步es

CanalAdapter同步ES7

预准备

  1. Mysql5.7
  2. ElasticSearch7.12.1集群或单击(7.x +)
  3. 一个数据库和一个表结构

1.下载

下载Canal1.1.5,支持es7

canal.deployer-1.1.5.tar.gz对应的是canal的server端,负责订阅并解析Mysql-Binlog
canal.adapter-1.1.5.tar.gz对应的是适配器,负责将server的binlog转换并发送给对应的应用
canal.admin-1.1.5.tar.gz一个可视化webui可以不安装
额外需要下载v1.1.5-alpha-2快照版本的canal.adapter-1.1.5.tar.gz(release1.1.5版本的jar包有bug)


分别解压缩后,将v1.1.5-alpha-2解压缩文件夹下plugin文件夹中的 client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar 替换掉release版本的plugin文件的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar

2. 开启Mysql-Binlog

进入mysql终端执行

show variables like 'log_bin';

如果是ON就代表已经开启。
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+

3. 修改配置

  1. 先修改canal.deployer的配置
    参考官方中文文档: https://github.com/alibaba/canal/wiki/QuickStart
  2. 修改canal.adapter配置 conf/application.yml
    更多请参考官方中文文档:https://github.com/alibaba/canal/tree/master/client-adapter
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
   # 省略.... 默认即可
  srcDataSources:   # 注意打开注释,然后要和上面对齐否则会报错(但是却又不告诉你是格式问题..)
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: es7 # 这里要改为es7 他会去es7下去找配置
        key: exampleKey #这里是全量导入时的taskid
        hosts: es1:9201,es2:9202,es3:9203 # 我这里是集群,如果单机写一个就行ip:port
        properties:
          mode: rest # restful 模式
          cluster.name: es-cluster # 集群名字 GET _cat/health?v 可以看到对应的名字
  1. 修改es7下模板文件 conf/es7/mytest_user.yml
dataSourceKey: defaultDS #和上面canal.conf.srcDataSources.defaultDS要一样
outerAdapterKey: exampleKey #和上面canal.conf.canalAdapters.instance.groups.outerAdapters.key要一样
destination: example #和上面canal.conf.canalAdapters.instance要一样
groupId: g1 #和上面canal.conf.canalAdapters.instance.groups.groupId要一样
esMapping:
  _index: test #索引名称
  _id: _id #documentid
  _type: _doc # type
#  upsert: true
#  pk: id
  sql: "select a.id as _id, a.name_cn, a.name_en, a.email
        from user a"  # 查询的sql返回的结构要使用as别名和es的filed对应
#  objFields:
#    _labels: array:;
  commitBatch: 3000 #批量提交数量

4. 创建对应的索引

PUT test
{
  "settings":{
        "number_of_shards":1,
        "number_of_replicas":2
    },
    "mappings":{
        "properties":{
            "name_cn" : {
          "type" : "keyword"
        },
        "name_en" : {
          "type" : "keyword"
        },
        "email" : {
          "type" : "keyword"
        }
      }
        }
    }

5. 启动

均在对应目录下 /bin/startup.sh

  1. 先启动deployer,查看/logs/canal/canal.log 无异常后再查看/logs/example/example.log均无异常则启动成功
  2. 启动adapter,查看/logs/adapter/adapter.log 无异常则启动成功

6.首次全量导入

curl http://localhost:8081/etl/es7/exampleKey/mytest_user.yml -X POST

 //执行完对应在/logs/adapter/adapter.log会有批量导入成功的日志
 start etl to import data to index: test
 数据全量导入完成, 一共导入 1585 条数据, 耗时: 643
 /**
     * ETL curl http://127.0.0.1:8081/etl/rdb/oracle1/mytest_user.yml -X POST
     *
     * @param type 类型 hbase, es
     * @param key adapter key
     * @param task 任务名对应配置文件名 mytest_user.yml
     * @param params etl where条件参数, 为空全部导入,匹配etlCondition中的参数,使用;分号多个值
     */
    @PostMapping("/etl/{type}/{key}/{task}")
    public EtlResult etl(@PathVariable String type, @PathVariable String key, @PathVariable String task,
                         @RequestParam(name = "params", required = false) String params) 

7.增量添加更新删除自动同步

  1. 如果以上操作在程序运行期间无任何异常则自动就开启了同步,可以手动添加更新删除一个记录试试。
  2. 如果出现提示表异常,可能是因为库太多某些库有问题

修改/deployer/conf/example/instance.properties的匹配策略

#只同步test库的全部表数据
canal.instance.filter.regex=test\\..*

如果解决不了则建议去看官方的issue,https://github.com/alibaba/canal/issues

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容