Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
前言
Flink SQL可以将多种数据源或数据落地端映射为table,使用起来非常方便。本篇以Flink自带的datagen类型表数据源和print类型表落地端为例,为大家分析TableSource
和TableSink
的代码实现逻辑。
TableSource
DynamicTableSourceFactory
Flink使用SPI机制加载Factory
(DynamicTableSourceFactory
和DynamicTableSinkFactory
同属Factory
)。在flink-table-api-java-bridge
项目的resources/META-INF/services
目录我们可以找到org.apache.flink.table.factories.Factory
文件,内容为:
org.apache.flink.table.factories.DataGenTableSourceFactory
org.apache.flink.table.factories.BlackHoleTableSinkFactory
org.apache.flink.table.factories.PrintTableSinkFactory
Flink启动的时候会根据这个文件,去加载这3个实现类。可以看出我们接下来要分析的datagen和print就在其中。
接下来我们看下TableSource
需要实现的接口DynamicTableSourceFactory
,它有几个重要的方法:
指定TableSource
配置属性的方法:
- requiredOptions方法:返回table必须的属性配置
- optionalOptions方法:返回table可选的属性配置
factoryIdentifier方法:指定该tableSource的类型是什么
只看方法名称大家可能没体会到它们的作用。这里结合create table SQL语句来说明。
CREATE TABLE datagen (
...
) WITH (
'connector' = 'datagen',
-- optional options --
'rows-per-second'='5',
...
)
可以看出我们使用SQL创建表,需要指定连接器connector,connector是和我们自定义的TableSource
或TableSink
的factoryIdentifier
方法的返回值关联的。
下面的'rows-per-second'='5'
为table的一个option(选项或属性),用于为TableSource
或TableSink
传递参数。其中必须指定的参数通过requiredOptions
方法指定,可选的参数通过optionalOptions
方法指定。
table的option为ConfigOption<T>
类型,使用如下方式构造(使用专门的builder):
public static final ConfigOption<Long> NUMBER_OF_ROWS =
key("number-of-rows")
.longType()
.defaultValue(UNLIMITED_ROWS)
.withDescription("Total number of rows to emit. By default, the source is unbounded.");
最后还有一个createDynamicTableSource
方法,负责返回创建的自定义TableSource
。
接下来我们以DataGenTableSourceFactory
为例,分析DynamicTableSourceFactory
的使用。
官网创建datagen类型table的方式如下:
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
-- optional options --
'rows-per-second'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='1000',
'fields.f_random.min'='1',
'fields.f_random.max'='1000',
'fields.f_random_str.length'='10'
)
连接器参数如下表:
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 必须 | (none) | String | 指定要使用的连接器,这里是 'datagen'。 |
rows-per-second | 可选 | 10000 | Long | 每秒生成的行数,用以控制数据发出速率。 |
fields.#.kind | 可选 | random | String | 指定 '#' 字段的生成器。可以是 'sequence' 或 'random'。 |
fields.#.min | 可选 | (Minimum value of type) | (Type of field) | 随机生成器的最小值,适用于数字类型。 |
fields.#.max | 可选 | (Maximum value of type) | (Type of field) | 随机生成器的最大值,适用于数字类型。 |
fields.#.length | 可选 | 100 | Integer | 随机生成器生成字符的长度,适用于 char、varchar、string。 |
fields.#.start | 可选 | (none) | (Type of field) | 序列生成器的起始值。 |
fields.#.end | 可选 | (none) | (Type of field) | 序列生成器的结束值。 |
具体使用方式参见Flink官网:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/datagen/
接下来是代码分析过程。DataGenTableSourceFactory
源代码如下:
// 定义connector类型为datagen
public static final String IDENTIFIER = "datagen";
public static final Long ROWS_PER_SECOND_DEFAULT_VALUE = 10000L;
// 定义属性,含义为每秒生成多少行数据
public static final ConfigOption<Long> ROWS_PER_SECOND =
key("rows-per-second")
.longType()
.defaultValue(ROWS_PER_SECOND_DEFAULT_VALUE)
.withDescription("Rows per second to control the emit rate.");
// 总共生成多少行数据,默认为无限生成
public static final ConfigOption<Long> NUMBER_OF_ROWS =
key("number-of-rows")
.longType()
.noDefaultValue()
.withDescription(
"Total number of rows to emit. By default, the source is unbounded.");
public static final String FIELDS = "fields";
public static final String KIND = "kind";
public static final String START = "start";
public static final String END = "end";
public static final String MIN = "min";
public static final String MAX = "max";
public static final String LENGTH = "length";
public static final String SEQUENCE = "sequence";
public static final String RANDOM = "random";
@Override
public String factoryIdentifier() {
// 返回TableSource对应的connector类型
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
// 必选属性,datagen没有必选属性
return new HashSet<>();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
// 可选属性
Set<ConfigOption<?>> options = new HashSet<>();
options.add(ROWS_PER_SECOND);
options.add(NUMBER_OF_ROWS);
return options;
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// 创建一个空白的Configuration
Configuration options = new Configuration();
// 获取table的所有配置选项填入Configuration
context.getCatalogTable().getOptions().forEach(options::setString);
// 获取表的物理字段,计算字段和元数据字段会被过滤掉
TableSchema schema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
// 创建DataGenerator数组,每一字段对应一个DataGenerator
DataGenerator<?>[] fieldGenerators = new DataGenerator[schema.getFieldCount()];
// 所有的可选选项
Set<ConfigOption<?>> optionalOptions = new HashSet<>();
// 遍历所有字段
for (int i = 0; i < fieldGenerators.length; i++) {
// 获取字段名称
String name = schema.getFieldNames()[i];
// 获取字段的数据类型
DataType type = schema.getFieldDataTypes()[i];
// 创建kind配置选项,默认值RANDOM
ConfigOption<String> kind =
key(FIELDS + "." + name + "." + KIND).stringType().defaultValue(RANDOM);
// 创建各个字段对应的生成器,RANDOM或者Sequence
DataGeneratorContainer container =
createContainer(name, type, options.get(kind), options);
fieldGenerators[i] = container.getGenerator();
// 添加属性到可选选项中
optionalOptions.add(kind);
optionalOptions.addAll(container.getOptions());
}
// 校验所有的配置值是否合法
FactoryUtil.validateFactoryOptions(requiredOptions(), optionalOptions, options);
// 创建consumedOptionKeys,获取所有已填写的选项
Set<String> consumedOptionKeys = new HashSet<>();
consumedOptionKeys.add(CONNECTOR.key());
consumedOptionKeys.add(ROWS_PER_SECOND.key());
consumedOptionKeys.add(NUMBER_OF_ROWS.key());
optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
// 校验是否有未使用的配置项
FactoryUtil.validateUnconsumedKeys(
factoryIdentifier(), options.keySet(), consumedOptionKeys);
// 获取table名称
String name = context.getObjectIdentifier().toString();
// 创建一个DataGenTableSource
return new DataGenTableSource(
fieldGenerators,
name,
schema,
options.get(ROWS_PER_SECOND),
options.get(NUMBER_OF_ROWS));
}
// 根据字段名,字段类型等创建出字段内容生成器
private DataGeneratorContainer createContainer(
String name, DataType type, String kind, ReadableConfig options) {
switch (kind) {
case RANDOM:
return type.getLogicalType().accept(new RandomGeneratorVisitor(name, options));
case SEQUENCE:
return type.getLogicalType().accept(new SequenceGeneratorVisitor(name, options));
default:
throw new ValidationException("Unsupported generator kind: " + kind);
}
}
DynamicTableSource
DynamicTableSource
负责从外部系统创建出一个动态表。该接口包含有如下两个子接口:
- ScanTableSource
- LookupTableSource
这两个接口的方法基本相同,主要区别为使用时需要全量扫描数据源还是根据key查询数据源。他们具有如下主要方法:
- copy方法:在生成执行计划阶段需要创建
DynamicTableSink
的一个部分,该方法包含创建副本的逻辑,要求为深拷贝。 - asSummaryString方法:用于在日志或者是控制台中,打印该TableSink的文字描述。
- getChangelogMode:返回Sink支持的变更模式,planner可提供建议但是最终决定在于Sink,如果planner和sink支持的模式冲突,则抛出异常。支持的模式有INSERT_ONLY,UPSERT和ALL。
还有一个方法是创建DynamicTableSource
。这个方法在上面两个接口中的名字不同。其中:
-
ScanTableSource
接口对应getScanRuntimeProvider
方法。系统运行时需要扫描外部系统中所有的数据行。 -
LookupTableSource
接口对应getLookupRuntimeProvider
方法。系统运行时需要根据一个或多个key查找外部系统中的数据行。
上面例子中的datagen使用的是ScanTableSource
类型。
接着datagen这个例子。DataGenTableSource
代码和分析如下:
@Internal
public class DataGenTableSource implements ScanTableSource {
private final DataGenerator<?>[] fieldGenerators;
private final String tableName;
private final TableSchema schema;
private final long rowsPerSecond;
private final Long numberOfRows;
public DataGenTableSource(
DataGenerator<?>[] fieldGenerators,
String tableName,
TableSchema schema,
long rowsPerSecond,
Long numberOfRows) {
this.fieldGenerators = fieldGenerators;
this.tableName = tableName;
this.schema = schema;
this.rowsPerSecond = rowsPerSecond;
this.numberOfRows = numberOfRows;
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
// 返回tableSource
boolean isBounded = numberOfRows != null;
return SourceFunctionProvider.of(createSource(), isBounded);
}
@VisibleForTesting
public DataGeneratorSource<RowData> createSource() {
// 构建tableSource的逻辑在此
// 创建一个RowData生成器,包含每个字段的名称和数据类型,以及配置参数
return new DataGeneratorSource<>(
new RowDataGenerator(fieldGenerators, schema.getFieldNames()),
rowsPerSecond,
numberOfRows);
}
@Override
public DynamicTableSource copy() {
return new DataGenTableSource(
fieldGenerators, tableName, schema, rowsPerSecond, numberOfRows);
}
@Override
public String asSummaryString() {
return "DataGenTableSource";
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
}
最后我们分析DataGeneratorSource
的逻辑,它其实是一个RichParallelSourceFunction
。和RichSourceFunction
不同的是,RichParallelSourceFunction
会同时运行多个实例,数量和配置的并行度一致。
下面分析它的open
方法和run
方法:
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 如果配置了行数限制
if (numberOfRows != null) {
// 获取任务并行度
final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
// 获取任务ID
final int taskIdx = getRuntimeContext().getIndexOfThisSubtask();
// 计算每个DataGenTableSource实例生成的数据行数
final int baseSize = (int) (numberOfRows / stepSize);
toOutput = (numberOfRows % stepSize > taskIdx) ? baseSize + 1 : baseSize;
}
}
@Override
public void run(SourceContext<T> ctx) throws Exception {
// 计算每秒钟需要产生的数据行数
double taskRowsPerSecond =
(double) rowsPerSecond / getRuntimeContext().getNumberOfParallelSubtasks();
long nextReadTime = System.currentTimeMillis();
// 死循环
while (isRunning) {
// 每次批量生成taskRowsPerSecond条数据
for (int i = 0; i < taskRowsPerSecond; i++) {
if (isRunning
&& generator.hasNext()
&& (numberOfRows == null || outputSoFar < toOutput)) {
synchronized (ctx.getCheckpointLock()) {
outputSoFar++;
// 调用生成器生成数据
ctx.collect(this.generator.next());
}
} else {
return;
}
}
// 下批数据生成1秒后进行
nextReadTime += 1000;
// 由于生成数据存在耗时,这里计算生成完数据后,还需要等多久够1秒钟
long toWaitMs = nextReadTime - System.currentTimeMillis();
// 线程睡眠toWaitMs毫秒
while (toWaitMs > 0) {
Thread.sleep(toWaitMs);
toWaitMs = nextReadTime - System.currentTimeMillis();
}
}
}
到这里TableSource
就介绍完了。
TableSink
DynamicTableSinkFactory
DynamicTableSinkFactory
的主要方法和DynamicTableSourceFactory
几乎完全一致。我们直接从实例分析。
以print为例,它的使用非常简单,SQL如下所示:
CREATE TABLE print_table (
f0 INT,
f1 INT,
f2 STRING,
f3 DOUBLE
) WITH (
'connector' = 'print'
)
配置参数如下:
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 必选 | (none) | String | 指定要使用的连接器,此处应为 'print' |
print-identifier | 可选 | (none) | String | 配置一个标识符作为输出数据的前缀。 |
standard-error | 可选 | false | Boolean | 如果 format 需要打印为标准错误而不是标准输出,则为 True 。 |
sink.parallelism | 可选 | (none) | Integer | 为 Print sink operator 定义并行度。默认情况下,并行度由框架决定,和链在一起的上游 operator 一致。 |
print类型table sink的作用为把insert到这个table的数据直接print到控制台。详细内容参见:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/print/
下面开始源代码分析。PrintTableSinkFactory
的代码如下所示:
public static final String IDENTIFIER = "print";
// 创建配置选项
public static final ConfigOption<String> PRINT_IDENTIFIER =
key("print-identifier")
.stringType()
.noDefaultValue()
.withDescription(
"Message that identify print and is prefixed to the output of the value.");
public static final ConfigOption<Boolean> STANDARD_ERROR =
key("standard-error")
.booleanType()
.defaultValue(false)
.withDescription(
"True, if the format should print to standard error instead of standard out.");
@Override
public String factoryIdentifier() {
// 返回print,当设置connector为print时,使用这个DynamicTableSourceFactory
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
// 必须的属性,返回空集合表示没有必须的属性
return new HashSet<>();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
// 这个方法返回可选的属性
Set<ConfigOption<?>> options = new HashSet<>();
options.add(PRINT_IDENTIFIER);
options.add(STANDARD_ERROR);
options.add(FactoryUtil.SINK_PARALLELISM);
return options;
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
// 该方法返回DynamicTableSink实例
// 主要逻辑为校验属性值,然后读取这些属性,构建出PrintSink
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
ReadableConfig options = helper.getOptions();
return new PrintSink(
context.getCatalogTable().getSchema().toPhysicalRowDataType(),
options.get(PRINT_IDENTIFIER),
options.get(STANDARD_ERROR),
options.getOptional(FactoryUtil.SINK_PARALLELISM).orElse(null));
}
DynamicTableSink
它具有的方法和DynamicTableSource
基本一致,只有一个方法不同:getSinkRuntimeProvider
方法。这个方法是sink的关键,返回一个SinkRuntimeProvider
。这个类包含如何将表中数据落地的逻辑。
和上面例子相同,我们贴出PrintSink
的源代码展开分析。
private static class PrintSink implements DynamicTableSink {
// 这里保存了table中各个column的数据类型
private final DataType type;
private final String printIdentifier;
private final boolean stdErr;
private final @Nullable Integer parallelism;
private PrintSink(
DataType type, String printIdentifier, boolean stdErr, Integer parallelism) {
this.type = type;
this.printIdentifier = printIdentifier;
this.stdErr = stdErr;
this.parallelism = parallelism;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return requestedMode;
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
// 类型转换器,负责转换Flink内部数据类型到POJO
DataStructureConverter converter = context.createDataStructureConverter(type);
return SinkFunctionProvider.of(
new RowDataPrintFunction(converter, printIdentifier, stdErr), parallelism);
}
@Override
public DynamicTableSink copy() {
return new PrintSink(type, printIdentifier, stdErr, parallelism);
}
@Override
public String asSummaryString() {
return "Print to " + (stdErr ? "System.err" : "System.out");
}
}
最后我们看下RowDataPrintFunction
,分析它是如何将数据打印到控制台的。它实际上是一个RichSinkFunction
。
private static class RowDataPrintFunction extends RichSinkFunction<RowData> {
private static final long serialVersionUID = 1L;
private final DataStructureConverter converter;
private final PrintSinkOutputWriter<String> writer;
private RowDataPrintFunction(
DataStructureConverter converter, String printIdentifier, boolean stdErr) {
this.converter = converter;
// 创建出PrintSinkOutputWriter,输出到标准输出或者是标准错误
this.writer = new PrintSinkOutputWriter<>(printIdentifier, stdErr);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
}
@Override
public void invoke(RowData value, Context context) {
// 将table一行数据转换为Object(POJO)类型
Object data = converter.toExternal(value);
assert data != null;
// 输出数据到PrintSinkOutputWriter
writer.write(data.toString());
}
}
到这里print类型connector源代码已分析完毕。
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。