2022-06-23-Flink-51(三. SQL手册)

1. 函数类型

函数 | Apache Flink
Flink 中的函数有两个划分标准

  1. 一个划分标准是:系统(内置)函数和 Catalog 函数。系统函数没有名称空间,只能通过其名称来进行引用。 Catalog 函数属于 Catalog 和数据库,因此它们拥有 Catalog 和数据库命名空间。 用户可以通过全/部分限定名(catalog.db.func 或 db.func)或者函数名 来对 Catalog 函数进行引用。
  2. 另一个划分标准是:临时函数和持久化函数。 临时函数始终由用户创建,它容易改变并且仅在会话的生命周期内有效。 持久化函数不是由系统提供,就是存储在 Catalog 中,它在会话的整个生命周期内都有效

看一下函数如何引用和函数解析优先级??

2. 系统内置函数

系统(内置)函数 | Apache Flink

3. 自定义函数

当前 Flink 有如下几种函数:

  1. 标量函数 将标量值转换成一个新标量值;
  2. 表值函数 将标量值转换成新的行数据;
  3. 聚合函数 将多行数据里的标量值转换成一个新标量值;
  4. 表值聚合函数 将多行数据里的标量值转换成新的行数据;
  5. 异步表值函数 是异步查询外部数据系统的特殊函数。
public class tableDemo5 {

    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //输入表
        tableEnv.executeSql("CREATE TABLE input_table( tag STRING ,uid INT, money INT ) " +
                " WITH( 'connector' = 'datagen',  " +
                " 'rows-per-second'='1',  " +
                " 'fields.uid.kind'='sequence', " +
                " 'fields.uid.start'='1'," +
                " 'fields.uid.end'='1000'," +
                " 'fields.tag.length'='1'," +
                " 'fields.money.min'='1'," +
                " 'fields.money.max'='1000')");

        //输出表
        tableEnv.executeSql("CREATE TABLE out_Table (tag STRING, money_2 BIGINT, money BIGINT ) WITH ( 'connector' = 'print' )");
        //注册自定义标量函数
        tableEnv.createTemporarySystemFunction("myDouble",DoubleFunction.class);
        tableEnv.executeSql("INSERT INTO out_Table SELECT tag ,myDouble(money),money FROM input_table");
        env.execute();



    }

    public static class DoubleFunction extends ScalarFunction {
        public Integer eval(Integer money) {
            return money * 2;
        }
    }
}

如果你的函数在初始化时,是有入参的,那么需要你的入参是 Serializable 的。即 Java 中需要继承 Serializable 接口

public class tableDemo6 {

    public static void main(String[] args) throws Exception {
               EnvironmentSettings setting = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .useBlinkPlanner()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(setting);
        //输入表
        tableEnv.executeSql("CREATE TABLE input_table( tag STRING ,uid INT, money INT ) " +
                " WITH( 'connector' = 'datagen',  " +
                " 'rows-per-second'='1',  " +
                " 'fields.uid.kind'='sequence', " +
                " 'fields.uid.start'='1'," +
                " 'fields.uid.end'='1000'," +
                " 'fields.tag.length'='1'," +
                " 'fields.money.min'='1'," +
                " 'fields.money.max'='1000')");

        //输出表
        tableEnv.executeSql("CREATE TABLE out_Table (tag STRING, money_2 BIGINT, money BIGINT ) WITH ( 'connector' = 'print' )");
        //注册自定义标量函数
        tableEnv.createTemporarySystemFunction("myDouble",new DoubleFunction(false));
        tableEnv.executeSql("INSERT INTO out_Table SELECT tag ,myDouble(money),money FROM input_table");



    }

    public static class DoubleFunction extends ScalarFunction {
        private boolean endInclusive;

        public DoubleFunction(boolean endInclusive) {
            this.endInclusive = endInclusive;
        }

        public Integer eval(Integer money) {
            if (endInclusive) {
                 return money * 2;
            }
            return  money;
        }
    }
}

4. 开发UDF的需知事项

  1. 首先需要继承 Flink SQL UDF 体系提供的基类,每种 UDF 实现都有不同的基类
  2. 实现 UDF 执行逻辑函数,不同类型的 UDF 需要实现不同的执行逻辑函数
  3. 注意 UDF 入参、出参类型推导,Flink 在一些基础类型上的是可以直接推导出类型信息的,但是一些复杂类型就无能为力了,这里需要用户主动介入
  4. 明确 UDF 输出结果是否是定值,如果是定值则 Flink 会在生成计划时就执行一遍,得出结果,然后使用这个定值的结果作为后续的执行逻辑的参数,这样可以做到不用在 Flink SQL 任务运行时每次都执行一次,会有性能优化
  5. 巧妙运用运行时上下文,可以在任务运行前加载到一些外部资源、上下文配置信息,扩展 UDF 能力
注意 UDF 入参、出参类型推导

Data Types | Apache Flink

5. SQL 标量函数(Scalar Function)

// 有多个重载求值方法的函数
public static class OverloadedFunction extends ScalarFunction {

  // 不需要任何声明,可以直接推导出类型信息,即入参和出参对应到 SQL 中的 bigint 类型
  public Long eval(long a, long b) {
    return a + b;
  }

  // 使用 @DataTypeHint("DECIMAL(12, 3)") 定义 decimal 的精度和小数位
  public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {
    return BigDecimal.valueOf(a + b);
  }

  // 使用注解定义嵌套数据类型
  @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
  public Row eval(int i) {
    return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));
  }

  // 允许任意类型的输入,并输出序列化定制后的值
  @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
  public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
    return MyUtils.serializeToByteBuffer(o);
  }
}

6. SQL 表值函数(Table Function)

表值函数即 UDTF,常用于进一条数据,出多条数据的场景

public class tableDemo6 {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings setting = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .useBlinkPlanner()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(setting);
        tableEnv.executeSql("CREATE TABLE input_table (id INT ,tag STRING) WITH ( 'connector' = 'filesystem', 'path' = 'src/main/resources/e.txt', 'format' = 'csv')");
        tableEnv.executeSql("CREATE TABLE out_Table (id INT, tag_1 STRING, tag_2 INT ) WITH ( 'connector' = 'print' )");
        tableEnv.createTemporarySystemFunction("MySplit", MyFunction.class);
        tableEnv.executeSql("INSERT INTO out_Table SELECT  id,tag_1, tag_2 FROM input_table , LATERAL TABLE(MySplit(tag)) ");
    }

    //自定义实现ScalarFunction
    @FunctionHint(output = @DataTypeHint("ROW<tag_1 STRING, tag_2 INT>"))
    public static class MyFunction extends TableFunction<Row> {

        public void eval(String str) {
            String[] split = str.split("\\|");
            for (String s : split) {
                collect(Row.of(s, s.length()));
            }
        }
    }
}

BUG:Recovery is suppressed by NoRestartBackoffTimeStrategy
Row导错包了...

如果你是使用 Scala 实现函数,不要使用 Scala 中 object 实现 UDF,Scala object 是单例的,有可能会导致并发问题

7. SQL 聚合函数(Aggregate Function)

聚合函数即 UDAF,常用于进多条数据,出一条数据的场景

  1. 实现 AggregateFunction 接口,其中所有的方法必须是 public 的、非 static 的,传一个是最终的输出类型和中间状态类型
  2. Acc聚合中间结果 createAccumulator():为当前 Key 初始化一个空的 accumulator
  3. accumulate(Acc accumulator, Input输入参数):对于每一行数据,都会调用 accumulate() 方法来更新 accumulator,这个方法就是用于处理每一条输入数据;
  4. Output输出参数 getValue(Acc accumulator):通过调用 getValue 方法来计算和返回最终的结果
  5. retract(Acc accumulator, Input输入参数):在回撤流的场景下必须要实现
  6. merge(Acc accumulator, Iterable<Acc> it):在许多批式聚合以及流式聚合中的 Session、Hop 窗口聚合场景下都是必须要实现的。除此之外,这个方法对于优化也很多帮助。例如,如果你打开了两阶段聚合优化,就需要 AggregateFunction 实现 merge 方法,从而可以做到在数据进行 shuffle 前先进行一次聚合计算。
  7. resetAccumulator():在批式聚合中是必须实现的。

public class tableDemo7 {
    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.executeSql("CREATE TABLE input_table (id INT ,money INT ,cnt INT) WITH ( 'connector' = 'filesystem', 'path' = 'src/main/resources/f.txt', 'format' = 'csv')");
        //加权平均值
        tableEnv.createTemporarySystemFunction("myAvg",myAvg.class);
        Table table = tableEnv.sqlQuery("SELECT id,myAvg(money,cnt)  FROM input_table GROUP BY id");
        tableEnv.toChangelogStream(table).print();
        env.execute();

    }

    //最终输出类型和中间状态类型
    public static class myAvg extends AggregateFunction<Long, avgAccumulator> {

        // 获取返回结果
        @Override
        public Long getValue(avgAccumulator acc) {
            if (acc.count == 0) {
                return null;
            } else {
                return acc.sum / acc.count;

            }
        }
        //初始化
        @Override
        public avgAccumulator createAccumulator() {
            return new avgAccumulator();
        }

        //中间状态的计算
        public void accumulate(avgAccumulator acc, Long iMoney, Integer iCnt) {
            acc.sum += iMoney * iCnt;
            acc.count += iCnt;
        }


        // Session window 可以使用这个方法将几个单独窗口的结果合并
        public void merge(avgAccumulator acc, Iterable<avgAccumulator> it) {
            for (avgAccumulator a : it) {
                acc.count += a.count;
                acc.sum += a.sum;
            }
        }

        public void resetAccumulator(avgAccumulator acc) {
            acc.count = 0;
            acc.sum = 0L;
        }
    }

    public static class avgAccumulator {
        public long sum = 0;
        public int count = 0;
    }
}

突然发现案例都是官网的,哈哈哈哈
User-defined Functions | Apache Flink

8. SQL 表值聚合函数(Table Aggregate Function)

  1. 实现 TableAggregateFunction 接口,其中所有的方法必须是 public 的、非 static 的
  2. Acc聚合中间结果 createAccumulator():为当前 Key 初始化一个空的 accumulator,其存储了聚合的中间结果
  3. accumulate(Acc accumulator, Input输入参数):对于每一行数据,都会调用 accumulate() 方法来更新 accumulator
  4. emitValue(Acc accumulator, Collector<OutPut> collector) 或者 emitUpdateWithRetract(Acc accumulator, RetractableCollector<OutPut> collector):当遍历所有的数据,当所有的数据都处理完了之后,通过调用 emit 方法来计算和输出最终的结果
  5. retract(Acc accumulator, Input输入参数):在回撤流的场景下必须要实现
  6. merge(Acc accumulator, Iterable<Acc> it):在许多批式聚合以及流式聚合中的 Session、Hop 窗口聚合场景下都是必须要实现的。

public class tableDemo8 {

    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.executeSql("CREATE TABLE input_table (id INT ,money INT ,cnt INT) WITH ( 'connector' = 'filesystem', 'path' = 'src/main/resources/f.txt', 'format' = 'csv')");
        //加权平均值
        tableEnv.createTemporarySystemFunction("top2", Top2.class);
        Table table = tableEnv.from("input_table").groupBy($("id")).flatAggregate(Expressions.call("top2", $("money")).as("value", "row")).select($("id"), $("value"), $("row"));
        tableEnv.toChangelogStream(table).print();
        env.execute();
    }


    public static class Top2Accum {
        public Integer first;
        public Integer second;
        public Integer oldFirst;
        public Integer oldSecond;
    }

    public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {

        //初始化
        @Override
        public Top2Accum createAccumulator() {
            Top2Accum acc = new Top2Accum();
            acc.first = Integer.MIN_VALUE;
            acc.second = Integer.MIN_VALUE;
            acc.oldFirst = Integer.MIN_VALUE;
            acc.oldSecond = Integer.MIN_VALUE;
            return acc;
        }


        public void accumulate(Top2Accum acc, Integer v) {
            if (v > acc.first) {
                acc.second = acc.first;
                acc.first = v;
            } else if (v > acc.second) {
                acc.second = v;
            }
        }

       public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
            // emit the value and rank
            if (acc.first != Integer.MIN_VALUE) {
                out.collect(Tuple2.of(acc.first, 1));
            }
            if (acc.second != Integer.MIN_VALUE) {
                out.collect(Tuple2.of(acc.second, 2));
            }
        }

   public void emitUpdateWithRetract(Top2Accum acc, TableAggregateFunction.RetractableCollector<Tuple2<Integer, Integer>> out) {
        if (!acc.first.equals(acc.oldFirst)) {
            // if there is an update, retract old value then emit new value.
            if (acc.oldFirst != Integer.MIN_VALUE) {
                out.retract(Tuple2.of(acc.oldFirst, 1));
            }
            out.collect(Tuple2.of(acc.first, 1));
            acc.oldFirst = acc.first;
        }

        if (!acc.second.equals(acc.oldSecond)) {
            // if there is an update, retract old value then emit new value.
            if (acc.oldSecond != Integer.MIN_VALUE) {
                out.retract(Tuple2.of(acc.oldSecond, 2));
            }
            out.collect(Tuple2.of(acc.second, 2));
            acc.oldSecond = acc.second;
        }
    }

        public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
            for (Top2Accum otherAcc : iterable) {
                accumulate(acc, otherAcc.first);
                accumulate(acc, otherAcc.second);
            }
        }
    }
}

???emitUpdateWithRetract怎么用不了

9. FlinkSQL-UDF Module

目前 Flink 包含了以下三种 Module:

  1. CoreModule:CoreModule 是 Flink 内置的 Module,其包含了目前 Flink 内置的所有 UDF,Flink 默认开启的 Module 就是 CoreModule,我们可以直接使用其中的 UDF
  2. HiveModule:HiveModule 可以将 Hive 内置函数作为 Flink 的系统函数提供给 SQL\Table API 用户进行使用,比如 get_json_object 这类 Hive 内置函数(Flink 默认的 CoreModule 是没有的)
  3. 用户自定义 Module:用户可以实现 Module 接口实现自己的 UDF 扩展 Module
Flink SQL 支持 Hive UDF
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

10. FlinkSQL Catalog

Flink SQL 中是由 Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。对标 Hive 去理解就是 Hive 的 MetaStore,都是用于存储计算引擎涉及到的元数据信息。

目前 Flink 包含了以下四种 Catalog:

  1. GenericInMemoryCatalog:GenericInMemoryCatalog 是基于内存实现的 Catalog,所有元数据只在 session 的生命周期(即一个 Flink 任务一次运行生命周期内)内可用。
  2. JdbcCatalog:JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。PostgresCatalog 是当前实现的唯一一种 JDBC Catalog,即可以将 Flink SQL 的预案数据存储在 Postgres 中。
  3. HiveCatalog:HiveCatalog 有两个用途,作为 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。注意:Hive MetaStore 以小写形式存储所有元数据对象名称。而 GenericInMemoryCatalog 会区分大小写。
  4. 用户自定义 Catalog:用户可以实现 Catalog 接口实现自定义 Catalog

11. FlinkSQL 任务参数配置

Configuration | Apache Flink

具体参数分为以下 3 类:

  1. 运行时参数:优化 Flink SQL 任务在执行时的任务性能
  2. 优化器参数:Flink SQL 任务在生成执行计划时,经过优化器优化生成更优的执行计划
  3. 表参数:用于调整 Flink SQL table 的执行行为
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,099评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,828评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,540评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,848评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,971评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,132评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,193评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,934评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,376评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,687评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,846评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,537评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,175评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,887评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,134评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,674评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,741评论 2 351

推荐阅读更多精彩内容