1. 函数类型
函数 | Apache Flink
Flink 中的函数有两个划分标准
- 一个划分标准是:系统(内置)函数和 Catalog 函数。系统函数没有名称空间,只能通过其名称来进行引用。 Catalog 函数属于 Catalog 和数据库,因此它们拥有 Catalog 和数据库命名空间。 用户可以通过全/部分限定名(catalog.db.func 或 db.func)或者函数名 来对 Catalog 函数进行引用。
- 另一个划分标准是:临时函数和持久化函数。 临时函数始终由用户创建,它容易改变并且仅在会话的生命周期内有效。 持久化函数不是由系统提供,就是存储在 Catalog 中,它在会话的整个生命周期内都有效
看一下函数如何引用和函数解析优先级??
2. 系统内置函数
3. 自定义函数
当前 Flink 有如下几种函数:
- 标量函数 将标量值转换成一个新标量值;
- 表值函数 将标量值转换成新的行数据;
- 聚合函数 将多行数据里的标量值转换成一个新标量值;
- 表值聚合函数 将多行数据里的标量值转换成新的行数据;
- 异步表值函数 是异步查询外部数据系统的特殊函数。
public class tableDemo5 {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//输入表
tableEnv.executeSql("CREATE TABLE input_table( tag STRING ,uid INT, money INT ) " +
" WITH( 'connector' = 'datagen', " +
" 'rows-per-second'='1', " +
" 'fields.uid.kind'='sequence', " +
" 'fields.uid.start'='1'," +
" 'fields.uid.end'='1000'," +
" 'fields.tag.length'='1'," +
" 'fields.money.min'='1'," +
" 'fields.money.max'='1000')");
//输出表
tableEnv.executeSql("CREATE TABLE out_Table (tag STRING, money_2 BIGINT, money BIGINT ) WITH ( 'connector' = 'print' )");
//注册自定义标量函数
tableEnv.createTemporarySystemFunction("myDouble",DoubleFunction.class);
tableEnv.executeSql("INSERT INTO out_Table SELECT tag ,myDouble(money),money FROM input_table");
env.execute();
}
public static class DoubleFunction extends ScalarFunction {
public Integer eval(Integer money) {
return money * 2;
}
}
}
如果你的函数在初始化时,是有入参的,那么需要你的入参是 Serializable
的。即 Java 中需要继承 Serializable 接口
public class tableDemo6 {
public static void main(String[] args) throws Exception {
EnvironmentSettings setting = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
TableEnvironment tableEnv = TableEnvironment.create(setting);
//输入表
tableEnv.executeSql("CREATE TABLE input_table( tag STRING ,uid INT, money INT ) " +
" WITH( 'connector' = 'datagen', " +
" 'rows-per-second'='1', " +
" 'fields.uid.kind'='sequence', " +
" 'fields.uid.start'='1'," +
" 'fields.uid.end'='1000'," +
" 'fields.tag.length'='1'," +
" 'fields.money.min'='1'," +
" 'fields.money.max'='1000')");
//输出表
tableEnv.executeSql("CREATE TABLE out_Table (tag STRING, money_2 BIGINT, money BIGINT ) WITH ( 'connector' = 'print' )");
//注册自定义标量函数
tableEnv.createTemporarySystemFunction("myDouble",new DoubleFunction(false));
tableEnv.executeSql("INSERT INTO out_Table SELECT tag ,myDouble(money),money FROM input_table");
}
public static class DoubleFunction extends ScalarFunction {
private boolean endInclusive;
public DoubleFunction(boolean endInclusive) {
this.endInclusive = endInclusive;
}
public Integer eval(Integer money) {
if (endInclusive) {
return money * 2;
}
return money;
}
}
}
4. 开发UDF的需知事项
- 首先需要继承 Flink SQL UDF 体系提供的基类,每种 UDF 实现都有不同的基类
- 实现 UDF 执行逻辑函数,不同类型的 UDF 需要实现不同的执行逻辑函数
- 注意 UDF 入参、出参类型推导,Flink 在一些基础类型上的是可以直接推导出类型信息的,但是一些复杂类型就无能为力了,这里需要用户主动介入
- 明确 UDF 输出结果是否是定值,如果是定值则 Flink 会在生成计划时就执行一遍,得出结果,然后使用这个定值的结果作为后续的执行逻辑的参数,这样可以做到不用在 Flink SQL 任务运行时每次都执行一次,会有性能优化
- 巧妙运用运行时上下文,可以在任务运行前加载到一些外部资源、上下文配置信息,扩展 UDF 能力
注意 UDF 入参、出参类型推导
5. SQL 标量函数(Scalar Function)
// 有多个重载求值方法的函数
public static class OverloadedFunction extends ScalarFunction {
// 不需要任何声明,可以直接推导出类型信息,即入参和出参对应到 SQL 中的 bigint 类型
public Long eval(long a, long b) {
return a + b;
}
// 使用 @DataTypeHint("DECIMAL(12, 3)") 定义 decimal 的精度和小数位
public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {
return BigDecimal.valueOf(a + b);
}
// 使用注解定义嵌套数据类型
@DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
public Row eval(int i) {
return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));
}
// 允许任意类型的输入,并输出序列化定制后的值
@DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
return MyUtils.serializeToByteBuffer(o);
}
}
6. SQL 表值函数(Table Function)
表值函数即 UDTF,常用于进一条数据,出多条数据的场景
public class tableDemo6 {
public static void main(String[] args) throws Exception {
EnvironmentSettings setting = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
TableEnvironment tableEnv = TableEnvironment.create(setting);
tableEnv.executeSql("CREATE TABLE input_table (id INT ,tag STRING) WITH ( 'connector' = 'filesystem', 'path' = 'src/main/resources/e.txt', 'format' = 'csv')");
tableEnv.executeSql("CREATE TABLE out_Table (id INT, tag_1 STRING, tag_2 INT ) WITH ( 'connector' = 'print' )");
tableEnv.createTemporarySystemFunction("MySplit", MyFunction.class);
tableEnv.executeSql("INSERT INTO out_Table SELECT id,tag_1, tag_2 FROM input_table , LATERAL TABLE(MySplit(tag)) ");
}
//自定义实现ScalarFunction
@FunctionHint(output = @DataTypeHint("ROW<tag_1 STRING, tag_2 INT>"))
public static class MyFunction extends TableFunction<Row> {
public void eval(String str) {
String[] split = str.split("\\|");
for (String s : split) {
collect(Row.of(s, s.length()));
}
}
}
}
BUG:Recovery is suppressed by NoRestartBackoffTimeStrategy
Row导错包了...
如果你是使用 Scala 实现函数,不要使用 Scala 中 object 实现 UDF,Scala object 是单例的,有可能会导致并发问题
7. SQL 聚合函数(Aggregate Function)
聚合函数即 UDAF,常用于进多条数据,出一条数据的场景
- 实现 AggregateFunction 接口,其中所有的方法必须是 public 的、非 static 的,传一个是最终的输出类型和中间状态类型
- Acc聚合中间结果 createAccumulator():为当前 Key 初始化一个空的 accumulator
- accumulate(Acc accumulator, Input输入参数):对于每一行数据,都会调用 accumulate() 方法来更新 accumulator,这个方法就是用于处理每一条输入数据;
- Output输出参数 getValue(Acc accumulator):通过调用 getValue 方法来计算和返回最终的结果
- retract(Acc accumulator, Input输入参数):在回撤流的场景下必须要实现
- merge(Acc accumulator, Iterable<Acc> it):在许多批式聚合以及流式聚合中的 Session、Hop 窗口聚合场景下都是必须要实现的。除此之外,这个方法对于优化也很多帮助。例如,如果你打开了两阶段聚合优化,就需要 AggregateFunction 实现 merge 方法,从而可以做到在数据进行 shuffle 前先进行一次聚合计算。
- resetAccumulator():在批式聚合中是必须实现的。
public class tableDemo7 {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE input_table (id INT ,money INT ,cnt INT) WITH ( 'connector' = 'filesystem', 'path' = 'src/main/resources/f.txt', 'format' = 'csv')");
//加权平均值
tableEnv.createTemporarySystemFunction("myAvg",myAvg.class);
Table table = tableEnv.sqlQuery("SELECT id,myAvg(money,cnt) FROM input_table GROUP BY id");
tableEnv.toChangelogStream(table).print();
env.execute();
}
//最终输出类型和中间状态类型
public static class myAvg extends AggregateFunction<Long, avgAccumulator> {
// 获取返回结果
@Override
public Long getValue(avgAccumulator acc) {
if (acc.count == 0) {
return null;
} else {
return acc.sum / acc.count;
}
}
//初始化
@Override
public avgAccumulator createAccumulator() {
return new avgAccumulator();
}
//中间状态的计算
public void accumulate(avgAccumulator acc, Long iMoney, Integer iCnt) {
acc.sum += iMoney * iCnt;
acc.count += iCnt;
}
// Session window 可以使用这个方法将几个单独窗口的结果合并
public void merge(avgAccumulator acc, Iterable<avgAccumulator> it) {
for (avgAccumulator a : it) {
acc.count += a.count;
acc.sum += a.sum;
}
}
public void resetAccumulator(avgAccumulator acc) {
acc.count = 0;
acc.sum = 0L;
}
}
public static class avgAccumulator {
public long sum = 0;
public int count = 0;
}
}
突然发现案例都是官网的,哈哈哈哈
User-defined Functions | Apache Flink
8. SQL 表值聚合函数(Table Aggregate Function)
- 实现 TableAggregateFunction 接口,其中所有的方法必须是 public 的、非 static 的
- Acc聚合中间结果 createAccumulator():为当前 Key 初始化一个空的 accumulator,其存储了聚合的中间结果
- accumulate(Acc accumulator, Input输入参数):对于每一行数据,都会调用 accumulate() 方法来更新 accumulator
- emitValue(Acc accumulator, Collector<OutPut> collector) 或者 emitUpdateWithRetract(Acc accumulator, RetractableCollector<OutPut> collector):当遍历所有的数据,当所有的数据都处理完了之后,通过调用 emit 方法来计算和输出最终的结果
- retract(Acc accumulator, Input输入参数):在回撤流的场景下必须要实现
- merge(Acc accumulator, Iterable<Acc> it):在许多批式聚合以及流式聚合中的 Session、Hop 窗口聚合场景下都是必须要实现的。
public class tableDemo8 {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE input_table (id INT ,money INT ,cnt INT) WITH ( 'connector' = 'filesystem', 'path' = 'src/main/resources/f.txt', 'format' = 'csv')");
//加权平均值
tableEnv.createTemporarySystemFunction("top2", Top2.class);
Table table = tableEnv.from("input_table").groupBy($("id")).flatAggregate(Expressions.call("top2", $("money")).as("value", "row")).select($("id"), $("value"), $("row"));
tableEnv.toChangelogStream(table).print();
env.execute();
}
public static class Top2Accum {
public Integer first;
public Integer second;
public Integer oldFirst;
public Integer oldSecond;
}
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {
//初始化
@Override
public Top2Accum createAccumulator() {
Top2Accum acc = new Top2Accum();
acc.first = Integer.MIN_VALUE;
acc.second = Integer.MIN_VALUE;
acc.oldFirst = Integer.MIN_VALUE;
acc.oldSecond = Integer.MIN_VALUE;
return acc;
}
public void accumulate(Top2Accum acc, Integer v) {
if (v > acc.first) {
acc.second = acc.first;
acc.first = v;
} else if (v > acc.second) {
acc.second = v;
}
}
public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
// emit the value and rank
if (acc.first != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
public void emitUpdateWithRetract(Top2Accum acc, TableAggregateFunction.RetractableCollector<Tuple2<Integer, Integer>> out) {
if (!acc.first.equals(acc.oldFirst)) {
// if there is an update, retract old value then emit new value.
if (acc.oldFirst != Integer.MIN_VALUE) {
out.retract(Tuple2.of(acc.oldFirst, 1));
}
out.collect(Tuple2.of(acc.first, 1));
acc.oldFirst = acc.first;
}
if (!acc.second.equals(acc.oldSecond)) {
// if there is an update, retract old value then emit new value.
if (acc.oldSecond != Integer.MIN_VALUE) {
out.retract(Tuple2.of(acc.oldSecond, 2));
}
out.collect(Tuple2.of(acc.second, 2));
acc.oldSecond = acc.second;
}
}
public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
for (Top2Accum otherAcc : iterable) {
accumulate(acc, otherAcc.first);
accumulate(acc, otherAcc.second);
}
}
}
}
???emitUpdateWithRetract怎么用不了
9. FlinkSQL-UDF Module
目前 Flink 包含了以下三种 Module:
- CoreModule:CoreModule 是 Flink 内置的 Module,其包含了目前 Flink 内置的所有 UDF,Flink 默认开启的 Module 就是 CoreModule,我们可以直接使用其中的 UDF
- HiveModule:HiveModule 可以将 Hive 内置函数作为 Flink 的系统函数提供给 SQL\Table API 用户进行使用,比如 get_json_object 这类 Hive 内置函数(Flink 默认的 CoreModule 是没有的)
- 用户自定义 Module:用户可以实现 Module 接口实现自己的 UDF 扩展 Module
Flink SQL 支持 Hive UDF
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
10. FlinkSQL Catalog
Flink SQL 中是由 Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。对标 Hive 去理解就是 Hive 的 MetaStore,都是用于存储计算引擎涉及到的元数据信息。
目前 Flink 包含了以下四种 Catalog:
- GenericInMemoryCatalog:GenericInMemoryCatalog 是基于内存实现的 Catalog,所有元数据只在 session 的生命周期(即一个 Flink 任务一次运行生命周期内)内可用。
- JdbcCatalog:JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。PostgresCatalog 是当前实现的唯一一种 JDBC Catalog,即可以将 Flink SQL 的预案数据存储在 Postgres 中。
- HiveCatalog:HiveCatalog 有两个用途,作为 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。注意:Hive MetaStore 以小写形式存储所有元数据对象名称。而 GenericInMemoryCatalog 会区分大小写。
- 用户自定义 Catalog:用户可以实现 Catalog 接口实现自定义 Catalog
11. FlinkSQL 任务参数配置
具体参数分为以下 3 类:
- 运行时参数:优化 Flink SQL 任务在执行时的任务性能
- 优化器参数:Flink SQL 任务在生成执行计划时,经过优化器优化生成更优的执行计划
- 表参数:用于调整 Flink SQL table 的执行行为