1.canal同步数据从mysql -同步到mysql
1.1同步场景架构
canal-同步架构图.png
1.2 canal 的简介详细请看
https://www.jianshu.com/p/3df39e34f4bf
1.3:给msyql 添加账号:
1.4. mysql 安装省略
1.5 为canal -admin 创建用户
CREATE USER canaladmin IDENTIFIED BY 'canaladmin%123';
GRANT ALL ON canal_manager.* TO 'canaladmin'@'%';
FLUSH PRIVILEGES;
1.6 为canal 创建用户:
CREATE USER canal IDENTIFIED BY 'canal%123';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
1.7 mysql 的配置:
打开mysql 的配置文件:
vi /etc/my.cnf
server-id=1
log-bin=mysql-bin
binlog-format=ROW
binlog-ignore-db=information_schema
binlog-ignore-db=mysql
binlog-ignore-db=performance_schema
binlog-ignore-db=sys
log-bin用于指定binlog日志文件名前缀,默认存储在/var/lib/mysql 目录下。
server-id用于标识唯一的数据库,不能和别的服务器重复,建议使用ip的最后一段,默认值也不可以。
binlog-ignore-db:表示同步的时候忽略的数据库。
binlog-do-db:指定需要同步的数据库(如果没有此项,表示同步所有的库)。
2.下载地址:
下载地址:
https://github.com/alibaba/canal/releases
-rw-r--r-- 1 root root 165805905 3月 12 15:25 canal.adapter-1.1.5-SNAPSHOT.tar.gz
-rw-r--r-- 1 root root 38060846 3月 12 15:25 canal.admin-1.1.5-SNAPSHOT.tar.gz
-rw-r--r-- 1 root root 58810588 3月 12 15:25 canal.deployer-1.1.5-SNAPSHOT.tar.gz
[root@slave01 canal]#
2.1 配置环境变量
export CANAL_ADMIN_HOME=/opt/operation/canal/canaladmin
export PATH=${CANAL_ADMIN_HOME}/bin:$PATH
export CANAL_SERVER_HOME=/opt/operation/canal/canal-service
export PATH=${CANAL_SERVER_HOME}/bin:$PATH
2.3 安装canal-admin
下载解压:
mkdir -p /opt/operation/canal/canaladmin;
tar -zxvf canal.admin-1.1.5.tar.gz -C app/canaladmin;
2.4 修改配置:application.yml
路径:/opt/operation/canal/canaladmin/conf/application.yml 内容如下:
主要修改数据库的账号:账号在mysql 里面新建
[root@slave01 conf]# pwd
/opt/operation/canal/canaladmin/conf
[root@slave01 conf]# vim application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: slave01.pxx.com:3306
database: canal_manager
username: canaladmin
password: canaladmin%123
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
2.5 添加驱动包:
ln -s /usr/share/java/mysql-connector-java-8.0.18.jar /opt/operation/canal/canaladmin/lib/mysql-connector-java-8.0.18.jar
2.6 初始化元数据
1.mysql -uroot -p
source /opt/operation/canal/canaladmin/conf/canal_manager.sql
2.7 启动 canal-admin 命令
/opt/operation/canal/canaladmin/bin/startup.sh
2.8 日志位置在log下面
2.9 访问地址:
http://slave01.com:8089
canal-serviceui.png
3. 安装 canal-service
3.1 解压安装:
mkdir -p app/canal-server;
tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/operation/canal/canal-service/conf
3.2 配置canal service
cd /opt/operation/canal/canal-service/conf/;
mv canal.properties canal.properties.bak;
mv canal_local.properties canal.properties;
vi /opt/operation/canal/canal-service/conf/canal.properties;
## admmin 的地址
canal.admin.manager = slave01.com:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
~
3.3 添加驱动包:
ln -s /usr/share/java/mysql-connector-java-8.0.18.jar /opt/operation/canal/canal-service/lib/mysql-connector-java-8.0.18.jar
3.4启动canal-service
/opt/operation/canal/canal-service/bin/startup.sh
/opt/operation/canal/canal-service/bin/stop.sh
/opt/operation/canal/canal-service/bin/restart.s
3.5 页面操作:
canal-instance.png
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=slave01.com:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal%123
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
4.配置canal 适配器:
/opt/operation/canal/canaladapter
4.1 解压安装:
mkdir -p /opt/operation/canal/canaladapter
tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/operation/canal/canaladapter/
4.2 修改配置: application.yml
[root@slave01 conf]# pwd
/opt/operation/canal/canaladapter/conf
[root@slave01 conf]# ll
总用量 12
-rwxrwxrwx 1 root root 2905 3月 12 19:10 application.yml
-rwxrwxrwx 1 root root 183 3月 12 17:54 bootstrap.yml
drwxr-xr-x 2 root root 86 3月 12 15:23 es6
drwxr-xr-x 2 root root 86 3月 12 15:23 es7
drwxr-xr-x 2 root root 40 3月 12 15:23 hbase
drwxr-xr-x 2 root root 31 3月 12 15:23 kudu
-rwxrwxrwx 1 root root 2172 3月 12 18:53 logback.xml
drwxrwxrwx 2 root root 30 3月 12 15:23 META-INF
drwxrwxrwx 2 root root 29 3月 12 19:30 rdb
[root@slave01 conf]# vi application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: slave01.com:11111
# canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://slave01.com:3306/test?useUnicode=true
username: canal
password: canal%123
canalAdapters:
- instance: slave01 # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://192.168.1.154:3306/mytest2?useUnicode=true
jdbc.username: root
jdbc.password: Mysql@2021
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
# - name: es
# hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
# properties:
# mode: transport # or rest
# # security.auth: test:123456 # only used for rest mode
# cluster.name: elasticsearch
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
4.3 配置对应的映射文件:
[root@slave01 conf]# pwd
/opt/operation/canal/canaladapter/conf
[root@slave01 conf]# ll
总用量 12
-rwxrwxrwx 1 root root 2905 3月 12 19:10 application.yml
-rwxrwxrwx 1 root root 183 3月 12 17:54 bootstrap.yml
drwxr-xr-x 2 root root 86 3月 12 15:23 es6
drwxr-xr-x 2 root root 86 3月 12 15:23 es7
drwxr-xr-x 2 root root 40 3月 12 15:23 hbase
drwxr-xr-x 2 root root 31 3月 12 15:23 kudu
-rwxrwxrwx 1 root root 2172 3月 12 18:53 logback.xml
drwxrwxrwx 2 root root 30 3月 12 15:23 META-INF
drwxrwxrwx 2 root root 29 3月 15 16:31 rdb
[root@slave01 conf]# cd rdb/
[root@slave01 rdb]# ll
总用量 4
-rwxr-xr-x 1 root root 513 3月 12 19:17 mytest_user.yml
[root@slave01 rdb]#
*********************************************************************************
[root@slave01 rdb]# vi mytest_user.yml
dataSourceKey: defaultDS
destination: slave01
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: test
table: test02
targetTable: mytest2.test
targetPk:
id: id
# mapAll: true
targetColumns:
id:
name:
etlCondition: "where c_time>={}"
commitBatch: 3 # 批量提交的大小
## Mirror schema synchronize config
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
#dbMapping:
# mirrorDb: true
# database: mytest
日志:
2021-03-15 16:47:56.525 [pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.rdb.support.BatchExecutor - Batch executor commit 1 rows
2021-03-15 16:48:08.611 [pool-7-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":1,"name":"flink"}],"database":"test","destination":"slave01","es":1615798088000,"groupId":"g1","isDdl":false,"old":null,"pkNames":[],"sql":"","table":"test02","ts":1615798088610,"type":"INSERT"}
2021-03-15 16:48:08.625 [pool-2-thread-1] TRACE c.a.o.canal.client.adapter.rdb.service.RdbSyncService - Insert into target table, sql: INSERT INTO mytest2.test (`id`,`name`) VALUES (?,?)
2021-03-15 16:48:08.627 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":1,"name":"flink"},"database":"test","destination":"slave01","old":null,"table":"test02","type":"INSERT"}
2021-03-15 16:48:08.630 [pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.rdb.support.BatchExecutor - Batch executor commit 1 rows
2021-03-15 16:48:09.136 [pool-7-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":1,"name":"flink","create_time":null,"age":null}],"database":"db_test","destination":"slave01","es":1615798088000,"groupId":"g1","isDdl":false,"old":null,"pkNames":[],"sql":"","table":"test02","ts":1615798089136,"type":"INSERT"}
2021-03-15 16:48:57.948 [pool-7-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":null,"database":"test","destination":"slave01","es":1615798137000,"groupId":"g1","isDdl":false,"old":null,"pkNames":[],"sql":"DROP TABLE `test0002` /* generated by server */","table":"test0002","ts":1615798137947,"type":"ERASE"}
4.4 启动项目:
/opt/operation/canal/canaladapter/bin/startup.sh
查看同步情况:
canal-结果.png