apache iceberg v1写记录

一、概述
基于Iceberg-Flink- V1相关api来完成的数据写入


写入过程

二、实现

关键源码---FlinkSink.chainIcebergOperators

private <T> DataStreamSink<T> chainIcebergOperators() {
  // 省略部分代码
  // 加载iceberg table
  if (table == null) {
    tableLoader.open();  // open TableLoader
    try (TableLoader loader = tableLoader) {
      this.table = loader.loadTable();
    } catch (IOException e) {
      throw new UncheckedIOException("Failed to load iceberg table from table loader: " + tableLoader, e);
    }
  }

  // 需将icebeg table schema转为Flink Row类型
  // Convert the requested flink table schema to flink row type.
  RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);

  // 根据指定partition field并且对应的分发模式是hash,通过keyBy(equality fields)来将输入进行分发
  // 若是分发模式是None不需要对输入流做任何处理直接返回
  // 若是分发模式是Range 当前版本还不支持
  // Distribute the records from input data stream based on the write.distribution-mode.
  DataStream<RowData> distributeStream = distributeDataStream(
      rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);

  // 此处通过并行writer来完成datafile写入
  // 不过需要注意的写入是否开启upsert,一旦开启则不能对datafile进行overwrite只能使用append
  // Add parallel writers that append rows to files
  SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType);

  // 在checkpoint完成时或输入流结束,则通过单并行度完成data file的提交
  // Add single-parallelism committer that commits files
  // after successful checkpoint or end of input
  SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
  // 若是本次checkpoint成功,但commit文件失败,通过指定虚拟discard接收器在下次checkpoint成功在进行提交
  // Add dummy discard sink
  return appendDummySink(committerStream);
}

操作代码拆解

- distributeDataStream:输入流内容分组

private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
                                                   Map<String, String> properties,
                                                   PartitionSpec partitionSpec,
                                                   Schema iSchema,
                                                   RowType flinkRowType) {
    // 首先获取指定的write.distribution-mode                                                   
    DistributionMode writeMode;
    if (distributionMode == null) { // 默认使用NONE
      // Fallback to use distribution mode parsed from table properties if don't specify in job level.
      String modeName = PropertyUtil.propertyAsString(properties,
          WRITE_DISTRIBUTION_MODE,
          WRITE_DISTRIBUTION_MODE_DEFAULT);

      writeMode = DistributionMode.fromName(modeName);
    } else {   // HASH和RANGE
      writeMode = distributionMode;
    }

    switch (writeMode) {
      case NONE:  // 不需要做任何处理 直接返回
        return input;

      case HASH:  // 目前只要时按照partition field来进行分组的
        if (partitionSpec.isUnpartitioned()) { // 未分区
          return input;
        } else { // 若是分区,则通过partition field 进行keyBy
          return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
        }

      case RANGE: // 目前不支持,直接返回
        LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now",
            WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
        return input;

      default:
        throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
    }
  }
}

- appendWriter: 并行写入datafile

private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {
   // 1、获取指定EquailityFields 
  // Find out the equality field id list based on the user-provided equality field column names.
  List<Integer> equalityFieldIds = Lists.newArrayList();
  if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
    for (String column : equalityFieldColumns) {
      org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
      Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
          column, table.schema());
      equalityFieldIds.add(field.fieldId());
    }
  }
  // 2、判断当前操作是否时upsert(可以通过job级别和table级别进行设置)
  // Fallback to use upsert mode parsed from table properties if don't specify in job level.
  boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
      UPSERT_MODE_ENABLE, UPSERT_MODE_ENABLE_DEFAULT);

  // Validate the equality fields and partition fields if we enable the upsert mode.
  // 若是当前操作upsert 则通过需要进行EquailityFields检查(若是写入时分区表,则对应的partition field也需要存在其中),
  // 并且当前针对datafile操作是不能进行overwrite
  if (upsertMode) {
    // upsert操作下 不能进行overwrite  
    Preconditions.checkState(!overwrite,
        "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
    // upsert操作下 对应的EquailityFields是不允许为空     
    Preconditions.checkState(!equalityFieldIds.isEmpty(),
        "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
    // 是否分区表    
    if (!table.spec().isUnpartitioned()) {
      for (PartitionField partitionField : table.spec().fields()) { // 分区表对应的分区字段必须存在EquailityFields集合中
        Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
            "In UPSERT mode, partition field '%s' should be included in equality fields: '%s'",
            partitionField, equalityFieldColumns);
      }
    }
  }

  // 3、构建StreamWriter
  IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds, upsertMode);
  // 3.1 将输入流写入到datafile是可以并发进行的
  int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
  SingleOutputStreamOperator<WriteResult> writerStream = input
      .transform(operatorName(ICEBERG_STREAM_WRITER_NAME), TypeInformation.of(WriteResult.class), streamWriter)
      .setParallelism(parallelism);
  if (uidPrefix != null) {
    writerStream = writerStream.uid(uidPrefix + "-writer");
  }
  return writerStream;
}

// 补充代码: 创建StreamWriter
static IcebergStreamWriter<RowData> createStreamWriter(Table table,
                                                       RowType flinkRowType,
                                                       List<Integer> equalityFieldIds,
                                                       boolean upsert) {
  // 1、获取指定的table 配置信息                                                         
  Preconditions.checkArgument(table != null, "Iceberg table should't be null");
  Map<String, String> props = table.properties();
  // 1.1 目标文件大小
  long targetFileSize = getTargetFileSizeBytes(props);
      // 1.2 file format(orc/parquet/avro)
  FileFormat fileFormat = getFileFormat(props);
  // 2、构建一个read-only且可序列化的表
  Table serializableTable = SerializableTable.copyOf(table);
  // 3、构建TaskWriter
  TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
      serializableTable, flinkRowType, targetFileSize,
      fileFormat, equalityFieldIds, upsert);
  // 4、创建StreamWriter
  return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
}

- appendCommitter: 当checkpoint成功或输入流结束,通过文件提交器完成对应的文件commit

private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
  // 1、构建FilesCommitter  
  IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
  // 2、单并行度文件提交器
  SingleOutputStreamOperator<Void> committerStream = writerStream
      .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
      .setParallelism(1)
      .setMaxParallelism(1);
  if (uidPrefix != null) {
    committerStream = committerStream.uid(uidPrefix + "-committer");
  }
  return committerStream;
}

三、实例:基于HadoopCatalog进行iceberg数据写入

// 省略部分代码 
// flink datastream api并开启checkpoint
final Configuration config = new Configuration();
Map<String, String> properties =
        new ImmutableMap.Builder<String, String>()
          .put(CatalogProperties.WAREHOUSE_LOCATION, "file:///Users/XXX/tests/iceberg_namespace")
          .put(TableProperties.DEFAULT_FILE_FORMAT, "parquet")
          .build();

final Catalog catalog = CatalogUtil.loadCatalog("org.apache.iceberg.hadoop.HadoopCatalog", "hadoop", properties, config);
// final HadoopCatalog catalog = new HadoopCatalog();
// catalog.setConf(config);
// catalog.initialize("hadoop", properties);
//  schema
final Schema schema = new Schema(
        required(1, "data", Types.StringType.get()),
        required(2, "nested", Types.StructType.of(
                Types.NestedField.required(3, "f1", Types.StringType.get()),
                Types.NestedField.required(4, "f2", Types.StringType.get()),
                Types.NestedField.required(5, "f3", Types.LongType.get()))),
        required(6, "id", Types.LongType.get()));
// TableIdentifier
final TableIdentifier tableIdentifier = TableIdentifier.of("iceberg_db", "iceberg_table");
Table table = null;
// table
if (catalog.tableExists(tableIdentifier)) {
  table = catalog.loadTable(tableIdentifier);
} else {
  table = catalog.createTable(tableIdentifier, schema);
}
// TableLoader
final TableLoader tableLoader = TableLoader.fromCatalog(CatalogLoader.hadoop("hadoop", config, properties), tableIdentifier);
// 省略部分代码 datastream转为DataStream<RowData>
// FlinkSink
FlinkSink.forRowData(DataStream<RowData>)
        .table(table)
        .tableLoader(tableLoader)
        .writeParallelism(parallelism)
        .append();
// =========  省略部分代码  =========== // 
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,951评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,606评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,601评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,478评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,565评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,587评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,590评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,337评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,785评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,096评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,273评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,935评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,578评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,199评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,440评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,163评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,133评论 2 352