paimon sink 源码之 PrimaryKeyFileStoreTable 的 KeyValueFileStoreWrite write 方法

  • 接上篇paimon sink 源码之 StoreSinkWriteImpl
  • 回顾 wirte 调用链 StoreSinkWriteImpl#wirte ->TableWriteImpl.writeAndReturn(rowData)->KeyValueFileStoreWrite.write()
  • 回顾 prepareCommit 调用链 StoreSinkWriteImpl#prepareCommit->TableWriteImpl.prepareCommit->KeyValueFileStoreWrite.prepareCommit

KeyValueFileStoreWrite#构造

org.apache.paimon.flink.sink.StoreSinkWriteImpl#newTableWrite 构建函数构建中调用 table.newWrite 构建 write
org.apache.paimon.table.PrimaryKeyFileStoreTable#newWrite(String commitUser, ManifestCacheFilter manifestFilter){
   ... ...
   store().newWrite(commitUser, manifestFilter) //先调用 table 的 store 方法构建出 KeyValueFileStore 然在 newWrite 构建出 KeyValueFileStoreWrite
  ... ... 
}

//先看 store 方法再看 newWrite 方法
public KeyValueFileStore store() {
        if (lazyStore == null) {
            RowType rowType = tableSchema.logicalRowType();
            Options conf = Options.fromMap(tableSchema.options());
            CoreOptions options = new CoreOptions(conf);
            KeyValueFieldsExtractor extractor =
                    PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR;

            MergeFunctionFactory<KeyValue> mfFactory =
                    PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema, extractor);
           //merge-engine:
           //   |--deduplicate 去重保留最后一条 默认是 deduplicate
           //   |--partial-update 局部更新字段 多流拼接场景?
           //   |--aggregation 聚合 类似 pv? uv? 来一条新数据和底表数据做聚合
           //   |--first-row  去重保留最新一条
           //   |--看着就像 hudi 的 payload 。 但是 paimon 表达更简答有着丰富的 function 来个案例 
          // 详情可以去官网查看 https://paimon.apache.org/docs/master/primary-key-table/merge-engine/#aggregation
           //   CREATE TABLE t (
                     //   k INT,
                    //    a INT,
                    //    b INT,
                    //    c INT,
                    //    d INT,
                    //   PRIMARY KEY (k) NOT ENFORCED
           //   ) WITH (
                //   'merge-engine'='partial-update',                          // 'merge-engine'='aggregation'
                //   'fields.a.sequence-group' = 'b',                          // 'fields.a.aggregate-function'='sum'
                //   'fields.b.aggregate-function' = 'first_value',
                //   'fields.c.sequence-group' = 'd',
                //   'fields.d.aggregate-function' = 'sum'
                // );
            // changelog-producer 配置成 lookup 或者 deletion-vectors.enabled=true 就需要 lookup  DV 为啥要 lookUP?? 等到 MergeFunction 调用的时候再揭秘把
            if (options.needLookup()) {
                mfFactory =
                        LookupMergeFunction.wrap(
                                mfFactory, new RowType(extractor.keyFields(tableSchema)), rowType);
            }

            lazyStore =
                    new KeyValueFileStore(
                            fileIO(), //在第一篇 https://www.jianshu.com/p/f8a518d9f6ff 混了个眼熟,先不管
                            schemaManager(), // 管理 schema 获取最新 schema 等
                            tableSchema,
                            tableSchema.crossPartitionUpdate(), //是否跨分区更新, 有分区有主键并且分区键不全是主键就可以跨分区更新
                            options,
                            tableSchema.logicalPartitionType(),
                            PrimaryKeyTableUtils.addKeyNamePrefix(
                                    tableSchema.logicalBucketKeyType()),
                            new RowType(extractor.keyFields(tableSchema)),
                            rowType,
                            extractor,
                            mfFactory,
                            name(), // 根据数据路径截取出来的名字 路径的最后一级目录名字
                            catalogEnvironment);
        }
        // KeyValueFileStore 构造的这么多属性基本上用来构建 writer、reader 用的。
        return lazyStore;
    }

//  KeyValueFileStore  newWrite 构建出 KeyValueFileStoreWrite

public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter manifestFilter) {
        IndexMaintainer.Factory<KeyValue> indexFactory = null;
        if (bucketMode() == BucketMode.DYNAMIC) {
            indexFactory = new HashIndexMaintainer.Factory(newIndexFileHandler());
        }
        DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory = null;
        if (options.deletionVectorsEnabled()) {
            deletionVectorsMaintainerFactory =
                    new DeletionVectorsMaintainer.Factory(newIndexFileHandler());
        }
        return new KeyValueFileStoreWrite(
                fileIO,                       // 来自 KeyValueFileStore
                schemaManager,    // 来自 KeyValueFileStore
                schema,                  // 来自 KeyValueFileStore
                commitUser,            // 来自 KeyValueFileStore
                keyType,                 // 来自 KeyValueFileStore
                valueType,               // 来自 KeyValueFileStore
                keyComparatorSupplier,  // 用来比较 key 的大小
                () -> UserDefinedSeqComparator.create(valueType, options), // 用来比较 sequence.field 大小 类似 hudi 的 preCombine filed
                valueEqualiserSupplier, // 用来比较 value 是否相等??
                mfFactory,           // mergeFunction  来自 KeyValueFileStore
                pathFactory(),     // 文件路径  path,空分区时的分区命名 partition.default-name 默认是 __DEFAULT_PARTITION__ 和 hive是一样的,file.format 默认是 orc
                format2PathFactory(), //file.format.per.level 不同层可以设置不同的文件格式,这里所指的层应该是  LSM 的层
                snapshotManager(),  // snapshot 相关的工具类
                newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter), // filestore 读取器 带上了一个 manifestFilter  用来构建 WriterContainer
                indexFactory, //  如果是 BucketMode.DYNAMIC 就有 用来构建 WriterContainer
                deletionVectorsMaintainerFactory, // 如果开启了deletion-vectors.enabled 就有 用来构建 WriterContainer
                options,
                keyValueFieldsExtractor,
                tableName);
    }

KeyValueFileStoreWrite#write

WriterContainer 创建

//partition 从 row 里面抽取出来的分区 bucket 是根据 bucket 和  
public void write(BinaryRow partition, int bucket, T data) throws Exception {
         //创建  WriterContainer 每个分区的每个 bucket 维护一个 write
        WriterContainer<T> container = getWriterWrapper(partition, bucket);
        //写数据
        container.writer.write(data);
        if (container.indexMaintainer != null) {
            container.indexMaintainer.notifyNewRecord(data);
        }
    }

// 创建 WriterContainer
public WriterContainer<T> createWriterContainer(
            BinaryRow partition, int bucket, boolean ignorePreviousFiles) {
        // ... ...
       // 从 snapshotManager 获取最近的 snaphot, 从 ../db/table/snapshot 目录获取
       // snapshot 有一个 LATEST 文件 和 一些 snaphot-n 文件,可以直接解析  LATEST 获取最近的  snaphot
       // 也可以从 snaphot-n 文件名上解析出最大的 n 当做最近的 snaphot
       //  snaphot 记录了 baseManifestList,deltaManifestList 的文件名,解析这两个文件可以拿到  Manifest
      // Manifest 记录一次提交的所有文件集合。
        Long latestSnapshotId = snapshotManager.latestSnapshotId();
        List<DataFileMeta> restoreFiles = new ArrayList<>();
        if (!ignorePreviousFiles && latestSnapshotId != null) {
             // 当前是要创建 Writer 这个writer 是要去写某个分区的某个 bucket, 
           // 所以这里也会把 partition, bucket 传进入进行 Manifest 过滤,得到 相应 partition, bucket 的 DataFile
            restoreFiles = scanExistingFileMetas(latestSnapshotId, partition, bucket);
        }
        // indexFactory 从上面看 如果是 BucketMode.DYNAMIC 时会有 indexFactory, 为啥  GLOBAL_DYNAMIC 没有 indexFactory ???
        IndexMaintainer<T> indexMaintainer =
                indexFactory == null
                        ? null
                        : indexFactory.createOrRestore(
                                ignorePreviousFiles ? null : latestSnapshotId, partition, bucket);
       // 开启 DV 才有
        DeletionVectorsMaintainer deletionVectorsMaintainer =
                deletionVectorsMaintainerFactory == null
                        ? null
                        : deletionVectorsMaintainerFactory.createOrRestore(
                                ignorePreviousFiles ? null : latestSnapshotId, partition, bucket);
        RecordWriter<T> writer =
                createWriter(   //下面再看 createWriter
                        partition.copy(),
                        bucket,
                        restoreFiles,
                        null,
                        compactExecutor(), // 创建 compact 线程
                        deletionVectorsMaintainer);
        //创建完 write 之后给这个 write 分配 memoryPool
       // write-buffer-size 转换为排序的磁盘文件之前要在内存中构建的数据量。 256 mb
       // page-size 64k
       // 初始化 writer 的 writeBuffer
        notifyNewWriter(writer);
        return new WriterContainer<>(
                writer, indexMaintainer, deletionVectorsMaintainer, latestSnapshotId);
    }

WriterContainer#MergeTreeWriter 创建

protected MergeTreeWriter createWriter(
            BinaryRow partition,  // 当前 write 的分区
            int bucket,                 // 当前 write 的桶
            List<DataFileMeta> restoreFiles,  //分区和桶下对应的 DataFile
            @Nullable CommitIncrement restoreIncrement, // null
            ExecutorService compactExecutor,   // compact 线程
            @Nullable DeletionVectorsMaintainer dvMaintainer) {
 
        KeyValueFileWriterFactory writerFactory =
                writerFactoryBuilder.build(partition, bucket, options); // 用来创建 KeyValueDataFileWriter
        Comparator<InternalRow> keyComparator = keyComparatorSupplier.get(); // key 排序比较器
       // merge tree 的 levels 用来存储每层的  文件 level0 存的是 DataFiles  其他 level 存的是 SortedRun, SortedRun 里面也是存的 DataFiles
       // level0 的DataFile 的排序顺序先按 maxSequenceNumber 从大到小,然后再按名字从小到大
        Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels());
       // 合并参数和策略
        UniversalCompaction universalCompaction =
                new UniversalCompaction(
                          // compaction.max-size-amplification-percent 默认 200 
                        options.maxSizeAmplificationPercent(),
                       // compaction.size-ratio 默认为 1
                        options.sortedRunSizeRatio(),
                        // num-sorted-run.compaction-trigger 默认为 5
                        options.numSortedRunCompactionTrigger(),
                         // compaction.optimization-interval
                        options.optimizedCompactionInterval());
        CompactStrategy compactStrategy =
                // changelog-producer 配置成 lookup 或者 deletion-vectors.enabled=true 就需要 lookup
                options.needLookup()
                        ? new ForceUpLevel0Compaction(universalCompaction)
                        : universalCompaction;
        // compactManager 用来创建 CompactRewriter 当达到触发条件时会把合并单元 封装成一个 callable CompactTask 给到 compactExecutor 里面去执行 后面再看
        CompactManager compactManager =
                createCompactManager(
                        partition, bucket, compactStrategy, compactExecutor, levels, dvMaintainer);
         // 写数据用的 writer 
        return new MergeTreeWriter(
                bufferSpillable(), // 是否配置了 write-buffer-spillable 是否能够溢写
                options.writeBufferSpillDiskSize(),  //溢写的 大小 write-buffer-spill.max-disk-size 默认 Long.MAX_VALUE
               // local-sort.max-num-file-handles 默认 128 外部合并排序的最大扇入。 它限制文件句柄的数量。 如果太小,可能会导致中间合并。 但如果太大,会导致同时打开的文件过多,消耗内存并导致随机读取。
                options.localSortMaxNumFileHandles(),
               // 溢写压缩算法 spill-compression 默认 lz4.  lz4, lzo and zstd are supported
                options.spillCompression(),
                ioManager,
                compactManager,
               // 每次提交记录都会递增 SequenceNumber
                getMaxSequenceNumber(restoreFiles),
                keyComparator,
                mfFactory.create(), // MergeFunction
                writerFactory,
               // commit 之前是否强制 合并  commit.force-compact
                options.commitForceCompact(),
                // changelog-producer
                options.changelogProducer(),
                restoreIncrement, // null
                 // 根据用户指定的  sequence.field 字段构建 seq 的比较器
                UserDefinedSeqComparator.create(valueType, options));
    }

MergeTreeWriter#write

 public void write(KeyValue kv) throws Exception {
        // 从 DataFile 里面获得最大的 sequenceNumber 之后每次 write 写一条数据都会加 1
        long sequenceNumber = newSequenceNumber(); 
       //执行写入 写入 buffer 重点可以看看 buffer 是什么
        boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
       // 如果 写不下了就会先 flush 在写
        if (!success) {
            flushWriteBuffer(false, false);
            success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
            if (!success) {
                throw new RuntimeException("Mem table is too small to hold a single element.");
            }
        }
    }

关于 KeyValueFileStoreWrite#write 总结如下

clas  KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> extends AbstractFileStoreWrite<T>  implements FileStoreWrite<T>
private MemoryPoolFactory writeBufferPool;
1. writeBufferPool 来自于构建 KeyValueFileStoreWrite 后调用 withMemoryPool 方法,然后 new MemoryPoolFactory
2. MemoryPoolFactory 里面放的是 MemorySegmentPool memoryPool 分为 flink 管理内存和堆内存
3. MemoryPoolFactory 可以理解为对 MemorySegmentPool 的一种管理 ,维护了一个shubTask 中所有 writes 使用的 memory 

// 核心方法创建 MergeTreeWriter 以及 MergeTreeWriter 写时用的 writeBufffer
public WriterContainer<T> createWriterContainer{

...

RecordWriter<T> writer = createWriter(...) // 创建 MergeTreeWriter 

1. 给这个 write 创建一个单独的 OwnerMemoryPool  OwnerMemoryPool 实际是操作 MemoryPoolFactory writeBufferPool 里面的 MemorySegmentPool innerPool

2. 个这个 write 分配 memoryPool 实际就是创建 MergeTreeWriter 的 WriteBuffer writeBuffer
3.这个 writeBufffer 实际是 new SortBufferWriteBuffer ;SortBufferWriteBuffer 还有 MemorySegmentPool 和  SortBuffer; SortBuffer  对于是否开启 spill 分 BinaryInMemorySortBuffer 和 BinaryExternalSortBuffer;MemorySegmentPool 是给 SortBuffer 用的
....

}



// 核心方法写数据,写数据就是往 MergeTreeWriter 的 WriteBuffer 里面 put 和必要的 WriteBuffer flush

 public void write(BinaryRow partition, int bucket, T data) throws Exception {

        WriterContainer<T> container = getWriterWrapper(partition, bucket);  //实际是调用上面的  createWriterContainer 创建出 MergeTreeWriter  container 只是包了一层

        然后调用 MergeTreeWriter 的 write 方法 往下看就是往 MergeTreeWriter 的 WriteBuffer 里面 put

        container.writer.write(data);

        if (container.indexMaintainer != null) {

            container.indexMaintainer.notifyNewRecord(data);

        }

    }
  • 上面提到了 MemoryPoolFactory、MemorySegmentPool、MergeTreeWriter、WriteBuffer、SortBuffer

MemoryPoolFactory

class MemoryPoolFactory {
private final MemorySegmentPool innerPool; //这个是根据配置是否使用托管内存生成的
private final int totalPages;
private Iterable<MemoryOwner> owners; //MemoryOwner 其实就是 MergeTreeWriter, MergeTreeWriter 实现了 MemoryOwner
private final long totalBufferSize;
private long bufferPreemptCount;

//内部类
private class OwnerMemoryPool implements MemorySegmentPool {
   // 操作的是外部类的 innerPool
}
}

MemorySegmentPool

MemorySegmentPool
  • 上面提到了 MergeTreeWriter 在看看 MergeTreeWriter

MergeTreeWriter#write 总结如下

MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner
//又有一层 writeBuffer
private WriteBuffer writeBuffer
public void write(KeyValue kv) throws Exception {
        long sequenceNumber = newSequenceNumber();
        // 往writeBuffer 写数据 写时可能会 flush
        boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
        if (!success) {
            flushWriteBuffer(false, false);
            success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
            if (!success) {
                throw new RuntimeException("Mem table is too small to hold a single element.");
            }
        }
    }

WriteBuffer

SortBufferWriteBuffer implements WriteBuffer
仅追加用于存储键值的写入器缓冲区。当它已满时,它将被刷新到磁盘和
形成数据文件。
MemorySegmentPool memoryPool // MemorySegmentPool 一直传递
private final KeyValueSerializer serializer;
private final SortBuffer buffer;   // 又有一层 buffer

SortBuffer

// SortBuffer 的两种实现,一种是基于纯内存的一种是基于可以 spill 的 是否 spill 由配置决定
BinaryInMemorySortBuffer extends BinaryIndexedSortable implements SortBuffer
protected final MemorySegmentPool memorySegmentPool //MemorySegmentPool 最终使用的地方

//spill 有基于内存的 SortBuffer 的引用
BinaryExternalSortBuffer implements SortBuffer
 private final BinaryInMemorySortBuffer inMemorySortBuffer

FINAL

  • 此篇有点混乱,后面搞得更熟悉的时候再来整理。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容