一、首先需要安装canal-server, 这里使用docker来运行canal-server
- 下载最近的release版本 https://github.com/alibaba/canal/releases,解压后拷贝
conf
目录到新的目录/Users/huaan9527/Documents/docker/canal-server/conf
- 根据canal-server的文档
修改 canal.properties
:
canal.serverMode = kafka # tcp, kafka, RocketMQ
canal.destinations = highso # instance的名字 多个使用逗号分隔
canal.mq.servers = 192.168.16.xxx:9092 #kafka集群的地址
修改instance.properties
canal.instance.master.address=192.168.16.xxx:3306 #需要监控的mysql数据库地址
canal.instance.dbUsername=xxx
canal.instance.dbPassword=xxx
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName =highso
canal.instance.filter.regex=highso\\.crmtest,highso\\.test #监控的数据库表
canal.mq.topic=common_mq_highso #kafka 中topic的名字
- 启动canal-server容器的脚本
#!/bin/bash
function usage() {
echo "Usage:"
echo " run.sh [CONFIG]"
echo "example:"
echo " run.sh -e canal.instance.master.address=127.0.0.1:3306 \\"
echo " -e canal.instance.dbUsername=canal \\"
echo " -e canal.instance.dbPassword=canal \\"
echo " -e canal.instance.connectionCharset=UTF-8 \\"
echo " -e canal.instance.tsdb.enable=true \\"
echo " -e canal.instance.gtidon=false \\"
echo " -e canal.instance.filter.regex=.*\\\\\\..* "
exit
}
function check_port() {
local port=$1
local TL=$(which telnet)
if [ -f $TL ]; then
data=`echo quit | telnet 127.0.0.1 $port| grep -ic connected`
echo $data
return
fi
local NC=$(which nc)
if [ -f $NC ]; then
data=`nc -z -w 1 127.0.0.1 $port | grep -ic succeeded`
echo $data
return
fi
echo "0"
return
}
function getMyIp() {
case "`uname`" in
Darwin)
myip=`echo "show State:/Network/Global/IPv4" | scutil | grep PrimaryInterface | awk '{print $3}' | xargs ifconfig | grep inet | grep -v inet6 | awk '{print $2}'`
;;
*)
myip=`ip route get 1 | awk '{print $NF;exit}'`
;;
esac
echo $myip
}
NET_MODE=""
case "`uname`" in
Darwin)
bin_abs_path=`cd $(dirname $0); pwd`
;;
Linux)
bin_abs_path=$(readlink -f $(dirname $0))
NET_MODE="--net=host"
;;
*)
NET_MODE="--net=host"
bin_abs_path=`cd $(dirname $0); pwd`
;;
esac
BASE=${bin_abs_path}
if [ "$1" == "-h" ] ; then
usage
elif [ "$1" == "help" ] ; then
usage
fi
# DATA="$BASE/data"
# mkdir -p $DATA
VOLUMNS="-v $BASE/conf:/home/admin/canal-server/conf -v $BASE/logs:/home/admin/canal-server/logs"
PORTLIST="8000 2222 11111 11112"
PORTS=""
for PORT in $PORTLIST ; do
#exist=`check_port $PORT`
exist="0"
if [ "$exist" == "0" ]; then
PORTS="$PORTS -p $PORT:$PORT"
else
echo "port $PORT is used , pls check"
exit 1
fi
done
MEMORY="-m 2048m"
LOCALHOST=`getMyIp`
cmd="docker run -d -it -h $LOCALHOST --name=canal-server $VOLUMNS $NET_MODE $PORTS $MEMORY canal/canal-server:v1.1.2"
echo $cmd
eval $cmd
二、配置canal.adapter 实现 mysql数据库3306 同步 到 mysql数据库3308
- 下载canal.adapter 解压 https://github.com/alibaba/canal/releases
- 编辑
application.yml
server:
port: 8081
logging:
level:
org.springframework: INFO
com.alibaba.otter.canal.client.adapter.hbase: DEBUG
com.alibaba.otter.canal.client.adapter.es: DEBUG
com.alibaba.otter.canal.client.adapter.rdb: DEBUG
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mqServers: 192.168.16.xxx:9092 #or rocketmq
flatMessage: true
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
mode: kafka # tcp kafka rocketMQ
srcDataSources:
demoSrc:
url: jdbc:mysql://192.168.16.xxx:3306/demo?useUnicode=true
username: xxx
password: xxx
canalAdapters:
- instance: common_mq_highso # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: rdb
key: demoDes
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://192.168.16.xxx:3308/demo?useUnicode=true
jdbc.username: xxx
jdbc.password: xxx
- 编辑rdb目录下面表的映射文件,数据库demo, 表 test
dataSourceKey: demoSrc
destination: common_mq_highso
outerAdapterKey: demoDes
concurrent: true
dbMapping:
database: demo
table: test
targetTable: demo.test
targetPk:
id: id
mapAll: true
- 启动canal.adapter :
./startup.sh
三、测试
- 在3306数据库中插入一条数据库,检查是否在3308中同步
- 更新
- 删除
- 批量插入
- 批量更新
- 批量删除