一、概述
基于Iceberg-Flink- V1相关api来完成的数据写入
二、实现
关键源码---FlinkSink.chainIcebergOperators
private <T> DataStreamSink<T> chainIcebergOperators() {
// 省略部分代码
// 加载iceberg table
if (table == null) {
tableLoader.open(); // open TableLoader
try (TableLoader loader = tableLoader) {
this.table = loader.loadTable();
} catch (IOException e) {
throw new UncheckedIOException("Failed to load iceberg table from table loader: " + tableLoader, e);
}
}
// 需将icebeg table schema转为Flink Row类型
// Convert the requested flink table schema to flink row type.
RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
// 根据指定partition field并且对应的分发模式是hash,通过keyBy(equality fields)来将输入进行分发
// 若是分发模式是None不需要对输入流做任何处理直接返回
// 若是分发模式是Range 当前版本还不支持
// Distribute the records from input data stream based on the write.distribution-mode.
DataStream<RowData> distributeStream = distributeDataStream(
rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
// 此处通过并行writer来完成datafile写入
// 不过需要注意的写入是否开启upsert,一旦开启则不能对datafile进行overwrite只能使用append
// Add parallel writers that append rows to files
SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType);
// 在checkpoint完成时或输入流结束,则通过单并行度完成data file的提交
// Add single-parallelism committer that commits files
// after successful checkpoint or end of input
SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
// 若是本次checkpoint成功,但commit文件失败,通过指定虚拟discard接收器在下次checkpoint成功在进行提交
// Add dummy discard sink
return appendDummySink(committerStream);
}
操作代码拆解
- distributeDataStream:输入流内容分组
private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
Map<String, String> properties,
PartitionSpec partitionSpec,
Schema iSchema,
RowType flinkRowType) {
// 首先获取指定的write.distribution-mode
DistributionMode writeMode;
if (distributionMode == null) { // 默认使用NONE
// Fallback to use distribution mode parsed from table properties if don't specify in job level.
String modeName = PropertyUtil.propertyAsString(properties,
WRITE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_DEFAULT);
writeMode = DistributionMode.fromName(modeName);
} else { // HASH和RANGE
writeMode = distributionMode;
}
switch (writeMode) {
case NONE: // 不需要做任何处理 直接返回
return input;
case HASH: // 目前只要时按照partition field来进行分组的
if (partitionSpec.isUnpartitioned()) { // 未分区
return input;
} else { // 若是分区,则通过partition field 进行keyBy
return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
}
case RANGE: // 目前不支持,直接返回
LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now",
WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
return input;
default:
throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
}
}
}
- appendWriter: 并行写入datafile
private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {
// 1、获取指定EquailityFields
// Find out the equality field id list based on the user-provided equality field column names.
List<Integer> equalityFieldIds = Lists.newArrayList();
if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
for (String column : equalityFieldColumns) {
org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
column, table.schema());
equalityFieldIds.add(field.fieldId());
}
}
// 2、判断当前操作是否时upsert(可以通过job级别和table级别进行设置)
// Fallback to use upsert mode parsed from table properties if don't specify in job level.
boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
UPSERT_MODE_ENABLE, UPSERT_MODE_ENABLE_DEFAULT);
// Validate the equality fields and partition fields if we enable the upsert mode.
// 若是当前操作upsert 则通过需要进行EquailityFields检查(若是写入时分区表,则对应的partition field也需要存在其中),
// 并且当前针对datafile操作是不能进行overwrite
if (upsertMode) {
// upsert操作下 不能进行overwrite
Preconditions.checkState(!overwrite,
"OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
// upsert操作下 对应的EquailityFields是不允许为空
Preconditions.checkState(!equalityFieldIds.isEmpty(),
"Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
// 是否分区表
if (!table.spec().isUnpartitioned()) {
for (PartitionField partitionField : table.spec().fields()) { // 分区表对应的分区字段必须存在EquailityFields集合中
Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
"In UPSERT mode, partition field '%s' should be included in equality fields: '%s'",
partitionField, equalityFieldColumns);
}
}
}
// 3、构建StreamWriter
IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds, upsertMode);
// 3.1 将输入流写入到datafile是可以并发进行的
int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
SingleOutputStreamOperator<WriteResult> writerStream = input
.transform(operatorName(ICEBERG_STREAM_WRITER_NAME), TypeInformation.of(WriteResult.class), streamWriter)
.setParallelism(parallelism);
if (uidPrefix != null) {
writerStream = writerStream.uid(uidPrefix + "-writer");
}
return writerStream;
}
// 补充代码: 创建StreamWriter
static IcebergStreamWriter<RowData> createStreamWriter(Table table,
RowType flinkRowType,
List<Integer> equalityFieldIds,
boolean upsert) {
// 1、获取指定的table 配置信息
Preconditions.checkArgument(table != null, "Iceberg table should't be null");
Map<String, String> props = table.properties();
// 1.1 目标文件大小
long targetFileSize = getTargetFileSizeBytes(props);
// 1.2 file format(orc/parquet/avro)
FileFormat fileFormat = getFileFormat(props);
// 2、构建一个read-only且可序列化的表
Table serializableTable = SerializableTable.copyOf(table);
// 3、构建TaskWriter
TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
serializableTable, flinkRowType, targetFileSize,
fileFormat, equalityFieldIds, upsert);
// 4、创建StreamWriter
return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
}
- appendCommitter: 当checkpoint成功或输入流结束,通过文件提交器完成对应的文件commit
private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
// 1、构建FilesCommitter
IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
// 2、单并行度文件提交器
SingleOutputStreamOperator<Void> committerStream = writerStream
.transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
.setParallelism(1)
.setMaxParallelism(1);
if (uidPrefix != null) {
committerStream = committerStream.uid(uidPrefix + "-committer");
}
return committerStream;
}
三、实例:基于HadoopCatalog进行iceberg数据写入
// 省略部分代码
// flink datastream api并开启checkpoint
final Configuration config = new Configuration();
Map<String, String> properties =
new ImmutableMap.Builder<String, String>()
.put(CatalogProperties.WAREHOUSE_LOCATION, "file:///Users/XXX/tests/iceberg_namespace")
.put(TableProperties.DEFAULT_FILE_FORMAT, "parquet")
.build();
final Catalog catalog = CatalogUtil.loadCatalog("org.apache.iceberg.hadoop.HadoopCatalog", "hadoop", properties, config);
// final HadoopCatalog catalog = new HadoopCatalog();
// catalog.setConf(config);
// catalog.initialize("hadoop", properties);
// schema
final Schema schema = new Schema(
required(1, "data", Types.StringType.get()),
required(2, "nested", Types.StructType.of(
Types.NestedField.required(3, "f1", Types.StringType.get()),
Types.NestedField.required(4, "f2", Types.StringType.get()),
Types.NestedField.required(5, "f3", Types.LongType.get()))),
required(6, "id", Types.LongType.get()));
// TableIdentifier
final TableIdentifier tableIdentifier = TableIdentifier.of("iceberg_db", "iceberg_table");
Table table = null;
// table
if (catalog.tableExists(tableIdentifier)) {
table = catalog.loadTable(tableIdentifier);
} else {
table = catalog.createTable(tableIdentifier, schema);
}
// TableLoader
final TableLoader tableLoader = TableLoader.fromCatalog(CatalogLoader.hadoop("hadoop", config, properties), tableIdentifier);
// 省略部分代码 datastream转为DataStream<RowData>
// FlinkSink
FlinkSink.forRowData(DataStream<RowData>)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.append();
// ========= 省略部分代码 =========== //