flinksql - operator - udf

概念
  1. 自定义标量函数,接收一个或多个列,输出一个列,行与行是一一对应的
  2. 构造函数在jobmanager上创建udf时就执行
  3. open方法在所有并行子任务上都执行一次,且在调用该udf时才会执行open方法
  4. 通过DataTypeHint注解和FunctionHint注解可以自定义udf参数和返回类型
  5. 通过重写getTypeInference方法动态指定udf返回类型
  6. deterministic为true时,若eval方法无参或传入常量参数,则eval方法仅会执行一次,所有行的调用结果都采用此次执行的eval返回值。若deterministic为false时,则无论何种情况均每行执行一次eval方法获得对应的返回值。deterministic默认为true,可通过重写isDeterministic方法指定其值。
定义

定义udf类,继承ScalarFunction,并实现eval方法,参数自定义

// 实现nvl函数,接收任意类型的参数,若第一个参数为null则返回第二个参数的值,否则返回第一个参数,且返回值类型恒等于第一个参数类型
import org.apache.flink.table.functions.ScalarFunction;

private Class<?> valueConvertClass;
    private Class<?> defaultValueConvertClass;
    Constructor<?> convertConstructor = null;
    Method staticConvertMethod = null;
    Map<Class<?>, Class<?>> typeMap;

    public Nvl() {
        // 保存引用类型与基本类型的对应关系, 因为所有valueOf转换方法都要求传入基本类型, 而defaultValueConvertClass获取到的有可能是其引用类型
        typeMap = new HashMap<>();
        typeMap.put(Integer.class, int.class);
        typeMap.put(Long.class, long.class);
        typeMap.put(Double.class, double.class);
        typeMap.put(Character.class, char.class);
        typeMap.put(Byte.class, byte.class);
        typeMap.put(Short.class, short.class);
        typeMap.put(Float.class, float.class);
    }
    
    @Override
    public void open(FunctionContext context) throws Exception {
        if (valueConvertClass != defaultValueConvertClass) {
            if (valueConvertClass.equals(BigDecimal.class)) {
                // 对应FlinkSQL的DECIMAL类型
                // 使用BigDecimal的构造函数把目标对象转为BigDecimal对象, 源类型BigDecimal,int,long,char[],string,double,BigInteger
                convertConstructor = BigDecimal.class.getConstructor(typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
            } else if (valueConvertClass.equals(String.class)) {
                // 对应flinkSQL的STRING、VARCHAR、CHAR类型
                // 使用String.valueOf方法把目标对象转为String对象, 原类型支持所有基本数据类型
                staticConvertMethod = String.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
            }  else if (valueConvertClass.equals(Integer.class)) {
                // 对应FlinkSQL的INT类型
                // 使用Integer.valueOf方法把目标对象转为Integer对象, 源类型仅支持int类型和String类型
                staticConvertMethod = Integer.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
            } else if (valueConvertClass.equals(Boolean.class)) {
                // 对应flinkSQL的BOOLEAN类型
                // 使用Boolean.valueOf方法把目标对象转为Boolean对象, 源类型仅支持boolean类型和String类型
                staticConvertMethod = Boolean.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
            } else if (valueConvertClass.equals(Byte.class)) {
                // 对应FlinkSQL的TINYINT类型
                // 使用Byte.valueOf方法把目标对象转为Byte对象, 源类型仅支持byte类型和String类型
                staticConvertMethod = Byte.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
            } else if (valueConvertClass.equals(Short.class)) {
                // 对应FlinkSQL的SMALLINT类型
                // 使用Short.valueOf方法把目标对象转为Short对象, 源类型仅支持short类型和String类型
                staticConvertMethod = Short.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
            }else if (valueConvertClass.equals(Long.class)) {
                // 对应FlinkSQL的BIGINT类型
                // 使用Long.valueOf方法把目标对象转为Long对象, 源类型仅支持long类型和String类型
                staticConvertMethod = Long.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
            } else if (valueConvertClass.equals(Float.class)) {
                // 对应FlinkSQL的FLOAT类型
                // 使用Float.valueOf方法把目标对象转为Float对象, 源类型仅支持float类型和String类型
                staticConvertMethod = Float.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
            } else if (valueConvertClass.equals(Double.class)) {
                // 对应FlinkSQL的DOUBLE类型
                // 使用Double.valueOf方法把目标对象转为Double对象, 源类型仅支持double类型和String类型
                staticConvertMethod = Double.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
            } else if (valueConvertClass.equals(Date.class)) {
                // 对应FlinkSQL的DATE类型
                // 使用Date.valueOf方法把目标对象转为Date对象, 源类型仅支持LocalDate类型和String类型
                staticConvertMethod = Date.class.getMethod("valueOf", defaultValueConvertClass);
            } else if (valueConvertClass.equals(LocalDate.class)) {
                // 对应FlinkSQL的DATE类型
                // 使用LocalDate.parse方法把目标对象转为LocalDate对象, 源类型仅支持CharSequence类型
                staticConvertMethod = LocalDate.class.getMethod("parse", defaultValueConvertClass);
            } else if (valueConvertClass.equals(Time.class)) {
                // 对应FlinkSQL的TIME(0)类型
                // 使用Time.valueOf方法把目标对象转为Time对象, 源类型仅支持LocalTime类型和String类型
                staticConvertMethod = Time.class.getMethod("valueOf", defaultValueConvertClass);
            } else {
                throw new RuntimeException("unsupported datatype: " + defaultValueConvertClass.getName());
            }
        }
    }

    // eval方法,实现udf返回
    // 重写getTypeInference方法,以及声明eval方法返回类型为Object,实现动态返回类型
    // 使用DataTypeHin注解自定义udf参数类型,inputGroup = InputGroup.ANY时表示接收任意类型的参数,搭配Object类型的参数类型,实现对任意类型参数的接收处理
    public Object eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object value,
                       @DataTypeHint(inputGroup = InputGroup.ANY) Object defaultValue) throws InvocationTargetException, InstantiationException, IllegalAccessException {
        if (value != null) {
            return value;
        } else if (staticConvertMethod != null) {
            return staticConvertMethod.invoke(null, defaultValue);
        } else if (convertConstructor != null) {
            return convertConstructor.newInstance(defaultValue);
        } else {
            return defaultValue;
        }
    }

    // 获取第一个参数的类型, 此类型通过其字段类型得到,并将其作为udf返回类型
    @Override
    public TypeInference getTypeInference(DataTypeFactory typeFactory) {
        return TypeInference.newBuilder()
                .outputTypeStrategy(callContext -> {
                    // getConversionClass为引用类型
                    valueConvertClass = callContext.getArgumentDataTypes().get(0).getConversionClass();
                    defaultValueConvertClass = callContext.getArgumentDataTypes().get(1).getConversionClass();
                    return Optional.of(callContext.getArgumentDataTypes().get(0));
                })
                .build();
    }
}
使用
// table api
table.select($"nvl",call(new Nvl(),$"col1",$"col2"));
// flink sql
tableEnv.createTemporaryFunction("nvl",Nvl.class);
tableEnv.createTemporaryFunction("nvl",new Nvl());

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容

  • UDF是什么? UDF是用户自定义函数(User Define Function)的缩写,从定义可以看出UDF是一...
    troublemak_19be阅读 3,206评论 0 0
  • 记录一下曾经走过的一些坑,一定要注意operator状态之前尽量不要用keyby Flink提供了Exactly ...
    大酱游说大数据阅读 3,926评论 0 3
  • [TOC] 一、UDF 介绍 UDF(User-Defined Functions)即是用户自定义的hive函数。...
    w1992wishes阅读 3,345评论 0 0
  • Flink学习总结 flink是什么:为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架 流处...
    倔强青铜弟中弟阅读 566评论 0 0
  • 2020-01-03 时间属性 Flink支持不同的时间语义,核心是 Processing Time 和 Even...
    Ary_zz阅读 408评论 0 0