Flink1.9 UDF使用教程

UDF是什么?

UDF是用户自定义函数(User Define Function)的缩写,从定义可以看出UDF是一个函数,UDF在SQL开发中是一个非常重要的特性,它可以帮助我们扩展SQL的表意能力。

Flink SQL默认给我们提供了一些内置函数如COALESCE、IF、TO_DATE等函数,但是可能无法满足我们的需求,比如我们希望实现时间戳转换、从JSON字符串中获取对应值、维度关联等复杂功能需要借助UDF来完成。

Flink中UDF总共有几类?

  • UDF(Scalar Function)
  • UDTF
  • UDAF
  • UDTAF

UDF如何使用

​ UDF需要先注册后使用,在TableEnvironment中调用registerFunction()方法完成UDF方法注册。当完成UDF的注册后该函数就会注册到TablEnvironment中,后续在SQL中使用该函数Table API和SQL parser就能够识并解析该函数。

Scalar Functions(标量函数)

​ 标量函数用于对传递给它的一个或者多个参数值进行处理和计算,并返回一个单一的值。

​ 定义标量函数需要继承包org.apache.flink.table.functions下的类ScalarFunction并且实现一到多个求值函数(evaluation method)。标量函数的行为是取决于求值函数。一个求值函数必须要定义个访问权限为publiceval函数。求值函数可以自定义参数类型和返回值类型。求值函数还支持重载(可以实现多个eval函数)。

public class HashCode extends ScalarFunction {
  private int factor = 12;
  
  public HashCode(int factor) {
      this.factor = factor;
  }
  
  public int eval(String s) {
      return s.hashCode() * factor;
  }
}

BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);

// register the function
tableEnv.registerFunction("hashCode", new HashCode(10));

// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");

// use the function in SQL API
tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");

​ 求值函数的返回值默认是由Flink的类型提取器获取的。这对于基本类型及POJO是够用的但是对于复杂类型如自定义或组合类型会提取错误。在这些情况下可以通过手动覆盖ScalarFunction#getResultType()函数来定义结果类型的TypeInformation

​ 接下来的例子展示一个高级的例子,内部使用timestamp表示当返回时内部timestamp表示成long值进行返回。通过覆盖ScalarFunction#getResultType()我们定义这个返回的long值将被代码生成器解释成Types.TIMESTAMP类型。

public static class TimestampModifier extends ScalarFunction {
  public long eval(long t) {
    return t % 1000;
  }

  public TypeInformation<?> getResultType(Class<?>[] signature) {
    return Types.SQL_TIMESTAMP;
  }
}

Table Functions

Table Function接受任意数量的输入值,输出一张表(一或多行同时每行有一或多列)。

​ 类似用户自定义的标量函数,用户自定义的table function使用0、1或者多个标量值作为输入参数。但是与标量函数不同的是,它可以返回任意数量的行而不是一个单一的值。返回的行可以包含一个或多个列。

​ 为了定义Tabe Function需要继承包org.apache.flink.table.functions下的TableFunction基类然后实现(一个或多个)求值函数。Table Function的行为是由它的求值函数决定的。一个求值函数必须声明为public且命名为eval。Table Function可以通过实现多个eval函数来实现重载。求值函数的参数类型决定了Table Function所有可用的合法参数。求值函数还可以支持不定长参数,比如eval(String... strs)。返回的表类型是由TableFunction定义的泛型类型决定的。求值函数发出输出row使用protected collect(T)方法。

​ 在Table API中Table Function 主要用于.joinLateral or .leftOuterJoinLateraljoinLateral算子(cross join) 连接外表(算子左边的表)的每一行及Table Function的值(算子右边的函数)产生的所有行。leftOuterJoinLateral算子连接外表的每一行(算子左边的表)及TableFunction的值(算子右边的函数)产生所有行同时如果Table Function返回空表的话则保留外部表的行。在SQL中使用LATERAL TABLE(<TableFunction>)需要配合CROSS JOINLEFT JOINON TRUE的join条件组合。

​ 下面的例子展示了TableFunction如何定义、在TableEnvironment中注册及在查询中调用。注意你可以在注册TableFunction之前通过它的构造函数来进行配置。

// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
public class Split extends TableFunction<Tuple2<String, Integer>> {
    private String separator = " ";
    
    public Split(String separator) {
        this.separator = separator;
    }
    
    public void eval(String str) {
        for (String s : str.split(separator)) {
            // use collect(...) to emit a row
            collect(new Tuple2<String, Integer>(s, s.length()));
        }
    }
}

BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
Table myTable = ...         // table schema: [a: String]

// Register the function.
tableEnv.registerFunction("split", new Split("#"));

// Use the table function in the Java Table API. "as" specifies the field names of the table.
myTable.joinLateral("split(a) as (word, length)")
    .select("a, word, length");
myTable.leftOuterJoinLateral("split(a) as (word, length)")
    .select("a, word, length");

// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");

​ 请注意POJO类型无法决定字段的顺序。所以你不能通过对Table Function使用AS来实现对POJO字段的重命名。

​ Table Function的结果类型默认是由Flink的类型提取工具确定的。这对于基本类型及简单的POJO有效,但是对于更复杂如自定义或者复合类型可能会出错。在这种情况,结果的类型能通过手动重写TableFunction#getResultType()返回指定的TypeInformation来确定类型信息。

​ 下面的例子展示了TableFunction需要返回Row类型所以需要显示指定类型信息的例子。我们通过重写TableFunction#getResultType()来定义返回的表类型为RowTypeInfo(String, Integer)

public class CustomTypeSplit extends TableFunction<Row> {
    public void eval(String str) {
        for (String s : str.split(" ")) {
            Row row = new Row(2);
            row.setField(0, s);
            row.setField(1, s.length());
            collect(row);
        }
    }

    @Override
    public TypeInformation<Row> getResultType() {
        return Types.ROW(Types.STRING(), Types.INT());
    }
}

Aggregation Functions

​ User-Defined Aggregate Functions(UDAF)是输入一张表(一或多行同时每行有一或多列),输出一个标量值,它是将一张表聚合成一个值类型与SQL中Group操作类似。

image.png

​ 上面的图展示了聚合的例子。假设你有一张包含各种饮料的数据的表。这个表格里包含3列属性(id、名称和价格)且一共有5行。现在你需要找到表格里所有饮料中最高价格。比如执行了聚合函数max()。你需要检查这5行最后输出一个数字值作为结果。

​ 用户自定义聚合函数式通过继承AggregationFunction类来实现的。一个AggregationFunction的工作内容如下。首先它需要一个累加器(accumulator),累加器是一个能保留聚合的中间结果的数据结构。一个空的累加器通过调用AggregationFunctioncreateAccmulator()方法来创建的。随后对于每个输入行都调用函数的accumulate()方法来更新累加器。一旦所有的行都已经处理完成,函数的getValue()方法会被调用用来计算并返回最终结果。

对于AggregationFunction下面的方法是强制要求设置的:

  • createAccumulator()
  • accumulate()
  • getValue()

​ Flink的类型提取器对于识别复杂数据类型会失败,比如如果他们不是基本类型或者简单的POJO。因此和ScalarFunctionTableFunction类似,AggregateFunction提供方法用于指定结果类型的TypeInformation(通过AggregateFunction#getResultType())及累加器的类型(通过`AggregateFunction#getAccumulatorType())。

​ 除了上面的方法,这还有一些简化的方法可以有选择的来实现。当中有一些方法允许系统更高效的执行查询,其他的在某些情况下是需要必须实现的。比如在聚合函数需要在session group window(当以行察觉到需要”连接“它们时,两个session窗口的累加器需要关联)中使用context时需要强制实现merge()方法。

下面的AggregationFunction的方法需要取决于使用情况来强制实现:

  • retract() 是在聚合进行有界数据聚合时需要的
  • merge()是在许多批聚合及session窗口聚合时需要的
  • resetAccumulator()是在许多批聚合是需要的

AggregateFunction所有的方法必须声明为public,not static且命名必须和上面提到的保持一致。方法createAccumulatorgetValuegetResultTypegetAccumulatorType都是在抽象类AggregateFunction中定义的,其他的方法都是需要手动定义的。为了定义一个聚合函数,需啊集成基类org.apache.flink.table.functions.AggregateFunction并且实现一或多个accumulate方法。这个accmulate方法能够通过不同类型的参数及支持变长参数来实现重载。

AggregationFunction的所有方法都在下方文档。

/**
  * Base class for user-defined aggregates and table aggregates.
  *
  * @param <T>   the type of the aggregation result.
  * @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
  *             aggregated values which are needed to compute an aggregation result.
  */
public abstract class UserDefinedAggregateFunction<T, ACC> extends UserDefinedFunction {

  /**
    * Creates and init the Accumulator for this (table)aggregate function.
    *
    * @return the accumulator with the initial value
    */
  public ACC createAccumulator(); // MANDATORY

  /**
    * Returns the TypeInformation of the (table)aggregate function's result.
    *
    * @return The TypeInformation of the (table)aggregate function's result or null if the result
    *         type should be automatically inferred.
    */
  public TypeInformation<T> getResultType = null; // PRE-DEFINED

  /**
    * Returns the TypeInformation of the (table)aggregate function's accumulator.
    *
    * @return The TypeInformation of the (table)aggregate function's accumulator or null if the
    *         accumulator type should be automatically inferred.
    */
  public TypeInformation<ACC> getAccumulatorType = null; // PRE-DEFINED
}

/**
  * Base class for aggregation functions. 
  *
  * @param <T>   the type of the aggregation result
  * @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
  *             aggregated values which are needed to compute an aggregation result.
  *             AggregateFunction represents its state using accumulator, thereby the state of the
  *             AggregateFunction must be put into the accumulator.
  */
public abstract class AggregateFunction<T, ACC> extends UserDefinedAggregateFunction<T, ACC> {

  /** Processes the input values and update the provided accumulator instance. The method
    * accumulate can be overloaded with different custom types and arguments. An AggregateFunction
    * requires at least one accumulate() method.
    *
    * @param accumulator           the accumulator which contains the current aggregated results
    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
    */
  public void accumulate(ACC accumulator, [user defined inputs]); // MANDATORY

  /**
    * Retracts the input values from the accumulator instance. The current design assumes the
    * inputs are the values that have been previously accumulated. The method retract can be
    * overloaded with different custom types and arguments. This function must be implemented for
    * datastream bounded over aggregate.
    *
    * @param accumulator           the accumulator which contains the current aggregated results
    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
    */
  public void retract(ACC accumulator, [user defined inputs]); // OPTIONAL

  /**
    * Merges a group of accumulator instances into one accumulator instance. This function must be
    * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
    *
    * @param accumulator  the accumulator which will keep the merged aggregate results. It should
    *                     be noted that the accumulator may contain the previous aggregated
    *                     results. Therefore user should not replace or clean this instance in the
    *                     custom merge method.
    * @param its          an {@link java.lang.Iterable} pointed to a group of accumulators that will be
    *                     merged.
    */
  public void merge(ACC accumulator, java.lang.Iterable<ACC> its); // OPTIONAL

  /**
    * Called every time when an aggregation result should be materialized.
    * The returned value could be either an early and incomplete result
    * (periodically emitted as data arrive) or the final result of the
    * aggregation.
    *
    * @param accumulator the accumulator which contains the current
    *                    aggregated results
    * @return the aggregation result
    */
  public T getValue(ACC accumulator); // MANDATORY

  /**
    * Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for
    * dataset grouping aggregate.
    *
    * @param accumulator  the accumulator which needs to be reset
    */
  public void resetAccumulator(ACC accumulator); // OPTIONAL

  /**
    * Returns true if this AggregateFunction can only be applied in an OVER window.
    *
    * @return true if the AggregateFunction requires an OVER window, false otherwise.
    */
  public Boolean requiresOver = false; // PRE-DEFINED
}

​ 下面的例子展示了定义一个AggregateFunction计算给定列的加权平均值、在TableEnvironment中注册及在查询中使用这个函数。

​ 为了计算加权平均值,累加器需要存权值的总和以及所有的数据出现次数的累加值。在我们的例子里我们定义了一个类WeightedAvgAccum用做累加器。累加器能够自动使用Flink的checkpoint机制并且当出错时能够自动回复从而保证精准消费一次的语义。

​ 我们的聚合函数WeightedAvgaccumulate()方法有三个输入。第一个是WeightedAvgAccum累加器,其他两个是用户自定义的输入:输入值ivalue和权重输入iweight。尽管retract()merge()resetAccmulator()方法在大多数聚合类型中不需要强制设置,但我们还是提供了一个例子。请注意我们使用Java的基本类型同时在Scala例子中定义了getResultType()getAccumulatorType()方法因为Flink类型提取对于Scala类型执行的并不是很好。

/**
 * Accumulator for WeightedAvg.
 */
public static class WeightedAvgAccum {
    public long sum = 0;
    public int count = 0;
}

/**
 * Weighted Average user-defined aggregate function.
 */
public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {

    @Override
    public WeightedAvgAccum createAccumulator() {
        return new WeightedAvgAccum();
    }

    @Override
    public Long getValue(WeightedAvgAccum acc) {
        if (acc.count == 0) {
            return null;
        } else {
            return acc.sum / acc.count;
        }
    }

    public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
        acc.sum += iValue * iWeight;
        acc.count += iWeight;
    }

    public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
        acc.sum -= iValue * iWeight;
        acc.count -= iWeight;
    }
    
    public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
        Iterator<WeightedAvgAccum> iter = it.iterator();
        while (iter.hasNext()) {
            WeightedAvgAccum a = iter.next();
            acc.count += a.count;
            acc.sum += a.sum;
        }
    }
    
    public void resetAccumulator(WeightedAvgAccum acc) {
        acc.count = 0;
        acc.sum = 0L;
    }
}

// register function
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("wAvg", new WeightedAvg());

// use function
tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");

Table Aggregation Functions

​ User-Defined Table Aggregate Functions(UDTAGGs) 是输入一张表(一或多行且每行有一或多个属性),输出一个张结果表。

image.png

​ 上面的图展示了聚UDAG的例子。假设你有一张包含饮料数据的表格。这个表包含id、名称、价格3列总共有5行。现在你要在表格中所有饮料找到最高的2个价格。比如实现一个top2()表聚合的功能。你需要依次检查5行最后输出结果是包含最高2个值的一张表。

​ UDAGF是通过实现继承TableAggregateFunction类来实现的。一个TableAggregateFunction实现需要以下步骤。首先,需要实现一个数据结构accumulator,它是用于保留聚合的中间结果。一个空的累加器是由调用TableAggregateFunction的方法createAccumulator()方法来创建的。随后每一行输入进来都会调用accumulate()方法来更新累加器。一旦所有的行都处理完成,emitValue()方法会被调用来计算和返回最终结果。

**下面的方法是每一个TableAggregateFunction都需要实现的:

  • createAccumulator()
  • accumulate()

​ Flink的类型提取器对于复杂数据类型识别会失败,比如对于非基本类型或非简单的POJO。类似ScalarFunctionTableFunctionTableAggregateFunction提供了方法用来识别结果的TypeInformation(通过TableAggregateFunction#getResultType())及累加器的类型(通过TableAggregateFunction#getAccumulatorType()`)。

​ 除了以上方法,还有一些未在父类定义的方法可以选择性的实现。有些方法可以允许系统查询更高效,还有一些在某些情况下要求强制实现。比如,merge()方法在聚合函数需要使用session group window的上下文时必须要实现(两个session window的累加器当行发现需要做连接时进行关联操作)。

下面就是TableAggregateFunction需要强制实现方法的情况:

  • retract()在有界窗口上做聚合需要实现
  • merge()在需要批聚合及session window聚合是需要实现
  • resetAccumulator()在许多批聚合需要实现
  • emitValue()在批和窗口聚合需要实现

下面是TableAggregateFunction需要改善流式任务的性能的方法:

  • emitUpdateWithRetract()用于在回撤模式下更新已经发送的数据

​ 对于emitValue方法,它会将累加器中所有的数据都发送出来。对于TopN的例子来说,emitValue每次都会将top n的数据都发送出来。这对于流式任务来说会产生应能问题。为了改善性能,用户可以通过实现emitUpdateWithRetract方法来改善性能。这个方法在回车模式下是增量输出数据的,比如一旦发生了更新,我们会在发送新更新后的数据前先回车旧的记录。如果这两个方法都定义在table aggregate function中会优先使用emitUpdateWithRetract方法,因为该方法是增量输出结果较emitValue更加高效。

​ 所有的TableAggregateFunction必须要声明成public,非static而且命名需要和上面提及的保持一致。方法createAccumulatorgetResultTypegetAccumulatorType都是定义在抽象父类TableAggregateFunction中,其他的方法未在父类定义。为了定义table aggregation function需要继承父类org.apache.flink.table.functions.TableAggregateFunction且实现一或多个accumulate方法。accumulate可以使用不同的参数及支持变长参数来实现重载。

​ 详细关于TableAggregateFunction所有的方法在下面列出来。

/**
  * Base class for user-defined aggregates and table aggregates.
  *
  * @param <T>   the type of the aggregation result.
  * @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
  *             aggregated values which are needed to compute an aggregation result.
  */
public abstract class UserDefinedAggregateFunction<T, ACC> extends UserDefinedFunction {

  /**
    * Creates and init the Accumulator for this (table)aggregate function.
    *
    * @return the accumulator with the initial value
    */
  public ACC createAccumulator(); // MANDATORY

  /**
    * Returns the TypeInformation of the (table)aggregate function's result.
    *
    * @return The TypeInformation of the (table)aggregate function's result or null if the result
    *         type should be automatically inferred.
    */
  public TypeInformation<T> getResultType = null; // PRE-DEFINED

  /**
    * Returns the TypeInformation of the (table)aggregate function's accumulator.
    *
    * @return The TypeInformation of the (table)aggregate function's accumulator or null if the
    *         accumulator type should be automatically inferred.
    */
  public TypeInformation<ACC> getAccumulatorType = null; // PRE-DEFINED
}

/**
  * Base class for table aggregation functions. 
  *
  * @param <T>   the type of the aggregation result
  * @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
  *             aggregated values which are needed to compute a table aggregation result.
  *             TableAggregateFunction represents its state using accumulator, thereby the state of
  *             the TableAggregateFunction must be put into the accumulator.
  */
public abstract class TableAggregateFunction<T, ACC> extends UserDefinedAggregateFunction<T, ACC> {

  /** Processes the input values and update the provided accumulator instance. The method
    * accumulate can be overloaded with different custom types and arguments. A TableAggregateFunction
    * requires at least one accumulate() method.
    *
    * @param accumulator           the accumulator which contains the current aggregated results
    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
    */
  public void accumulate(ACC accumulator, [user defined inputs]); // MANDATORY

  /**
    * Retracts the input values from the accumulator instance. The current design assumes the
    * inputs are the values that have been previously accumulated. The method retract can be
    * overloaded with different custom types and arguments. This function must be implemented for
    * datastream bounded over aggregate.
    *
    * @param accumulator           the accumulator which contains the current aggregated results
    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
    */
  public void retract(ACC accumulator, [user defined inputs]); // OPTIONAL

  /**
    * Merges a group of accumulator instances into one accumulator instance. This function must be
    * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
    *
    * @param accumulator  the accumulator which will keep the merged aggregate results. It should
    *                     be noted that the accumulator may contain the previous aggregated
    *                     results. Therefore user should not replace or clean this instance in the
    *                     custom merge method.
    * @param its          an {@link java.lang.Iterable} pointed to a group of accumulators that will be
    *                     merged.
    */
  public void merge(ACC accumulator, java.lang.Iterable<ACC> its); // OPTIONAL

  /**
    * Called every time when an aggregation result should be materialized. The returned value
    * could be either an early and incomplete result  (periodically emitted as data arrive) or
    * the final result of the  aggregation.
    *
    * @param accumulator the accumulator which contains the current
    *                    aggregated results
    * @param out         the collector used to output data
    */
  public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL
  
  /**
    * Called every time when an aggregation result should be materialized. The returned value
    * could be either an early and incomplete result (periodically emitted as data arrive) or
    * the final result of the aggregation.
    *
    * Different from emitValue, emitUpdateWithRetract is used to emit values that have been updated.
    * This method outputs data incrementally in retract mode, i.e., once there is an update, we
    * have to retract old records before sending new updated ones. The emitUpdateWithRetract
    * method will be used in preference to the emitValue method if both methods are defined in the
    * table aggregate function, because the method is treated to be more efficient than emitValue
    * as it can outputvalues incrementally.
    *
    * @param accumulator the accumulator which contains the current
    *                    aggregated results
    * @param out         the retractable collector used to output data. Use collect method
    *                    to output(add) records and use retract method to retract(delete)
    *                    records.
    */
  public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out); // OPTIONAL
  
  /**
    * Collects a record and forwards it. The collector can output retract messages with the retract
    * method. Note: only use it in {@code emitRetractValueIncrementally}.
    */
  public interface RetractableCollector<T> extends Collector<T> {

      /**
        * Retract a record.
        *
        * @param record The record to retract.
        */
      void retract(T record);
  }
}

下面的例子的展示了如何定义一个通过给定的列计算出top 2值的TableAggregateFuncgtion、在TableEnvironment中注册及在Table API中使用该函数查询(Flink 1.9中TableAggregateFunction只支持在在Table API中使用)。

​ 为了计算top 2值,累加器需要存储已经累加后的最大的两个值。在我们的例子中我们定义Top2Accum类作为我们累加器。累加器自动支持使用Flink的checkpoint机制在程序出错时能存储数据保证精准一次的语义。

​ 我们的Top2 UTAF的accumulate()方法有两个输入。第一个是Top2Accum累加器,另一个是用户自定义数据:输入值v。尽管merge()方法对于大多数聚合类型不是强制要求的,我们在下面还是提供了样例。请注意我们使用Java基本类型在Scala样例中定义getResultType()getAccumulatorType()方法因为Flink类型提取对于Scala类型计算的不是很好。

/**
 * Accumulator for Top2.
 */
public class Top2Accum {
    public Integer first;
    public Integer second;
}

/**
 * The top2 user-defined table aggregate function.
 */
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {

    @Override
    public Top2Accum createAccumulator() {
        Top2Accum acc = new Top2Accum();
        acc.first = Integer.MIN_VALUE;
        acc.second = Integer.MIN_VALUE;
        return acc;
    }


    public void accumulate(Top2Accum acc, Integer v) {
        if (v > acc.first) {
            acc.second = acc.first;
            acc.first = v;
        } else if (v > acc.second) {
            acc.second = v;
        }
    }

    public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
        for (Top2Accum otherAcc : iterable) {
            accumulate(acc, otherAcc.first);
            accumulate(acc, otherAcc.second);
        }
    }

    public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
        // emit the value and rank
        if (acc.first != Integer.MIN_VALUE) {
            out.collect(Tuple2.of(acc.first, 1));
        }
        if (acc.second != Integer.MIN_VALUE) {
            out.collect(Tuple2.of(acc.second, 2));
        }
    }
}

// register function
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("top2", new Top2());

// init table
Table tab = ...;

// use function
tab.groupBy("key")
    .flatAggregate("top2(a) as (v, rank)")
    .select("key, v, rank");

​ 下面的例子展示了如何使用emitUpdateWithRetract方法来发送只有更新的数据。为了发送更新,在我们的例子中累加器保留了旧的和新的top 2值。注意:如果topN的N是很大的话,对于保留旧的和新的值不是特别高效的。一种做法是在accumulate方法中存储将输入数据的值存储到累加器中然后在emitUpdateWithRetract方法中执行计算。

/**
 * Accumulator for Top2.
 */
public class Top2Accum {
    public Integer first;
    public Integer second;
    public Integer oldFirst;
    public Integer oldSecond;
}

/**
 * The top2 user-defined table aggregate function.
 */
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {

    @Override
    public Top2Accum createAccumulator() {
        Top2Accum acc = new Top2Accum();
        acc.first = Integer.MIN_VALUE;
        acc.second = Integer.MIN_VALUE;
        acc.oldFirst = Integer.MIN_VALUE;
        acc.oldSecond = Integer.MIN_VALUE;
        return acc;
    }

    public void accumulate(Top2Accum acc, Integer v) {
        if (v > acc.first) {
            acc.second = acc.first;
            acc.first = v;
        } else if (v > acc.second) {
            acc.second = v;
        }
    }

    public void emitUpdateWithRetract(Top2Accum acc, RetractableCollector<Tuple2<Integer, Integer>> out) {
        if (!acc.first.equals(acc.oldFirst)) {
            // if there is an update, retract old value then emit new value.
            if (acc.oldFirst != Integer.MIN_VALUE) {
                out.retract(Tuple2.of(acc.oldFirst, 1));
            }
            out.collect(Tuple2.of(acc.first, 1));
            acc.oldFirst = acc.first;
        }

        if (!acc.second.equals(acc.oldSecond)) {
            // if there is an update, retract old value then emit new value.
            if (acc.oldSecond != Integer.MIN_VALUE) {
                out.retract(Tuple2.of(acc.oldSecond, 2));
            }
            out.collect(Tuple2.of(acc.second, 2));
            acc.oldSecond = acc.second;
        }
    }
}

// register function
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("top2", new Top2());

// init table
Table tab = ...;

// use function
tab.groupBy("key")
    .flatAggregate("top2(a) as (v, rank)")
    .select("key, v, rank");

实现UDF的最佳实践

​ Table API和SQL内部代码生成尝试尽可能多的使用基本类型。一个UDF如果使用过多的对象创建、转换、封箱(拆箱)会引入很多开销。因此强烈建议声明参数和结果类型时使用基本类型而不是它们包装类。Types.DATETypes.TIME能被表示成int。Types.TIMESTAMP能被表示成long

将UDF和Runtime进行整合

​ 有时对于UDF有必要获取全局runtime参数信息来在实际运行前做一些准备(setup)和(清理)工作。UDF提供重写openclose()方法来实现与DataSet或DataStream API中的RichFunction类似的功能。

​ 在求值函数执行前open方法会被调用一次。close()方法会在最后一次求值函数调用后执行。

open()方法提供了FunctionContext用于记录UDF执行中的上下文信息,比如指标组、分布式缓存文件或者全局作业参数。

​ 下面的信息能够通过调用FunctionContext的方法来获得:

Method Description
getMetricGroup() 并行子任务的指标组
getCachedFile(name) 分布式缓存文件的本地临时拷贝文件
getJobParameter(name, defaultValue) 根据指定键获取全局作业参数

下面的代码例子展示了如何在标量函数中使用FunctionContext获取全局作业参数:

public class HashCode extends ScalarFunction {

    private int factor = 0;

    @Override
    public void open(FunctionContext context) throws Exception {
        // access "hashcode_factor" parameter
        // "12" would be the default value if parameter does not exist
        factor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12")); 
    }

    public int eval(String s) {
        return s.hashCode() * factor;
    }
}

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);

// set job parameter
Configuration conf = new Configuration();
conf.setString("hashcode_factor", "31");
env.getConfig().setGlobalJobParameters(conf);

// register the function
tableEnv.registerFunction("hashCode", new HashCode());

// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");

// use the function in SQL
tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,204评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,091评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,548评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,657评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,689评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,554评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,302评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,216评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,661评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,851评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,977评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,697评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,306评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,898评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,019评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,138评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,927评论 2 355