paimon sink 源码 之 paimon table 创建

在学习 paimon sink 的过程中本来只想快速梳理下 paimon 的 sink 时对 DataStream 操作的拓扑, 但是过程中发现 paimon 会有很多概念,并且这些概念都做了很好的抽象,一口吃不了大胖子,慢慢的边啃边理解吧。这篇记录在学习过程中的 paimon table,包含 paimon table 的创建和 table 本身。

paimon table 创建

在 sql 中 table 的创建少不了 catalog, catalog 的创建又离不开 flink 的 CatalogFactory。

Paimon 对于 flink CatalogFactory 的实现

org.apache.flink.table.factories.CatalogFactory

Paimon 对于 flink CatalogFactory 实现类的不同点

  1. IDENTIFIER 不同
  • org.apache.paimon.flink.FlinkGenericCatalogFactory:IDENTIFIER = "paimon"
    CREATE CATALOG catalogName WITH ( 'type'='paimon', ....)
    
  • org.apache.paimon.flink.FlinkCatalogFactory: IDENTIFIER = "paimon-generic";
    CREATE CATALOG catalogName WITH ( 'type'='paimon-generic', ....)
    
  1. 核心方法 CatalogFactory#createCatalog 的实现不一样
  • FlinkGenericCatalogFactory#createCatalog
    1. 创建 flink hive connector 的 HiveCatalogFactory
    2. 然后创建 flink hive connector 的 HiveCatalog
    3. new paimon 的 FlinkCatalog
    4. 返回 new FlinkGenericCatalog (paimonCatalog, hiveCatalog)
  • FlinkCatalogFactory#createCatalog 直接 new paimon 的 FlinkCatalog
  • 所以 FlinkGenericCatalogFactory 创建出来的是 FlinkGenericCatalog
    FlinkCatalogFactory 创建出来的是 FlinkCatalog
    FlinkGenericCatalog 和 FlinkCatalog 都实现了 flink Catalog, 那么他们的区别是什么呢?

Paimon 对于 Flink Catalog 的实现

org.apache.flink.table.catalog.Catalog

从刚刚 createCatalog 方法中可以看到他们区别是 FlinkGenericCatalog 不仅仅含有 paimon 的 FlinkCatalog 还包含 flink hive connector 的 HiveCatalog,从 FlinkGenericCatalog 的实现来看,很多操作都会同时操作两个 catalog, 其中 HiveCatalog 是对 hive HMS 进行请求操作,FlinkCatalog 是对 paimon 进行操作,方法例举如下。

public CatalogBaseTable getTable(ObjectPath tablePath) {
     try {
         return paimon.getTable(tablePath); //  paimon 的 FlinkCatalog
     } catch (TableNotExistException e) {
         return flink.getTable(tablePath);  // hive connector 的 HiveCatalog
     }
}

public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) {
     String connector = table.getOptions().get(CONNECTOR.key());
     if (FlinkCatalogFactory.IDENTIFIER.equals(connector)) { //是 paimon 表
         paimon.createTable(tablePath, table, ignoreIfExists); // 就用 paimon 的 FlinkCatalog 进行操作
     } else {
         flink.createTable(tablePath, table, ignoreIfExists); // 否则就用  hive connector 的 HiveCatalog
     }
}

public List<String> listTables(String databaseName) {
     // flink list tables contains all paimon tables // 都包含?那他是怎么把 paimon 表同步到 HMS 的
     return flink.listTables(databaseName); // 为什么这里只用  hive connector 的 HiveCatalog 就行?
}

所以看起来 FlinkGenericCatalog 有如下特点

  1. 是可以自动识别是否为 paimon 表,优先用 paimon catalog 去尝试。
  2. 这个 catalog 可以兼容普通的 hive 表和 paimon 表

对于 listTables 等一些操作为什么只要用 HiveCatalog 就行?
既然都包含,那么 paimon 的 FlinkCatalog 是怎么把表同步到 HMS 的?

Paimon 的 FlinkCatalog

上面得知 FlinkCatalog 是 flink Catalog 的一个实现

FlinkCatalog extends org.apache.flink.table.catalog.AbstractCatalog {
  private final org.apache.paimon.catalog.Catalog catalog;
  public List<String> listTables(String databaseName) {
      return catalog.listTables(databaseName);
    } 
 ... ...
}
  • 从上得知 FlinkCatalog 其实只是一个包装 真正进行 Catalog 操作的还是 org.apache.paimon.catalog.Catalog 需要注意它的包名不是 flink 的 org.apache.flink.table.catalog.Catalog
    文中我有意的补充包路径或者强调是 paimon 的什么什么或者是 flink 的什么什么就是想要做区分,在 Paimon 中好多类名看起来很容易误以为是 flink 的类,其实不是。。。。
  • 所以要看 Paimon 的 org.apache.paimon.flink.FlinkCatalog 还得看 paimon 的 org.apache.paimon.catalog.Catalog

Paimon 的 Catalog

org.apache.paimon.catalog.Catalog

Paimon 的 Catalog 是如何创建的

  • 直接说结论吧还是通过 flink SPI 机制根据配置、IDENTIFIER 和 org.apache.paimon.catalog.CatalogFactory 实现类去先找到 factory 然后在 factory 进行创建 paimon org.apache.paimon.catalog.Catalog 的具体实现
  • 看一眼 org.apache.paimon.catalog.CatalogFactory 类图


    org.apache.paimon.catalog.CatalogFactory
  • Paimon Catalog 创建示例
    // 创建 org.apache.paimon.catalog.FileSystemCatalog
    CREATE CATALOG my_catalog WITH (
        'type' = 'paimon',
        'warehouse' = 'hdfs:///path/to/warehouse'
    );
    
    // 创建 org.apache.paimon.hive.HiveCatalog
    CREATE CATALOG my_hive WITH (
    'type' = 'paimon',
    'metastore' = 'hive',
    -- 'uri' = 'thrift://<hive-metastore-host-name>:<port>', default use 'hive.metastore.uris' in HiveConf
    -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment
    -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment
    -- 'warehouse' = 'hdfs:///path/to/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf
    );
    
    // 创建 org.apache.paimon.jdbc.JdbcCatalog
    CREATE CATALOG my_jdbc WITH (
    'type' = 'paimon',
    'metastore' = 'jdbc',
    'uri' = 'jdbc:mysql://<host>:<port>/<databaseName>',
    'jdbc.user' = '...', 
    'jdbc.password' = '...', 
    'catalog-key'='jdbc',
    'warehouse' = 'hdfs:///path/to/warehouse'
    );
    
  • 可能大部分场景用的是 org.apache.paimon.hive.HiveCatalog 所以来看看这个的实现,刚好也可以解答 paimon 的 FlinkGenericCatalog 是怎么把表同步到 HMS 的,在 创建 FlinkGenericCatalog 时会创建 Paimon 的 org.apache.paimon.catalog.Catalog 会注入核心参数 'metastore'='hive', 使得 FlinkGenericCatalog 中的 Paimon Catalog 最终是 org.apache.paimon.hive.HiveCatalog
    options.set(CatalogOptions.METASTORE, "hive");
    org.apache.paimon.catalog.CatalogFactory.createCatalog(
                                CatalogContext.create(options, new FlinkFileIOLoader()), cl),
    

Paimon 的 org.apache.paimon.hive.HiveCatalog

  • HiveCatalog 维护了一个 hive client 会对表的变更进行同步
HiveCatalog extends AbstractCatalog implements Catalog{
      private final IMetaStoreClient client;  // hive client
      protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes) {
        final SchemaManager schemaManager = schemaManager(identifier);
        // first commit changes to underlying files
        TableSchema schema = schemaManager.commitChanges(changes);
        try {
            // sync to hive hms 表变更同步到 hive
            Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
            updateHmsTablePars(table, schema);
            updateHmsTable(table, identifier, schema);
            client.alter_table(
                    identifier.getDatabaseName(), identifier.getObjectName(), table, true);
        } catch (Exception te) {
            schemaManager.deleteSchema(schema.id());
            throw new RuntimeException(te);
        }
    }
}

到这里就说明了 paimon table 的创建前揍, 从 Flink 的 CatalogFactory 到 Flink 的 Catalog, 再到 Paimon 基于 Flink 的 Catalog, Paimon 基于 Flink 的 Catalog 是一个壳子实际是用的 Paimon 的 Catalog, Paimon 的 Catalog 又是通过 Paimon 的 CatalogFactory 创建而来。接下来看看主角 Paimon 的 Catalog 创建的 Paimon Table, Table 的创建是 org.apache.paimon.catalog.AbstractCatalog 实现的

org.apache.paimon.catalog.AbstractCatalog implements Catalog  {
  public Table getTable(Identifier identifier) throws TableNotExistException {
        if (isSystemDatabase(identifier.getDatabaseName())) { // 先忽略
        } else if (isSpecifiedSystemTable(identifier)) { //先忽略
        } else {
            return getDataTable(identifier);
        }
    }
}

private FileStoreTable getDataTable(Identifier identifier) throws TableNotExistException {
        TableSchema tableSchema = getDataTableSchema(identifier);
        return FileStoreTableFactory.create(
                fileIO,
                getDataTableLocation(identifier),
                tableSchema,
                new CatalogEnvironment(
                        Lock.factory(
                                lockFactory().orElse(null), lockContext().orElse(null), identifier),
                        metastoreClientFactory(identifier).orElse(null),
                        lineageMetaFactory));
    }

然后在 FileStoreTableFactory.create 方法中根据是否有主键会创建 AppendOnlyFileStoreTable 或者 PrimaryKeyFileStoreTable

    public static FileStoreTable create(
         FileIO fileIO, // 这又是个啥玩意?
         Path tablePath,
         TableSchema tableSchema,
         Options dynamicOptions,
         CatalogEnvironment catalogEnvironment) {
     FileStoreTable table =
             tableSchema.primaryKeys().isEmpty()
                     ? new AppendOnlyFileStoreTable(
                             fileIO, tablePath, tableSchema, catalogEnvironment)
                     : new PrimaryKeyFileStoreTable(
                             fileIO, tablePath, tableSchema, catalogEnvironment);
     return table.copy(dynamicOptions.toMap());
 }

到这里 Paimon Table 就已经创建完成了,Table 提供了表的读取写入和表操作的一些抽象,涉及面较多
简单看看 Table 和写入相关的一些 方法 混个眼熟,后面了解更多再补充

Paimon Table 之 FileStoreTable

org.apache.paimon.table.Table
  • 看到这个类图一开始是崩溃了没想到一个 Table 有这么多花样
  • 庆幸的是在 paimon sink 中只需要关注 FileStoreTable 的3个实现类 AbstractFileStoreTable、 AppendOnlyFileStoreTable 和 PrimaryKeyFileStoreTable, 其中 AbstractFileStoreTable 是 其他两个的父类


    org.apache.paimon.table.FileStoreTable
abstract class  AbstractFileStoreTable AbstractFileStoreTable implements FileStoreTable {
    protected final FileIO fileIO;
    @Override 
    public BucketMode bucketMode() { // 分桶模式很重要
        return store().bucketMode(); // store() 方法在子类实现
    }
  ... ...
}
  • 在创建 AbstractFileStoreTable 时需要传入一个 FileIO FileIO 是个啥先混个眼熟,看类图是和 数据存储层交互的一个接口对不同的存储有不同实现


    org.apache.paimon.fs.FileIO
  • store() 的实现

    • AppendOnlyFileStoreTable FileStore 为 AppendOnlyFileStore<InternalRow>
    • PrimaryKeyFileStoreTable FileStore 为 KeyValueFileStore<KeyValue>
    • FileStore 是数据读写的接口


      org.apache.paimon.FileStore
  • bucketMode 的实现

    • BucketMode 来自 FileStoreTable 的 FileStore,所以 PrimaryKeyFileStoreTable 的 BucketMode 是在 KeyValueFileStore 中定义的逻辑为:
      1. 先计算 crossPartitionUpdate
           public boolean crossPartitionUpdate() {
             if (primaryKeys.isEmpty() || partitionKeys.isEmpty()) {
                return false; //如果 primaryKeys 为空 或者  partitionKeys 返回  false
              }
             //如果 primaryKeys 包含所有的 partitionKeys 返回 false ; 如果 主键是 ABC, 分区字段是 A 则不支持分区变更,
             // 这个很好理解,因为如果分区变更了 那说明主键都变了变成新的记录了
             // 那如果主键是 ABC 分区字段是 AD 呢?A 变了如何支持分区变更??
             return !primaryKeys.containsAll(partitionKeys);
          }
        
        • crossPartitionUpdate = true 的场景就是要有主键要有分区并且分区键不全是主键
        • 如果主键是 ABC 分区字段是 AD 呢?A 变了如何支持分区变更??
      2. 如果 bucket =-1 默认为 -1 则看 crossPartitionUpdate 如果可以 crossPartitionUpdate 则为 GLOBAL_DYNAMIC 否则为 DYNAMIC
        如果 bucket !=-1 则为 FIXED 并且此时 crossPartitionUpdate 一点要为 false 会校验。 这意味着劲量不要去设置 bucket 数目?? 因为设置了 如果 crossPartitionUpdate 为 true 就会报错,为啥要这样设计??
           public BucketMode bucketMode() {
                if (options.bucket() == -1) { // 默认为 -1
                  return crossPartitionUpdate ? BucketMode.GLOBAL_DYNAMIC : BucketMode.DYNAMIC;
               } else {
                  checkArgument(!crossPartitionUpdate);
                 return BucketMode.FIXED;
             }
        }
        
  • 对于 AppendOnlyFileStoreTable 的 BucketMode 是在 AppendOnlyFileStore 中定义的逻辑为:options.bucket() == -1 ? BucketMode.UNAWARE : BucketMode.FIXED

  • 所以 AppendOnlyFileStoreTable 的 BucketMode 为 FIXED 或者 UNAWARE

  • 所以 PrimaryKeyFileStoreTable 的 BucketMode 为 DYNAMIC 或者 GLOBAL_DYNAMIC 或者 FIX

BucketMode 翻译自官网

  • Bucket 是读写的最小存储单元,每个Bucket目录中包含一棵LSM树

Fixed Bucket 指的是 BucketMode.FIXED

  • 配置一个大于0的bucket,采用Fixed Bucket模式,根据Math.abs(key_hashcode % numBuckets)记录计算bucket。重新缩放存储桶只能通过离线流程完成,请参阅重新缩放存储桶。桶数过多会导致小文件过多,桶数过少会导致写入性能较差
  • 根据 bucket 键将数据分发到相应的 Bucket 中(默认为主键),对于带有分桶键的查询可以进行桶的 data skipping

Dynamic Bucket 有两种

  • 配置'bucket' = '-1'。以前写入过的 key 会落入旧的 bucket,新的 key 会落入新的 bucket,bucket 和 key 的分布取决于数据到达的顺序。 Paimon 维护一个索引来确定哪个键对应哪个桶。
  • 分桶键的查询不支持桶的 data skipping
  • Paimon会自动扩大桶的数量。
    • Option1: 'dynamic-bucket.target-row-num':控制一个桶的目标行数。
    • Option2: 'dynamic-bucket.initial-buckets': 控制初始化bucket的数量。

动态Bucket仅支持单个写入作业。请不要启动多个作业来写入同一分区(这可能会导致重复数据)。即使您启用'write-only'并启动专用的压缩作业,它也不会起作用。
Dynamic bucket mode can not work with log system

Normal Dynamic Bucket Mode 指的是 BucketMode.DYNAMIC

  • 当您的更新不跨分区(没有分区,或者主键包含所有分区字段)时,BucketMode.DYNAMIC 使用 HASH 索引来维护从键到桶的映射,它比固定桶模式需要更多的内存。
    性能:
  1. 一般来说,没有性能损失,但会有一些额外的内存消耗,一个分区中的1 亿个 条目多占用1 GB内存,不再活动的分区不占用内存。
  2. 对于更新率较低的表,建议使用此模式,以显着提高性能。

Normal Dynamic Bucket Mode支持sort-compact以加快查询速度。请参阅紧凑排序

Cross Partitions Upsert Dynamic Bucket Mode 指的是 BucketMode.GLOBAL_DYNAMIC

  • 当需要跨分区upsert(主键不包含所有分区字段)时,Dynamic Bucket 模式直接维护键到分区和桶的映射,使用本地磁盘,并在启动流写作业时通过读取表中所有现有键来初始化索引。不同的合并引擎有不同的行为:
    • Deduplicate:删除旧分区中的数据,并将新数据插入到新分区中。保证数据的唯一性
    • PartialUpdate & Aggregation:将新数据插入旧分区。
    • FirstRow:如果有旧值,则忽略新数据。
      性能:
    1. 对于数据量较大的表,性能会有明显的损失。而且,初始化需要很长时间。
    2. 如果你的upsert不依赖太旧的数据,可以考虑配置索引TTL来减少索引和初始化时间:
      2.1 'cross-partition-upsert.index-ttl':rocksdb索引和初始化中的TTL,这样可以避免维护太多索引而导致性能越来越差。但请注意,这也可能会导致数据重复。

FINAL

  • 讲述了 Paimon Table 的创建过程。


    Paimon Table 的创建
  • Paimon Table 功能待补充。
  • 介绍了 BucketMode。 这个很重要不同的数据场景选择合适的 mode, 不同的 mode 数据入 paimon 的 拓扑也不一样
  • FileStore 功能待补充。
  • Paimon sink 拓扑 是在 DynamicTableSink 中定义的
  • 下篇会介绍 Paimon 的 DynamicTableSink
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容