准备6台机器机
假设ip是192.168.1.181-192.168.1.185
181-183(搭建zk集群)
安装java(跳过)
wget http://apache.claz.org/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5-bin.tar.gz
tar zxf apache-zookeeper-3.5.5-bin.tar.gz
cd apache-zookeeper-3.5.5-bin
cp conf/zoo_sample.cfg conf/zoo.cfg
修改dataDir为比如/data/zookeeper(确保已经创建并且有权限)
添加
server.1=192.168.1.181:2888:3888
server.2=192.168.1.182:2888:3888
server.3=192.168.1.183:2888:3888
echo 1 > /data/zookeeper/myid(三台机器依次为1,2,3)
启动./bin/zkServer.sh start
184(mysql)
安装mysql(跳过)
/etc/mysql/mysql.conf.d/mysqld.cnf的[mysqld]中添加
log-bin=mysql-bin
binlog-format=ROW
server_id=1
修改bind-address 0.0.0.0
mysql -uroot -pyourpassword
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
create databse mydb;
create table mytable(id bigint not null auto_increment primary key,name varchar(100) not null default '');
185-186(canal-server集群)
···
安装java(跳过)
wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
tar zxf canal.deployer-1.1.3.tar.gz
修改conf/canal.properties
canal.id两台不一样
canal.ip 比如192.168.1.185
canal.zkServers =192.168.1.181:2181,192.168.1.182:2181,192.168.1.183:2181
修改conf/example/instance.properties
canal.instance.master.address=192.168.1.189:3306
···
测试(代码取自canal-go)
package main
import (
"github.com/CanalClient/canal-go/client"
"time"
"fmt"
"log"
"os"
protocol "github.com/CanalClient/canal-go/protocol"
"github.com/golang/protobuf/proto"
)
func main() {
fmt.Println("cluster main")
conn, _ := createConn()
conn.Subscribe(".*\\\\..*")
n := 0
for {
message, err := conn.Get(100, nil, nil)
if err != nil {
log.Println(err)
continue
}
batchId := message.Id
if batchId == -1 || len(message.Entries) <= 0 {
time.Sleep(3000 * time.Millisecond)
fmt.Println("===没有数据了===")
n++
if n > 100 {
break
}
continue
}
printEntry(message.Entries)
}
conn.DisConnection()
}
func createConn() (conn *client.ClusterCanalConnector, err error) {
cn, err := client.NewCanalClusterNode("example", []string{"192.168.1.181:2181","192.168.1.182:2181","192.168.1.183:2181"}, time.Second*10)
fmt.Printf("err=%v,cn=%+v\n\n", err, cn)
addr, port, err := cn.GetNode()
fmt.Printf("addr=%s, port=%d, err=%v", addr, port, err)
conn = client.NewClusterCanalConnector(cn, "", "", "example", 60000, 60*60*1000)
err = conn.Connect()
fmt.Printf("err=%v,cluCanalConn=%+vn\n", err, conn)
return
}
func printEntry(entrys []protocol.Entry) {
for _, entry := range entrys {
if entry.GetEntryType() == protocol.EntryType_TRANSACTIONBEGIN || entry.GetEntryType() == protocol.EntryType_TRANSACTIONEND {
continue
}
rowChange := new(protocol.RowChange)
err := proto.Unmarshal(entry.GetStoreValue(), rowChange)
checkError(err)
if rowChange != nil {
eventType := rowChange.GetEventType()
header := entry.GetHeader()
fmt.Println(fmt.Sprintf("================> binlog[%s : %d],name[%s,%s], eventType: %s", header.GetLogfileName(), header.GetLogfileOffset(), header.GetSchemaName(), header.GetTableName(), header.GetEventType()))
for _, rowData := range rowChange.GetRowDatas() {
if eventType == protocol.EventType_DELETE {
printColumn(rowData.GetBeforeColumns())
} else if eventType == protocol.EventType_INSERT {
printColumn(rowData.GetAfterColumns())
} else {
fmt.Println("-------> before")
printColumn(rowData.GetBeforeColumns())
fmt.Println("-------> after")
printColumn(rowData.GetAfterColumns())
}
}
}
}
}
func printColumn(columns []*protocol.Column) {
for _, col := range columns {
fmt.Println(fmt.Sprintf("%s : %s update= %t", col.GetName(), col.GetValue(), col.GetUpdated()))
}
}
func checkError(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
}