【Canal源码分析】parser工作过程

本文主要分析的部分是instance启动时,parser的一个启动和工作过程。主要关注的是AbstractEventParser的start()方法中的parseThread。

一、序列图

instance工作过程一.png

二、源码分析

parseThread中包含的内容比较清晰,代码不是很长,我们逐步分析下。

2.1 构造数据库连接

erosaConnection = buildErosaConnection();

这里构造的,应该是一个mysql的链接,包括的内容都是从配置文件中过来的一些信息,包括mysql的地址,账号密码等。

2.2 启动心跳线程

startHeartBeat(erosaConnection);

这里的心跳,感觉是个假的心跳,并没有用到connection相关的内容。启动一个定时任务,默认3s发送一个心跳的binlog给sink阶段,表名parser还在工作。在sink阶段,会把心跳的binlog直接过滤,不会走到store过程。

2.3 dump之前准备工作

这一步的代码也不复杂。

preDump(erosaConnection);

我们看看preDump都能够做什么?在MysqlEventParser中,我们可以看到,主要做了几件事:

  • 针对binlog格式进行过滤,也就是我们在配置文件中指定binlog的格式,不过目前我们默认的都是ROW模式。
  • 针对binlog image进行过滤,目前默认是FULL,也就是binlog记录的是变更前后的数据,如果配置为minimal,那么只记录变更后的值,可以减少binlog的文件大小。
  • 构造表结构源数据的缓存TableMetaCache

2.4 获取最后的位置信息

这一步是比较核心的,也是保证binlog不丢失的核心代码。

EntryPosition position = findStartPosition(erosaConnection);
final EntryPosition startPosition = position;
if (startPosition == null) {
    throw new CanalParseException("can't find start position for " + destination);
}

if (!processTableMeta(startPosition)) {
    throw new CanalParseException("can't find init table meta for " + destination
                                  + " with position : " + startPosition);
}

具体的findStartPosition是怎么实现的,请查阅下一篇文章

如果没有找到最后的位置信息,那么直接抛出异常,否则还要进行一次判断,也就是processTableMeta,我们看下这个方法做了什么。

protected boolean processTableMeta(EntryPosition position) {
    if (isGTIDMode()) {
        if (binlogParser instanceof LogEventConvert) {
            // 记录gtid
            ((LogEventConvert) binlogParser).setGtidSet(MysqlGTIDSet.parse(position.getGtid()));
        }
    }

    if (tableMetaTSDB != null) {
        if (position.getTimestamp() == null || position.getTimestamp() <= 0) {
            throw new CanalParseException("use gtid and TableMeta TSDB should be config timestamp > 0");
        }

        return tableMetaTSDB.rollback(position);
    }

    return true;
}

如果开启了GTID模式,那么直接设置GTID集合。如果tableMetaTSDB不为空,那么直接根据位置信息回滚到对应的表结构。这个tableMetaTSDB记录的是一个表结构的时序,使用的是Druid的一个功能,把所有DDL记录在数据库中,一般来说,每24小时生成一份快照插入到数据库中,这样能解决DDL产生的表结构不一致的问题,也就是增加了一个表结构的回溯功能。

这边的rollback主要做的事情为:

  • 根据位置信息position从数据库去查询对应的信息,包括binlog文件名、位点等。然后记录到内存中,使用的Druid的SchemaRepository.console方法。

2.5 开始dump数据

在dump之前,代码中构造了一个sink类,也就是SinkFunction。里面定义了一个sink方法,主要的内容是对哪些数据进行过滤。

try {
    CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, false);

    if (!running) {
        return false;
    }

    if (entry != null) {
        exception = null; // 有正常数据流过,清空exception
        transactionBuffer.add(entry);
        // 记录一下对应的positions
        this.lastPosition = buildLastPosition(entry);
        // 记录一下最后一次有数据的时间
        lastEntryTime = System.currentTimeMillis();
    }
    return running;
} catch (TableIdNotFoundException e) {
    throw e;
} catch (Throwable e) {
    if (e.getCause() instanceof TableIdNotFoundException) {
        throw (TableIdNotFoundException) e.getCause();
    }
    // 记录一下,出错的位点信息
    processSinkError(e,
        this.lastPosition,
        startPosition.getJournalName(),
        startPosition.getPosition());
    throw new CanalParseException(e); // 继续抛出异常,让上层统一感知
}

首先判断parser是否在运行,如果不运行,那么就直接抛弃。运行时,判断entry是否为空,不为空的情况下,直接将entry加入到transactionBuffer中。这里我们说下这个transactionBuffer,其实类似于Disruptor中的一个环形队列(默认长度为1024),维护了几个指针,包括put、get、ack三个指针,里面存储了需要进行传递到下一阶段的数据。

加到环形队列之后,记录一下当前的位置信息和时间。如果这个过程出错了,需要记录下出错的位置信息,这里的processSinkError其实就是打印了一下错误日志,然后抛出了一个CanalException,让上一层感知。

说了这么多,还没到真正开始dump的地方。下面开始吧。

if (isGTIDMode()) {
    erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), sinkHandler);
} else {
    if (StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) {
        erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
    } else {
        erosaConnection.dump(startPosition.getJournalName(),
                startPosition.getPosition(),
                sinkHandler);
    }
}

在新版本中,增加了GTID的模式,所以这里的dump需要判断怎么dump,发送什么命令给mysql来获取什么样的binlog。

2.5.1 GTID模式

如果开启了GTID模式(在instance.properties开启),那么需要发送COM_BINLOG_DUMP_GTID命令,然后开始接受binlog信息,进行binlog处理。

public void dump(GTIDSet gtidSet, SinkFunction func) throws IOException {
    updateSettings();
    sendBinlogDumpGTID(gtidSet);

    DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
    fetcher.start(connector.getChannel());
    LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
    LogContext context = new LogContext();
    while (fetcher.fetch()) {
        LogEvent event = null;
        event = decoder.decode(fetcher, context);

        if (event == null) {
            throw new CanalParseException("parse failed");
        }

        if (!func.sink(event)) {
            break;
        }
    }
}

调用LogDecoder.decode方法,对二进制进行解析,解析为我们需要的LogEvent,如果解析失败,抛出异常。否则进行sink,如果sink返回的false,那么直接跳过,否则加入到transactionBuffer中。

2.5.2 非GTID模式

这块有个逻辑判断,如果找到的最后的位置信息中包含了时间戳,如果没有binlog文件名,那么在MysqlConnection中直接报错,也就是必须既要有时间戳,又要有binlog文件名,才能进行dump操作。

这里的dump分了两步,第一步就是发送COM_REGISTER_SLAVE命令,伪装自己是一个slave,然后发送COM_BINLOG_DUMP命令接收binlog。

public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
    updateSettings();
    sendRegisterSlave();
    sendBinlogDump(binlogfilename, binlogPosition);
    DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
    fetcher.start(connector.getChannel());
    LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
    LogContext context = new LogContext();
    while (fetcher.fetch()) {
        LogEvent event = null;
        event = decoder.decode(fetcher, context);

        if (event == null) {
            throw new CanalParseException("parse failed");
        }

        if (!func.sink(event)) {
            break;
        }

        if (event.getSemival() == 1) {
            sendSemiAck(context.getLogPosition().getFileName(), binlogPosition);
        }
    }
}

这里有个mysql半同步的标识,semival。如果semival==1,说明需要进行ack,发送SEMI_SYNC_ACK给master(我们这边more都不开启)。

2.5.3 异常处理

如果整个过程中发生了异常,有以下几种处理方式:

  • 没有找到表,说明起始的position在一个事务中,需要重新找到事务的开始点
  • 其他异常,processDumpError,如果是IO异常,而且message中包含errno = 1236错误,表示从master读取binlog发生致命错误,处理方法如下:http://blog.sina.com.cn/s/blog_a1e9c7910102wv2v.html
  • 如果当前parser不在运行,抛出异常;如果在运行,抛出异常之后,发送一个告警信息。
  • 异常处理完成后,在finally中,首先将当前线程置为interrupt,然后关闭mysql连接。如果关闭连接过程中,抛出异常,需要进行处理。
  • 整个异常处理后,首先暂停sink过程,然后重置缓冲队列TransctionBuffer,重置binlogParser。最后,如果parser还在运行,那么sleep一段时间后重试。
} catch (TableIdNotFoundException e) {
    exception = e;
    // 特殊处理TableIdNotFound异常,出现这样的异常,一种可能就是起始的position是一个事务当中,导致tablemap
    // Event时间没解析过
    needTransactionPosition.compareAndSet(false, true);
    logger.error(String.format("dump address %s has an error, retrying. caused by ",
        runningInfo.getAddress().toString()), e);
} catch (Throwable e) {
    processDumpError(e);
    exception = e;
    if (!running) {
        if (!(e instanceof java.nio.channels.ClosedByInterruptException || e.getCause() instanceof java.nio.channels.ClosedByInterruptException)) {
            throw new CanalParseException(String.format("dump address %s has an error, retrying. ",
                runningInfo.getAddress().toString()), e);
        }
    } else {
        logger.error(String.format("dump address %s has an error, retrying. caused by ",
            runningInfo.getAddress().toString()), e);
        sendAlarm(destination, ExceptionUtils.getFullStackTrace(e));
    }
} finally {
    // 重新置为中断状态
    Thread.interrupted();
    // 关闭一下链接
    afterDump(erosaConnection);
    try {
        if (erosaConnection != null) {
            erosaConnection.disconnect();
        }
    } catch (IOException e1) {
        if (!running) {
            throw new CanalParseException(String.format("disconnect address %s has an error, retrying. ",
                runningInfo.getAddress().toString()),
                e1);
        } else {
            logger.error("disconnect address {} has an error, retrying., caused by ",
                runningInfo.getAddress().toString(),
                e1);
        }
    }
}
// 出异常了,退出sink消费,释放一下状态
eventSink.interrupt();
transactionBuffer.reset();// 重置一下缓冲队列,重新记录数据
binlogParser.reset();// 重新置位

if (running) {
    // sleep一段时间再进行重试
    try {
        Thread.sleep(10000 + RandomUtils.nextInt(10000));
    } catch (InterruptedException e) {
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容