mysql通过canal同步数据到elasticsearch

开发中,我们会发现关系型数据库,对于数据分析来说,会有很大的不方便性,当数据量过大时,查询效率更加的慢。

首先我们能需要了解一下 主从数据库,请移步到 docker mysql8.0主从库

canal-server呢, 其实我认为canal就是类似从库的概念,监听主库的变化,

canal-adapter: 基于canal server订阅Mysql binglog日志增量同步数据的一款工具

闲话不说了,开始吧!

安装mysql数据库

  1. 新建mysql.cnf文件,存储到 D:\docker\mysql\master\conf文件夹下
[mysqld]
server-id=100
log-bin=mysql-bin
binlog_cache_size=1M
# binlog日志格式
binlog_format=row
  1. 启动
docker run -itd -p 3307:3306 --name mysql-master -v D:\docker\mysql\master\conf:/etc/mysql/conf.d -v D:\docker\mysql\master\data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 mysql:8.0
  1. 修改mysql.cnf文件权限
docker exec -it mysql-master chmod 644 /etc/mysql/conf.d/mysql.cnf
  1. 重启

安装 canal-server

  1. 下载
docker pull canal/canal-server:v1.1.5
  1. 启动临时容器
docker run --rm --name canal canal/canal-server:v1.1.5
  1. 将容器中的instance.properties 复制到本地 D:\docker\canal\server文件夹下
docker cp canal:/home/admin/canal-server/conf/example/instance.properties .
  1. 编辑instance.properties, 其他保持不变
## server_id 与mysql数据库不要设置一样
canal.instance.mysql.slaveId=300

# mysql数据库ip与端口
canal.instance.master.address=192.168.1.8:3307

# 访问主库的用户名密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
  1. 启动canal-server
docker run -p 11111:11111 --name canal -v D:\docker\canel\server\conf\instance.properties:/home/admin/canal-server/conf/example/instance.properties  -v D:\docker\canel\server\logs:/home/admin/canal-server/logs -itd canal/canal-server:v1.1.5
  1. 验证启动成功
docker logs -f canal
...
==> INIT DONE
==> RUN /home/admin/app.sh
==> START ...
start canal ...
start canal successful
==> START SUCCESSFUL ...

在mysql应用中创建一个数据库

CREATE DATABASE `canaltest` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci';

验证canal是否正常连接

PS D:\docker\canel> docker exec -it canal tail -1f canal-server/logs/example/example.log
2023-09-08 21:36:08.627 [MultiStageCoprocessor-other-example-0] 
WARN  c.a.o.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta - 
dup apply for sql : 
CREATE DATABASE `canaltest` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'

安装canal-adapter

  1. 下载镜像
docker pull slpcat/canal-adapter:v1.1.5
  1. 临时容器
docker run --rm --name adapter-tmp slpcat/canal-adapter:v1.1.5
  1. 复制application.yml到本地 D:\docker\canel\adapter\conf
docker cp adapter-tmp:/opt/canal-adapter/conf/application.yml .
  1. 修改文件内容
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal-server的ip:端口
    canal.tcp.server.host: 192.168.1.8:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      # 数据库来源,用户名、密码 canaltest
      url: jdbc:mysql://192.168.1.8:3307/canaltest?useUnicode=true
      username: canal
      password: canal
  canalAdapters:
  - instance: example # canal-server 实例名称,这个跟canal-server的instance.properties的文件夹名称一致就好
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7  #这个是容器内conf目录es7文件夹名称
        hosts: 192.168.1.8:9210 # es对外可访问地址
        properties:
          mode: rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: docker-cluster  # 集群的名称,docker启动单例es时,默认为docker-cluster
  1. 启动adapter
docker run -itd --name canal-adapter -p 8081:8081 -v D:\docker\canel\adapter\conf\application.yml:/opt/canal-adapter/conf/application.yml slpcat/canal-adapter:v1.1.5
  1. 新增一个表映射索引文件
docker exec -it canal-adapter /bin/bash

cd /opt/canal-adapter/conf/es7/
touch my_test.yml

文件内容如下

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  # es中索引
  _index: mytest_user
  _id: _id
  _type: _doc
  upsert: true
  sql: "
      SELECT
              id as _id,
              id,
              name,
              role_id,
              c_time
      FROM
              user
      "
  commitBatch: 3000

Mysql 创建表,追加数据

-- 创建数据库, 因前面配置canal-adapter时,配置的数据库为canaltest
CREATE DATABASE `canaltest` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci';

-- 创建表
CREATE TABLE `user` (
  `id` int NOT NULL,
  `name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
  `role_id` int DEFAULT NULL,
  `c_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 新增数据
INSERT INTO `canaltest`.`user` (`id`, `name`, `role_id`, `c_time`) VALUES (2, 'haha', 1, '2023-09-05 19:53:31');

ES中创建索引

PUT localhost:9210/mytest_user
{
    "mappings": {
        "properties": {
            "c_time": {
                "type": "date",
                "format": "date_optional_time||epoch_millis"
            },
            "name": {
                "type": "text"
            },
            "role_id": {
                "type": "long"
            },
            "id": {
                "type": "long"
            }
        }
    }
}

查询ES中是否存在数据

GET localhost:9210/mytest_user/_search
{
    "took": 12,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "mytest_user",
                "_id": "1",
                "_score": 1.0,
                "_source": {
                    "id": 1,
                    "name": "haha",
                    "role_id": 1,
                    "c_time": "2023-09-05T19:53:31+08:00"
                }
            }
        ]
    }
}

至此, msyql中数据变化,canal都会将变化数据同步到es中

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容