无锁并发快照在Flink MySQL CDC 2.0中的实现

前言

本文是两个月前参加黑马比赛时写了一半的,为了避免烂尾,今天补全发出来。

Flink CDC经过长时间的发展,目前无疑是实现数据源端Pipeline的最简单直接的方式。而在1.0时代,因为存在一些主要的缺点,它还不能称得上是production-ready,这些缺点有:

  • Debezium底层需要通过全局锁来保证全量和增量数据的一致性,对线上业务影响大;
  • 全量快照阶段只能单线程读取,大表同步非常耗时;
  • 全量快照阶段不支持Checkpoint,一旦作业失败只能重新同步,稳定性不佳。

但是随着2.0版本的发布,上述问题都得到了解决,解决方案的核心就是Netflix通过DBLog CDC框架提出的无锁快照算法,相关论文见<<DBLog: A Watermark Based Change-Data-Capture Framework>>。本文简单讲述一下论文的思路,并对照看一下Flink CDC的实现方法。

Watermark-based Chunk Selection

为了使得全量快照、增量读取都实现无锁,DBLog发明了一种名为Watermark-based Chunk Selection的机制。名词解释:

  • Chunk是将源表按主键(必须有物理主键)切分成的小块。也就是说,如果主键的上下界是maxmin,一个Chunk有size行,那么最终会有(max - min) / size个Chunks。
  • Watermark的含义与Flink类似,即在Binlog Stream中插入的标记,将其分割为包含有限量Binlog的有限集。在DBLog中,Watermark需要单独维护在源库中一个单行单列的表内。

下面通过引用论文中的图示介绍一下Watermark-based Chunk Selection的流程。

(1 ~ 2) 暂停Binlog处理,并更新一个低水位Watermark,记为L

(3) 获取下一个Chunk并存储在内存中,我们可以认为该Chunk就是执行查询当时的部分快照,当前Chunk的元信息额外借助ZK存储。图示的Chunk包含k1k6这些主键;

(4) 更新一个高水位Watermark,记为H。由于Watermark也是一张表,所以它们会自动进入Binlog Stream,圈出一个与Chunk时间对应的Window [L, H]

(5) 恢复binlog,开始处理Chunk与Window;

(6) 从L开始读取Window数据,并将两者中key重合的数据从Chunk移除(因为通过重放流就包含了同时间的快照中对应key的数据)。示例中k1k3都是重合的,所以Chunk最终只会剩下k2, k4, k5, k6

(7) 读到H后,将Chunk中剩余的数据插入H之前,Window数据之后。这样就将[L, H]区间内的流式变化和批式快照整合了起来,论文中将此操作称为“交错”(interleave)。

重复上述流程可以发现,随着Chunk逐渐被读取,同时产生的Binlog也会被增量地的处理,形成一个既包含历史数据,也包含增量数据的统一Changelog,并且能够保证相同主键的变化数据不重、不漏、不乱序,工程性极佳,且非常精妙。

Flink CDC 2.0基于FLIP-27的实现

我们知道,出于统一数据源规范、统一线程模型、支持流批一体等等考量,Flink社区早已启用了FLIP-27 Source API作为数据源的标准实现。FLIP-27 Source的原理可以用下图表示,三要素分别为:SplitEnumerator、Split、SourceReader,具体可以参考设计文档。

结合上文所述可以发现,Split正好能够与算法中的Chunk、Window概念对齐。以MySQL CDC为例,MySqlSplit分为两种,分别是MySqlSnapshotSplit(代表Chunk)和MySqlBinlogSplit(代表Binlog Window以及纯增量阶段的Binlog Stream)。

无锁快照

MySqlSourceEnumerator通过调用ChunkSplitter来分割源表,产生MySqlSplit并通过MySqlSplitAssigner分配给各个MySqlSourceReader。这部分逻辑不算难,在此不再赘述。

而负责读取Chunk的MySqlSnapshotSplitReadTask任务中,可以明显看出上述步骤1~4:

    @Override
    protected SnapshotResult<MySqlOffsetContext> doExecute(
            ChangeEventSourceContext context,
            MySqlOffsetContext previousOffset,
            SnapshotContext<MySqlOffsetContext> snapshotContext,
            SnapshottingTask snapshottingTask)
            throws Exception {
        // ...

        final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);
        LOG.info(
                "Snapshot step 1 - Determining low watermark {} for split {}",
                lowWatermark,
                snapshotSplit);
        ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
                .setLowWatermark(lowWatermark);
        signalEventDispatcher.dispatchWatermarkEvent(
                snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);

        LOG.info("Snapshot step 2 - Snapshotting data");
        createDataEvents(ctx, snapshotSplit.getTableId());

        final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
        LOG.info(
                "Snapshot step 3 - Determining high watermark {} for split {}",
                highWatermark,
                snapshotSplit);
        signalEventDispatcher.dispatchWatermarkEvent(
                snapshotSplit, highWatermark, SignalEventDispatcher.WatermarkKind.HIGH);
        ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
                .setHighWatermark(highWatermark);

        return SnapshotResult.completed(ctx.offset);
    }

可见这里采用的低水位与高水位值就是当时的Binlog位点值(通过执行SHOW MASTER STATUS语句获得),避免了单独建表保存Watermark。

负责读取Binlog的MySqlBinlogSplitReadTask中,会先处理Binlog事件,再检查是否达到了高水位(仅在全量同步阶段会检查,增量阶段会一直同步下去)。如果已经达到,则停止继续读入Binlog,并向下游发射一个BINLOG_END标记。

// 关于具体处理Binlog Event的逻辑,可参见父类`MySqlStreamingChangeEventSource#handleEvent()`
    @Override
    protected void handleEvent(MySqlOffsetContext offsetContext, Event event) {
        super.handleEvent(offsetContext, event);
        // check do we need to stop for read binlog for snapshot split.
        if (isBoundedRead()) {
            final BinlogOffset currentBinlogOffset = getBinlogPosition(offsetContext.getOffset());
            // reach the high watermark, the binlog reader should finished
            if (currentBinlogOffset.isAtOrAfter(binlogSplit.getEndingOffset())) {
                // send binlog end event
                try {
                    signalEventDispatcher.dispatchWatermarkEvent(
                            binlogSplit,
                            currentBinlogOffset,
                            SignalEventDispatcher.WatermarkKind.BINLOG_END);
                } catch (InterruptedException e) {
                    LOG.error("Send signal event error.", e);
                    errorHandler.setProducerThrowable(
                            new DebeziumException("Error processing binlog signal event", e));
                }
                // tell reader the binlog task finished
                ((SnapshotBinlogSplitChangeEventSourceContextImpl) context).finished();
            }
        }
    }

此时形成的数据布局是:

Low Watermark | Snapshot Events | High Watermark | Binlog Events | Binlog End

接下来的整理操作(即第6、7两步)则位于SourceRecordUtils#normalizedSplitRecords()方法中。

// 来自JdbcSourceScanFetcher#pollSplitRecords()
    public static List<SourceRecord> normalizedSplitRecords(
            SnapshotSplit snapshotSplit,
            List<SourceRecord> sourceRecords,
            SchemaNameAdjuster nameAdjuster) {
        List<SourceRecord> normalizedRecords = new ArrayList<>();
        Map<Struct, SourceRecord> snapshotRecords = new HashMap<>();
        List<SourceRecord> binlogRecords = new ArrayList<>();
        if (!sourceRecords.isEmpty()) {

            SourceRecord lowWatermark = sourceRecords.get(0);
            checkState(
                    isLowWatermarkEvent(lowWatermark),
                    String.format(
                            "The first record should be low watermark signal event, but is %s",
                            lowWatermark));
            SourceRecord highWatermark = null;
            int i = 1;
            for (; i < sourceRecords.size(); i++) {
                SourceRecord sourceRecord = sourceRecords.get(i);
                if (!isHighWatermarkEvent(sourceRecord)) {
                    snapshotRecords.put((Struct) sourceRecord.key(), sourceRecord);
                } else {
                    highWatermark = sourceRecord;
                    i++;
                    break;
                }
            }

            if (i < sourceRecords.size() - 1) {
                List<SourceRecord> allBinlogRecords =
                        sourceRecords.subList(i, sourceRecords.size() - 1);
                for (SourceRecord binlog : allBinlogRecords) {
                    if (isDataChangeRecord(binlog)) {
                        Object[] key =
                                getSplitKey(snapshotSplit.getSplitKeyType(), binlog, nameAdjuster);
                        if (splitKeyRangeContains(
                                key, snapshotSplit.getSplitStart(), snapshotSplit.getSplitEnd())) {
                            binlogRecords.add(binlog);
                        }
                    }
                }
            }
            checkState(
                    isHighWatermarkEvent(highWatermark),
                    String.format(
                            "The last record should be high watermark signal event, but is %s",
                            highWatermark));
            normalizedRecords =
                    upsertBinlog(lowWatermark, highWatermark, snapshotRecords, binlogRecords);
        }
        return normalizedRecords;
    }

该方法的操作步骤是:

  1. 用Watermark将所有数据分为快照(snapshotRecords)和Binlog(binlogRecords)两部分,注意处理Binlog时需要做过滤,只保留属于自己Split的那部分日志;
  2. 调用upsertBinlog()方法将binlogRecords以UPSERT和DELETE语义覆盖到snapshotRecords中,返回的就是interleave好的Changelog数据了,布局如下。
Low Watermark | Normalized Events | High Watermark

并发一致性

原生的无锁快照算法仍然是单线程执行的,不过在FLIP-27的模型下,多个SourceReader就可以自然并行地获取Chunk及读取Binlog。由于Chunk的key之间没有交集,所以即使它们处理的Binlog Window之间有交集(这是必然会发生的),也仍然可以保证数据一致性。那么,如何在全量阶段并发完成并转向增量同步时也保证数据绝对正确呢?

在一个Snapshot Split处理完成时,MysqlSourceReader会将其放入finishedUnackedSplits集合,并触发完成事件汇报给MySqlSourceEnumerator。该完成事件中包含Split的ID以及对应的高水位值。

// 由MySqlSourceReader#onSplitFinished()调用
    private void reportFinishedSnapshotSplitsIfNeed() {
        if (!finishedUnackedSplits.isEmpty()) {
            final Map<String, BinlogOffset> finishedOffsets = new HashMap<>();
            for (MySqlSnapshotSplit split : finishedUnackedSplits.values()) {
                finishedOffsets.put(split.splitId(), split.getHighWatermark());
            }
            FinishedSnapshotSplitsReportEvent reportEvent =
                    new FinishedSnapshotSplitsReportEvent(finishedOffsets);
            context.sendSourceEventToCoordinator(reportEvent);
            LOG.debug(
                    "The subtask {} reports offsets of finished snapshot splits {}.",
                    subtaskId,
                    finishedOffsets);
        }
    }

而产生增量阶段所需的long-running Binlog Split之前,则会从所有高水位值中选择最小的那一个作为起始的Binlog Offset。只要该位点对应的Binlog没有被purge掉,就一定不会漏掉任何数据了。

// MySqlHybridSplitAssigner#createBinlogSplit()
    private MySqlBinlogSplit createBinlogSplit() {
        // ...
        BinlogOffset minBinlogOffset = null;
        for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
            // find the min binlog offset
            BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
            if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) {
                minBinlogOffset = binlogOffset;
            }
            finishedSnapshotSplitInfos.add(
                    new FinishedSnapshotSplitInfo(
                            split.getTableId(),
                            split.splitId(),
                            split.getSplitStart(),
                            split.getSplitEnd(),
                            binlogOffset));
        }
        // ...
    }

全量阶段的Checkpoint支持

容易推断,在FLIP-27的体系下,如果我们对Split的分配状态和Split本身的状态做快照,那么即使CDC的全量阶段失败,也可以从上述状态恢复现场,进行断点续传。例如,对于等待分配的Snapshot Split集合,在Checkpoint时需要记录如下信息:

    public SnapshotPendingSplitsState(
            List<TableId> alreadyProcessedTables,
            List<MySqlSnapshotSplit> remainingSplits,
            Map<String, MySqlSnapshotSplit> assignedSplits,
            Map<String, BinlogOffset> splitFinishedOffsets,
            AssignerStatus assignerStatus,
            List<TableId> remainingTables,
            boolean isTableIdCaseSensitive,
            boolean isRemainingTablesCheckpointed) {
        // ...
}

又例如对于一个Binlog Split,在Checkpoint时需要记录如下信息:

    @Nullable private BinlogOffset startingOffset;
    @Nullable private BinlogOffset endingOffset;
    private final Map<TableId, TableChange> tableSchemas;

    public MySqlBinlogSplitState(MySqlBinlogSplit split) {
        super(split);
        this.startingOffset = split.getStartingOffset();
        this.endingOffset = split.getEndingOffset();
        this.tableSchemas = split.getTableSchemas();
    }

具体源码就不再分析了,看官可自行参考。

The End

晚安晚安~

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容