一、概述
flink 动态表是一个逻辑概念,动态表是table & sql api的核心概念,用于以统一的方式处理流、批数据。
以下是表连接器的一般架构
-
Metadata
执行create table声明表,这个操作不会修改外部系统的物理数据,元数据会表示为CatalogTable的一个实例
-
Planning
这一步是在表执行时和解析优化时,程序将CatalogTable解析为DynamicTableSource(对应select操作)或DynamicTableSink(对应insert操作)
主要操作是编写DynamicTableSourceFactory和DynamicTableSinkFactory逻辑,验证with里面的参数是否正确,并将元数据CatalogTable转换为DynamicTableSource和DynamicTableSink。
工厂类必须提供有效的工厂标识符(对应connector = 'xxxxxx' 的内容),让Java的spi发现并注册服务。spi机制可以参考这篇文章:https://www.cnblogs.com/lovesqcc/p/5229353.html
-
Runtime
逻辑规划完成后,规划器将从表连接器获取到runtime的实现类
二、自定义SourceConnector
下面实现一个简单的自定义source connector,主要目的是熟悉一遍开发流程。
自定义SourceConnector逻辑是根据配置的列(字符型),随机返回字段表达式里的任意字符。
-
首先实现DynamicTableSourceFactory类方法
package com.github.knaufk.flink.test; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.utils.TableSchemaUtils; import java.util.HashSet; import java.util.Set; import static org.apache.flink.configuration.ConfigOptions.key; public class XiaolongTableSourceFactory implements DynamicTableSourceFactory { private static final String IDENTIFIER = "xiaolong"; private static final String FIELDS = "fields"; private static final String EXPRESSION = "expression"; private static final ConfigOption<Long> ROWS_PER_SECOND = key("rows-per-second") .longType() .defaultValue(1000L) .withDescription("Rows per second to emit."); private static final ConfigOption<String> SPLIT_STR = key("split-str") .stringType() .defaultValue(",") .withDescription("the delimited of string."); @Override public DynamicTableSource createDynamicTableSource(Context context) { CatalogTable catalogTable = context.getCatalogTable(); // 获取with配置 Configuration options = new Configuration(); context.getCatalogTable().getOptions().forEach(options::setString); // 获取表定义 TableSchema schema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema()); String[] fieldExpressions = new String[schema.getFieldCount()]; Set<ConfigOption<?>> fieldOptions = new HashSet<>(); for (int i = 0; i < fieldExpressions.length; i++) { String fieldName = schema.getFieldName(i).get(); // 校验字段配置是否正确 ConfigOption<String> filed = key(FIELDS + "." + fieldName + "." + EXPRESSION).stringType().noDefaultValue(); fieldExpressions[i] = validateFieldExpression(filed, options); fieldOptions.add(filed); } // 校验配置 FactoryUtil.validateFactoryOptions(requiredOptions(), fieldOptions, options); return new XiaolongTableSource(fieldExpressions, options.get(ROWS_PER_SECOND), options.get(SPLIT_STR)); } /** * @return * connector的标识 */ @Override public String factoryIdentifier() { return IDENTIFIER; } /** * 必须配置的参数,默认返回空集合 * @return */ @Override public Set<ConfigOption<?>> requiredOptions() { return new HashSet<>(); } /** * 可额外配置的参数,默认返回空集合 * @return */ @Override public Set<ConfigOption<?>> optionalOptions() { Set<ConfigOption<?>> options = new HashSet<>(); options.add(ROWS_PER_SECOND); options.add(SPLIT_STR); return options; } private String validateFieldExpression(ConfigOption<String> field, Configuration options) { String fieldExpression = options.get(field); if (fieldExpression == null) { throw new ValidationException( "Every column needs a corresponding expression. No expression found for " + field.key() + "."); } return fieldExpression; } }
-
实现DynamicTableSource子类方法
package com.github.knaufk.flink.test; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceFunctionProvider; public class XiaolongTableSource implements ScanTableSource { private String[] fieldExpression; private long numsPerSec; private String split; public XiaolongTableSource(String[] fieldExpression, long numsPerSec, String split) { this.fieldExpression = fieldExpression; this.numsPerSec = numsPerSec; this.split = split; } @Override public ChangelogMode getChangelogMode() { return ChangelogMode.insertOnly(); } @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { return SourceFunctionProvider.of(new XiaolongSource(fieldExpression, numsPerSec, split), false); } @Override public DynamicTableSource copy() { return new XiaolongTableSource(fieldExpression, numsPerSec, split); } @Override public String asSummaryString() { return "XiaolongTableSource"; } }
-
实现SourceFunction类方法
这里可以自定义继承的SourceFunction方法和实现CheckpointedFunction方法实现Exactly-Once,这里只简单实现了RichSourceFunction
package com.github.knaufk.flink.test; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import java.util.Arrays; import java.util.List; import java.util.Random; public class XiaolongSource extends RichSourceFunction<RowData> { private String[] fieldExpressions; private long numsPerSec; private volatile boolean cancelled; private String split; public XiaolongSource(String[] fieldExpressions, long numsPerSec, String split) { this.fieldExpressions = fieldExpressions; this.numsPerSec = numsPerSec; this.split = split; } @Override public void run(SourceContext<RowData> sourceContext) throws Exception { while (!cancelled) { for (long i = 0; i < numsPerSec; i++) { RowData rowData = generateRow(); sourceContext.collect(rowData); } Thread.sleep(1000); } } @Override public void cancel() { cancelled = true; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } private RowData generateRow() { GenericRowData row = new GenericRowData(fieldExpressions.length); for (int i = 0; i < fieldExpressions.length; i++) { String fieldExpression = fieldExpressions[i]; List<String> result = Lists.newArrayList(); result.addAll(Arrays.asList(fieldExpression.split(split))); Random random = new Random(); String value = result.get(random.nextInt(result.size())); row.setField(i, StringData.fromString(value)); } return row; } }
-
配置文件
最后在META-INF/services/org.apache.flink.table.factories.Factory配置上自定义的source connector
com.flink.test.XiaolongTableSourceFactory
打成jar包复制到flink lib目录下
三、环境搭建
这里用Zeppelin来做测试环境,具体的搭建过程可以查看文章:https://www.jianshu.com/p/091d2490a969
四、测试
1、表声明
2、执行查询,流数据会一直产生