- 接上篇paimon sink 源码之 StoreSinkWriteImpl
- 回顾 wirte 调用链 StoreSinkWriteImpl#wirte ->TableWriteImpl.writeAndReturn(rowData)->KeyValueFileStoreWrite.write()
- 回顾 prepareCommit 调用链 StoreSinkWriteImpl#prepareCommit->TableWriteImpl.prepareCommit->KeyValueFileStoreWrite.prepareCommit
KeyValueFileStoreWrite#构造
org.apache.paimon.flink.sink.StoreSinkWriteImpl#newTableWrite 构建函数构建中调用 table.newWrite 构建 write
org.apache.paimon.table.PrimaryKeyFileStoreTable#newWrite(String commitUser, ManifestCacheFilter manifestFilter){
... ...
store().newWrite(commitUser, manifestFilter) //先调用 table 的 store 方法构建出 KeyValueFileStore 然在 newWrite 构建出 KeyValueFileStoreWrite
... ...
}
//先看 store 方法再看 newWrite 方法
public KeyValueFileStore store() {
if (lazyStore == null) {
RowType rowType = tableSchema.logicalRowType();
Options conf = Options.fromMap(tableSchema.options());
CoreOptions options = new CoreOptions(conf);
KeyValueFieldsExtractor extractor =
PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR;
MergeFunctionFactory<KeyValue> mfFactory =
PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema, extractor);
//merge-engine:
// |--deduplicate 去重保留最后一条 默认是 deduplicate
// |--partial-update 局部更新字段 多流拼接场景?
// |--aggregation 聚合 类似 pv? uv? 来一条新数据和底表数据做聚合
// |--first-row 去重保留最新一条
// |--看着就像 hudi 的 payload 。 但是 paimon 表达更简答有着丰富的 function 来个案例
// 详情可以去官网查看 https://paimon.apache.org/docs/master/primary-key-table/merge-engine/#aggregation
// CREATE TABLE t (
// k INT,
// a INT,
// b INT,
// c INT,
// d INT,
// PRIMARY KEY (k) NOT ENFORCED
// ) WITH (
// 'merge-engine'='partial-update', // 'merge-engine'='aggregation'
// 'fields.a.sequence-group' = 'b', // 'fields.a.aggregate-function'='sum'
// 'fields.b.aggregate-function' = 'first_value',
// 'fields.c.sequence-group' = 'd',
// 'fields.d.aggregate-function' = 'sum'
// );
// changelog-producer 配置成 lookup 或者 deletion-vectors.enabled=true 就需要 lookup DV 为啥要 lookUP?? 等到 MergeFunction 调用的时候再揭秘把
if (options.needLookup()) {
mfFactory =
LookupMergeFunction.wrap(
mfFactory, new RowType(extractor.keyFields(tableSchema)), rowType);
}
lazyStore =
new KeyValueFileStore(
fileIO(), //在第一篇 https://www.jianshu.com/p/f8a518d9f6ff 混了个眼熟,先不管
schemaManager(), // 管理 schema 获取最新 schema 等
tableSchema,
tableSchema.crossPartitionUpdate(), //是否跨分区更新, 有分区有主键并且分区键不全是主键就可以跨分区更新
options,
tableSchema.logicalPartitionType(),
PrimaryKeyTableUtils.addKeyNamePrefix(
tableSchema.logicalBucketKeyType()),
new RowType(extractor.keyFields(tableSchema)),
rowType,
extractor,
mfFactory,
name(), // 根据数据路径截取出来的名字 路径的最后一级目录名字
catalogEnvironment);
}
// KeyValueFileStore 构造的这么多属性基本上用来构建 writer、reader 用的。
return lazyStore;
}
// KeyValueFileStore newWrite 构建出 KeyValueFileStoreWrite
public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter manifestFilter) {
IndexMaintainer.Factory<KeyValue> indexFactory = null;
if (bucketMode() == BucketMode.DYNAMIC) {
indexFactory = new HashIndexMaintainer.Factory(newIndexFileHandler());
}
DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory = null;
if (options.deletionVectorsEnabled()) {
deletionVectorsMaintainerFactory =
new DeletionVectorsMaintainer.Factory(newIndexFileHandler());
}
return new KeyValueFileStoreWrite(
fileIO, // 来自 KeyValueFileStore
schemaManager, // 来自 KeyValueFileStore
schema, // 来自 KeyValueFileStore
commitUser, // 来自 KeyValueFileStore
keyType, // 来自 KeyValueFileStore
valueType, // 来自 KeyValueFileStore
keyComparatorSupplier, // 用来比较 key 的大小
() -> UserDefinedSeqComparator.create(valueType, options), // 用来比较 sequence.field 大小 类似 hudi 的 preCombine filed
valueEqualiserSupplier, // 用来比较 value 是否相等??
mfFactory, // mergeFunction 来自 KeyValueFileStore
pathFactory(), // 文件路径 path,空分区时的分区命名 partition.default-name 默认是 __DEFAULT_PARTITION__ 和 hive是一样的,file.format 默认是 orc
format2PathFactory(), //file.format.per.level 不同层可以设置不同的文件格式,这里所指的层应该是 LSM 的层
snapshotManager(), // snapshot 相关的工具类
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter), // filestore 读取器 带上了一个 manifestFilter 用来构建 WriterContainer
indexFactory, // 如果是 BucketMode.DYNAMIC 就有 用来构建 WriterContainer
deletionVectorsMaintainerFactory, // 如果开启了deletion-vectors.enabled 就有 用来构建 WriterContainer
options,
keyValueFieldsExtractor,
tableName);
}
KeyValueFileStoreWrite#write
WriterContainer 创建
//partition 从 row 里面抽取出来的分区 bucket 是根据 bucket 和
public void write(BinaryRow partition, int bucket, T data) throws Exception {
//创建 WriterContainer 每个分区的每个 bucket 维护一个 write
WriterContainer<T> container = getWriterWrapper(partition, bucket);
//写数据
container.writer.write(data);
if (container.indexMaintainer != null) {
container.indexMaintainer.notifyNewRecord(data);
}
}
// 创建 WriterContainer
public WriterContainer<T> createWriterContainer(
BinaryRow partition, int bucket, boolean ignorePreviousFiles) {
// ... ...
// 从 snapshotManager 获取最近的 snaphot, 从 ../db/table/snapshot 目录获取
// snapshot 有一个 LATEST 文件 和 一些 snaphot-n 文件,可以直接解析 LATEST 获取最近的 snaphot
// 也可以从 snaphot-n 文件名上解析出最大的 n 当做最近的 snaphot
// snaphot 记录了 baseManifestList,deltaManifestList 的文件名,解析这两个文件可以拿到 Manifest
// Manifest 记录一次提交的所有文件集合。
Long latestSnapshotId = snapshotManager.latestSnapshotId();
List<DataFileMeta> restoreFiles = new ArrayList<>();
if (!ignorePreviousFiles && latestSnapshotId != null) {
// 当前是要创建 Writer 这个writer 是要去写某个分区的某个 bucket,
// 所以这里也会把 partition, bucket 传进入进行 Manifest 过滤,得到 相应 partition, bucket 的 DataFile
restoreFiles = scanExistingFileMetas(latestSnapshotId, partition, bucket);
}
// indexFactory 从上面看 如果是 BucketMode.DYNAMIC 时会有 indexFactory, 为啥 GLOBAL_DYNAMIC 没有 indexFactory ???
IndexMaintainer<T> indexMaintainer =
indexFactory == null
? null
: indexFactory.createOrRestore(
ignorePreviousFiles ? null : latestSnapshotId, partition, bucket);
// 开启 DV 才有
DeletionVectorsMaintainer deletionVectorsMaintainer =
deletionVectorsMaintainerFactory == null
? null
: deletionVectorsMaintainerFactory.createOrRestore(
ignorePreviousFiles ? null : latestSnapshotId, partition, bucket);
RecordWriter<T> writer =
createWriter( //下面再看 createWriter
partition.copy(),
bucket,
restoreFiles,
null,
compactExecutor(), // 创建 compact 线程
deletionVectorsMaintainer);
//创建完 write 之后给这个 write 分配 memoryPool
// write-buffer-size 转换为排序的磁盘文件之前要在内存中构建的数据量。 256 mb
// page-size 64k
// 初始化 writer 的 writeBuffer
notifyNewWriter(writer);
return new WriterContainer<>(
writer, indexMaintainer, deletionVectorsMaintainer, latestSnapshotId);
}
WriterContainer#MergeTreeWriter 创建
protected MergeTreeWriter createWriter(
BinaryRow partition, // 当前 write 的分区
int bucket, // 当前 write 的桶
List<DataFileMeta> restoreFiles, //分区和桶下对应的 DataFile
@Nullable CommitIncrement restoreIncrement, // null
ExecutorService compactExecutor, // compact 线程
@Nullable DeletionVectorsMaintainer dvMaintainer) {
KeyValueFileWriterFactory writerFactory =
writerFactoryBuilder.build(partition, bucket, options); // 用来创建 KeyValueDataFileWriter
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get(); // key 排序比较器
// merge tree 的 levels 用来存储每层的 文件 level0 存的是 DataFiles 其他 level 存的是 SortedRun, SortedRun 里面也是存的 DataFiles
// level0 的DataFile 的排序顺序先按 maxSequenceNumber 从大到小,然后再按名字从小到大
Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels());
// 合并参数和策略
UniversalCompaction universalCompaction =
new UniversalCompaction(
// compaction.max-size-amplification-percent 默认 200
options.maxSizeAmplificationPercent(),
// compaction.size-ratio 默认为 1
options.sortedRunSizeRatio(),
// num-sorted-run.compaction-trigger 默认为 5
options.numSortedRunCompactionTrigger(),
// compaction.optimization-interval
options.optimizedCompactionInterval());
CompactStrategy compactStrategy =
// changelog-producer 配置成 lookup 或者 deletion-vectors.enabled=true 就需要 lookup
options.needLookup()
? new ForceUpLevel0Compaction(universalCompaction)
: universalCompaction;
// compactManager 用来创建 CompactRewriter 当达到触发条件时会把合并单元 封装成一个 callable CompactTask 给到 compactExecutor 里面去执行 后面再看
CompactManager compactManager =
createCompactManager(
partition, bucket, compactStrategy, compactExecutor, levels, dvMaintainer);
// 写数据用的 writer
return new MergeTreeWriter(
bufferSpillable(), // 是否配置了 write-buffer-spillable 是否能够溢写
options.writeBufferSpillDiskSize(), //溢写的 大小 write-buffer-spill.max-disk-size 默认 Long.MAX_VALUE
// local-sort.max-num-file-handles 默认 128 外部合并排序的最大扇入。 它限制文件句柄的数量。 如果太小,可能会导致中间合并。 但如果太大,会导致同时打开的文件过多,消耗内存并导致随机读取。
options.localSortMaxNumFileHandles(),
// 溢写压缩算法 spill-compression 默认 lz4. lz4, lzo and zstd are supported
options.spillCompression(),
ioManager,
compactManager,
// 每次提交记录都会递增 SequenceNumber
getMaxSequenceNumber(restoreFiles),
keyComparator,
mfFactory.create(), // MergeFunction
writerFactory,
// commit 之前是否强制 合并 commit.force-compact
options.commitForceCompact(),
// changelog-producer
options.changelogProducer(),
restoreIncrement, // null
// 根据用户指定的 sequence.field 字段构建 seq 的比较器
UserDefinedSeqComparator.create(valueType, options));
}
MergeTreeWriter#write
public void write(KeyValue kv) throws Exception {
// 从 DataFile 里面获得最大的 sequenceNumber 之后每次 write 写一条数据都会加 1
long sequenceNumber = newSequenceNumber();
//执行写入 写入 buffer 重点可以看看 buffer 是什么
boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
// 如果 写不下了就会先 flush 在写
if (!success) {
flushWriteBuffer(false, false);
success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
throw new RuntimeException("Mem table is too small to hold a single element.");
}
}
}
关于 KeyValueFileStoreWrite#write 总结如下
clas KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> extends AbstractFileStoreWrite<T> implements FileStoreWrite<T>
private MemoryPoolFactory writeBufferPool;
1. writeBufferPool 来自于构建 KeyValueFileStoreWrite 后调用 withMemoryPool 方法,然后 new MemoryPoolFactory
2. MemoryPoolFactory 里面放的是 MemorySegmentPool memoryPool 分为 flink 管理内存和堆内存
3. MemoryPoolFactory 可以理解为对 MemorySegmentPool 的一种管理 ,维护了一个shubTask 中所有 writes 使用的 memory
// 核心方法创建 MergeTreeWriter 以及 MergeTreeWriter 写时用的 writeBufffer
public WriterContainer<T> createWriterContainer{
...
RecordWriter<T> writer = createWriter(...) // 创建 MergeTreeWriter
1. 给这个 write 创建一个单独的 OwnerMemoryPool OwnerMemoryPool 实际是操作 MemoryPoolFactory writeBufferPool 里面的 MemorySegmentPool innerPool
2. 个这个 write 分配 memoryPool 实际就是创建 MergeTreeWriter 的 WriteBuffer writeBuffer
3.这个 writeBufffer 实际是 new SortBufferWriteBuffer ;SortBufferWriteBuffer 还有 MemorySegmentPool 和 SortBuffer; SortBuffer 对于是否开启 spill 分 BinaryInMemorySortBuffer 和 BinaryExternalSortBuffer;MemorySegmentPool 是给 SortBuffer 用的
....
}
// 核心方法写数据,写数据就是往 MergeTreeWriter 的 WriteBuffer 里面 put 和必要的 WriteBuffer flush
public void write(BinaryRow partition, int bucket, T data) throws Exception {
WriterContainer<T> container = getWriterWrapper(partition, bucket); //实际是调用上面的 createWriterContainer 创建出 MergeTreeWriter container 只是包了一层
然后调用 MergeTreeWriter 的 write 方法 往下看就是往 MergeTreeWriter 的 WriteBuffer 里面 put
container.writer.write(data);
if (container.indexMaintainer != null) {
container.indexMaintainer.notifyNewRecord(data);
}
}
- 上面提到了 MemoryPoolFactory、MemorySegmentPool、MergeTreeWriter、WriteBuffer、SortBuffer
MemoryPoolFactory
class MemoryPoolFactory {
private final MemorySegmentPool innerPool; //这个是根据配置是否使用托管内存生成的
private final int totalPages;
private Iterable<MemoryOwner> owners; //MemoryOwner 其实就是 MergeTreeWriter, MergeTreeWriter 实现了 MemoryOwner
private final long totalBufferSize;
private long bufferPreemptCount;
//内部类
private class OwnerMemoryPool implements MemorySegmentPool {
// 操作的是外部类的 innerPool
}
}
MemorySegmentPool
- 上面提到了 MergeTreeWriter 在看看 MergeTreeWriter
MergeTreeWriter#write 总结如下
MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner
//又有一层 writeBuffer
private WriteBuffer writeBuffer
public void write(KeyValue kv) throws Exception {
long sequenceNumber = newSequenceNumber();
// 往writeBuffer 写数据 写时可能会 flush
boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
flushWriteBuffer(false, false);
success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
throw new RuntimeException("Mem table is too small to hold a single element.");
}
}
}
WriteBuffer
SortBufferWriteBuffer implements WriteBuffer
仅追加用于存储键值的写入器缓冲区。当它已满时,它将被刷新到磁盘和
形成数据文件。
MemorySegmentPool memoryPool // MemorySegmentPool 一直传递
private final KeyValueSerializer serializer;
private final SortBuffer buffer; // 又有一层 buffer
SortBuffer
// SortBuffer 的两种实现,一种是基于纯内存的一种是基于可以 spill 的 是否 spill 由配置决定
BinaryInMemorySortBuffer extends BinaryIndexedSortable implements SortBuffer
protected final MemorySegmentPool memorySegmentPool //MemorySegmentPool 最终使用的地方
//spill 有基于内存的 SortBuffer 的引用
BinaryExternalSortBuffer implements SortBuffer
private final BinaryInMemorySortBuffer inMemorySortBuffer
FINAL