从上篇中可以看到了 MemorySegmentPool,MemoryPoolFactory, 各种 buffer ,概念比较多再来重新梳理下整个 writer 的构建过程同时也关注下 MemorySegmentPool, buffer, 看看 MemorySegmentPool,MemoryPoolFactory 是算子级别( subtask 的所有 writer 共用 MemorySegmentPool)的 还是 writer 级别(每个 write 独享一个 MemorySegmentPool)的
RowDataStoreWriteOperator 有成员变量 MemorySegmentPool memoryPool 、StoreSinkWrite write
1.1 memoryPool 在 setup 方法时 根据是否 配置了 sink.use-managed-memory-allocator 会创建 FlinkMemorySegmentPool, 如果没有配置则为空,所以默认是空的, MemorySegmentPool 主要有两个实现一个是基于managed memory 的 FlinkMemorySegmentPool 一个是基于内存的 HeapMemorySegmentPool
1.2 initializeState 时创建了 StoreSinkWrite 创建 StoreSinkWrite 会把 memoryPool 传入进去。StoreSinkWrite 会根据是否要合并分为 StoreSinkWriteImpl 和 GlobalFullCompactionSinkWrite 两种实现, 同时 GlobalFullCompactionSinkWrite 是继承自 StoreSinkWriteImplStoreSinkWriteImpl 有成员变量 MemorySegmentPool memoryPool、MemoryPoolFactory memoryPoolFactory、TableWriteImpl<?> write
2.1 memoryPool 从 RowDataStoreWriteOperator 传下来的可能为空 对于 配置了 sink.use-managed-memory-allocator 会有值
2.2 memoryPoolFactory 是空
2.3 write 是 StoreSinkWriteImpl 构造函数里面创建的最终是通过 FileStoreTable 的 newWrite 方法创建 TableWriteImpl,创建完 TableWriteImpl 后会紧接着通过 withMemoryPool 设置 memoryPool,对于 memoryPool 为空的场景则会初始化一个 HeapMemorySegmentPool, 到此 memoryPool 就都有值了
2.4 创建完 TableWriteImpl 后会紧接着通过 withMemoryPoolFactory 设置 memoryPoolFactory, 因为 memoryPoolFactory 是空的所以忽略TableWriteImpl 有属性 FileStoreWrite<T> write
3.1 他是在构建 TableWriteImpl 时创建的,对于主键表创建的是 KeyValueFileStoreWrite
3.2 在 TableWriteImpl 调用 withMemoryPool 设置 memoryPool 实际还是调用的 KeyValueFileStoreWrite withMemoryPool 这样 memoryPool 不为空了也传导到了 KeyValueFileStoreWrite
3.3 KeyValueFileStoreWrite 执行 withMemoryPool 时其实是 new MemoryPoolFactory(memoryPool) 然后调用
withMemoryPoolFactory 把这个 MemoryPoolFactory 赋值给 KeyValueFileStoreWrite 的 writeBufferPool 属性
3.4 这样 KeyValueFileStoreWrite 就有 MemoryPoolFactory, MemoryPoolFactory 里面放的是 memoryPool
此时 MemoryPoolFactory 里面的 memoryPool 叫做 innerPool
3.5 MemoryPoolFactory 里面一个 owners 其实对应的 KeyValueFileStoreWrite 的 writers,同一个 subTask 针对不同的 分区和 bucket 都会创建一个 write 所以 KeyValueFileStoreWrite 里面有多个 writer
3.6 所以到这里大概可以猜测出 KeyValueFileStoreWrite 是一个 subTask 一个, MemoryPoolFactory 也是一个,又因为 MemoryPoolFactory 里面只有一个 innerPool 看上去也是 subTask 一个 innerPoolKeyValueFileStoreWrite 有属性 MemoryPoolFactory writeBufferPool; 有一个 createWriter 方法
4.1 writeBufferPool 来自于 KeyValueFileStoreWrite 新建的 MemoryPoolFactory
4.2 createWriter 方法针对每个分区和 bucket 创建一个 MergeTreeWriter, 创建完 MergeTreeWriter 之后会给这个 MergeTreeWriter 创建一个 OwnerMemoryPool, OwnerMemoryPool 和 MergeTreeWriter 是 一一 对应的,但是操作的还是 MemoryPoolFactory 的 innerPool。之后还会初始化 MergeTreeWriter 的 writeBuffer
此时 writeBuffer 是 new SortBufferWriteBuffer , 构造 SortBufferWriteBuffer 会把 OwnerMemoryPool 传进去MergeTreeWriter 有属性 WriteBuffer writeBuffer
5.1 在 KeyValueFileStoreWrite 创建完成 MergeTreeWriter 后会创建一个 OwnerMemoryPool, writeBuffer 里面会与这个 OwnerMemoryPool
5.2 writeBuffer 是 SortBufferWriteBufferSortBufferWriteBuffer 有属性 SortBuffer buffer
6.1 SortBuffer 根据是否 spill 会生成 BinaryExternalSortBuffer 或者 BinaryInMemorySortBuffer
6.2 SortBuffer 里面会用到 memoryPool 也就是 OwnerMemoryPool