相当于mysql-cdc的大动作(后面我会讲),我读源码之后发现, 这个mongoDb-cdc的实现(2.2.1)代码不是很复杂,现在简单记录一下,方便自己后续查阅。
如何开始读源码?
我建议从怎么使用它入手,我们看到官网教我们这么用:
SourceFunction<String> sourceFunction = MongoDBSource.<String>builder()
.hosts("localhost:27017")
.username("flink")
.password("flinkpw")
.databaseList("inventory") // set captured database, support regex
.collectionList("inventory.products", "inventory.orders") //set captured collections, support regex
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
很明显,我们就从这个MongoDBSource.<String>builder来入手就好了。
MongoDBSource
我们打开这个类 ,看到上面全是一些变量的定义,初始化啥的,就不看了,直接跳到346行:build()方法,画风直接变了,这个就是我们想要的呀!!
然鹅,我们再看这个方法,最有用的就是最后一句:
return new DebeziumSourceFunction<>(
deserializer, props, null, Validator.getDefaultValidator());
继续进入这个 DebeziumSourceFunction 类,注释就非常棒,给了我们很多干货!!我总结了一下,有以下几点:
1 运行时有两个worker。
一名worker定期从数据库中提取记录并将记录推送到 Handover 。
另一个 worker 消费来自 Handover的记录,并将记录转换为 Flink 样式的数据。
不使用一个worker 是因为 debezium 在快照阶段和流阶段有不同的行为。
2 使用 Handover 作为缓冲区,将数据从生产者提交给消费者。
因为两个线程不直接相互通信,所以报错也依赖于 Handover。
当引擎出现错误时,引擎使用 {@linkDebeziumEngine.CompletionCallback} 向
Handover 报告错误并唤醒消费者检查错误。
但是,如果错误来自 Flink 端,source function 只是关闭引擎并唤醒生产者。
3 如果执行被取消或完成(仅快照阶段),退出逻辑与错误报告中的逻辑相同。
4 source function参与检查点并保证在故障期间不会丢失任何数据,保证了“exactly once”
5 目前, source function不能在多个并行实例中运行
再往下读,看到这个类名部分:
@PublicEvolving
public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T>
RichSourceFunction --- 这个是flink的自定义source固定用法
CheckpointedFunction,CheckpointListener --- cp相关的接口,里面有初始化状态,保存状态的方法,以及cp成功之后的动作
再往下,都是一些变量,其中值得我们注意的有几个:
/** Data for pending but uncommitted offsets. */
private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
这个变量就是个普通的linkedMap,用来保存没有提交的offset
//The offsets to restore to, if the consumer restores state from a checkpoint.
private transient volatile String restoredOffsetState;
这个说的是,如果是从某个cp恢复的,就保存该offset,之所以用string,是为了用 JSON BYTE来编码
/** Accessor for state in the operator state backend. */
private transient ListState<byte[]> offsetState;
这个就是整个灵魂,offset的值,看到用的是 ListState,
顺藤摸瓜找下去,发现了这个状态值的出处:
public byte[] snapshotCurrentState() throws Exception {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);
if (debeziumOffset.sourceOffset == null || debeziumOffset.sourcePartition == null) {
return null;
}
return stateSerializer.serialize(debeziumOffset);
}
而 debeziumOffset.sourceOffset 和 debeziumOffset.sourcePartition 都是hashMap.
可以推断出这个状态,会保存debezuim消费的分区信息,偏移量信息,也符合常识。
其实他这里只是用到了1个Byte[]而已,用ListState是出于什么考虑呢?
这个问题,我猜测是为了将来做扩展吧。 有了解详情的,也请留言告知下,多谢了!
往下走,有几个变量,更为关键
private transient DebeziumEngine<?> engine;
//这个就是debezuim引擎,是flink-cdc的根本所在,能读数据全部仰仗它了。我们这篇文章先不深入介绍他了,我们知道数据最后靠它去获取的即可。后续开一个文章专门写他。
/** Consume the events from the engine and commit the offset to the engine. */
private transient DebeziumChangeConsumer changeConsumer;
consumer会消费engine从远程数据库拿到的数据,然后把数据放入 Handover 中(持有锁时),他放数据到handover前会检查 list有没有数据,有数据就先不放了,让出锁。
/** The consumer to fetch records from {@link Handover}. */
private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;
fetcher是用来消费 Handover 中的数据,读到数据之后,就会执行sourceContext.collect(record)方法,把数据输出到flink的数据pipeline中了。他和上面的consumer会竞争handover的锁,抢的话就消费数据,消费完了就清空List, 再释放锁。
/** Buffer the events from the source and record the errors from the debezium. */
private transient Handover handover;
handover用一个list来保存数据。然后再设置了一个锁给上述的 changeConsumer 和 debeziumChangeFetcher 来抢。
看完了变量,接下来就是重头戏了,先是open方法:
public void open(Configuration parameters) throws Exception {
validator.validate();
super.open(parameters);
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-engine").build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.handover = new Handover();
this.changeConsumer = new DebeziumChangeConsumer(handover);
}
这个的意思是,先校验,然后创建一个线程执行器,创建一个handover(非常重要的家伙),创建一个changeConsumer。
然后我们看到 run方法里
这里关键的是以下这些代码
this.engine =
DebeziumEngine.create(Connect.class)
.using(properties)
.notifying(changeConsumer)
.using(OffsetCommitPolicy.always())
.using(
(success, message, error) -> {
if (success) {
// Close the handover and prepare to exit.
handover.close();
} else {
handover.reportError(error);
}
})
.build();
.notifying(changeConsumer) ---这一句指的是 engine拿到的数据,将由 changeConsumer来处理。
// run the engine asynchronously
executor.execute(engine); --用一个线程来启动 engine。顺便把changeConsumer也启动了。
debeziumStarted = true;
// start the real debezium consumer
debeziumChangeFetcher.runFetchLoop();
//另外一个线程启动 debeziumChangeFetcher
总结一下:
flink-cdc-原理.JPG
该图很清晰的看见,一共有4个重要的组件:
debezuim引擎负责数据抓取
changeConsumer负责消费引擎出来的数据以及将数据发给handover
handover类似中间件,做短暂的存储(因为一共就两个线程抢占锁,一旦数据过来,会很快被消费)
fetcher负责消费handover的数据,然后输出到flink的pipeline中。
最后还有一点:
flink-cdc 的mongodb的实现,前面的mongoDbsource仅仅虚晃一枪,绝大部分逻辑是在 DebeziumSourceFunction中完成的,而其他的四种db(目前flink-cdc就支持5种), oracle,pg,mysql(旧版本), sqlserver,也是调用了该类来实现的。