- 接上篇paimon sink 之 dataStream 的拓扑梳理 在 BucketMode 在 FIX 模式下最终写入是 RowDataStoreWriteOperator 算子完成的
RowDataStoreWriteOperator
-
类图在 paimon 的基类是 org.apache.paimon.flink.sink.PrepareCommitOperator
- RowDataStoreWriteOperator
extends
TableWriteOperator<InternalRow>extends
PrepareCommitOperator<InternalRow, Committable> - 从 PrepareCommitOperator 类的泛型来看他处理的输入数据结构是 InternalRow, 输出是 Committable
- 以方法的维度来看这个算子的逻辑
RowDataStoreWriteOperator#构造方法
- 参数1.FileStoreTable table 就是这一篇讲的 PrimaryKeyFileStoreTable 或者 AppendOnlyFileStoreTable
- LogSinkFunction logSinkFunction 在这篇里面也有提到 对于开启了 log.system 的 table 就会有,paimon 目前 logSinkFunction 就只有 kafka
- String initialCommitUser = UUID.randomUUID().toString();是一个 UUID 他作为 commit_user_state 的默认值,每一个 job 要有一个唯一的 userName. 所以第一次启动就使用 UUID 作为 userName, 而后会把他存在 commit_user_state state 中,任务从 state 恢复就会读出来使用。
- StoreSinkWrite.Provider storeSinkWriteProvider 这个是用来对接 fileSystem 进行写入要重点关注,先看看他的初始化
StoreSinkWrite.Provider
- StoreSinkWrite.Provider 提供一个 provider 方法去创建 StoreSinkWrite 在 org.apache.paimon.flink.sink.FlinkSink#createWriteProvider 的实现中对应不同场景会有两种 StoreSinkWrite 分别是 GlobalFullCompactionSinkWrite 和 StoreSinkWriteImpl。逻辑如下
- 先判断 write-only 是否开启; 如果设置为 true,将跳过压缩和快照过期。则是用的 StoreSinkWriteImpl
-
如果 write-only 没有开启,则计算 deltaCommits 看看是否有配置在多少 commit 下就要进行合并计算逻辑如下
a. 获取配置项:full-compaction.delta-commits 表明多少次 commits 触发 Full compaction, 如果配置了这个参数那么 deltaCommits= full-compaction.delta-commits
b. 获取配置项:changelog-producer.compaction-interval,当 changelog-producer 设置为 FULL_COMPACTION 时此参数用来表示多少间隔后 触发 Full compaction, 如果配置了这个参数那么会用 compaction-interval/checkpoint-interval 来得到 deltaCommits
c. 如果 a,b 都没有设置则 deltaCommits 为 -1 否则就是 deltaCommits >=0
d. 如果 deltaCommits >=0 或者 changelog-producer 配置的是 FULL_COMPACTION 则用 GlobalFullCompactionSinkWrite 否则就是 StoreSinkWriteImpl
- 总体来说就是会根据是否 writeOnly 和是否需要合并进行选择,如果不需要合并就选择 StoreSinkWriteImpl, 要合并就选择 GlobalFullCompactionSinkWrite
-
GlobalFullCompactionSinkWrite 是 StoreSinkWriteImpl 的子类
构造函数的 4 个参数说完了
- FileStoreTable table : PrimaryKeyFileStoreTable 或者 AppendOnlyFileStoreTable
- LogSinkFunction logSinkFunction 如果配置了就相当于进行了双写(写 kafka 和写 filesystem)
- String initialCommitUser = UUID.randomUUID().toString();是一个 UUID ,保证每个 job userName 的唯一性
- StoreSinkWrite.Provider storeSinkWriteProvider 用来构建 StoreSinkWrite,StoreSinkWrite 先可以理解为就是一个 writer 先不看细节
- 特别的:对于RowDataStoreWriteOperator 只有 logSinkFunction 是留给自己用的,其他三个参数都传给父类的构造函数说明父类 TableWriteOperator 才是执行 write 的真正入口,接着看 RowDataStoreWriteOperator 的其他方法
RowDataStoreWriteOperator#setup
- 调用父类的 PrepareCommitOperator#setup
- 父类判断是否开启了配置项
sink.use-managed-memory-allocator
默认为 false, 这项参数的解释如下:- paimon task 可以基于 TM memory 创建 memory pools,这些内存将会被 flink TM 管理,例如 TM 中的 managed memory。它通过 TM 管理多个 tasks 的 writer buffers 来提高 sink 的稳定性和性能。
- 如果设置为 true , paimon 的 merge tree 将会使用
managed memory 进行工作; 否则他会创建一个独立的内存分配器,这意味着每个 Task 会分配和管理自己的堆内内存池,如果一个 TM 中的Task 太多了,可能会有性能问题甚至 OOM - 如果设置为 ture 与之配套的还有一个参数 sink.managed.writer-buffer-memory 默认是 256m, 参数用来指定 writer buffer 在托管内存中的权重,Flink会根据权重计算内存大小,对于writer来说,实际使用的内存取决于运行环境。 现在,此属性中定义的内存大小等于运行时分配给写入缓冲区的确切内存。
- 如果设置为 true 那么在 setup 方法里面会初始化一个基于 managed memory 的内存分配器
if (options.get(SINK_USE_MANAGED_MEMORY)) { MemoryManager memoryManager = containingTask.getEnvironment().getMemoryManager(); memoryAllocator = new MemorySegmentAllocator(containingTask, memoryManager); memoryPool = new FlinkMemorySegmentPool( computeManagedMemory(this), memoryManager.getPageSize(), memoryAllocator); }
- 判断是否有 logSinkFunction 如果有的话 对 logSinkFunction 进行设置 RuntimeContext
setup 方法逻辑总结如下
- 父类判断是否需要使用 managed memory 如果要使用会就会构建基于 managed memory 的内存分配器和内存池
- 自己判断是否有 logSinkFunction 如果有的话 对 logSinkFunction 进行设置 RuntimeContext, 后续不在关注 logSinkFunction 主要关注写 fileSystem 的逻辑, 然而发现如果忽略 logSinkFunction 的存在那么 RowDataStoreWriteOperator 的大部分逻辑和父类是一样的,大部分复写的方法都是先调 super 然后判断是否有 logSinkFunction 然后调用 logSinkFunction 的相应方法。
RowDataStoreWriteOperator#initializeState
- 直接看 super org.apache.paimon.flink.sink.TableWriteOperator#initializeState 主要是两个 state 的恢复 和创建 writer
-
commit_user_state 的恢复
a. 尝试从 state 中恢复 commitUser, 如果没有就用 构造函数传过来的 UUID, 而后写入 state
b. commit_user_state 结构如下ListState<T> state = context.getOperatorStateStore() //是一个 OperatorState .getUnionListState(new ListStateDescriptor<>(stateName, valueClass)); // 不是很理解为啥是一个 UnionListState, 而不直接是一个 ListState, 可能是兼容同时写多个 // Paimon 表的场景??
c. commitUser 是全局的,当前 operate 所有 subTask 的 commitUser 都是为同一个值。代码中也做了校验,对恢复出来的 commit_user_state 的 listValus 的每个元素会进行校验是否为同一值。
d. 所以如果状态中有这个 state, 那么 当前 subTask 的 commitUser 就随便从 list 中取一个值就 ok, 都是一样的。之所以这么繁琐是想强调一下与常规的 OperatorState ListState 的不同之处,一般 ListState 每个元素会不一样,比如 kafka partition 的 offset, 而恢复的时候 每个 subTask 只要取属于自己的某一个下标元素。
比如 如果 kafka 有 4 个 partition, 那么 ListState 会有 4 个元素,如果 kafka source 只有一个并行度那么他就要读取到 ListState 的所有值进行 kafka 消费, 如果 kafka source 有 2 个并行度,那么两个并行度只要分别拿 ListState 的 2 个下标进行消费 kafka。 如果 kafka source 有 4 个并行度,那么每个并行度就只要拿一个下标元素。具体拿哪个和上游的 partition 算法相关, 对于 source 因为没有上游可能就是按顺序拿的。 -
paimon_store_sink_write_state 的恢复, 他也是 OperatorState ListState, 他就只要取属于自己的那一部分。怎么取取决于上游的 partition 算法。 这个算法有在之前写过关于 FIXED mode 下 partitioner 的梳理
a. 了解到了之前的算法那么再来看他是怎么筛选下标的
StateValueFilter stateFilter =
(tableName, partition, bucket) -> { // tableName 不参与过滤逻辑 忽略
int task =
containLogSystem // 是否包含 logFuntion 因为在 partition 算法中也有这个判断,
? ChannelComputer.select(bucket, numTasks) // numTasks 等于 numChannels,
: ChannelComputer.select(partition, bucket, numTasks);
// task 的值代表对于在某个 bucket 下和算子的并行度下他应该发往下游的那个 subTask.
// 所以有这么个判断只有算出来的 task 和当前 subTask 下标一致的才获取,否则就过滤掉
return task == getRuntimeContext().getIndexOfThisSubtask();
};
b. 从上面的过滤逻辑就可以看出 paimon_store_sink_write_state 里面装的可能是[(t1,p1,0),(t1,p1,1),(t1,p1,2),...,(t1,p2,0),(t1,p2,1),(t1,p2,2),...] 具体数据结构如下
public StoreSinkWriteState(
StateInitializationContext context, StateValueFilter stateValueFilter)
throws Exception {
this.stateValueFilter = stateValueFilter; // ListState 选择器在上面已经说明
// paimon_store_sink_write_state 的元素类型, 上面参与过滤的是 3 个元素 实际存储 5 个元素更加丰富
TupleSerializer<Tuple5<String, String, byte[], Integer, byte[]>> listStateSerializer =
new TupleSerializer<>(
(Class<Tuple5<String, String, byte[], Integer, byte[]>>)
(Class<?>) Tuple5.class,
new TypeSerializer[] {
StringSerializer.INSTANCE,
StringSerializer.INSTANCE,
BytePrimitiveArraySerializer.INSTANCE,
IntSerializer.INSTANCE,
BytePrimitiveArraySerializer.INSTANCE
});
listState =
context.getOperatorStateStore()
.getUnionListState(
new ListStateDescriptor<>(
"paimon_store_sink_write_state", listStateSerializer));
//挑选出来放入 Map 中
map = new HashMap<>();
for (Tuple5<String, String, byte[], Integer, byte[]> tuple : listState.get()) {
//tuple.f0 tableName, tuple.f1 未知, tuple.f2 是分区,tuple.f3 数 bucket , tuple.f4 未知
BinaryRow partition = SerializationUtils.deserializeBinaryRow(tuple.f2);
if (stateValueFilter.filter(tuple.f0, partition, tuple.f3)) {
map.computeIfAbsent(tuple.f0, k -> new HashMap<>())
.computeIfAbsent(tuple.f1, k -> new ArrayList<>())
.add(new StateValue(partition, tuple.f3, tuple.f4));
}
}
}
c. 经过过滤最终在算子中引用的是一个经过包装的 StoreSinkWriteState 对象。恢复出来的元素放在 StoreSinkWriteState 的一个 map 中。
private final Map<String, Map<String, List<StateValue>>> map; 从上一步的逻辑可以猜测出 Map 里面放的大概是 Map<tableName, Map<未知,List<StateValue(分区,bucket, 未知)>>。 对于未知的先不管现在是反着看的只能猜测,在 snaphost 或者其他地方会正着来,会揭秘开来。
- 创建 writer
- writer 的创建是通过 StoreSinkWrite.Provider 的 provider 方法创建的。 关于 StoreSinkWrite.Provider 的初始化和什么场景创建什么 writer 在上文构造函数中说明了。如果没有合并会创建 StoreSinkWriteImpl, 有合并就创建 GlobalFullCompactionSinkWrite, 其中 GlobalFullCompactionSinkWrite 继承 StoreSinkWriteImpl。对于 StoreSinkWrite 这一块上文没有细纠,受于篇幅太长,这里也还是先放一放先把这个算子看完,后面再写 StoreSinkWrite 的详细部分。现在理解他就是用来写数据用的。
到此 initializeState 说完了总结如下
- 重点说了两个 state 的恢复 和 state 的选择器的逻辑,都是 OperatorState ListState
- commit_user_state 是一个UUID 保证 job commit_user 的唯一性,所有 subtask 都是一个值
- paimon_store_sink_write_state 放的是 bucket 分区 等,每个 subTask 只取属于自己的部分
- 创建了 writer
RowDataStoreWriteOperator#processElement
- write.write(element.getValue()); 写数据
PrepareCommitOperator#prepareSnapshotPreBarrier
- 收到 checkpoint Barrier 说明属于当前 checkpoint 周期的数据都已经到,这方法在在checkpoint snapshot 之前执行
- prepareSnapshotPreBarrier 第一步是执行抽象方法 prepareCommit(waitCompaction, checkpointId) 获取一个 List<Committable> 然后利用 flink output 发送到下游, prepareCommit 具体看子类实现
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
if (!endOfInput) {
emitCommittables(false, checkpointId);
}
// no records are expected to emit after endOfInput
}
private void emitCommittables(boolean waitCompaction, long checkpointId) throws IOException {
prepareCommit(waitCompaction, checkpointId)
.forEach(committable -> output.collect(new StreamRecord<>(committable)));
}
protected abstract List<OUT> prepareCommit(boolean waitCompaction, long checkpointId)
throws IOException;
- prepareCommit 跟代码发现是通过 write.prepareCommit(waitCompaction, checkpointId) 获得的。先不看 write 细节
- 记得 prepareSnapshotPreBarrier 在 iceberg 的写中也是一个重要的一环,曾经也向社区推荐过在 hudi 中利用 prepareSnapshotPreBarrier 去开启事务。我猜测 Paimon 也差不多,因为 Paimon 的 commit 也是按照 checkpoint 来提交的,所以要关注 checkpoint 相关的几个方法(initializeState、prepareSnapshotPreBarrier、snapshotState、notifyCheckpointComplete)
- prepareSnapshotPreBarrier 做的事情就是调用 write.prepareCommit 方法返回 List<Committable> 然后发送到下游
- 到此关于 write 本身以及 write.write, write.prepareCommit 先暂时都没看细节
RowDataStoreWriteOperator#snapshotState 做两件事情
- write.snapshotState(); 对于 StoreSinkWriteImpl 是一个空实现啥也没做
- state.snapshotState();
state 这个变量在 initializeState方法中有讲解, 他是从状态中恢复出来的一个包装结构,snapshotState 方法就把这个包装结构拆回去更新到 state. 上面中 state 的 map 存放的内容还不是很清晰,但是到此为止还是没有看到 state 是如何更新的放的是什么数据。 在 RowDataStoreWriteOperator 算子和父类中都没有发现对 state 的修改,他是放在 StoreSinkWrite 中做的修改的,所以这里也是先调用 write.snapshotState(); 在调用 state.snapshotState(), 其中 write.snapshotState() 可能会变更 state 本身。也后面再看把。通过代码跟踪发现对于 StoreSinkWriteImpl 场景实际是没有操作 state 的可以理解为 StoreSinkWriteImpl 场景下 RowDataStoreWriteOperator 算子的 paimon_store_sink_write_state 是一个空的 state. 没有东西。是给 GlobalFullCompactionSinkWrite 场景用的。
FINAL
本篇讲解了 RowDataStoreWriteOperator 的几个核心方法
- 构造函数主要是有一个 StoreSinkWrite.Provider storeSinkWriteProvider 这个 provider 根据不同的配置场景会构造出两种 writer. 不需要合并就是 StoreSinkWriteImpl 需要合并就是 GlobalFullCompactionSinkWrite
- setup方法,如果开启了 managed memory 就会构建基于 managed memory 的内存分配器和内存池,这个池是给 writer 使用的
- initializeState方法,讲了 commit_user_state 和 paimon_store_sink_write_state 的恢复。恢复出来的 commitUser 和 StoreSinkWriteState 也是给 write 使用的。另外根据 provider 初始化了一个 write.
- processElement 就是用 write 写数据
- prepareSnapshotPreBarrier 调用 write.prepareCommit List<Committable> 然后利用 flink output 发送到下游
- snapshotState 先调用 write.snapshotState 在调用 state.snapshotState, 因为在 write 是 StoreSinkWriteImpl 时他的 snapshotState 毛都没干,其实 state 也是空,所以理解为 StoreSinkWriteImpl 场景下 paimon_store_sink_write_state 是空的
- 接下来要看 StoreSinkWriteImpl 的 write 、prepareCommit、snapshotState(已经说了是空实现)方法