paimon sink 源码之 StoreSinkWriteImpl

StoreSinkWriteImpl#构造方法

  1. FileStoreTable table : PrimaryKeyFileStoreTable 或者 AppendOnlyFileStoreTable, paimon sink 源码 之 paimon table 创建
  2. String commitUser: 是 UUID
  3. StoreSinkWriteState state: paimon sink 源码之 RowDataStoreWriteOperator
    这篇有讲他是从状态中恢复出来的 listState 的一个包装里面核心 是一个 map 数据结构,map 里面装的啥在 StoreSinkWriteImpl 里面也没有提到,因为 StoreSinkWriteImpl 的 这个 listState 是空的。map 里面存的啥 要看 StoreSinkWriteImpl 的子类
  4. IOManager ioManager: 来自于 flink runtime, paimon 中利用 flink IOManager 获取 spill 路径用来构建 paimon 自己的 IOManager ,paimon 自己的 IOManager 先不看
  5. boolean ignorePreviousFiles:是 overwritePartition 场景才使用所以可以认为是 false
  6. boolean waitCompaction 逻辑如下:非 writeOnly 情况下 && (开启了 DV || 在 changlogProducer 是 lookup 时开启了 changelog-producer.lookup-wait) waitCompaction 就为 true 否则就是 false


    waitCompaction
  7. MemorySegmentPool memoryPool : 开了 sink.use-managed-memory-allocator 就会有基于 managed-memory 的内存池
  8. 根据第一步的 FileStoreTable table 创建 TableWriteImpl write。 所以从现在开始 write 里面开始套 write 了。琢磨下来总共是 套了 7 层 write 如下图


    image.png

StoreSinkWriteImpl#write(org.apache.paimon.data.InternalRow)

  1. StoreSinkWriteImpl#write --> TableWriteImpl.writeAndReturn(rowData)
public SinkRecord writeAndReturn(InternalRow row) throws Exception {
       //如果配置了 ignore-delete 则舍弃撤回流, 
        if (ignoreDelete && row.getRowKind().isRetract()) {
            return null;
        }
       //进行一次包装 SinkRecord 包含 主键、分区、bucket、和原本的 InternalRow 把一些重要属性先抽取出来方便使用
        SinkRecord record = toSinkRecord(row);
        write.write(record.partition(), record.bucket(), recordExtractor.extract(record));
        return record;
    }

StoreSinkWriteImpl#prepareCommit

CommitMessage committable = write.prepareCommit(this.waitCompaction || waitCompaction, checkpointId)

StoreSinkWriteImpl 有 write 、prepareCommit 的方法都是调用 里面 write 变量的 write 和 prepareCommit 方法
那直接看 TableWriteImpl 的 write 、prepareCommit 方法
在 StoreSinkWriteImpl#构造方法 第 8 步是构造 TableWriteImpl 的地方,所以先看下他时如何构建的然后再看他的 的 write 、prepareCommit 方法

TableWriteImpl#构造方法

  • 在里面 StoreSinkWriteImpl 构造函数的第 8 步会通过 newTableWrite 创建 TableWriteImpl
// StoreSinkWriteImpl 初始化 TableWriteImpl
private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
TableWriteImpl<?> tableWrite =
                table.newWrite( //不同的  table 创建不同的 write 等下在看
                                commitUser,
                                (part, bucket) -> // 这里竟然用了 state 过滤器,[在上篇](https://www.jianshu.com/p/8153c43a4170) 有详细讲这个过滤器的可能用处,这里只是一个函数,要看函数的调用放过才知道具体的  (part, bucket) 是怎么来的
                                        state.stateValueFilter().filter(table.name(), part, bucket)
                        )
                        .withIOManager(paimonIOManager) // 设置其他属性
                        .withIgnorePreviousFiles(ignorePreviousFiles) // false
                        .withExecutionMode(isStreamingMode) // true
                        .withBucketMode(table.bucketMode()); 

        if (metricGroup != null) {
            tableWrite.withMetricRegistry(new FlinkMetricRegistry(metricGroup));
        }

        if (memoryPoolFactory != null) {
            return tableWrite.withMemoryPoolFactory(memoryPoolFactory); // mamaged memory
        } else {
            return tableWrite.withMemoryPool( // 堆内存
                    memoryPool != null
                            ? memoryPool
                            : new HeapMemorySegmentPool(
                                    table.coreOptions().writeBufferSize(),
                                    table.coreOptions().pageSize()));
        }
}

// 从上面函数来看首先是通过 table 创建出了 TableWriteImpl 然后设置了 各种属性, 
// 接下来看 table 是怎么 newWrite 的。主键表为例 如下

 public TableWriteImpl<KeyValue> newWrite(
            String commitUser, ManifestCacheFilter manifestFilter) {
        TableSchema schema = schema();
        CoreOptions options = store().options();
       //rowKindGenerator 用来解析 '+I', '-U', '+U' or '-D' , 
       //如果有指定 rowkind.field 那就从数据中抽取这个字段来获取 rowKind 
       //否则直接获取 row 的 rowkind 这个是 flink 层面加上的,而 rowkind.field 是数据层面的。
       //暂时没有 get 到数据里面带 rowkind.field 的场景。
        RowKindGenerator rowKindGenerator = RowKindGenerator.create(schema, options);
       //主键表写的数据格式是 KeyValue, append 表写的格式直接是 InternalRow
        KeyValue kv = new KeyValue();
      //调用 TableWriteImpl 构造函数
        return new TableWriteImpl<>(
                 // TableWriteImpl 里面又嵌套了一个 write 对于主键表是 KeyValueFileStoreWrite 
                 // append 表是 AppendOnlyFileStoreWrite 
                 // manifestFilter 是那个 state 过滤器。 ok 这里命名了 manifestFilter,  
                 // manifest 应该是 Paiom 的元数据,文件列表信息,一个 Paimon 表会有很多文件列表信息,
                 // 可能对某个 task 只需要和自己 partition bucket 相关的 manifest
                 // (以上个人猜测,边看边猜,后面到 KeyValueFileStoreWrite 再细看)
                store().newWrite(commitUser, manifestFilter), 
                createRowKeyExtractor(), //用来读取数据的主键的
                record -> { //又是一个函数用来定义如何把 输入的 InternalRow 转化成 KeyValue, 然后后面用 KeyValue 结构去写
                    InternalRow row = record.row(); // 原始 row
                    RowKind rowKind =  // 抽取 rowkind
                            rowKindGenerator == null
                                    ? row.getRowKind()
                                    : rowKindGenerator.generate(row);
                     // 构建 KeyValue 放了主键,sequence, rowKind , row 本身
                    return kv.replace(record.primaryKey(), KeyValue.UNKNOWN_SEQUENCE, rowKind, row);
                },
               //获取是否有配置 ignore-delete ,如果配置了则会对 rowKind 为撤回类型的过滤掉 这个在上面 StoreSinkWriteImpl#write 里面也有提到
                CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
    }

//到这里我们看了 TableWriteImpl 构造的地方了再看 TableWriteImpl 的构造函数

// TableWriteImpl 自身构造函数
public TableWriteImpl(
            FileStoreWrite<T> write, // 来自于 table.store().newWrite() 主键表是 KeyValueFileStoreWrite
            KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor, //主键和bucket 的抽取器给一个InternalRow 之后用来方便对 key bucket 等的抽取
            RecordExtractor<T> recordExtractor, //数据转化器 write 的不同 数据转化也不同,KeyValueFileStoreWrite 需要把 InternalRow 转化成 KeyValue, append 表 不需要转化
            boolean ignoreDelete) { //是否忽略撤回数据
        this.write = write;
        this.keyAndBucketExtractor = keyAndBucketExtractor;
        this.recordExtractor = recordExtractor;
        this.ignoreDelete = ignoreDelete;
    }

FINAL

  • 此篇讲述了 StoreSinkWriteImpl 的构造和 write,prepareCommit 方法实际 write,prepareCommit 并么有做什么他是调用里面的 write 的write 和 prepareCommit 方法,write 里面又嵌套了 wirte, 对此又往下看了两层
  • 从 RowDataStoreWriteOperator 来看 StoreSinkWriteImpl 是第一层 write
  • StoreSinkWriteImpl 中的 TableWriteImpl 是第二层 write
    • 接着讲述了 StoreSinkWriteImpl 的构建过程 他的 write,prepareCommit 又是调用第 3 层的 write
  • StoreSinkWriteImpl 中的 FileStoreWrite 是第三成的 write
    • FileStoreWrite 对于主键表对应的是 KeyValueFileStoreWrite, 对于append 表是 AppendOnlyFileStoreWrite
  • KeyValueFileStoreWrite 里面还有 write. 后面再看, AppendOnlyFileStoreWrite 就先不看了,下文将接着讲述 KeyValueFileStoreWrite
  • 回顾 wirte 调用链 StoreSinkWriteImpl#wirte ->TableWriteImpl.writeAndReturn(rowData)->KeyValueFileStoreWrite.write()
  • 回顾 prepareCommit 调用链 StoreSinkWriteImpl#prepareCommit->TableWriteImpl.prepareCommit->KeyValueFileStoreWrite.prepareCommit
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容