Flink(1.13) FlinkSql自定义函数

函数分类

官网介绍
Currently, Flink distinguishes between the following kinds of functions:

  • Scalar functions:标量函数将标量值映射到一个新的标量值。
  • Table functions:制表函数将标量值映射到新行(类似于列转行)。
  • Aggregate functions:聚合函数将多行标量值映射为新标量值。
  • Table aggregate functions:属于Table functionsAggregate functions功能的合并。
  • Async table functions:异步表函数是用于执行查找的表源的特殊函数。

自定义 Scalar functions

  • 准备一个类
import org.apache.flink.table.functions.ScalarFunction;

/**
 * 字符串转换大写
 * @author admin
 * @date 2021/8/21
 */
public class MyScalarFunctionByUppercase extends ScalarFunction {

}
  • 异常:需要自定义eval方法。
org.apache.flink.table.api.ValidationException: Function class 'com.admin.flink.demo12.function.MyScalarFunctionByUppercase' 
does not implement a method named 'eval'.

方法名必须叫eval,参数和返回值随意。

public class MyScalarFunctionByUppercase extends ScalarFunction {
    public String eval(String words){
        return words.toUpperCase();
    }
}
  • 应用
    @Test
    public void test1(){
        // 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 模拟数据
        DataStreamSource<String> source = env.fromElements("java", "google", "hello");

        // 给字段取名
        Table table = tableEnv.fromDataStream(source,$("words"));

        // 使用内联的方式
        table.select($("words"),call(MyScalarFunctionByUppercase.class,$("words")))
                .execute()
                .print();
    }
  • 查询
+----+--------------------------------+--------------------------------+
| op |                          words |                            _c1 |
+----+--------------------------------+--------------------------------+
| +I |                           java |                           JAVA |
| +I |                         google |                         GOOGLE |
| +I |                          hello |                          HELLO |
+----+--------------------------------+--------------------------------+
3 rows in set
  • 注册后再使用
        // 先注册再使用
        tableEnv.createFunction("toUppercase",MyScalarFunctionByUppercase.class);

        table.select($("words"),call("toUppercase",$("words")))
                .execute()
                .print();
  • 查询
+----+--------------------------------+--------------------------------+
| op |                          words |                            _c1 |
+----+--------------------------------+--------------------------------+
| +I |                           java |                           JAVA |
| +I |                         google |                         GOOGLE |
| +I |                          hello |                          HELLO |
+----+--------------------------------+--------------------------------+
3 rows in set
  • 在sql中使用,必须先注册
        // 给字段取名
        Table table = tableEnv.fromDataStream(source,$("words"));

        // 先注册再使用
        tableEnv.createFunction("toUppercase",MyScalarFunctionByUppercase.class);

        // 在sql中使用
        tableEnv.sqlQuery("select words,toUppercase(words) as upp_words from "+table)
        .execute()
        .print();
  • 查询
+----+--------------------------------+--------------------------------+
| op |                          words |                      upp_words |
+----+--------------------------------+--------------------------------+
| +I |                           java |                           JAVA |
| +I |                         google |                         GOOGLE |
| +I |                          hello |                          HELLO |
+----+--------------------------------+--------------------------------+
3 rows in set

自定义 Table functions

  • 自定义函数
/**
 * 行专列
 * 泛型:每行数据有多列
 * @FunctionHint 指定返回列的类型
 * @author admin
 * @date 2021/8/21
 */
@FunctionHint(output = @DataTypeHint("row<w string,len int>"))
public class MyTableFunctionByRowToColumn extends TableFunction<Row> {

    public void eval(String phrase){

        Arrays.stream(phrase.split(" ")).forEach(s -> {
            collect( Row.of(s,s.length()));
        });
    }

}

查询

+----+--------------------------------+--------------------------------+-------------+
| op |                         phrase |                              w |         len |
+----+--------------------------------+--------------------------------+-------------+
| +I |                   hello world! |                          hello |           5 |
| +I |                   hello world! |                         world! |           6 |
| +I |                     明天 你好! |                           明天 |           2 |
| +I |                     明天 你好! |                          你好! |           3 |
| +I |                      数码 宝贝 |                           数码 |           2 |
| +I |                      数码 宝贝 |                           宝贝 |           2 |
| +I |                   名侦探 柯南! |                         名侦探 |           3 |
| +I |                   名侦探 柯南! |                          柯南! |           3 |
+----+--------------------------------+--------------------------------+-------------+
8 rows in set
  • 应用 table api 使用(内联)
    @Test
    public void test1(){
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.fromElements("hello world!", "明天 你好!", "数码 宝贝", "名侦探 柯南!");

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv.fromDataStream(source,$("phrase"));

        // 需求:将元数据炸开,
        table.joinLateral(call(MyTableFunctionByRowToColumn.class,$("phrase")))
                .execute().print();

    }
  • 应用 sql 使用
    @Test
    public void test2(){
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.fromElements("hello world!", "明天 你好!", "数码 宝贝", "名侦探 柯南!");

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv.fromDataStream(source,$("phrase"));

        // 创建一张临时表
        tableEnv.createTemporaryView("t",table);

        // 需求:将元数据炸开,

        // 注册
        tableEnv.createFunction("rowToColumn",MyTableFunctionByRowToColumn.class);

        //查询
        tableEnv.sqlQuery("select phrase , w ,len from t join lateral table (rowToColumn(phrase)) on true")
                .execute()
                .print();

    }
  • 取别名,内部内置函数 T 就是用于取别名
    <!--取别名-->
    <sql id="tableFunction2">
        select
        phrase , w1 ,len1
        from #{tableName}
        join lateral table (rowToColumn(phrase))
        as T(w1,len1)
        on true
    </sql>

查询

+----+--------------------------------+--------------------------------+-------------+
| op |                         phrase |                             w1 |        len1 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                   hello world! |                          hello |           5 |
| +I |                   hello world! |                         world! |           6 |
| +I |                     明天 你好! |                           明天 |           2 |
| +I |                     明天 你好! |                          你好! |           3 |
| +I |                      数码 宝贝 |                           数码 |           2 |
| +I |                      数码 宝贝 |                           宝贝 |           2 |
| +I |                   名侦探 柯南! |                         名侦探 |           3 |
| +I |                   名侦探 柯南! |                          柯南! |           3 |
+----+--------------------------------+--------------------------------+-------------+
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,295评论 6 512
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,928评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,682评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,209评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,237评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,965评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,586评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,487评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,016评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,136评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,271评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,948评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,619评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,139评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,252评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,598评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,267评论 2 358

推荐阅读更多精彩内容