【Canal源码分析】Canal Instance启动和停止

一、序列图

1.1 启动

instance启动.png

1.2 停止

instance停止.png

二、源码分析

2.1 启动

这部分代码其实在ServerRunningMonitor的start()方法中。针对不同的destination,启动不同的CanalInstance。主要的方法在于initRunning()。

private void initRunning() {
    if (!isStart()) {
        return;
    }

    String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
    // 序列化
    byte[] bytes = JsonUtils.marshalToByte(serverData);
    try {
        mutex.set(false);
        zkClient.create(path, bytes, CreateMode.EPHEMERAL);
        activeData = serverData;
        processActiveEnter();// 触发一下事件
        mutex.set(true);
    } catch (ZkNodeExistsException e) {
        bytes = zkClient.readData(path, true);
        if (bytes == null) {// 如果不存在节点,立即尝试一次
            initRunning();
        } else {
            activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
        }
    } catch (ZkNoNodeException e) {
        zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
        initRunning();
    }
}

首先在zk中新增一个临时节点,表示的是正在运行destination的ip和端口,然后触发一下processActiveEnter()。我们主要看下这个方法,在controller启动时定义的。

public void processActiveEnter() {
    try {
        MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
        embededCanalServer.start(destination);
    } finally {
        MDC.remove(CanalConstants.MDC_DESTINATION);
    }
}

public void start(final String destination) {
    final CanalInstance canalInstance = canalInstances.get(destination);
    if (!canalInstance.isStart()) {
        try {
            MDC.put("destination", destination);
            canalInstance.start();
            logger.info("start CanalInstances[{}] successfully", destination);
        } finally {
            MDC.remove("destination");
        }
    }
}

主要在embededCanalServer.start中,我们看下这个canalInstance.start(),跟踪到AbstractCanalInstance。

2.1.1 启动metaManager

在默认的instance配置文件中,我们选择的metaManager是PeriodMixedMetaManager,定时(默认1s)刷新数据到zk中,所以我们主要关注这个类的start方法。这个类继承了MemoryMetaManager,首先启动一个MemoryMetaManager,然后再启动一个ZooKeeperMetaManager。

2.1.1.1 获取所有destination和client

destinations = MigrateMap.makeComputingMap(new Function<String, List<ClientIdentity>>() {

    public List<ClientIdentity> apply(String destination) {
        return zooKeeperMetaManager.listAllSubscribeInfo(destination);
    }
});

从/otter/canal/destinations/{destination}获取所有的client信息,返回的内容是List<ClientIdentity>,包括了destination、clientId、filter等等。

2.1.1.2 获取client指针cursor

根据ClientIdentity去zk获取指针,从zk的/otter/canal/destinations/{destination}/{clientId}/cursor下面去获取,返回的内容是个LogPosition。

cursors = MigrateMap.makeComputingMap(new Function<ClientIdentity, Position>() {

    public Position apply(ClientIdentity clientIdentity) {
        Position position = zooKeeperMetaManager.getCursor(clientIdentity);
        if (position == null) {
            return nullCursor; // 返回一个空对象标识,避免出现异常
        } else {
            return position;
        }
    }
});

有可能返回一个空。

2.1.1.3 获取批次batch

创建一个基于内存的MemoryClientIdentityBatch,包含位点的start、end、ack信息。然后从zk节点/otter/canal/destinations/{destination}/{clientId}/mark获取,取出来的数据进行排序,然后从/otter/canal/destinations/{destination}/{clientId}/mark/{batchId}中取出PositionRange这个类,描述的是一个position的范围。

batches = MigrateMap.makeComputingMap(new Function<ClientIdentity, MemoryClientIdentityBatch>() {

    public MemoryClientIdentityBatch apply(ClientIdentity clientIdentity) {
        // 读取一下zookeeper信息,初始化一次
        MemoryClientIdentityBatch batches = MemoryClientIdentityBatch.create(clientIdentity);
        Map<Long, PositionRange> positionRanges = zooKeeperMetaManager.listAllBatchs(clientIdentity);
        for (Map.Entry<Long, PositionRange> entry : positionRanges.entrySet()) {
            batches.addPositionRange(entry.getValue(), entry.getKey()); // 添加记录到指定batchId
        }
        return batches;
    }
});

2.1.1.4 启动定时刷zk任务

// 启动定时工作任务
executor.scheduleAtFixedRate(new Runnable() {

    public void run() {
        List<ClientIdentity> tasks = new ArrayList<ClientIdentity>(updateCursorTasks);
        for (ClientIdentity clientIdentity : tasks) {
            try {
                // 定时将内存中的最新值刷到zookeeper中,多次变更只刷一次
                zooKeeperMetaManager.updateCursor(clientIdentity, getCursor(clientIdentity));
                updateCursorTasks.remove(clientIdentity);
            } catch (Throwable e) {
                // ignore
                logger.error("period update" + clientIdentity.toString() + " curosr failed!", e);
            }
        }
    }
}, period, period, TimeUnit.MILLISECONDS);

定时刷新position到zk后,从任务中删除。刷新的频率为1s。

2.1.2 启动alarmHandler

这块比较简单。

if (!alarmHandler.isStart()) {
    alarmHandler.start();
}

其实默认是LogAlarmHandler,用于发送告警信息的。

2.1.3 启动eventStore

启动EventStore,默认是MemoryEventStoreWithBuffer。start方法也比较简单。

public void start() throws CanalStoreException {
    super.start();
    if (Integer.bitCount(bufferSize) != 1) {
        throw new IllegalArgumentException("bufferSize must be a power of 2");
    }

    indexMask = bufferSize - 1;
    entries = new Event[bufferSize];
}

2.1.4 启动eventSink

这块默认是EntryEventSink。这块也不复杂。

public void start() {
    super.start();
    Assert.notNull(eventStore);

    for (CanalEventDownStreamHandler handler : getHandlers()) {
        if (!handler.isStart()) {
            handler.start();
        }
    }
}

正常的启动,将running状态置为true。

2.1.5 启动eventParser

if (!eventParser.isStart()) {
    beforeStartEventParser(eventParser);
    eventParser.start();
    afterStartEventParser(eventParser);
}

我们分别看下。

2.1.5.1 beforeStartEventParser

protected void beforeStartEventParser(CanalEventParser eventParser) {

    boolean isGroup = (eventParser instanceof GroupEventParser);
    if (isGroup) {
        // 处理group的模式
        List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
        for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
            startEventParserInternal(singleEventParser, true);
        }
    } else {
        startEventParserInternal(eventParser, false);
    }
}

判断是不是集群的parser(用于分库),如果是GroupParser,需要一个个启动CanalEventParser。我们主要看下startEventParserInternal方法。我们只关注MysqlEventParser,因为他支持HA。

if (eventParser instanceof MysqlEventParser) {
    MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser;
    CanalHAController haController = mysqlEventParser.getHaController();

    if (haController instanceof HeartBeatHAController) {
        ((HeartBeatHAController) haController).setCanalHASwitchable(mysqlEventParser);
    }

    if (!haController.isStart()) {
        haController.start();
    }
}

启动一个HeartBeatHAController。主要作用是用于当parser失败次数超过阈值时,执行mysql的主备切换。

2.1.5.2 eventParser.start()

这里也区分是GroupParser还是单个的MysqlParser,其实最终都是启动Parser,不过前者是启动多个而已。我们看下单个的start方法。具体实现在AbstractMysqlEventParser中

public void start() throws CanalParseException {
    if (enableTsdb) {
        if (tableMetaTSDB == null) {
            // 初始化
            tableMetaTSDB = TableMetaTSDBBuilder.build(destination, tsdbSpringXml);
        }
    }

    super.start();
}

首先如果启用了Tsdb功能(也就是DDL后表结构的回溯),那么需要从xml中初始化表结构源数据,然后调用AbstractEventParser的start方法。

  • 首先初始化缓冲队列transactionBuffer,默认队列长度为1024
  • 初始化BinlogParser,将其running状态置为true
  • 启动工作线程parseThread,开始订阅binlog,这个线程中做的事在下一篇文章中有。

2.1.5.3 afterStartEventParser

protected void afterStartEventParser(CanalEventParser eventParser) {
    // 读取一下历史订阅的filter信息
    List<ClientIdentity> clientIdentitys = metaManager.listAllSubscribeInfo(destination);
    for (ClientIdentity clientIdentity : clientIdentitys) {
        subscribeChange(clientIdentity);
    }
}

这块订阅的主要是filter的变化。

public boolean subscribeChange(ClientIdentity identity) {
    if (StringUtils.isNotEmpty(identity.getFilter())) {
        logger.info("subscribe filter change to " + identity.getFilter());
        AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter());

        boolean isGroup = (eventParser instanceof GroupEventParser);
        if (isGroup) {
            // 处理group的模式
            List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
            for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
                ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);
            }
        } else {
            ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);
        }

    }

    // filter的处理规则
    // a. parser处理数据过滤处理
    // b. sink处理数据的路由&分发,一份parse数据经过sink后可以分发为多份,每份的数据可以根据自己的过滤规则不同而有不同的数据
    // 后续内存版的一对多分发,可以考虑
    return true;
}

至此,CanalInstance启动成功。

2.2 停止

同样的,停止的触发也是在ServerRunningMonitor中,停止的代码如下:

public void stop() {
    super.stop();
    logger.info("stop CannalInstance for {}-{} ", new Object[] { canalId, destination });

    if (eventParser.isStart()) {
        beforeStopEventParser(eventParser);
        eventParser.stop();
        afterStopEventParser(eventParser);
    }

    if (eventSink.isStart()) {
        eventSink.stop();
    }

    if (eventStore.isStart()) {
        eventStore.stop();
    }

    if (metaManager.isStart()) {
        metaManager.stop();
    }

    if (alarmHandler.isStart()) {
        alarmHandler.stop();
    }

    logger.info("stop successful....");
}

2.2.1 停止EventParser

和启动一样,在前后也可以做一些事情。

  • 停止前,目前默认什么都不做;
  • 停止时,我们主要看MysqlEventParser
    • 首先断开mysql的连接
    • 清理缓存中表结构源数据tableMetaCache.clearTableMeta()
    • 调用AbstractMysqlEventParser的stop方法,首先从spring上下文中,删除tableMetaTSDB。然后调用AbstractEventParser中的stop方法。
public void stop() {
    super.stop();

    stopHeartBeat(); // 先停止心跳
    parseThread.interrupt(); // 尝试中断
    eventSink.interrupt();
    try {
        parseThread.join();// 等待其结束
    } catch (InterruptedException e) {
        // ignore
    }

    if (binlogParser.isStart()) {
        binlogParser.stop();
    }
    if (transactionBuffer.isStart()) {
        transactionBuffer.stop();
    }
}

首先关闭心跳的定时器,然后中断解析线程,等待当前运行的任务结束后,停止binlogParser,清空transactionBuffer。这里看下怎么清空transactionBuffer的。

public void stop() throws CanalStoreException {
    putSequence.set(INIT_SQEUENCE);
    flushSequence.set(INIT_SQEUENCE);

    entries = null;
    super.stop();
}

将put和flush的序列置为初始序列,也就是不再允许向队列中put数据。

停止parser后,停止位点管理和HAController。其实只是将running置为false。

2.2.2 停止EventSink

类似于启动,停止也不复杂。

public void stop() {
    super.stop();

    for (CanalEventDownStreamHandler handler : getHandlers()) {
        if (handler.isStart()) {
            handler.stop();
        }
    }
}

2.2.3 停止EventStore

主要部分在这边

public void cleanAll() throws CanalStoreException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        putSequence.set(INIT_SQEUENCE);
        getSequence.set(INIT_SQEUENCE);
        ackSequence.set(INIT_SQEUENCE);

        putMemSize.set(0);
        getMemSize.set(0);
        ackMemSize.set(0);
        entries = null;
        // for (int i = 0; i < entries.length; i++) {
        // entries[i] = null;
        // }
    } finally {
        lock.unlock();
    }
}

其实也是将RingBuffer的指针置为初始值。

2.2.4 停止metaManager

我们看下PeriodMixedMetaManager。主要调用了两块的stop,一个是MemoryMetaManager,另一个是ZooKeeperMetaManager。清理内存中的数据,然后让zk的管理器running状态改为false。

2.2.5 停止alarmHandler

将running状态置为false。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容