- 接上篇 paimon sink 源码之 RowDataStoreWriteOperator
对于 StoreSinkWriteImpl 有 write 、prepareCommit 2 个方法待分析,StoreSinkWriteImpl 自身也待分析先看构造方法
StoreSinkWriteImpl#构造方法
- FileStoreTable table : PrimaryKeyFileStoreTable 或者 AppendOnlyFileStoreTable, paimon sink 源码 之 paimon table 创建
- String commitUser: 是 UUID
- StoreSinkWriteState state: paimon sink 源码之 RowDataStoreWriteOperator
这篇有讲他是从状态中恢复出来的 listState 的一个包装里面核心 是一个 map 数据结构,map 里面装的啥在 StoreSinkWriteImpl 里面也没有提到,因为 StoreSinkWriteImpl 的 这个 listState 是空的。map 里面存的啥 要看 StoreSinkWriteImpl 的子类 - IOManager ioManager: 来自于 flink runtime, paimon 中利用 flink IOManager 获取 spill 路径用来构建 paimon 自己的 IOManager ,paimon 自己的 IOManager 先不看
- boolean ignorePreviousFiles:是 overwritePartition 场景才使用所以可以认为是 false
-
boolean waitCompaction 逻辑如下:非 writeOnly 情况下 && (开启了 DV || 在 changlogProducer 是 lookup 时开启了 changelog-producer.lookup-wait) waitCompaction 就为 true 否则就是 false
- MemorySegmentPool memoryPool : 开了 sink.use-managed-memory-allocator 就会有基于 managed-memory 的内存池
-
根据第一步的 FileStoreTable table 创建 TableWriteImpl write。 所以从现在开始 write 里面开始套 write 了。琢磨下来总共是 套了 7 层 write 如下图
StoreSinkWriteImpl#write(org.apache.paimon.data.InternalRow)
- StoreSinkWriteImpl#write --> TableWriteImpl.writeAndReturn(rowData)
public SinkRecord writeAndReturn(InternalRow row) throws Exception {
//如果配置了 ignore-delete 则舍弃撤回流,
if (ignoreDelete && row.getRowKind().isRetract()) {
return null;
}
//进行一次包装 SinkRecord 包含 主键、分区、bucket、和原本的 InternalRow 把一些重要属性先抽取出来方便使用
SinkRecord record = toSinkRecord(row);
write.write(record.partition(), record.bucket(), recordExtractor.extract(record));
return record;
}
StoreSinkWriteImpl#prepareCommit
CommitMessage committable = write.prepareCommit(this.waitCompaction || waitCompaction, checkpointId)
StoreSinkWriteImpl 有 write 、prepareCommit 的方法都是调用 里面 write 变量的 write 和 prepareCommit 方法
那直接看 TableWriteImpl 的 write 、prepareCommit 方法
在 StoreSinkWriteImpl#构造方法 第 8 步是构造 TableWriteImpl 的地方,所以先看下他时如何构建的然后再看他的 的 write 、prepareCommit 方法
TableWriteImpl#构造方法
- 在里面 StoreSinkWriteImpl 构造函数的第 8 步会通过 newTableWrite 创建 TableWriteImpl
// StoreSinkWriteImpl 初始化 TableWriteImpl
private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
TableWriteImpl<?> tableWrite =
table.newWrite( //不同的 table 创建不同的 write 等下在看
commitUser,
(part, bucket) -> // 这里竟然用了 state 过滤器,[在上篇](https://www.jianshu.com/p/8153c43a4170) 有详细讲这个过滤器的可能用处,这里只是一个函数,要看函数的调用放过才知道具体的 (part, bucket) 是怎么来的
state.stateValueFilter().filter(table.name(), part, bucket)
)
.withIOManager(paimonIOManager) // 设置其他属性
.withIgnorePreviousFiles(ignorePreviousFiles) // false
.withExecutionMode(isStreamingMode) // true
.withBucketMode(table.bucketMode());
if (metricGroup != null) {
tableWrite.withMetricRegistry(new FlinkMetricRegistry(metricGroup));
}
if (memoryPoolFactory != null) {
return tableWrite.withMemoryPoolFactory(memoryPoolFactory); // mamaged memory
} else {
return tableWrite.withMemoryPool( // 堆内存
memoryPool != null
? memoryPool
: new HeapMemorySegmentPool(
table.coreOptions().writeBufferSize(),
table.coreOptions().pageSize()));
}
}
// 从上面函数来看首先是通过 table 创建出了 TableWriteImpl 然后设置了 各种属性,
// 接下来看 table 是怎么 newWrite 的。主键表为例 如下
public TableWriteImpl<KeyValue> newWrite(
String commitUser, ManifestCacheFilter manifestFilter) {
TableSchema schema = schema();
CoreOptions options = store().options();
//rowKindGenerator 用来解析 '+I', '-U', '+U' or '-D' ,
//如果有指定 rowkind.field 那就从数据中抽取这个字段来获取 rowKind
//否则直接获取 row 的 rowkind 这个是 flink 层面加上的,而 rowkind.field 是数据层面的。
//暂时没有 get 到数据里面带 rowkind.field 的场景。
RowKindGenerator rowKindGenerator = RowKindGenerator.create(schema, options);
//主键表写的数据格式是 KeyValue, append 表写的格式直接是 InternalRow
KeyValue kv = new KeyValue();
//调用 TableWriteImpl 构造函数
return new TableWriteImpl<>(
// TableWriteImpl 里面又嵌套了一个 write 对于主键表是 KeyValueFileStoreWrite
// append 表是 AppendOnlyFileStoreWrite
// manifestFilter 是那个 state 过滤器。 ok 这里命名了 manifestFilter,
// manifest 应该是 Paiom 的元数据,文件列表信息,一个 Paimon 表会有很多文件列表信息,
// 可能对某个 task 只需要和自己 partition bucket 相关的 manifest
// (以上个人猜测,边看边猜,后面到 KeyValueFileStoreWrite 再细看)
store().newWrite(commitUser, manifestFilter),
createRowKeyExtractor(), //用来读取数据的主键的
record -> { //又是一个函数用来定义如何把 输入的 InternalRow 转化成 KeyValue, 然后后面用 KeyValue 结构去写
InternalRow row = record.row(); // 原始 row
RowKind rowKind = // 抽取 rowkind
rowKindGenerator == null
? row.getRowKind()
: rowKindGenerator.generate(row);
// 构建 KeyValue 放了主键,sequence, rowKind , row 本身
return kv.replace(record.primaryKey(), KeyValue.UNKNOWN_SEQUENCE, rowKind, row);
},
//获取是否有配置 ignore-delete ,如果配置了则会对 rowKind 为撤回类型的过滤掉 这个在上面 StoreSinkWriteImpl#write 里面也有提到
CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
}
//到这里我们看了 TableWriteImpl 构造的地方了再看 TableWriteImpl 的构造函数
// TableWriteImpl 自身构造函数
public TableWriteImpl(
FileStoreWrite<T> write, // 来自于 table.store().newWrite() 主键表是 KeyValueFileStoreWrite
KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor, //主键和bucket 的抽取器给一个InternalRow 之后用来方便对 key bucket 等的抽取
RecordExtractor<T> recordExtractor, //数据转化器 write 的不同 数据转化也不同,KeyValueFileStoreWrite 需要把 InternalRow 转化成 KeyValue, append 表 不需要转化
boolean ignoreDelete) { //是否忽略撤回数据
this.write = write;
this.keyAndBucketExtractor = keyAndBucketExtractor;
this.recordExtractor = recordExtractor;
this.ignoreDelete = ignoreDelete;
}
FINAL
- 此篇讲述了 StoreSinkWriteImpl 的构造和 write,prepareCommit 方法实际 write,prepareCommit 并么有做什么他是调用里面的 write 的write 和 prepareCommit 方法,write 里面又嵌套了 wirte, 对此又往下看了两层
- 从 RowDataStoreWriteOperator 来看 StoreSinkWriteImpl 是第一层 write
- StoreSinkWriteImpl 中的 TableWriteImpl 是第二层 write
- 接着讲述了 StoreSinkWriteImpl 的构建过程 他的 write,prepareCommit 又是调用第 3 层的 write
- StoreSinkWriteImpl 中的 FileStoreWrite 是第三成的 write
- FileStoreWrite 对于主键表对应的是 KeyValueFileStoreWrite, 对于append 表是 AppendOnlyFileStoreWrite
- KeyValueFileStoreWrite 里面还有 write. 后面再看, AppendOnlyFileStoreWrite 就先不看了,下文将接着讲述 KeyValueFileStoreWrite
- 回顾 wirte 调用链 StoreSinkWriteImpl#wirte ->TableWriteImpl.writeAndReturn(rowData)->KeyValueFileStoreWrite.write()
- 回顾 prepareCommit 调用链 StoreSinkWriteImpl#prepareCommit->TableWriteImpl.prepareCommit->KeyValueFileStoreWrite.prepareCommit