canal学习-1

准备6台机器机
假设ip是192.168.1.181-192.168.1.185

181-183(搭建zk集群)

安装java(跳过)
wget http://apache.claz.org/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5-bin.tar.gz
tar zxf apache-zookeeper-3.5.5-bin.tar.gz
cd apache-zookeeper-3.5.5-bin
cp conf/zoo_sample.cfg conf/zoo.cfg
修改dataDir为比如/data/zookeeper(确保已经创建并且有权限)
添加
server.1=192.168.1.181:2888:3888
server.2=192.168.1.182:2888:3888
server.3=192.168.1.183:2888:3888
echo  1 > /data/zookeeper/myid(三台机器依次为1,2,3)
启动./bin/zkServer.sh start

184(mysql)

安装mysql(跳过)

/etc/mysql/mysql.conf.d/mysqld.cnf的[mysqld]中添加
log-bin=mysql-bin
binlog-format=ROW
server_id=1 
修改bind-address 0.0.0.0
mysql -uroot -pyourpassword
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
create databse mydb;
 create table mytable(id bigint not null auto_increment primary key,name varchar(100) not null default '');

185-186(canal-server集群)
···
安装java(跳过)
wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
tar zxf canal.deployer-1.1.3.tar.gz
修改conf/canal.properties
canal.id两台不一样
canal.ip 比如192.168.1.185
canal.zkServers =192.168.1.181:2181,192.168.1.182:2181,192.168.1.183:2181

修改conf/example/instance.properties
canal.instance.master.address=192.168.1.189:3306
···

测试(代码取自canal-go)

package main

import (
    "github.com/CanalClient/canal-go/client"
    "time"
    "fmt"
    "log"
    "os"
    protocol "github.com/CanalClient/canal-go/protocol"
    "github.com/golang/protobuf/proto"


)
func main() {

    fmt.Println("cluster main")
    conn, _ := createConn()
    conn.Subscribe(".*\\\\..*")
    n := 0
    for {
        message, err := conn.Get(100, nil, nil)
        if err != nil {
            log.Println(err)
            continue
        }
        batchId := message.Id
        if batchId == -1 || len(message.Entries) <= 0 {
            time.Sleep(3000 * time.Millisecond)
            fmt.Println("===没有数据了===")
            n++
            if n > 100 {
                break
            }
            continue
        }
        printEntry(message.Entries)
    }

    conn.DisConnection()
}

func createConn() (conn *client.ClusterCanalConnector, err error) {

    cn, err := client.NewCanalClusterNode("example", []string{"192.168.1.181:2181","192.168.1.182:2181","192.168.1.183:2181"}, time.Second*10)
    fmt.Printf("err=%v,cn=%+v\n\n", err, cn)

    addr, port, err := cn.GetNode()
    fmt.Printf("addr=%s, port=%d, err=%v", addr, port, err)

    conn = client.NewClusterCanalConnector(cn, "", "", "example", 60000, 60*60*1000)
    err = conn.Connect()
    fmt.Printf("err=%v,cluCanalConn=%+vn\n", err, conn)
    return
}


func printEntry(entrys []protocol.Entry) {

    for _, entry := range entrys {
        if entry.GetEntryType() == protocol.EntryType_TRANSACTIONBEGIN || entry.GetEntryType() == protocol.EntryType_TRANSACTIONEND {
            continue
        }
        rowChange := new(protocol.RowChange)

        err := proto.Unmarshal(entry.GetStoreValue(), rowChange)
        checkError(err)
        if rowChange != nil {
            eventType := rowChange.GetEventType()
            header := entry.GetHeader()
            fmt.Println(fmt.Sprintf("================> binlog[%s : %d],name[%s,%s], eventType: %s", header.GetLogfileName(), header.GetLogfileOffset(), header.GetSchemaName(), header.GetTableName(), header.GetEventType()))

            for _, rowData := range rowChange.GetRowDatas() {
                if eventType == protocol.EventType_DELETE {
                    printColumn(rowData.GetBeforeColumns())
                } else if eventType == protocol.EventType_INSERT {
                    printColumn(rowData.GetAfterColumns())
                } else {
                    fmt.Println("-------> before")
                    printColumn(rowData.GetBeforeColumns())
                    fmt.Println("-------> after")
                    printColumn(rowData.GetAfterColumns())
                }
            }
        }
    }
}

func printColumn(columns []*protocol.Column) {
    for _, col := range columns {
        fmt.Println(fmt.Sprintf("%s : %s  update= %t", col.GetName(), col.GetValue(), col.GetUpdated()))
    }
}

func checkError(err error) {
    if err != nil {
        fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
        os.Exit(1)
    }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容