paimon sink 源码之 RowDataStoreWriteOperator

RowDataStoreWriteOperator

  • 类图在 paimon 的基类是 org.apache.paimon.flink.sink.PrepareCommitOperator


    org.apache.paimon.flink.sink.PrepareCommitOperator
  • RowDataStoreWriteOperator extends TableWriteOperator<InternalRow> extends PrepareCommitOperator<InternalRow, Committable>
  • 从 PrepareCommitOperator 类的泛型来看他处理的输入数据结构是 InternalRow, 输出是 Committable
  • 以方法的维度来看这个算子的逻辑

RowDataStoreWriteOperator#构造方法

  1. 参数1.FileStoreTable table 就是这一篇讲的 PrimaryKeyFileStoreTable 或者 AppendOnlyFileStoreTable
  2. LogSinkFunction logSinkFunction 在这篇里面也有提到 对于开启了 log.system 的 table 就会有,paimon 目前 logSinkFunction 就只有 kafka
  3. String initialCommitUser = UUID.randomUUID().toString();是一个 UUID 他作为 commit_user_state 的默认值,每一个 job 要有一个唯一的 userName. 所以第一次启动就使用 UUID 作为 userName, 而后会把他存在 commit_user_state state 中,任务从 state 恢复就会读出来使用。
  4. StoreSinkWrite.Provider storeSinkWriteProvider 这个是用来对接 fileSystem 进行写入要重点关注,先看看他的初始化

StoreSinkWrite.Provider

  • StoreSinkWrite.Provider 提供一个 provider 方法去创建 StoreSinkWrite 在 org.apache.paimon.flink.sink.FlinkSink#createWriteProvider 的实现中对应不同场景会有两种 StoreSinkWrite 分别是 GlobalFullCompactionSinkWrite 和 StoreSinkWriteImpl。逻辑如下
    1. 先判断 write-only 是否开启; 如果设置为 true,将跳过压缩和快照过期。则是用的 StoreSinkWriteImpl
    2. 如果 write-only 没有开启,则计算 deltaCommits 看看是否有配置在多少 commit 下就要进行合并计算逻辑如下
      a. 获取配置项:full-compaction.delta-commits 表明多少次 commits 触发 Full compaction, 如果配置了这个参数那么 deltaCommits= full-compaction.delta-commits
      b. 获取配置项:changelog-producer.compaction-interval,当 changelog-producer 设置为 FULL_COMPACTION 时此参数用来表示多少间隔后 触发 Full compaction, 如果配置了这个参数那么会用 compaction-interval/checkpoint-interval 来得到 deltaCommits
      c. 如果 a,b 都没有设置则 deltaCommits 为 -1 否则就是 deltaCommits >=0
      d. 如果 deltaCommits >=0 或者 changelog-producer 配置的是 FULL_COMPACTION 则用 GlobalFullCompactionSinkWrite 否则就是 StoreSinkWriteImpl


      StoreSinkWrite 的选择
  1. 总体来说就是会根据是否 writeOnly 和是否需要合并进行选择,如果不需要合并就选择 StoreSinkWriteImpl, 要合并就选择 GlobalFullCompactionSinkWrite
  2. GlobalFullCompactionSinkWrite 是 StoreSinkWriteImpl 的子类


    StoreSinkWrite

构造函数的 4 个参数说完了

  1. FileStoreTable table : PrimaryKeyFileStoreTable 或者 AppendOnlyFileStoreTable
  2. LogSinkFunction logSinkFunction 如果配置了就相当于进行了双写(写 kafka 和写 filesystem)
  3. String initialCommitUser = UUID.randomUUID().toString();是一个 UUID ,保证每个 job userName 的唯一性
  4. StoreSinkWrite.Provider storeSinkWriteProvider 用来构建 StoreSinkWrite,StoreSinkWrite 先可以理解为就是一个 writer 先不看细节
  5. 特别的:对于RowDataStoreWriteOperator 只有 logSinkFunction 是留给自己用的,其他三个参数都传给父类的构造函数说明父类 TableWriteOperator 才是执行 write 的真正入口,接着看 RowDataStoreWriteOperator 的其他方法

RowDataStoreWriteOperator#setup

  1. 调用父类的 PrepareCommitOperator#setup
  • 父类判断是否开启了配置项 sink.use-managed-memory-allocator 默认为 false, 这项参数的解释如下:
    • paimon task 可以基于 TM memory 创建 memory pools,这些内存将会被 flink TM 管理,例如 TM 中的 managed memory。它通过 TM 管理多个 tasks 的 writer buffers 来提高 sink 的稳定性和性能。
    • 如果设置为 true , paimon 的 merge tree 将会使用
      managed memory 进行工作; 否则他会创建一个独立的内存分配器,这意味着每个 Task 会分配和管理自己的堆内内存池,如果一个 TM 中的Task 太多了,可能会有性能问题甚至 OOM
    • 如果设置为 ture 与之配套的还有一个参数 sink.managed.writer-buffer-memory 默认是 256m, 参数用来指定 writer buffer 在托管内存中的权重,Flink会根据权重计算内存大小,对于writer来说,实际使用的内存取决于运行环境。 现在,此属性中定义的内存大小等于运行时分配给写入缓冲区的确切内存。
    • 如果设置为 true 那么在 setup 方法里面会初始化一个基于 managed memory 的内存分配器
      if (options.get(SINK_USE_MANAGED_MEMORY)) {
            MemoryManager memoryManager = containingTask.getEnvironment().getMemoryManager();
            memoryAllocator = new MemorySegmentAllocator(containingTask, memoryManager);
            memoryPool =
                    new FlinkMemorySegmentPool(
                            computeManagedMemory(this),
                            memoryManager.getPageSize(),
                            memoryAllocator);
        }
    
  1. 判断是否有 logSinkFunction 如果有的话 对 logSinkFunction 进行设置 RuntimeContext

setup 方法逻辑总结如下

  1. 父类判断是否需要使用 managed memory 如果要使用会就会构建基于 managed memory 的内存分配器和内存池
  2. 自己判断是否有 logSinkFunction 如果有的话 对 logSinkFunction 进行设置 RuntimeContext, 后续不在关注 logSinkFunction 主要关注写 fileSystem 的逻辑, 然而发现如果忽略 logSinkFunction 的存在那么 RowDataStoreWriteOperator 的大部分逻辑和父类是一样的,大部分复写的方法都是先调 super 然后判断是否有 logSinkFunction 然后调用 logSinkFunction 的相应方法。

RowDataStoreWriteOperator#initializeState

  • 直接看 super org.apache.paimon.flink.sink.TableWriteOperator#initializeState 主要是两个 state 的恢复 和创建 writer
  1. commit_user_state 的恢复
    a. 尝试从 state 中恢复 commitUser, 如果没有就用 构造函数传过来的 UUID, 而后写入 state
    b. commit_user_state 结构如下

    ListState<T> state =
                context.getOperatorStateStore() //是一个 OperatorState
                        .getUnionListState(new ListStateDescriptor<>(stateName, valueClass));
                       // 不是很理解为啥是一个 UnionListState, 而不直接是一个   ListState, 可能是兼容同时写多个
                       // Paimon 表的场景??
    

    c. commitUser 是全局的,当前 operate 所有 subTask 的 commitUser 都是为同一个值。代码中也做了校验,对恢复出来的 commit_user_state 的 listValus 的每个元素会进行校验是否为同一值。
    d. 所以如果状态中有这个 state, 那么 当前 subTask 的 commitUser 就随便从 list 中取一个值就 ok, 都是一样的。之所以这么繁琐是想强调一下与常规的 OperatorState ListState 的不同之处,一般 ListState 每个元素会不一样,比如 kafka partition 的 offset, 而恢复的时候 每个 subTask 只要取属于自己的某一个下标元素。
    比如 如果 kafka 有 4 个 partition, 那么 ListState 会有 4 个元素,如果 kafka source 只有一个并行度那么他就要读取到 ListState 的所有值进行 kafka 消费, 如果 kafka source 有 2 个并行度,那么两个并行度只要分别拿 ListState 的 2 个下标进行消费 kafka。 如果 kafka source 有 4 个并行度,那么每个并行度就只要拿一个下标元素。具体拿哪个和上游的 partition 算法相关, 对于 source 因为没有上游可能就是按顺序拿的。

  2. paimon_store_sink_write_state 的恢复, 他也是 OperatorState ListState, 他就只要取属于自己的那一部分。怎么取取决于上游的 partition 算法。 这个算法有在之前写过关于 FIXED mode 下 partitioner 的梳理

    image.png

    a. 了解到了之前的算法那么再来看他是怎么筛选下标的

 StateValueFilter stateFilter =
                (tableName, partition, bucket) -> { // tableName 不参与过滤逻辑 忽略
                    int task =
                            containLogSystem // 是否包含 logFuntion 因为在 partition 算法中也有这个判断,
                                    ? ChannelComputer.select(bucket, numTasks)  // numTasks 等于 numChannels,
                                    : ChannelComputer.select(partition, bucket, numTasks);
                    // task 的值代表对于在某个 bucket 下和算子的并行度下他应该发往下游的那个 subTask.
                    // 所以有这么个判断只有算出来的 task 和当前 subTask 下标一致的才获取,否则就过滤掉
                    return task == getRuntimeContext().getIndexOfThisSubtask();
                };

b. 从上面的过滤逻辑就可以看出 paimon_store_sink_write_state 里面装的可能是[(t1,p1,0),(t1,p1,1),(t1,p1,2),...,(t1,p2,0),(t1,p2,1),(t1,p2,2),...] 具体数据结构如下

    public StoreSinkWriteState(
            StateInitializationContext context, StateValueFilter stateValueFilter)
            throws Exception {
        this.stateValueFilter = stateValueFilter; // ListState 选择器在上面已经说明
       // paimon_store_sink_write_state 的元素类型, 上面参与过滤的是 3 个元素 实际存储 5 个元素更加丰富
        TupleSerializer<Tuple5<String, String, byte[], Integer, byte[]>> listStateSerializer =
                new TupleSerializer<>(
                        (Class<Tuple5<String, String, byte[], Integer, byte[]>>)
                                (Class<?>) Tuple5.class,
                        new TypeSerializer[] {
                            StringSerializer.INSTANCE,
                            StringSerializer.INSTANCE,
                            BytePrimitiveArraySerializer.INSTANCE,
                            IntSerializer.INSTANCE,
                            BytePrimitiveArraySerializer.INSTANCE
                        });
        listState =
                context.getOperatorStateStore()
                        .getUnionListState(
                                new ListStateDescriptor<>(
                                        "paimon_store_sink_write_state", listStateSerializer));
       //挑选出来放入 Map 中
        map = new HashMap<>();
        for (Tuple5<String, String, byte[], Integer, byte[]> tuple : listState.get()) {
            //tuple.f0 tableName, tuple.f1 未知, tuple.f2 是分区,tuple.f3 数 bucket , tuple.f4 未知
            BinaryRow partition = SerializationUtils.deserializeBinaryRow(tuple.f2);
            if (stateValueFilter.filter(tuple.f0, partition, tuple.f3)) {
                map.computeIfAbsent(tuple.f0, k -> new HashMap<>())
                        .computeIfAbsent(tuple.f1, k -> new ArrayList<>())
                        .add(new StateValue(partition, tuple.f3, tuple.f4));
            }
        }
    }

c. 经过过滤最终在算子中引用的是一个经过包装的 StoreSinkWriteState 对象。恢复出来的元素放在 StoreSinkWriteState 的一个 map 中。
private final Map<String, Map<String, List<StateValue>>> map; 从上一步的逻辑可以猜测出 Map 里面放的大概是 Map<tableName, Map<未知,List<StateValue(分区,bucket, 未知)>>。 对于未知的先不管现在是反着看的只能猜测,在 snaphost 或者其他地方会正着来,会揭秘开来。

  1. 创建 writer
  • writer 的创建是通过 StoreSinkWrite.Provider 的 provider 方法创建的。 关于 StoreSinkWrite.Provider 的初始化和什么场景创建什么 writer 在上文构造函数中说明了。如果没有合并会创建 StoreSinkWriteImpl, 有合并就创建 GlobalFullCompactionSinkWrite, 其中 GlobalFullCompactionSinkWrite 继承 StoreSinkWriteImpl。对于 StoreSinkWrite 这一块上文没有细纠,受于篇幅太长,这里也还是先放一放先把这个算子看完,后面再写 StoreSinkWrite 的详细部分。现在理解他就是用来写数据用的。

到此 initializeState 说完了总结如下

  1. 重点说了两个 state 的恢复 和 state 的选择器的逻辑,都是 OperatorState ListState
  2. commit_user_state 是一个UUID 保证 job commit_user 的唯一性,所有 subtask 都是一个值
  3. paimon_store_sink_write_state 放的是 bucket 分区 等,每个 subTask 只取属于自己的部分
  4. 创建了 writer

RowDataStoreWriteOperator#processElement

  • write.write(element.getValue()); 写数据

PrepareCommitOperator#prepareSnapshotPreBarrier

  • 收到 checkpoint Barrier 说明属于当前 checkpoint 周期的数据都已经到,这方法在在checkpoint snapshot 之前执行
  • prepareSnapshotPreBarrier 第一步是执行抽象方法 prepareCommit(waitCompaction, checkpointId) 获取一个 List<Committable> 然后利用 flink output 发送到下游, prepareCommit 具体看子类实现
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        if (!endOfInput) {
            emitCommittables(false, checkpointId);
        }
        // no records are expected to emit after endOfInput
    }

  private void emitCommittables(boolean waitCompaction, long checkpointId) throws IOException {
        prepareCommit(waitCompaction, checkpointId)
                .forEach(committable -> output.collect(new StreamRecord<>(committable)));
    }

    protected abstract List<OUT> prepareCommit(boolean waitCompaction, long checkpointId)
            throws IOException;
  • prepareCommit 跟代码发现是通过 write.prepareCommit(waitCompaction, checkpointId) 获得的。先不看 write 细节
  1. 记得 prepareSnapshotPreBarrier 在 iceberg 的写中也是一个重要的一环,曾经也向社区推荐过在 hudi 中利用 prepareSnapshotPreBarrier 去开启事务。我猜测 Paimon 也差不多,因为 Paimon 的 commit 也是按照 checkpoint 来提交的,所以要关注 checkpoint 相关的几个方法(initializeState、prepareSnapshotPreBarrier、snapshotState、notifyCheckpointComplete)
  2. prepareSnapshotPreBarrier 做的事情就是调用 write.prepareCommit 方法返回 List<Committable> 然后发送到下游
  3. 到此关于 write 本身以及 write.write, write.prepareCommit 先暂时都没看细节

RowDataStoreWriteOperator#snapshotState 做两件事情

  1. write.snapshotState(); 对于 StoreSinkWriteImpl 是一个空实现啥也没做
  2. state.snapshotState();
    state 这个变量在 initializeState方法中有讲解, 他是从状态中恢复出来的一个包装结构,snapshotState 方法就把这个包装结构拆回去更新到 state. 上面中 state 的 map 存放的内容还不是很清晰,但是到此为止还是没有看到 state 是如何更新的放的是什么数据。 在 RowDataStoreWriteOperator 算子和父类中都没有发现对 state 的修改,他是放在 StoreSinkWrite 中做的修改的,所以这里也是先调用 write.snapshotState(); 在调用 state.snapshotState(), 其中 write.snapshotState() 可能会变更 state 本身。也后面再看把。通过代码跟踪发现对于 StoreSinkWriteImpl 场景实际是没有操作 state 的可以理解为 StoreSinkWriteImpl 场景下 RowDataStoreWriteOperator 算子的 paimon_store_sink_write_state 是一个空的 state. 没有东西。是给 GlobalFullCompactionSinkWrite 场景用的。

FINAL

本篇讲解了 RowDataStoreWriteOperator 的几个核心方法

  1. 构造函数主要是有一个 StoreSinkWrite.Provider storeSinkWriteProvider 这个 provider 根据不同的配置场景会构造出两种 writer. 不需要合并就是 StoreSinkWriteImpl 需要合并就是 GlobalFullCompactionSinkWrite
  2. setup方法,如果开启了 managed memory 就会构建基于 managed memory 的内存分配器和内存池,这个池是给 writer 使用的
  3. initializeState方法,讲了 commit_user_state 和 paimon_store_sink_write_state 的恢复。恢复出来的 commitUser 和 StoreSinkWriteState 也是给 write 使用的。另外根据 provider 初始化了一个 write.
  4. processElement 就是用 write 写数据
  5. prepareSnapshotPreBarrier 调用 write.prepareCommit List<Committable> 然后利用 flink output 发送到下游
  6. snapshotState 先调用 write.snapshotState 在调用 state.snapshotState, 因为在 write 是 StoreSinkWriteImpl 时他的 snapshotState 毛都没干,其实 state 也是空,所以理解为 StoreSinkWriteImpl 场景下 paimon_store_sink_write_state 是空的
  7. 接下来要看 StoreSinkWriteImpl 的 write 、prepareCommit、snapshotState(已经说了是空实现)方法
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容