canal+kafka+go处理Mysql数据库增量信息

注意:canal使用java写的,需要安装jdk环境

canal介绍安装

说明

安装说明

  • 采用的是单节点的方式,集群可以参考官网。环境为centos7, 阿里云服务器
  • 将下载好的压缩包解压到指定的目录中,会有4个文件夹
    image.png
  • bin是执行的二进制文件,conf为配置文件,logs为日志文件
  • 打开conf/example/instance.properties文件配置数据库和kafka信息
    image.png
    image.png
    image.png
  • 注意:过滤表数据的配置 canal.instance.filter.regex 和 按照表名定义kafka主题的canal.instance.filter.regex配置是支持表达式的。例如:.*\..*是表示所有库的所有表,test\..*是test库的所有表,test.table1表示test库的table1表
  • 配置canal信息:打开conf目录下的canal.properties文件
    image.png
    image.png
    image.png
  • canal是支持连接池和kafka集群的,可以参考官网
  • 进入到bin目录下,执行./startup.sh bin目录下生成canal.pid,查看端口有11110-11112说明启动成功,前提是先启动kafka
  • 关闭canal是./stop.sh ,canal.pid文件将被删除

kafka安装

  • 从官网下载压缩文件,我安装的版本是kafka_2.13-2.6.0.tgz
  • 解压到指定的目录
  • kafka是依赖zookeeper的,可以使用文件中已经配置好的zookeeper,也可以单独安装
  • kafka的执行文件在bin目录下,提供一下几个命令
### 开启本地zookeeper指令(使用kafka已经配置好的)
./zookeeper-server-start.sh config/zookeeper.properties

### 开启kafka指令
./kafka-server-start.sh config/server.properties

### 查看指定topic信息, 例如查看topic为 example
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic example

### 监听指定topic消息客户端
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic example

### 发送消息到指定topic客户端
./kafka-console-producer.sh --topic=test --broker-list 127.0.0.1:9092

### 删除指定topic
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092  --delete --topic example

  • 说明9092是kafka的端口,2181是zookeeper的端口。可以通过指令 netstat -tunlp查看
  • kafka遇到的坑:本人的是在阿里云上不是的单节点kafka,如果在其他的地方通过公网去访问kafka是不行的,即便开启了端口映射,也不可以。参照网上的做法是:为该机器的ip申请一个域名,通过配置文件配置域名就可以在其他的通过域名加9092端口就可以访问了,配置文件说明,打开config/server.properties
    image.png
  • 如果要关闭kafka必须先关闭kafka服务再关闭zookeeper服务,关闭指令再bin目录下

操作的go代码

consumer.go

package consumer

import (
    "context"
    "github.com/Shopify/sarama"
)

type ConsumeTopic struct {
    //一个消费者组里包含几个消费者
    ConsumeNum int
    //消费者组监听的主题
    Topics []string
    //回调的Handler, 需要调用者自己实现
    Callback sarama.ConsumerGroupHandler
}

type consumer struct {
    //kafka地址集合: 例如[]string{域名:9092, ip:9093, ...}
    //addressSet []string
    //消费者组绑定主题: key为group主题id
    consumerGroupTopic map[string]ConsumeTopic
    //consumer配置
    consumerConfig *sarama.Config
    client         sarama.Client

    consumerGroup []sarama.ConsumerGroup
}

//创建消费者对象
//addressSet: kafka地址集合
//consumerGroupTopic: 消费者组信息
//consumerConfig: 消费者配置信息,如果为空就采用默认的配置
func CreateNewConsumer(addressSet []string, consumerGroupTopic map[string]ConsumeTopic, consumerConfig *sarama.Config) *consumer {
    consumer := consumer{
        //addressSet:         addressSet,
        consumerGroupTopic: consumerGroupTopic,
        consumerConfig:     consumerConfig,
    }
    //启用默认配置
    if consumer.consumerConfig == nil {
        consumer.consumerConfig = sarama.NewConfig()
        consumer.consumerConfig.Consumer.Return.Errors = false
        consumer.consumerConfig.Version = sarama.V2_6_0_0
        consumer.consumerConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
    }

    client, err := sarama.NewClient(addressSet, consumer.consumerConfig)
    if err != nil {
        panic(err)
    }
    consumer.client = client

    //开始
    consumer.init()
    return &consumer
}

//初始化
func (c *consumer) init() {
    for groupId, v := range c.consumerGroupTopic {
        consumerGroup, err := sarama.NewConsumerGroupFromClient(groupId, c.client)
        if err != nil {
            panic(err)
        }

        for i := 0; i < v.ConsumeNum; i++ {
            go c.consume(&consumerGroup, c.consumerGroupTopic[groupId].Topics, c.consumerGroupTopic[groupId].Callback)
        }

        c.consumerGroup = append(c.consumerGroup, consumerGroup)
    }
}

func (c *consumer) consume(group *sarama.ConsumerGroup, topics []string, consumerGroupHandler sarama.ConsumerGroupHandler) {
    ctx := context.Background()
    for {
        err := (*group).Consume(ctx, topics, consumerGroupHandler)
        if err != nil {
            panic(err)
        }
    }
}

//关闭
func (c *consumer) Close() {
    for _, v := range c.consumerGroup {
        v.Close()
    }
    c.client.Close()
}

consumer_test.go

package consumer

import (
    "fmt"
    "github.com/Shopify/sarama"
    "os"
    "os/signal"
    "testing"
)

type consumerGroupHandler struct {
    name string
}

func (h consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (h consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
    claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n",
            h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
        // 手动确认消息
        sess.MarkMessage(msg, "")
    }
    return nil
}

func TestCreateNewConsumer(t *testing.T) {
    cgt := map[string]ConsumeTopic{}
    cgt["id01"] = ConsumeTopic{
        ConsumeNum: 2,
        Topics:     []string{"read_book.wode"},
        Callback:   consumerGroupHandler{},
    }
    cClient := CreateNewConsumer([]string{"你的域名.com:9092"}, cgt, nil)

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    select {
    case <-signals:
    }
    cClient.Close()
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342