如何使用 Apache IoTDB UDF

本文将概述用户使用 UDF 的大致流程,UDF 的详细使用说明请参考官网用户手册:https://iotdb.apache.org/zh/UserGuide/Master/Operators-Functions/User-Defined-Function.html

编写 UDF

IoTDB 为用户提供编写 UDF 的 JAVA API,用户可以自主实现 UDTF 类,IoTDB 将通过类加载机制装载用户编写的类。

Maven 依赖

如果您使用 Maven,可以从 Maven 库中搜索下面示例中的依赖。请注意选择和目标 IoTDB 服务器版本相同的依赖版本,本文中使用 1.0.0 版本的依赖。


<dependency>

  <groupId>org.apache.iotdb</groupId>

  <artifactId>udf-api</artifactId>

  <version>1.0.0</version>

  <scope>provided</scope>

</dependency>

实现接口

UDTF 目前有如下接口

| 接口定义 | 描述 | 是否必须 |

| ------------------------------------------------------------ | ------------------------------------------------------------ | ------------------ |

| void validate(UDFParameterValidator validator) throws Exception | 在初始化方法beforeStart调用前执行,用于检测UDFParameters中用户输入的参数是否合法。 | 否 |

| void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception | 初始化方法,在 UDTF 处理输入数据前,调用用户自定义的初始化行为。用户每执行一次 UDTF 查询,框架就会构造一个新的 UDF 类实例,该方法在每个 UDF 类实例被初始化时调用一次。在每一个 UDF 类实例的生命周期内,该方法只会被调用一次。 | |

| void transform(Row row, PointCollector collector) throws Exception | 这个方法由框架调用。当您在beforeStart中选择以RowByRowAccessStrategy的策略消费原始数据时,这个数据处理方法就会被调用。输入参数以Row的形式传入,输出结果通过PointCollector输出。您需要在该方法内自行调用collector提供的数据收集方法,以决定最终的输出数据。 | 与下面的方法二选一 |

| void transform(RowWindow rowWindow, PointCollector collector) throws Exception | 这个方法由框架调用。当您在beforeStart中选择以XXXWindowAccessStrategySliding的策略消费原始数据时,这个数据处理方法就会被调用。输入参数以RowWindow的形式传入,输出结果通过PointCollector输出。您需要在该方法内自行调用collector提供的数据收集方法,以决定最终的输出数据。 | 与上面的方法二选一 |

| void terminate(PointCollector collector) throws Exception | 这个方法由框架调用。该方法会在所有的transform调用执行完成后,在beforeDestory方法执行前被调用。在一个 UDF 查询过程中,该方法会且只会调用一次。您需要在该方法内自行调用collector提供的数据收集方法,以决定最终的输出数据。 | 否 |

| void beforeDestroy() | UDTF 的结束方法。此方法由框架调用,并且只会被调用一次,即在处理完最后一条记录之后被调用。 | 否 |

在一个完整的 UDTF 实例生命周期中,各个方法的调用顺序如下:

  1. void validate(UDFParameterValidator validator) throws Exception

  2. void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception

  3. void transform(Row row, PointCollector collector) throws Exception或者void transform(RowWindow rowWindow, PointCollector collector) throws Exception

  4. void terminate(PointCollector collector) throws Exception

  5. void beforeDestroy()

注意,框架每执行一次 UDTF 查询,都会构造一个全新的 UDF 类实例,查询结束时,对应的 UDF 类实例即被销毁,因此不同 UDTF 查询(即使是在同一个 SQL 语句中)UDF 类实例内部的数据都是隔离的。您可以放心地在 UDTF 中维护一些状态数据,无需考虑并发对 UDF 类实例内部状态数据的影响。

UDTF 有较为完善的 Javadoc 说明,在编码实现 UDTF 类前,阅读说明可以帮助您更好的使用 UDTF。

注意事项

下面列出在实现 UDTF 的一些接口时需要注意的地方。

validate 接口


/**

* This method is mainly used to validate {@link UDFParameters} and it is executed before {@link

* UDTF#beforeStart(UDFParameters, UDTFConfigurations)} is called.

*

* @param validator the validator used to validate {@link UDFParameters}

* @throws Exception if any parameter is not valid

*/

@SuppressWarnings("squid:S112")

default void validate(UDFParameterValidator validator) throws Exception {}

该接口在初始化方法beforeStart调用前执行,用于检测UDFParameters中用户输入的参数是否合法。

同时,可以检测作为输入的时间序列的数据类型是否符合预期,比如以如下方式实现该接口:


@Override

public void validate(UDFParameterValidator validator) throws Exception {

  validator

      // this udf only accepts 1 time series

      .validateInputSeriesNumber(1)

      // the data type of the first input time series should be INT32

      .validateInputSeriesDataType(0, Type.INT32)

      // this udf doesn't accept any extra parameters

      // the validation rule is not required because extra parameters will be ignored

      .validate(

          attributes -> ((Map) attributes).isEmpty(),

          "extra udf parameters are not allowed",

          validator.getParameters().getAttributes());

}

那么该 UDF 将只能接受 INT32 类型的时间序列作为输入,其它类型的序列作为输入将报错:

img

INT32 类型的序列正常执行查询:

img

beforeStart 接口


void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception

该接口是 UDTF 类必须实现的接口,可以指定 UDF 访问原始数据时采取的策略和输出结果序列的类型,最简单的实现方式如下:


@Override

public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) {

  configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.INT32);

}

上述实现表明该 UDF 将逐行访问数据并调用相应的 transform 方法,同时,该 UDF 的 transform 方法的 PointCollector 将只能接收 INT32 类型的数据,如果接收其它类型的数据可能会出现运行时错误,具体可以参考 UDTF Javadoc。可以参考如下的简单实现方式:


@Override

public void transform(Row row, PointCollector collector) throws IOException {

  if (!row.isNull(0)) {

    // 由于 beforeStart 方法中设置了 outPutDataType 为 INT32

    // 这里如果调用 collector.putFloat(row.getTime(), -row.getInt(0)) 可能会出现运行时错误

    collector.putInt(row.getTime(), -row.getInt(0));

  }

}

这样实现之后,查询效果可以理解成将每一行数据取负:

img

transform 接口

transform 接口有两种参数列表,具体调用哪一种由实现的 UDF beforeStart接口中设置的 AccessStrategy 类型决定。如果 AccessStrategy类型为 XXXWindowStrategy,则会调用下述 transfrom:


/**

* When the user specifies {@link SlidingSizeWindowAccessStrategy} or {@link

* SlidingTimeWindowAccessStrategy} to access the original data in {@link UDTFConfigurations},

* this method will be called to process the transformation. In a single UDF query, this method

* may be called multiple times.

*

* @param rowWindow original input data window (rows inside the window are aligned by time)

* @param collector used to collect output data points

* @throws Exception the user can throw errors if necessary

* @see SlidingSizeWindowAccessStrategy

* @see SlidingTimeWindowAccessStrategy

*/

@SuppressWarnings("squid:S112")

default void transform(RowWindow rowWindow, PointCollector collector) throws Exception {}

由于入参 RowWindow 时按照原始数据划分的窗口,在访问窗口数据时有几点需要注意:

  • 窗口可能为空,此时访问窗口内具体某一行可能报越界异常,所以建议在访问具体数据前检查 if(rowWindow.windowSize() > 0)

  • 目前 PointCollector 只支持将特定时间戳放入一次,如果同一时间戳被多次放入,则可能非预期地终止计算。 而在进行窗口的运算的时候,需要选取窗口中某一行的时间戳作为这个窗口结果的时间戳,在特定数据场景下,这种使用可能会由于窗口重叠造成同一时间戳被多次放入 PointCollector 而导致非预期的结果。如果要避免这种情况,可以选择使用 RowWindow 提供的 windowStartTime() 或者 windowEndTime() 作为窗口结果的时间戳。

可以参考下述实现:


public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {

  long result = 0L;

  for (int i = 0; i < rowWindow.windowSize(); ++i) {

    if (!rowWindow.getRow(i).isNull(0)) {

      result += rowWindow.getRow(i).getLong(0);

    }

  }

  // 这里使用 rowWindow.windowStartTime() 而非 rowWindow.getRow(0).getTime()

  collector.putLong(rowWindow.windowStartTime(), result);

}

注册 UDF

注册一个 UDF 可以按如下流程进行:

  1. 实现一个完整的 UDF 类,假定这个类的全类名为org.apache.iotdb.udf.UDTFExample

  2. 将项目打成 JAR 包

  3. 进行注册前的准备工作,根据注册方式的不同需要做不同的准备,具体可参考示例

  4. 使用以下 SQL 语法注册 UDF


CREATE FUNCTION <UDF-NAME> AS <UDF-CLASS-FULL-PATHNAME> (USING URI URI-STRING)?

完成注册后即可以像使用内置函数一样使用注册的 UDF 了。

注册方式示例

注册名为example的 UDF,以下两种注册方式任选其一即可

不指定 URI

准备工作: 使用该种方式注册时,需要提前将 JAR 包放置到目录 iotdb-server-1.0.0-all-bin/ext/udf(该目录可配置) 下。 注意,如果使用的是集群,那么需要将 JAR 包放置到所有 DataNode 的该目录下

放置完成后使用注册语句:


CREATE FUNCTION example AS 'org.apache.iotdb.udf.UDTFExample'

指定URI

准备工作: 使用该种方式注册时,您需要提前将 JAR 包上传到服务器上并确保执行注册语句的 IoTDB 实例能够访问该服务器。 指定 URI 后无需手动放置 JAR 包到指定目录,IoTDB 会下载 JAR 包并正确同步到整个集群

注册语句:


CREATE FUNCTION example AS 'org.apache.iotdb.udf.UDTFExample' USING URI 'http://jar/example.jar'

注意事项

  • 1.0.0 版本的 UDF API 包路径与之前版本的 IoTDB 不同,因此 1.0.0 版本的 IoTDB 实例并不能注册 0.13 及之前版本已经构建好的 UDF jar包。可以通过更新 UDF 依赖版本,重新 import 正确路径的 UDF API,再构建 jar 包的方式更新 UDF 实现至 1.0.0 及以上版本。

  • 由于 IoTDB 的 UDF 是通过反射技术动态装载的,因此在装载过程中无需启停服务器。

  • UDF 函数名称是大小写不敏感的。

  • 请不要给 UDF 函数注册一个内置函数的名字。使用内置函数的名字给 UDF 注册会失败。

  • 不同的 JAR 包中最好不要有全类名相同但实现功能逻辑不一样的类。例如 UDF(UDAF/UDTF):udf1udf2分别对应资源udf1.jarudf2.jar。如果两个 JAR 包里都包含一个org.apache.iotdb.udf.UDTFExample类,当同一个 SQL 中同时使用到这两个 UDF 时,系统会随机加载其中一个类,导致 UDF 执行行为不一致。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,525评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,203评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,862评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,728评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,743评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,590评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,330评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,244评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,693评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,885评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,001评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,723评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,343评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,919评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,042评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,191评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,955评论 2 355

推荐阅读更多精彩内容