Currently, Flink distinguishes between the following kinds of functions:
- Scalar functions:标量函数将标量值映射到一个新的标量值。
- Table functions:制表函数将标量值映射到新行(类似于列转行)。
- Aggregate functions:聚合函数将多行标量值映射为新标量值。
- Table aggregate functions:属于
Table functions
和Aggregate functions
功能的合并。 - Async table functions:异步表函数是用于执行查找的表源的特殊函数。
自定义 Scalar functions
- 准备一个类
import org.apache.flink.table.functions.ScalarFunction;
* 字符串转换大写
* @author admin
* @date 2021/8/21
public class MyScalarFunctionByUppercase extends ScalarFunction {
- 异常:需要自定义
org.apache.flink.table.api.ValidationException: Function class 'com.admin.flink.demo12.function.MyScalarFunctionByUppercase'
does not implement a method named 'eval'.
public class MyScalarFunctionByUppercase extends ScalarFunction {
public String eval(String words){
return words.toUpperCase();
- 应用
public void test1(){
// 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 模拟数据
DataStreamSource<String> source = env.fromElements("java", "google", "hello");
// 给字段取名
Table table = tableEnv.fromDataStream(source,$("words"));
// 使用内联的方式
- 查询
| op | words | _c1 |
| +I | java | JAVA |
| +I | google | GOOGLE |
| +I | hello | HELLO |
3 rows in set
- 注册后再使用
// 先注册再使用
- 查询
| op | words | _c1 |
| +I | java | JAVA |
| +I | google | GOOGLE |
| +I | hello | HELLO |
3 rows in set
- 在sql中使用,必须先注册
// 给字段取名
Table table = tableEnv.fromDataStream(source,$("words"));
// 先注册再使用
// 在sql中使用
tableEnv.sqlQuery("select words,toUppercase(words) as upp_words from "+table)
- 查询
| op | words | upp_words |
| +I | java | JAVA |
| +I | google | GOOGLE |
| +I | hello | HELLO |
3 rows in set
自定义 Table functions
- 自定义函数
* 行专列
* 泛型:每行数据有多列
* @FunctionHint 指定返回列的类型
* @author admin
* @date 2021/8/21
@FunctionHint(output = @DataTypeHint("row<w string,len int>"))
public class MyTableFunctionByRowToColumn extends TableFunction<Row> {
public void eval(String phrase){
Arrays.stream(phrase.split(" ")).forEach(s -> {
collect( Row.of(s,s.length()));
| op | phrase | w | len |
| +I | hello world! | hello | 5 |
| +I | hello world! | world! | 6 |
| +I | 明天 你好! | 明天 | 2 |
| +I | 明天 你好! | 你好! | 3 |
| +I | 数码 宝贝 | 数码 | 2 |
| +I | 数码 宝贝 | 宝贝 | 2 |
| +I | 名侦探 柯南! | 名侦探 | 3 |
| +I | 名侦探 柯南! | 柯南! | 3 |
8 rows in set
- 应用 table api 使用(内联)
public void test1(){
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.fromElements("hello world!", "明天 你好!", "数码 宝贝", "名侦探 柯南!");
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv.fromDataStream(source,$("phrase"));
// 需求:将元数据炸开,
- 应用 sql 使用
public void test2(){
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.fromElements("hello world!", "明天 你好!", "数码 宝贝", "名侦探 柯南!");
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv.fromDataStream(source,$("phrase"));
// 创建一张临时表
// 需求:将元数据炸开,
// 注册
tableEnv.sqlQuery("select phrase , w ,len from t join lateral table (rowToColumn(phrase)) on true")
- 取别名,内部内置函数 T 就是用于取别名
<sql id="tableFunction2">
phrase , w1 ,len1
from #{tableName}
join lateral table (rowToColumn(phrase))
as T(w1,len1)
on true
| op | phrase | w1 | len1 |
| +I | hello world! | hello | 5 |
| +I | hello world! | world! | 6 |
| +I | 明天 你好! | 明天 | 2 |
| +I | 明天 你好! | 你好! | 3 |
| +I | 数码 宝贝 | 数码 | 2 |
| +I | 数码 宝贝 | 宝贝 | 2 |
| +I | 名侦探 柯南! | 名侦探 | 3 |
| +I | 名侦探 柯南! | 柯南! | 3 |