paimon sink 源码之 write 再梳理

从上篇中可以看到了 MemorySegmentPool,MemoryPoolFactory, 各种 buffer ,概念比较多再来重新梳理下整个 writer 的构建过程同时也关注下 MemorySegmentPool, buffer, 看看 MemorySegmentPool,MemoryPoolFactory 是算子级别( subtask 的所有 writer 共用 MemorySegmentPool)的 还是 writer 级别(每个 write 独享一个 MemorySegmentPool)的

  1. RowDataStoreWriteOperator 有成员变量 MemorySegmentPool memoryPool 、StoreSinkWrite write
    1.1 memoryPool 在 setup 方法时 根据是否 配置了 sink.use-managed-memory-allocator 会创建 FlinkMemorySegmentPool, 如果没有配置则为空,所以默认是空的, MemorySegmentPool 主要有两个实现一个是基于managed memory 的 FlinkMemorySegmentPool 一个是基于内存的 HeapMemorySegmentPool
    1.2 initializeState 时创建了 StoreSinkWrite 创建 StoreSinkWrite 会把 memoryPool 传入进去。StoreSinkWrite 会根据是否要合并分为 StoreSinkWriteImpl 和 GlobalFullCompactionSinkWrite 两种实现, 同时 GlobalFullCompactionSinkWrite 是继承自 StoreSinkWriteImpl

  2. StoreSinkWriteImpl 有成员变量 MemorySegmentPool memoryPool、MemoryPoolFactory memoryPoolFactory、TableWriteImpl<?> write
    2.1 memoryPool 从 RowDataStoreWriteOperator 传下来的可能为空 对于 配置了 sink.use-managed-memory-allocator 会有值
    2.2 memoryPoolFactory 是空
    2.3 write 是 StoreSinkWriteImpl 构造函数里面创建的最终是通过 FileStoreTable 的 newWrite 方法创建 TableWriteImpl,创建完 TableWriteImpl 后会紧接着通过 withMemoryPool 设置 memoryPool,对于 memoryPool 为空的场景则会初始化一个 HeapMemorySegmentPool, 到此 memoryPool 就都有值了
    2.4 创建完 TableWriteImpl 后会紧接着通过 withMemoryPoolFactory 设置 memoryPoolFactory, 因为 memoryPoolFactory 是空的所以忽略

  3. TableWriteImpl 有属性 FileStoreWrite<T> write
    3.1 他是在构建 TableWriteImpl 时创建的,对于主键表创建的是 KeyValueFileStoreWrite
    3.2 在 TableWriteImpl 调用 withMemoryPool 设置 memoryPool 实际还是调用的 KeyValueFileStoreWrite withMemoryPool 这样 memoryPool 不为空了也传导到了 KeyValueFileStoreWrite
    3.3 KeyValueFileStoreWrite 执行 withMemoryPool 时其实是 new MemoryPoolFactory(memoryPool) 然后调用
    withMemoryPoolFactory 把这个 MemoryPoolFactory 赋值给 KeyValueFileStoreWrite 的 writeBufferPool 属性
    3.4 这样 KeyValueFileStoreWrite 就有 MemoryPoolFactory, MemoryPoolFactory 里面放的是 memoryPool
    此时 MemoryPoolFactory 里面的 memoryPool 叫做 innerPool
    3.5 MemoryPoolFactory 里面一个 owners 其实对应的 KeyValueFileStoreWrite 的 writers,同一个 subTask 针对不同的 分区和 bucket 都会创建一个 write 所以 KeyValueFileStoreWrite 里面有多个 writer
    3.6 所以到这里大概可以猜测出 KeyValueFileStoreWrite 是一个 subTask 一个, MemoryPoolFactory 也是一个,又因为 MemoryPoolFactory 里面只有一个 innerPool 看上去也是 subTask 一个 innerPool

  4. KeyValueFileStoreWrite 有属性 MemoryPoolFactory writeBufferPool; 有一个 createWriter 方法
    4.1 writeBufferPool 来自于 KeyValueFileStoreWrite 新建的 MemoryPoolFactory
    4.2 createWriter 方法针对每个分区和 bucket 创建一个 MergeTreeWriter, 创建完 MergeTreeWriter 之后会给这个 MergeTreeWriter 创建一个 OwnerMemoryPool, OwnerMemoryPool 和 MergeTreeWriter 是 一一 对应的,但是操作的还是 MemoryPoolFactory 的 innerPool。之后还会初始化 MergeTreeWriter 的 writeBuffer
    此时 writeBuffer 是 new SortBufferWriteBuffer , 构造 SortBufferWriteBuffer 会把 OwnerMemoryPool 传进去

  5. MergeTreeWriter 有属性 WriteBuffer writeBuffer
    5.1 在 KeyValueFileStoreWrite 创建完成 MergeTreeWriter 后会创建一个 OwnerMemoryPool, writeBuffer 里面会与这个 OwnerMemoryPool
    5.2 writeBuffer 是 SortBufferWriteBuffer

  6. SortBufferWriteBuffer 有属性 SortBuffer buffer
    6.1 SortBuffer 根据是否 spill 会生成 BinaryExternalSortBuffer 或者 BinaryInMemorySortBuffer
    6.2 SortBuffer 里面会用到 memoryPool 也就是 OwnerMemoryPool

画个图总结如下

image.png
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容