Apache IoTDB UDF 查询执行源码阅读

作者目前是清华大学软件学院 IoTDB 组在读学生,参与过 Apache IoTDB UDF 模块的代码维护和功能拓展,本文是作者在阅读 Apache IoTDB UDF 模块代码时的一点总结。


UDF(User Defined Functions) 是数据库查询引擎里较为重要的一个模块,其为数据的高级分析提供了更多可能。

UDF 的使用说明可以参考作者的另一篇文章:


Apache IoTDB 的 UDF 功能实现总体可以分为三大部分:

  • 向用户提供的编程接口,相关代码在包 org.apache.iotdb.udf.api
  • 查询框架相关代码,包括 SQL 解析、逻辑计划生成、物理计划生成等
  • UDF 查询计算执行时相关逻辑

本文主要对 UDF 查询计算执行时的相关逻辑做概要介绍,主要针对特定接口/抽象类做说明,并分析典型实现帮助理解,希望本文可以帮助读者更轻松地 Debug 阅读 UDF 计算流程的源码。

UDF 查询计算重要接口和工具类

在我看来,要理解 IoTDB 中 UDF 计算的流程,最关键的是理解以下几个接口/工具类的作用:

  • IntermediateLayer
  • LayerPointReader / LayerRowReader / LayerRowWindowReader
  • Transformer
  • ElasticSerializableTVList

理解上述接口/抽象类的作用之后再进行 Debug 阅读源码会事半功倍。



UDF 计算流程大致可以分成 InputLayer -> IntermediateLayer -> OutputLayer 三层,IntermediateLayer 封装了计算中间层的逻辑。


  • 查询树节点可能存在公共部分,中间层缓存计算结果可以避免重复计算。
// function 的输入是 a + b,而 a + b 本身也是查询的一列
// 可以直接使用这一列作为输入,没有必要重复计算 a + b
select function(a + b), a + b from root.sg
  • 不同列消费数据的位置和速度可能不一致,使用中间层可以使用同一份数据,但是对外屏蔽这种差异

IntermediateLayer 缓存数据,可以通过其构造的 LayerPointReader / LayerRowReader / LayerRowWindowReader 访问 IntermediateLayer 缓存的数据。不同的 reader 类型对应不同的数据访问策略,即按点,按行,按窗口,窗口也可以通过多种形式进行划分。

这里贴出 IntermediateLayer 抽象类的源码:

 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.

package org.apache.iotdb.db.mpp.transformation.dag.intermediate;

import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
import org.apache.iotdb.db.mpp.transformation.api.LayerRowReader;
import org.apache.iotdb.db.mpp.transformation.api.LayerRowWindowReader;
import org.apache.iotdb.udf.api.customizer.strategy.AccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.StateWindowAccessStrategy;

import java.io.IOException;

public abstract class IntermediateLayer {

  protected static final int CACHE_BLOCK_SIZE = 2;

  // for debug
  protected final Expression expression;

  protected final long queryId;
  protected final float memoryBudgetInMB;

  protected IntermediateLayer(Expression expression, long queryId, float memoryBudgetInMB) {
    this.expression = expression;
    this.queryId = queryId;
    this.memoryBudgetInMB = memoryBudgetInMB;

  public abstract LayerPointReader constructPointReader();

  public abstract LayerRowReader constructRowReader();

  public final LayerRowWindowReader constructRowWindowReader(
      AccessStrategy strategy, float memoryBudgetInMB) throws QueryProcessException, IOException {
    switch (strategy.getAccessStrategyType()) {
        return constructRowSlidingTimeWindowReader(
            (SlidingTimeWindowAccessStrategy) strategy, memoryBudgetInMB);
        return constructRowSlidingSizeWindowReader(
            (SlidingSizeWindowAccessStrategy) strategy, memoryBudgetInMB);
        return constructRowSessionTimeWindowReader(
            (SessionTimeWindowAccessStrategy) strategy, memoryBudgetInMB);
      case STATE_WINDOW:
        return constructRowStateWindowReader(
            (StateWindowAccessStrategy) strategy, memoryBudgetInMB);
        throw new IllegalStateException(
            "Unexpected access strategy: " + strategy.getAccessStrategyType());

  protected abstract LayerRowWindowReader constructRowSlidingSizeWindowReader(
      SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB)
      throws QueryProcessException;

  protected abstract LayerRowWindowReader constructRowSlidingTimeWindowReader(
      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
      throws QueryProcessException, IOException;

  protected abstract LayerRowWindowReader constructRowSessionTimeWindowReader(
      SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
      throws QueryProcessException, IOException;

  protected abstract LayerRowWindowReader constructRowStateWindowReader(
      StateWindowAccessStrategy strategy, float memoryBudgetInMB)
      throws QueryProcessException, IOException;

  public String toString() {
    return expression.toString();


下面以 IntermediateLayer 的实现类 SingleInputMultiReferenceIntermediateLayer 作为例子来具体说明 IntermediateLayer 的作用。


// 中间层的输入,要获取新的未缓存过的数据点时,从该 reader 里读取数据
private final LayerPointReader parentLayerPointReader;

// 缓存数据的数据结构,内置 LRUCache,通过将数据溢出至磁盘保证内存不超限
private final ElasticSerializableTVList tvList;

// 配合 tvList,维持一个安全水线,index < 安全水线的数据不会再被使用
// 此时可以安全地抛弃这些不会再使用的数据,减小内存占用
private final SafetyLine safetyLine;


实现 IntermediateLayer 的所有 constructXXXReader 的抽象方法,通过 reader 提供访问中间层数据的方式,各 reader 的逻辑脉络相似。

下面以 constructPointReader() 举例说明,其它 reader 可以借助理解

该方法返回一个 LayerPointReader。该 reader 维护了一个 currentPointIndex,实际的数据来自于 SingleInputMultiReferenceIntermediateLayer#tvList,所有通过该 SingleInputMultiReferenceIntermediateLayer 构造出的 LayerPointReader 实际上都是在读取 tvList 里的数据,只是其 currentPointIndex 可能不同,这样就做到了一份数据提供多个游标来满足多个数据访问者的需要。

private final SafetyPile safetyPile = safetyLine.addSafetyPile();

private boolean hasCached = false;
private int currentPointIndex = -1;

构造的 LayerPointReader 的 next()实现逻辑:

// 如果当前点的 index 已经到了缓存的最大 index
// 那么就要尝试通过数据的源头,即 parentLayerPointReader 读取数据
// 否则直接递增 index
public boolean next() throws QueryProcessException, IOException {
  if (!hasCached
      && (currentPointIndex < tvList.size() - 1
          || LayerCacheUtils.cachePoint(
              parentLayerPointReaderDataType, parentLayerPointReader, tvList))) {
    hasCached = true;

  return hasCached;

构造的 LayerPointReader 的 readyForNext()实现逻辑:

public void readyForNext() {
  hasCached = false;

  // 所有构造出的 LayerPointReader 都维护安全水线,即会被用到的数据 index 最小值
  // index 小于该值的数据不会再被使用,可以被安全地放弃
  // SingleInputMultiReference IntermediateLayer 的安全水线就是所有 LayerPointReader
  // 安全水线的最小值
  safetyPile.moveForwardTo(currentPointIndex + 1);
  // evictionUpperBound 需结合 SerializableList 的逻辑来理解

LayerPointReader / LayerRowReader / LayerRowWindowReader

IntermediateLayer 向外提供构造这三种 reader 的方法,这三种 reader 接口里封装了按点,按行,按窗口读取数据的逻辑。

在 Apache IoTDB 1.0 查询引擎演进为 MPP 架构时,为了适配 MPP,有了 YeildableReader 接口,该接口的 yield 方法逻辑语义应当与 LayerPointReader / LayerRowReader / LayerRowWindowReader 原有的 next() 方法一致,只是为了适配 MPP 框架而存在。

由于 yield() 和 next() 两套方法的存在,可能导致读者在阅读这块代码的时候感到困惑,为什么需要两种接口?实际是因为在 1.0 版本的 UDF 计算里 next() 方法大部分时间已经不会再被调用了(还有很小一部分场景在使用),由于历史包袱,还没删掉 next() 方法相关的逻辑。读者只需要理解 yield() 和 next() 其中一套的逻辑,就能理解另一套的逻辑,本文主要通过 next() 进行说明。




package org.apache.iotdb.db.mpp.transformation.api;

import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;

import java.io.IOException;

public interface LayerPointReader extends YieldableReader {

  boolean isConstantPointReader();

  boolean next() throws QueryProcessException, IOException;

  void readyForNext();

  TSDataType getDataType();

  long currentTime() throws IOException;

  int currentInt() throws IOException;

  long currentLong() throws IOException;

  float currentFloat() throws IOException;

  double currentDouble() throws IOException;

  boolean currentBoolean() throws IOException;

  boolean isCurrentNull() throws IOException;

  Binary currentBinary() throws IOException;

next() 方法的返回值为布尔类型,实际上这个方法可以看作下述两个方法的结合:

boolean hasNext();
Object next();

即每次调用 next(),都会尝试移动迭代器的游标,如果可以往下移动(还有数据)则返回 true,否则返回 false。

成功调用一次 next() 后需要调用 readyForNext()。

实际返回数据是在调用 next() 且返回 true 之后,可以通过具体的数据类型去访问具体的 currentXXX()。


与 LayerPointReader 接口相似,只是访问数据时按照行访问。

public interface LayerRowReader extends YieldableReader {

  boolean next() throws IOException, QueryProcessException;

  void readyForNext();

  TSDataType[] getDataTypes();

  long currentTime() throws IOException;

  Row currentRow();

  /** whether current row fields are all null */
  boolean isCurrentNull() throws IOException;


与 LayerPointReader 接口相似,只是访问数据时按照窗口访问。

public interface LayerRowWindowReader extends YieldableReader {

  boolean next() throws IOException, QueryProcessException;

  void readyForNext() throws IOException, QueryProcessException;

  TSDataType[] getDataTypes();

  RowWindow currentWindow();


实现 LayerPointReader 的一个抽象类,封装了 UDF 和 表达式的计算逻辑。


  • Transformer 读取 IntermediateLayer 的数据作为输入(通过 IntermediateLayer 的 constructXXXReader 获取相应 reader)
  • Transformer 完成数据的计算,包括一元/二元/三元/UDF 计算
  • 为 Transformer 封装一个 IntermediateLayer,此时 Transformer 又可以成为该 IntermediateLayer 的数据源,这样就可以形成一颗计算树,从下往上逐层计算。

构造 IntermediateLayer 和 Transformer 的逻辑通过访问者模式被封装在 IntermediateLayerVisitor 类中。

Transformer 的源码如下:

public abstract class Transformer implements LayerPointReader {

  protected boolean hasCachedValue;

  protected long cachedTime;

  protected int cachedInt;
  protected long cachedLong;
  protected float cachedFloat;
  protected double cachedDouble;
  protected boolean cachedBoolean;
  protected Binary cachedBinary;
  protected boolean currentNull;

  protected Transformer() {
    hasCachedValue = false;

  public final boolean next() throws QueryProcessException, IOException {
    if (!hasCachedValue) {
      hasCachedValue = cacheValue();
    return hasCachedValue;

  /** if this method returns true, at least one of the cached field should be set */
  protected abstract boolean cacheValue() throws QueryProcessException, IOException;

  public final YieldableState yield() throws IOException, QueryProcessException {
    if (hasCachedValue) {
      return YieldableState.YIELDABLE;

    final YieldableState yieldableState = yieldValue();
    if (YieldableState.YIELDABLE == yieldableState) {
      hasCachedValue = true;
    return yieldableState;

   * if this method returns YieldableState.YIELDABLE, at least one of the cached field should be set
  protected abstract YieldableState yieldValue() throws QueryProcessException, IOException;

  public final void readyForNext() {
    hasCachedValue = false;
    currentNull = false;

  public final long currentTime() {
    return cachedTime;

  public final int currentInt() {
    return cachedInt;

  public final long currentLong() {
    return cachedLong;

  public final float currentFloat() {
    return cachedFloat;

  public final double currentDouble() {
    return cachedDouble;

  public final boolean currentBoolean() {
    return cachedBoolean;

  public final Binary currentBinary() {
    return cachedBinary;

  public final boolean isCurrentNull() {
    return currentNull;

其子类只需要实现 cacheValue() 方法,定义自己的计算逻辑即可。


EasticSerializableTVList 位于 org.apache.iotdb.db.mpp.transformation.datastructure,该包定义了 UDF 计算时用到的数据结构。

该类是一个数据点的容器,可以往里面 put 数据,也可以按照 index 读取数据。

为了避免占用内存超限,内置了 LRUCache 和数据溢出至磁盘的逻辑,实现中可以看到这两种思路,主要借鉴了操作系统分页机制。

EasticSerializableTVList 类图如下:

EasticSerializableTVList 类图
  • SerializableList:接口,可以将元素序列化到文件以及从文件中读回
  • BatchData:提供了 put 和 get 方法
  • LRUCache:基于 LinkedHashMap 实现的 LRUCache


// 可以理解为 EasticSerializableTVList 将数据分成块
// 每一块就是一个 SerializableTVList
protected List<SerializableTVList> tvLists;

// 用于存储 tvLists 的 index,只有 index 在 cache 中的 SerializableTVList 是位于内存中的
protected LRUCache cache;

// 每一个 SerializableTVList 块的容量
protected int internalTVListCapacity;

// cache 的大小,由于 cache 的每个元素都代表一个 SerializableTVList 数据块
// cacheSize 可以理解为存放在内存中的 SerializableTVList 块的数量
protected int cacheSize;

// 与 tvLists 的元素一一对应
protected List<BitMap> bitMaps;

// EasticSerializableTVList 的逻辑容量
protected int size;

// tvLists 中,index < evictionUpperBound / internalTVListCapacity 的元素不会再被使用
protected int evictionUpperBound;



protected ElasticSerializableTVList(
    TSDataType dataType, long queryId, float memoryLimitInMB, int cacheSize) {
  this.dataType = dataType;
  this.queryId = queryId;
  this.memoryLimitInMB = memoryLimitInMB;
  int allocatableCapacity = SerializableTVList.calculateCapacity(dataType, memoryLimitInMB);
  internalTVListCapacity = allocatableCapacity / cacheSize;
  if (internalTVListCapacity == 0) {
    cacheSize = 1;
    internalTVListCapacity = allocatableCapacity;
  this.cacheSize = cacheSize;

  cache = new LRUCache(cacheSize);
  bitMaps = new ArrayList<>();
  tvLists = new ArrayList<>();
  size = 0;
  evictionUpperBound = 0;
  • 构造方法中首先计算 internalTVListCapacity,即每个块的容量是多少
  • 初始化 cache,cache 的容量由 cacheSize 决定,cacheSize 决定了内存中存放多少块

按照 index 读取数据,以 getInt 为例:

public int getInt(int index) throws IOException {
  // index / internalTVListCapacity 计算出数据位于哪个块
  // index % internalTVListCapacity 计算出数据位于块的哪一行
  return cache.get(index / internalTVListCapacity).getIntByIndex(index % internalTVListCapacity);

将数据放入容器,以 putInt 为例:

public void putInt(long timestamp, int value) throws IOException {
  // 首先检查要不要分配新的块
  // 找到 index 对应的数据块,将数据放入即可 
  cache.get(size / internalTVListCapacity).putInt(timestamp, value);

private void checkExpansion() {
  if (size % internalTVListCapacity == 0) {
    tvLists.add(SerializableTVList.newSerializableTVList(dataType, queryId));
    bitMaps.add(new BitMap(internalTVListCapacity));

LRUCache 实现:

private class LRUCache extends Cache {

  LRUCache(int capacity) {
  // 获取 tvLists 中对应 index 的 SerializableList并更新 cache
  BatchData get(int targetIndex) throws IOException {
    if (!containsKey(targetIndex)) {
      // cache 中没有目标 index,且 cache 已满
      // 此时可能需要将元素溢出到磁盘
      if (cacheCapacity <= size()) {
        int lastIndex = getLast();
        // 如果数据不会再被用到则直接设为 null
        if (lastIndex < evictionUpperBound / internalTVListCapacity) {
          tvLists.set(lastIndex, null);
          bitMaps.set(lastIndex, null);
        } else {
          // 将数据溢出到磁盘
      // 目标数据不在 cache 里面,则肯定不在内存里面,需要读回内存
    // LRU 策略,更新 key
    // 将元素移出 cache 的操作由 LinkedHashMap 自动完成
    return tvLists.get(targetIndex);

public abstract class Cache extends LinkedHashMap<Integer, Integer> {

  protected final int cacheCapacity;

  protected Cache(int cacheCapacity) {
    super(cacheCapacity, 0.75F, true);
    this.cacheCapacity = cacheCapacity;

  protected boolean removeEldestEntry(Map.Entry<Integer, Integer> eldest) {
    return size() > cacheCapacity;

  // get the eldest key
  public int getLast() {
    return this.entrySet().iterator().next().getKey();

  protected Integer putKey(Integer index) {
    return put(index, index);


本文对 UDF 查询计算过程的重要接口和工具类做了简要说明。如果读者希望完整了解 UDF 计算流程,还需要阅读接口的各个实现类代码,以及前文提到的:

  • 向用户提供的编程接口,相关代码在包 org.apache.iotdb.udf.api
  • 查询框架相关代码,包括 SQL 解析、逻辑计划生成、物理计划生成等


