前言
本文是两个月前参加黑马比赛时写了一半的,为了避免烂尾,今天补全发出来。
Flink CDC经过长时间的发展,目前无疑是实现数据源端Pipeline的最简单直接的方式。而在1.0时代,因为存在一些主要的缺点,它还不能称得上是production-ready,这些缺点有:
- Debezium底层需要通过全局锁来保证全量和增量数据的一致性,对线上业务影响大;
- 全量快照阶段只能单线程读取,大表同步非常耗时;
- 全量快照阶段不支持Checkpoint,一旦作业失败只能重新同步,稳定性不佳。
但是随着2.0版本的发布,上述问题都得到了解决,解决方案的核心就是Netflix通过DBLog CDC框架提出的无锁快照算法,相关论文见<<DBLog: A Watermark Based Change-Data-Capture Framework>>。本文简单讲述一下论文的思路,并对照看一下Flink CDC的实现方法。
Watermark-based Chunk Selection
为了使得全量快照、增量读取都实现无锁,DBLog发明了一种名为Watermark-based Chunk Selection的机制。名词解释:
- Chunk是将源表按主键(必须有物理主键)切分成的小块。也就是说,如果主键的上下界是
max
、min
,一个Chunk有size
行,那么最终会有(max - min) / size
个Chunks。
- Watermark的含义与Flink类似,即在Binlog Stream中插入的标记,将其分割为包含有限量Binlog的有限集。在DBLog中,Watermark需要单独维护在源库中一个单行单列的表内。
下面通过引用论文中的图示介绍一下Watermark-based Chunk Selection的流程。
(1 ~ 2) 暂停Binlog处理,并更新一个低水位Watermark,记为L
;
(3) 获取下一个Chunk并存储在内存中,我们可以认为该Chunk就是执行查询当时的部分快照,当前Chunk的元信息额外借助ZK存储。图示的Chunk包含k1
到k6
这些主键;
(4) 更新一个高水位Watermark,记为H
。由于Watermark也是一张表,所以它们会自动进入Binlog Stream,圈出一个与Chunk时间对应的Window [L, H]
;
(5) 恢复binlog,开始处理Chunk与Window;
(6) 从L
开始读取Window数据,并将两者中key重合的数据从Chunk移除(因为通过重放流就包含了同时间的快照中对应key的数据)。示例中k1
、k3
都是重合的,所以Chunk最终只会剩下k2, k4, k5, k6
;
(7) 读到H
后,将Chunk中剩余的数据插入H
之前,Window数据之后。这样就将[L, H]
区间内的流式变化和批式快照整合了起来,论文中将此操作称为“交错”(interleave)。
重复上述流程可以发现,随着Chunk逐渐被读取,同时产生的Binlog也会被增量地的处理,形成一个既包含历史数据,也包含增量数据的统一Changelog,并且能够保证相同主键的变化数据不重、不漏、不乱序,工程性极佳,且非常精妙。
Flink CDC 2.0基于FLIP-27的实现
我们知道,出于统一数据源规范、统一线程模型、支持流批一体等等考量,Flink社区早已启用了FLIP-27 Source
API作为数据源的标准实现。FLIP-27 Source的原理可以用下图表示,三要素分别为:SplitEnumerator、Split、SourceReader,具体可以参考设计文档。
结合上文所述可以发现,Split正好能够与算法中的Chunk、Window概念对齐。以MySQL CDC为例,MySqlSplit
分为两种,分别是MySqlSnapshotSplit
(代表Chunk)和MySqlBinlogSplit
(代表Binlog Window以及纯增量阶段的Binlog Stream)。
无锁快照
MySqlSourceEnumerator
通过调用ChunkSplitter
来分割源表,产生MySqlSplit
并通过MySqlSplitAssigner
分配给各个MySqlSourceReader
。这部分逻辑不算难,在此不再赘述。
而负责读取Chunk的MySqlSnapshotSplitReadTask
任务中,可以明显看出上述步骤1~4:
@Override
protected SnapshotResult<MySqlOffsetContext> doExecute(
ChangeEventSourceContext context,
MySqlOffsetContext previousOffset,
SnapshotContext<MySqlOffsetContext> snapshotContext,
SnapshottingTask snapshottingTask)
throws Exception {
// ...
final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);
LOG.info(
"Snapshot step 1 - Determining low watermark {} for split {}",
lowWatermark,
snapshotSplit);
((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
.setLowWatermark(lowWatermark);
signalEventDispatcher.dispatchWatermarkEvent(
snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);
LOG.info("Snapshot step 2 - Snapshotting data");
createDataEvents(ctx, snapshotSplit.getTableId());
final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
LOG.info(
"Snapshot step 3 - Determining high watermark {} for split {}",
highWatermark,
snapshotSplit);
signalEventDispatcher.dispatchWatermarkEvent(
snapshotSplit, highWatermark, SignalEventDispatcher.WatermarkKind.HIGH);
((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
.setHighWatermark(highWatermark);
return SnapshotResult.completed(ctx.offset);
}
可见这里采用的低水位与高水位值就是当时的Binlog位点值(通过执行SHOW MASTER STATUS
语句获得),避免了单独建表保存Watermark。
负责读取Binlog的MySqlBinlogSplitReadTask
中,会先处理Binlog事件,再检查是否达到了高水位(仅在全量同步阶段会检查,增量阶段会一直同步下去)。如果已经达到,则停止继续读入Binlog,并向下游发射一个BINLOG_END
标记。
// 关于具体处理Binlog Event的逻辑,可参见父类`MySqlStreamingChangeEventSource#handleEvent()`
@Override
protected void handleEvent(MySqlOffsetContext offsetContext, Event event) {
super.handleEvent(offsetContext, event);
// check do we need to stop for read binlog for snapshot split.
if (isBoundedRead()) {
final BinlogOffset currentBinlogOffset = getBinlogPosition(offsetContext.getOffset());
// reach the high watermark, the binlog reader should finished
if (currentBinlogOffset.isAtOrAfter(binlogSplit.getEndingOffset())) {
// send binlog end event
try {
signalEventDispatcher.dispatchWatermarkEvent(
binlogSplit,
currentBinlogOffset,
SignalEventDispatcher.WatermarkKind.BINLOG_END);
} catch (InterruptedException e) {
LOG.error("Send signal event error.", e);
errorHandler.setProducerThrowable(
new DebeziumException("Error processing binlog signal event", e));
}
// tell reader the binlog task finished
((SnapshotBinlogSplitChangeEventSourceContextImpl) context).finished();
}
}
}
此时形成的数据布局是:
Low Watermark | Snapshot Events | High Watermark | Binlog Events | Binlog End
接下来的整理操作(即第6、7两步)则位于SourceRecordUtils#normalizedSplitRecords()
方法中。
// 来自JdbcSourceScanFetcher#pollSplitRecords()
public static List<SourceRecord> normalizedSplitRecords(
SnapshotSplit snapshotSplit,
List<SourceRecord> sourceRecords,
SchemaNameAdjuster nameAdjuster) {
List<SourceRecord> normalizedRecords = new ArrayList<>();
Map<Struct, SourceRecord> snapshotRecords = new HashMap<>();
List<SourceRecord> binlogRecords = new ArrayList<>();
if (!sourceRecords.isEmpty()) {
SourceRecord lowWatermark = sourceRecords.get(0);
checkState(
isLowWatermarkEvent(lowWatermark),
String.format(
"The first record should be low watermark signal event, but is %s",
lowWatermark));
SourceRecord highWatermark = null;
int i = 1;
for (; i < sourceRecords.size(); i++) {
SourceRecord sourceRecord = sourceRecords.get(i);
if (!isHighWatermarkEvent(sourceRecord)) {
snapshotRecords.put((Struct) sourceRecord.key(), sourceRecord);
} else {
highWatermark = sourceRecord;
i++;
break;
}
}
if (i < sourceRecords.size() - 1) {
List<SourceRecord> allBinlogRecords =
sourceRecords.subList(i, sourceRecords.size() - 1);
for (SourceRecord binlog : allBinlogRecords) {
if (isDataChangeRecord(binlog)) {
Object[] key =
getSplitKey(snapshotSplit.getSplitKeyType(), binlog, nameAdjuster);
if (splitKeyRangeContains(
key, snapshotSplit.getSplitStart(), snapshotSplit.getSplitEnd())) {
binlogRecords.add(binlog);
}
}
}
}
checkState(
isHighWatermarkEvent(highWatermark),
String.format(
"The last record should be high watermark signal event, but is %s",
highWatermark));
normalizedRecords =
upsertBinlog(lowWatermark, highWatermark, snapshotRecords, binlogRecords);
}
return normalizedRecords;
}
该方法的操作步骤是:
- 用Watermark将所有数据分为快照(
snapshotRecords
)和Binlog(binlogRecords
)两部分,注意处理Binlog时需要做过滤,只保留属于自己Split的那部分日志; - 调用
upsertBinlog()
方法将binlogRecords
以UPSERT和DELETE语义覆盖到snapshotRecords
中,返回的就是interleave好的Changelog数据了,布局如下。
Low Watermark | Normalized Events | High Watermark
并发一致性
原生的无锁快照算法仍然是单线程执行的,不过在FLIP-27的模型下,多个SourceReader就可以自然并行地获取Chunk及读取Binlog。由于Chunk的key之间没有交集,所以即使它们处理的Binlog Window之间有交集(这是必然会发生的),也仍然可以保证数据一致性。那么,如何在全量阶段并发完成并转向增量同步时也保证数据绝对正确呢?
在一个Snapshot Split处理完成时,MysqlSourceReader
会将其放入finishedUnackedSplits
集合,并触发完成事件汇报给MySqlSourceEnumerator
。该完成事件中包含Split的ID以及对应的高水位值。
// 由MySqlSourceReader#onSplitFinished()调用
private void reportFinishedSnapshotSplitsIfNeed() {
if (!finishedUnackedSplits.isEmpty()) {
final Map<String, BinlogOffset> finishedOffsets = new HashMap<>();
for (MySqlSnapshotSplit split : finishedUnackedSplits.values()) {
finishedOffsets.put(split.splitId(), split.getHighWatermark());
}
FinishedSnapshotSplitsReportEvent reportEvent =
new FinishedSnapshotSplitsReportEvent(finishedOffsets);
context.sendSourceEventToCoordinator(reportEvent);
LOG.debug(
"The subtask {} reports offsets of finished snapshot splits {}.",
subtaskId,
finishedOffsets);
}
}
而产生增量阶段所需的long-running Binlog Split之前,则会从所有高水位值中选择最小的那一个作为起始的Binlog Offset。只要该位点对应的Binlog没有被purge掉,就一定不会漏掉任何数据了。
// MySqlHybridSplitAssigner#createBinlogSplit()
private MySqlBinlogSplit createBinlogSplit() {
// ...
BinlogOffset minBinlogOffset = null;
for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
// find the min binlog offset
BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) {
minBinlogOffset = binlogOffset;
}
finishedSnapshotSplitInfos.add(
new FinishedSnapshotSplitInfo(
split.getTableId(),
split.splitId(),
split.getSplitStart(),
split.getSplitEnd(),
binlogOffset));
}
// ...
}
全量阶段的Checkpoint支持
容易推断,在FLIP-27的体系下,如果我们对Split的分配状态和Split本身的状态做快照,那么即使CDC的全量阶段失败,也可以从上述状态恢复现场,进行断点续传。例如,对于等待分配的Snapshot Split集合,在Checkpoint时需要记录如下信息:
public SnapshotPendingSplitsState(
List<TableId> alreadyProcessedTables,
List<MySqlSnapshotSplit> remainingSplits,
Map<String, MySqlSnapshotSplit> assignedSplits,
Map<String, BinlogOffset> splitFinishedOffsets,
AssignerStatus assignerStatus,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
boolean isRemainingTablesCheckpointed) {
// ...
}
又例如对于一个Binlog Split,在Checkpoint时需要记录如下信息:
@Nullable private BinlogOffset startingOffset;
@Nullable private BinlogOffset endingOffset;
private final Map<TableId, TableChange> tableSchemas;
public MySqlBinlogSplitState(MySqlBinlogSplit split) {
super(split);
this.startingOffset = split.getStartingOffset();
this.endingOffset = split.getEndingOffset();
this.tableSchemas = split.getTableSchemas();
}
具体源码就不再分析了,看官可自行参考。
The End
晚安晚安~