paimon sink 源码之 dataStream 的拓扑梳理

  • paimon sink 源码 之 DynamicTableSink
  • 上篇得知 sink 的拓扑是在 DynamicTableSink#getSinkRuntimeProvider 里面定义的
  • Paimon 对于 DynamicTableSink 的实现就是 org.apache.paimon.flink.sink.FlinkTableSink
  • getSinkRuntimeProvider 在父类 org.apache.paimon.flink.sink.FlinkTableSinkBase#getSinkRuntimeProvider 中定义
  • 这里只探讨主键表在流模式下写入场景

拓扑梳理

  1. 判断是否有 logStoreTableFactory 在创建 DynamicTableSink 是根据建表语句是否有配置 log.system 来决定是否有 logStoreTableFactory
    1.1 log.system 默认值是 none,可以配置成 kafka,配置成 kafka 数据不仅仅会写入 fileSystem 也会写入 kafka 相当于是双写,流读消费 paimon 表数据的时候 streaming-read-mode 可以配置成 log 从 kafka 进行消费

    1.2 log.system 设置为 kafka 举例

    CREATE TABLE T (i INT, j INT) WITH (
       'log.system'='kafka', 
       'log.system.partitions'='2', 
       'kafka.bootstrap.servers'='%s', 
       'kafka.topic'='Tt',
      'connector='paimon'
      ...
    )
    

    1.3 LogSinkFunction 的创建,对于 log.system 是 kafka 实际上就是利用 kafka connector 的 FlinkKafkaProducer 进行数据发送


    LogSinkFunction
  2. sink.parallelism 设置 sink 的并行度

  3. local-merge-buffer-size 如作业存在主键数据偏斜可以设置“local-merge-buffer-size”,在数据进行 shuffle 之前进行 buffer 和 merge, 当同一主键在快照之间频繁更新时,这特别有用。建议从“64 mb”开始调整缓冲区大小。不适用于 CDC ingestion
    3.1 如果设置了 local-merge-buffer-size 就会加入一个 LocalMergeOperator 算子
    3.2 LocalMergeOperator 算子并行度和上游保持一致,shuffle 方式是 forward

  4. 对 DataStream<RowData> 进行一次 map 将 org.apache.flink.table.data.RowData 转化成 org.apache.paimon.data.InternalRow
    4.1 算子并行度和上游一致
    4.2 paimon InternalRow 只是对 flink RowData 的包装实际还是操作的 flink RowData

  5. 判断 BucketMode ,在这篇里有讲述 Paimon Table BuketMode 的逻辑

  • 对于主键表会有 FIXED、DYNAMIC、GLOBAL_DYNAMIC 三种 mode
  • 对于 append only 表有 FIXED、UNAWARE 两种 mode
  • 不同的 BucketMode 会有不同的拓扑,他们的不同主要体现在 partition 的逻辑不一样和数据写入的逻辑不一样,之后的一些算子就是一样的,这次我们先梳理主线不去看算子细节,主线梳理完成之后在写每个算子的细节
  • 这次面向主键表,从 FIXED mode 开始

    FIXED

    1. 添加 FlinkStreamPartitioner 分区器对数据进行 shuffle
      • 并行度为 sink.parallelism
      • 分区逻辑为
        a. 先抽取 bucket key 的值,然后将值的 hash 和 bucket 桶数取模 Math.abs(key_hashcode % numBuckets 得到一个 bucket 序号
        b. 如果有 logFuntion 则直接按照 bucket 序号和算子并行度取模 recordChannel = bucket % numChannels
        c. 如果没有 logFuntion 则先算出 分区属于那一个 channel, 然后再用 分区的 channel + bucket 序号再和并行度取模 这个算法和我在 hudi 提的分区算法是一样的
        int startChannel = Math.abs(partition.hashCode()) % numChannels;
        return (startChannel + bucket) % numChannels;
        
    2. 分区后添加数据写入 RowDataStoreWriteOperator 算子
      • 并行度为上游并行度
      • 这里有点花原本我以为 logFuntion 会直接加一个 KafkaSinkFunction 算子,没想到他是直接集成到了 RowDataStoreWriteOperator 里面。可能是我 out 了。上代码
           public class RowDataStoreWriteOperator extends TableWriteOperator<InternalRow> {
              private final LogSinkFunction logSinkFunction;
              @Override
              public void setup() {
                   super.setup(containingTask, config, output);
                   if (logSinkFunction != null) {
                      // 调用 flink core 强加 function
                       FunctionUtils.setFunctionRuntimeContext(logSinkFunction, getRuntimeContext());
                   }
                }
               public void open() throws Exception {
                   super.open();
                   this.sinkContext = new SimpleContext(getProcessingTimeService());
                   if (logSinkFunction != null) {
                       // to stay compatible with Flink 1.18-
                       if (logSinkFunction instanceof RichFunction) {
                           RichFunction richFunction = (RichFunction) logSinkFunction;
                           //不仅仅当前算子要 open  logFuntion 也手动 open 
                           richFunction.open(new Configuration());
                       }
                   }
               }
        
               public void processElement(StreamRecord<InternalRow> element) {
                   SinkRecord record;
                   try {
                       record = write.write(element.getValue()); //写 fileSystem
                   } catch (Exception e) {
                       throw new IOException(e);
                   }
        
                  if (record != null && logSinkFunction != null) {
                       // write to log store, need to preserve original pk (which includes partition fields)
                       SinkRecord logRecord = write.toLogRecord(record);
                       logSinkFunction.invoke(logRecord, sinkContext); // 来吧接着干 log
                   }
               }
        
               @Override
               public void snapshotState(StateSnapshotContext context) throws Exception {
                   super.snapshotState(context);
                    if (logSinkFunction != null) {
                         // log 也 snapshotState
                       StreamingFunctionUtils.snapshotFunctionState(
                               context, getOperatorStateBackend(), logSinkFunction);
                   }
               }
           }
        

    DYNAMIC

    1. Dynamic bucket 不适用于 log.system 场景校验会直接报错不支持, logSinkFunction 必须为 null
    2. 获取 dynamic-bucket.assigner-parallelism: 用来定义 assigner 算子的并行度如果没有定义就是 sink.parallelism。这个配置的设置和桶的初始化个数相关,太小会导致 assigner 算子处理速度不够
    3. 获取 dynamic-bucket.initial-buckets: 控制初始化bucket的数量。
    4. 添加 FlinkStreamPartitioner 分区器对数据进行 shuffle
      • 并行度为 assigner-parallelism
      • 分区逻辑为
        a. 他的逻辑和 FIXED 在没有 logFunction 的逻辑一样,不同点是 FIXED 的 buketSize 是固定的,而 DYNAMIC 模式下 buketSize 是取的 算子并行度和 initial-buckets 的最小值 bucket=MathUtils.min(numAssigners, numChannels)
        b. 然后就是 FIXED 逻辑一致了
           int start = Math.abs(partitionHash % numChannels);
           int id = Math.abs(keyHash % buketSize);
           return (start + id)  % numChannels;
        
    5. 添加 HashBucketAssignerOperator 算子
      - 并行度为上游的并行度
    6. 再添加一个 FlinkStreamPartitioner 对数据再进行一次 shuffle
      - 并行度为 sink.parallelism
      - 分区逻辑为
      a. 上游 HashBucketAssignerOperator 会给每条记录打上一个 编号(编号产生的逻辑暂时不看)
      b. 然后就是和 FIX 一样
       int startChannel = Math.abs(partition.hashCode()) % numChannels;
       // 这个 bucket 是上游 HashBucketAssignerOperator 打上的一个编号
       return (startChannel + bucket) % numChannels;
      
    7. 分区后添加数据写入 DynamicBucketRowWriteOperator 算子
      • 并行度为上游并行度

    GLOBAL_DYNAMIC

    1. Dynamic bucket 不适用于 log.system 场景校验会直接报错不支持, logSinkFunction 必须为 null
    2. 离线(批模式)compactSink 不能再 GLOBAL_DYNAMIC 下使用处于 TODO 转态(此条可以忽略)
    3. 添加一个 IndexBootstrapOperator 算子构建索引
      • 并行度为上游并行度
    4. 同上获取 dynamic-bucket.assigner-parallelism
    5. 同上获取 dynamic-bucket.initial-buckets
    6. 添加 FlinkStreamPartitioner 分区器对数据进行 shuffle
      • 并行度为 Max(assigner-parallelism,initial-buckets) 如果为空则是 sink.parallelism
      • 分区逻辑为 主键 函数和并行度取模 Math.abs(主键.hashCode() % numChannels)
    7. 添加 GlobalIndexAssignerOperator 算子
    - 并行度为上游并行度
    
    1. 再添加一个 FlinkStreamPartitioner 对数据再进行一次 shuffle 和 DYNAMIC 第 6 步一样的
    2. 分区后添加数据写入 DynamicBucketRowWriteOperator 算子 和 DYNAMIC 第 7步一样的

到此在不同 bucketMode 下对应的算子梳理完毕了, 在这些处理完之后还有一些通用的 doCommit 逻辑

  1. 判断 sink.savepoint.auto-tag 是否为 true 默认为 false , 参数表示是否自动创建 tag 如果开了则添加 AutoTagForSavepointCommitterOperator 算子 并且这个算子里面是包含 CommitterOperator 的
    • 算子并行度 为 1
  2. 如果没有开启则直接是 CommitterOperator 算子 并行度也是 1
  3. 最后添加一个 空的 sink 节点 DiscardingSink 并行度为 1

FINAL

  • 梳理了一个 DataStream 在 Paimon sink 时的整个 DataStream 转化拓扑。并且对其中的分区器进行了分析整理入下图


    Paimon stream sink 拓扑
  • 接下来看具体算子的逻辑
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,928评论 6 509
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,748评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,282评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,065评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,101评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,855评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,521评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,414评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,931评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,053评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,191评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,873评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,529评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,074评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,188评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,491评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,173评论 2 357

推荐阅读更多精彩内容