Flink Sql教程(4)

Flink UDF

概述

  • 什么是UDF
    • UDF是User-defined Functions的缩写,即自定义函数。
  • UDF种类
    • UDF分为三种:Scalar Functions、Table Functions、Aggregation Functions
    • Scalar Functions
      • 接收0、1、多个参数,返回一个值
    • Table Functions
      • 和上面的Scalar Functions接收的参数个数一样,不同的是可以返回多行,而不是单个值
    • Aggregation Functions
      • 从名字就可以看出来,这个是搭配GROUP BY一起使用的,将表的一个或多个列的一行或多行数据汇聚到一个值里面,看上去有点拗口,其实可以把它简单理解为SQL中的聚合函数
    • Table Aggregation Functions
      • 相当于Table Functions和Aggregation Functions的结合体,聚合之后,再返回多行多列
  • 为什么要有UDF
    • Flink SQL目前提供了很多的内置UDF,主要是为了大家更方便的编写SQL代码完成自己的业务逻辑,具体内置的UDF可以参考官方文档;同时,Flink 也支持注册自己的UDF,下面正式开始我们今天的UDF探索之旅。

Scalar Functions

    //不墨迹,我们直接贴代码
    package udf;

    import org.apache.flink.table.functions.ScalarFunction;
    
    
    public class TestScalarFunc extends ScalarFunction {
    
        private int factor = 2020;
        //和传入数据进行计算的逻辑,参数个数任意
        public int eval() {
            return factor;
        }
    
        public int eval(int a) {
            return a * factor;
        }
    
        public int eval(int... a) {
            int res = 1;
            for (int i : a) {
                res *= i;
            }
            return res * factor;
        }
}

  • 自定义Scalar Functions,需要继承ScalarFunction,并且有一个publiceval(),方法可以接受任意个数参数,同时也可以在一个类中重载eval()
  • 写完UDF之后需要注册到我们的运行环境中,使用姿势有两种:
    • tEnv.sqlUpdate("CREATE FUNCTION IF NOT EXISTS test AS 'udf.TestScalarFunc'");
    • tEnv.registerFunction("test",new TestScalarFunc());
    • 第一种偏向在纯SQL的环境中使用,比如我们有个Flink SQL的提交平台,只支持纯SQL语句,那我们可以把自己写的UDF打包上传到平台后,通过SQL语句CREATE FUNCTION IF NOT EXISTS test AS 'udf.TestScalarFunc'来创建UDF;同时可以把UDF注册到catalog中,这里先不深入讨论,之后我们说到Flink X Hive的时候再聊吧
    • 第二种注册方式,如果我们的类有构造方法,可以通过new 对象的时候传递变量进去,更为灵活一点

Table Functions

    package udf;

    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
    import org.apache.flink.table.functions.TableFunction;
    import org.apache.flink.types.Row;
    
    public class TestTableFunction extends TableFunction {
    
        private String separator = ",";
    
        public TestTableFunction(String separator) {
            this.separator = separator;
        }

        //和传入数据进行计算的逻辑,参数个数任意
        public void eval(String input){
            
            Row row = null;
            
            if (Strings.isNullOrEmpty(input)){
                
                row = new Row(2);
                row.setField(0,null);
                row.setField(1,0);
                collect(row);
                
            }else {
                
                String[] split = input.split(separator);
                
                for (String word : split) {
                    row = new Row(2);
                    row.setField(0,word);
                    row.setField(1,word.length());
                    collect(row);
                }
                
            }
    
        }
    
        @Override
        public TypeInformation getResultType() {
            return Types.ROW(Types.STRING,Types.INT);
        }
    }

  • 自定义Table Functions,需要继承TableFunction,并且有一个publiceval(),方法可以接受任意个数参数,同时也可以在一个类中重载eval()
  • 因为返回的是Row类型,所以需要重写getResultType()
  • 在SQL语句中使用时,有两种写法:
    • select a.age,b.name,b.name_length from t2 a, LATERAL TABLE(test2(a.name_list)) as b(name, name_length)
    • select a.age,b.name,b.name_length from t2 a LEFT JOIN LATERAL TABLE(test2(a.name_list)) as b(name, name_length) ON TRUE
    • 第一种的用法相当于用的是CROSS JOIN
    • 第二种的用法是LEFT JOIN

Aggregation Functions

    package udf;

    import org.apache.flink.table.functions.AggregateFunction;
    
    import java.util.Iterator;
    
    public class TestAggregateFunction extends AggregateFunction<Long, TestAggregateFunction.SumAll> {
        //返回最终结果
        @Override
        public Long getValue(SumAll acc) {
            return acc.sum;
        }
        //构建保存中间结果的对象
        @Override
        public SumAll createAccumulator() {
            return new SumAll();
        }
        //和传入数据进行计算的逻辑
        public void accumulate(SumAll acc, long iValue) {
            acc.sum += iValue;
        }
    
        //减去要撤回的值
        public void retract(SumAll acc, long iValue) {
            acc.sum -= iValue;
        }
        
        //从每个分区把数据取出来然后合并
        public void merge(SumAll acc, Iterable<SumAll> it) {
    
            Iterator<SumAll> iter = it.iterator();
    
            while (iter.hasNext()) {
                SumAll a = iter.next();
                acc.sum += a.sum;
    
            }
        }
        //重置内存中值时调用
        public void resetAccumulator(SumAll acc) {
            acc.sum = 0L;
        }
    
        public static class SumAll {
            public long sum = 0;
        }
    
    }

  • 自定义Aggregation Functions,需要继承AggregateFunction,并且必须要有 以下的方法
    • createAccumulator() 创建一个保留中间结果的数据结构
    • accumulate() 把每个输入行与中间结果进行计算,可以重载
    • getValue() 获取最终结果
  • 根据不同的使用情况,还需要以下的方法
    • retract() 用于bounded OVER窗口,即窗口有结束时间
    • merge()用于多次批量聚合和会话窗口合并
    • resetAccumulator()用于多次批量聚合时,清空中间结果

Table Aggregation Functions

    package udf;

    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.table.functions.TableAggregateFunction;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.Collector;
    
    public class TestTableAggregateFunction extends TableAggregateFunction<Row,TestTableAggregateFunction.Top2> {
        //创建保留中间结果的对象
        @Override
        public Top2 createAccumulator() {
            Top2 t = new Top2();
            t.f1 = Integer.MIN_VALUE;
            t.f2 = Integer.MIN_VALUE;
    
            return t;
        }
        //与传入值进行计算的方法
        public void accumulate(Top2 t, Integer v) {
            //如果传入的值比内存中第一个值大,那就用第一个值替换第二个值,传入的值替换第一个值;
            //如果传入的值比第二个值大比第一个小,那么就替换第二个值。
            if (v > t.f1) {
                t.f2 = t.f1;
                t.f1 = v;
            } else if (v > t.f2) {
                t.f2 = v;
            }
        }
        
        //合并分区的值
        public void merge(Top2 t, Iterable<Top2> iterable) {
            for (Top2 otherT : iterable) {
                accumulate(t, otherT.f1);
                accumulate(t, otherT.f2);
            }
        }
    
        //拿到返回结果的方法
        public void emitValue(Top2 t, Collector<Row> out) {
            Row row = null;
            //发射数据
            //如果第一个值不是最小的int值,那就发出去
            //如果第二个值不是最小的int值,那就发出去
            if (t.f1 != Integer.MIN_VALUE) {
                row = new Row(2);
                row.setField(0,t.f1);
                row.setField(1,1);
                out.collect(row);
            }
            if (t.f2 != Integer.MIN_VALUE) {
                row = new Row(2);
                row.setField(0,t.f2);
                row.setField(1,2);
                out.collect(row);
            }
        }
        //撤回流拿结果的方法,会发射撤回数据
        public void emitUpdateWithRetract(Top2 t, RetractableCollector<Row> out) {
            Row row = null;
            //如果新旧值不相等,才需要撤回,不然没必要
            //如果旧值不等于int最小值,说明之前发射过数据,需要撤回
            //然后将新值发射出去
            if (!t.f1.equals(t.oldF1)) {
                if (t.oldF1 != Integer.MIN_VALUE) {
                    row = new Row(2);
                    row.setField(0,t.oldF1);
                    row.setField(1,1);
                    out.retract(row);
                }
                row = new Row(2);
                row.setField(0,t.f1);
                row.setField(1,1);
                out.collect(row);
                t.oldF1 = t.f1;
            }
            //和上面逻辑一样,只是一个发射f1,一个f2
            if (!t.f2.equals(t.oldF2)) {
                // if there is an update, retract old value then emit new value.
                if (t.oldF2 != Integer.MIN_VALUE) {
                    row = new Row(2);
                    row.setField(0,t.oldF2);
                    row.setField(1,2);
                    out.retract(row);
                }
                row = new Row(2);
                row.setField(0,t.f2);
                row.setField(1,2);
                out.collect(row);
                t.oldF2 = t.f2;
            }
        }
        //保留中间结果的类
        public class Top2{
            public Integer f1;
            public Integer f2;
            public Integer oldF1;
            public Integer oldF2;
    
        }
    
        @Override
        public TypeInformation<Row> getResultType() {
            return Types.ROW(Types.INT,Types.INT);
        }
    }

  • 自定义Table Aggregation Functions,需要继承TableAggregateFunction,并且必须要有 以下的方法
    • createAccumulator() 创建一个保留中间结果的数据结构
    • accumulate() 把每个输入行与中间结果进行计算,可以重载
  • 根据不同的使用情况,还需要以下的方法
    • retract() 用于bounded OVER窗口,即窗口有结束时间
    • merge()用于多次批量聚合和会话窗口合并
    • resetAccumulator()用于多次批量聚合时,清空中间结果
    • emitValue() 用于批量和窗口聚合拿到结果
    • emitUpdateWithRetract() 用于流式计算的撤回流
  • 目前Table Aggregation Functions只支持在Table Api中使用

完整代码

    //下面贴出来的是主类的代码,具体每个UDF的类上面已经有了
    package FlinkSql;


    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.types.Row;
    import udf.TestAggregateFunction;
    import udf.TestScalarFunc;
    import udf.TestTableAggregateFunction;
    import udf.TestTableFunction;
    
    import static util.FlinkConstant.env;
    import static util.FlinkConstant.tEnv;
    
    public class FlinkSql04 {
        public static void main(String[] args) throws Exception {
    
    
            DataStream<Row> source = env.addSource(new RichSourceFunction<Row>() {
    
                @Override
                public void run(SourceContext<Row> ctx) throws Exception {
                        Row row = new Row(3);
                        row.setField(0, 2);
                        row.setField(1, 3);
                        row.setField(2, 3);
                        ctx.collect(row);
                }
    
                @Override
                public void cancel() {
    
                }
            }).returns(Types.ROW(Types.INT,Types.INT,Types.INT));
    
            tEnv.createTemporaryView("t",source,"a,b,c");
    
    //        tEnv.sqlUpdate("CREATE FUNCTION IF NOT EXISTS test AS 'udf.TestScalarFunc'");
    
            tEnv.registerFunction("test",new TestScalarFunc());
    
            Table table = tEnv.sqlQuery("select test() as a,test(a) as b, test(a,b,c) as c from t");
    
            DataStream<Row> res = tEnv.toAppendStream(table, Row.class);
    
    //        res.print().name("Scalar Functions Print").setParallelism(1);
    
            DataStream<Row> ds2 = env.addSource(new RichSourceFunction<Row>() {
    
    
                @Override
                public void run(SourceContext<Row> ctx) throws Exception {
                        Row row = new Row(2);
                        row.setField(0, 22);
                        row.setField(1, "aa,b,cdd,dfsfdg,exxxxx");
                        ctx.collect(row);
                }
    
                @Override
                public void cancel() {
    
                }
            }).returns(Types.ROW(Types.INT, Types.STRING));
    
            tEnv.createTemporaryView("t2",ds2,"age,name_list");
    
            tEnv.registerFunction("test2",new TestTableFunction(","));
    
    //        Table table2 = tEnv.sqlQuery("select a.age,b.name,b.name_length from t2 a, LATERAL TABLE(test2(a.name_list)) as b(name, name_length)");
    
            Table table2 = tEnv.sqlQuery("select a.age,b.name,b.name_length from t2 a LEFT JOIN LATERAL TABLE(test2(a.name_list)) as b(name, name_length) ON TRUE");
    
            DataStream<Row> res2 = tEnv.toAppendStream(table2, Row.class);
    
    //        res2.print().name("Table Functions Print").setParallelism(1);
    
            DataStream<Row> ds3 = env.addSource(new RichSourceFunction<Row>() {
                @Override
                public void run(SourceContext<Row> ctx) throws Exception {
                    Row row1 = new Row(2);
                    row1.setField(0,"a");
                    row1.setField(1,1L);
    
                    Row row2 = new Row(2);
                    row2.setField(0,"a");
                    row2.setField(1,2L);
    
                    Row row3 = new Row(2);
                    row3.setField(0,"b");
                    row3.setField(1,100L);
    
                    ctx.collect(row1);
                    ctx.collect(row2);
                    ctx.collect(row3);
    
                }
    
                @Override
                public void cancel() {
    
                }
            }).returns(Types.ROW(Types.STRING, Types.LONG));
    
            tEnv.createTemporaryView("t3",ds3,"name,cnt");
    
            tEnv.registerFunction("test3",new TestAggregateFunction());
    
            Table table3 = tEnv.sqlQuery("select name,test3(cnt) as mySum from t3 group by name");
    
            DataStream<Tuple2<Boolean, Row>> res3 = tEnv.toRetractStream(table3, Row.class);
    
    //        res3.print().name("Aggregate Functions Print").setParallelism(1);
    
            DataStream<Row> ds4 = env.addSource(new RichSourceFunction<Row>() {
                @Override
                public void run(SourceContext<Row> ctx) throws Exception {
                    Row row1 = new Row(2);
                    row1.setField(0,"a");
                    row1.setField(1,1);
    
                    Row row2 = new Row(2);
                    row2.setField(0,"a");
                    row2.setField(1,2);
    
                    Row row3 = new Row(2);
                    row3.setField(0,"a");
                    row3.setField(1,100);
    
                    ctx.collect(row1);
                    ctx.collect(row2);
                    ctx.collect(row3);
                }
    
                @Override
                public void cancel() {
    
                }
            }).returns(Types.ROW(Types.STRING, Types.INT));
    
            tEnv.createTemporaryView("t4",ds4,"name,cnt");
    
            tEnv.registerFunction("test4",new TestTableAggregateFunction());
    
            Table table4 = tEnv.sqlQuery("select * from t4");
    
            Table table5 = table4.groupBy("name")
                    .flatAggregate("test4(cnt) as (v,rank)")
                    .select("name,v,rank");
    
            DataStream<Tuple2<Boolean, Row>> res4 = tEnv.toRetractStream(table5, Row.class);
    
            res4.print().name("Aggregate Functions Print").setParallelism(1);
    
            env.execute("test udf");
    
        }
    }

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,099评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,828评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,540评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,848评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,971评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,132评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,193评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,934评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,376评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,687评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,846评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,537评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,175评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,887评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,134评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,674评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,741评论 2 351