背景
前面我们了解了 写给大忙人看的Flink 消费 Kafka,今天我们一起来看一下 FlinkSQL Kafka 是如何与 Flink Streaming Kafka 结合起来的
正文
创建 kafka source
CREATE TABLE orders
(
status int,
courier_id bigint,
id bigint,
finish_time BIGINT,
place_time BIGINT,
PRIMARY KEY (id) NOT ENFORCED
)
WITH (
'connector' = 'kafka','topic' = 'test',
'properties.bootstrap.servers' = 'xxx','properties.group.id' = 'testGroup',
'format' = 'ss-canal-json','ss-canal-json.table.include' = 'orders','scan.startup.mode' = 'latest-offset');
经过 Apache Calcite 的一系列转化( 具体转化的过程后续会写 ),最终达到 CatalogSourceTable 类,此类继承自 FlinkPreparingTableBase,负责将 Calcite 的 RelOptTable 转化为 Flink 的 TableSourceTable
@Override
//入口方法 SqlToRelConverter toRel 方法
public RelNode toRel(ToRelContext toRelContext) {
final RelOptCluster cluster = toRelContext.getCluster();
final List<RelHint> hints = toRelContext.getTableHints();// sql Hint
final FlinkContext context = ShortcutUtils.unwrapContext(cluster);
final FlinkTypeFactory typeFactory = ShortcutUtils.unwrapTypeFactory(cluster);
final FlinkRelBuilder relBuilder = FlinkRelBuilder.of(cluster, relOptSchema);
// 0. finalize catalog table
final Map<String, String> hintedOptions = FlinkHints.getHintedOptions(hints);
final CatalogTable catalogTable = createFinalCatalogTable(context, hintedOptions);
// 1. create and prepare table source
final DynamicTableSource tableSource = createDynamicTableSource(context, catalogTable);
prepareDynamicSource(
schemaTable.getTableIdentifier(),
catalogTable,
tableSource,
schemaTable.isStreamingMode(),
context.getTableConfig());
// 2. push table scan
pushTableScan(relBuilder, cluster, catalogTable, tableSource, typeFactory, hints);
// 3. push project for non-physical columns
final TableSchema schema = catalogTable.getSchema();
if (!TableSchemaUtils.containsPhysicalColumnsOnly(schema)) {
pushMetadataProjection(relBuilder, typeFactory, schema);
pushGeneratedProjection(context, relBuilder, schema);
}
// 4. push watermark assigner
if (schemaTable.isStreamingMode() && !schema.getWatermarkSpecs().isEmpty()) {
pushWatermarkAssigner(context, relBuilder, schema);
}
return relBuilder.build();
}
0-4 转化完成。这篇 blog 主要关心部分是 1 ,我们继续追踪到 FactoryUtil.createTableSource 方法
public static DynamicTableSource createTableSource(
@Nullable Catalog catalog, //GenericlnMemoryCatalog
ObjectIdentifier objectIdentifier,//`default_catalog`.`default_database`.`orders`
CatalogTable catalogTable,//CatalogTableImpl
ReadableConfig configuration,
ClassLoader classLoader,
boolean isTemporary) {
final DefaultDynamicTableContext context =
new DefaultDynamicTableContext(
objectIdentifier, catalogTable, configuration, classLoader, isTemporary);
try {
final DynamicTableSourceFactory factory = // 找到 KafkaDynamicTableFactory
getDynamicTableFactory(DynamicTableSourceFactory.class, catalog, context);
return factory.createDynamicTableSource(context);
} catch (Throwable t) {
throw new ValidationException(
String.format(
"Unable to create a source for reading table '%s'.\n\n"
+ "Table options are:\n\n"
+ "%s",
objectIdentifier.asSummaryString(),
catalogTable.getOptions().entrySet().stream()
.map(e -> stringifyOption(e.getKey(), e.getValue()))
.sorted()
.collect(Collectors.joining("\n"))),
t);
}
}
我们到 KafkaDynamicTableFactory 的 createDynamicTableSource 方法
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig tableOptions = helper.getOptions();//with 里的配置信息
// 通过 format (SPI)
final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
getKeyDecodingFormat(helper);
final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =//SSCanalJsonFormatFactory
getValueDecodingFormat(helper);
// 一些类的校验 validate
helper.validateExcept(PROPERTIES_PREFIX);
validateTableSourceOptions(tableOptions);
validatePKConstraints(
context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);
final StartupOptions startupOptions = getStartupOptions(tableOptions);
//获取 kafka 本身的一些配置 servers、group.id 等
final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
// add topic-partition discovery
properties.setProperty(
FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
String.valueOf(
tableOptions
.getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
.map(Duration::toMillis)
.orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));
final DataType physicalDataType =//ROW<`status` INT, `courier_id` BIGINT, `id` BIGINT, `finish_time` BIGINT> NOT NULL
context.getCatalogTable().getSchema().toPhysicalRowDataType();
final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
return createKafkaTableSource(
physicalDataType,
keyDecodingFormat.orElse(null),
valueDecodingFormat,
keyProjection,
valueProjection,
keyPrefix,
KafkaOptions.getSourceTopics(tableOptions),
KafkaOptions.getSourceTopicPattern(tableOptions),
properties,
startupOptions.startupMode,
startupOptions.specificOffsets,
startupOptions.startupTimestampMillis);
}
首先做了一些校验,然后传入一些配置来创建 tableSource ,如下
protected KafkaDynamicSource createKafkaTableSource(
DataType physicalDataType,//要查询的字段 ROW<`status` INT, `courier_id` BIGINT, `id` BIGINT, `finish_time` BIGINT> NOT NULL
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,//SSCanalJsonFormatFactory
int[] keyProjection,
int[] valueProjection,
@Nullable String keyPrefix,
@Nullable List<String> topics,// topics
@Nullable Pattern topicPattern,//topicPattern
Properties properties,// kafka 的一些配置信息,servers、group.id 等
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis) {
return new KafkaDynamicSource(
physicalDataType,
keyDecodingFormat,
valueDecodingFormat,
keyProjection,
valueProjection,
keyPrefix,
topics,
topicPattern,
properties,
startupMode,
specificStartupOffsets,
startupTimestampMillis,
false);
}
继续执行
prepareDynamicSource(
schemaTable.getTableIdentifier(),
catalogTable,
tableSource,
schemaTable.isStreamingMode(),
context.getTableConfig());
会调用 KafkaDynamicSource.getScanRuntimeProvider 方法,创建 FlinkKafkaConsumer 成功
其他
关于 'format' = 'ss-canal-json'
的一些事情可以参考 FlinkSQL 平台