浅析Apache Kylin UDF

目录

一、什么是UDF?
二、实现一个UDF
三、源码解读 -- 注册UDF 的原理 && 向Apache Calcite添加UDF的多种方式
四、目前kylin中UDF注册存在的问题
五、UDF对apache calcite优化器的影响

一、什么是UDF?

UDF全称为user defined function(用户自定义函数),是查询引擎留给用户的一个口子,用于扩展sql 的能力,用户把自己实现的UDF打包部署后放在指定目录下,便可在查询时使用此函数。

二、实现一个UDF

下面举一个简单的例子,演示如何在Kylin 中实现一个UDF

1. 实现一个java类

  • 类名随意,例如ConcatUDF
  • 方法名为eval,方法名不能乱取,后面会解释为什么
  • 在eval 方法中实现自己的函数逻辑,例如下面实现的这个函数的逻辑就是将传入的两个字符串进行拼接
import org.apache.calcite.linq4j.function.Parameter;

public class ConcatUDF {

    public static String eval(@Parameter(name = "str1") String col1, @Parameter(name = "str2") String col2) {
        if (col1 == null) {
            return null;
        }
        if (col2 == null) {
            return null;
        }
        return col1 + col2;
    }
}

2. 将实现的java类打成jar包,部署在Kylin 目录的 “lib” 目录下

3. 修改配置文件
在kylin.properties中添加 org.apache.kylin.query.udf.ConcatUDF= 类的路径

三、源码解读 注册UDF 的原理 && 向Apache Calcite添加UDF的多种方式

1. 注册UDF 的原理

kylin中的查询引擎使用的是Apache Calcite,因此往kylin 中添加一个UDF 实际上就是往calcite 框架中注册一个UDF。 Apache Calcite 是目前比较通用的大数据查询引擎框架,想了解Apache Calcite的可以看看这篇文章 Apache Calcite:Hadoop 中新型大数据查询引擎

让我们一起来读一读kylin 的源码,看一看用户实现的UDF 是如何注册进calcite 的。

从kylin的查询引擎入口query(@RequestBody PrepareSqlRequest sqlRequest)开始看

query(@RequestBody PrepareSqlRequest sqlRequest)
  doQueryWithCache(SQLRequest sqlRequest, boolean isQueryInspect)
    queryWithCache(sqlRequest, startTime, isQueryCacheEnabled);
      queryAndUpdateCache(sqlRequest, startTime, isQueryCacheEnabled);
        query(sqlRequest);
          queryWithSqlMassage(sqlRequest);
private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws Exception {
        Connection conn = null;
        // 省略
        conn = QueryConnection.getConnection(sqlRequest.getProject());
        // 省略
       return executeRequest(correctedSql, sqlRequest, conn);
        // 省略
}

在queryWithSqlMassage中,会获取到calcite的jdbc 连接,在获取连接的同时,会将schema 等信息传递给calcite,schema信息之一便是我们要注册的UDF,包括UDF的类路径和方法名。

public static Connection getConnection(String project) throws SQLException {
        if (!isRegister) {
            try {
                Class<?> aClass = Thread.currentThread().getContextClassLoader()
                        .loadClass("org.apache.calcite.jdbc.Driver");
                Driver o = (Driver) aClass.getDeclaredConstructor().newInstance();
                DriverManager.registerDriver(o);
            } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
                e.printStackTrace();
            }
            isRegister = true;
        }
        File olapTmp = OLAPSchemaFactory.createTempOLAPJson(project, KylinConfig.getInstanceFromEnv());
        Properties info = new Properties();
        info.putAll(KylinConfig.getInstanceFromEnv().getCalciteExtrasProperties());
        // Import calcite props from jdbc client(override the kylin.properties)
        info.putAll(BackdoorToggles.getJdbcDriverClientCalciteProps());
        info.put("model", olapTmp.getAbsolutePath());
        info.put("typeSystem", "org.apache.kylin.query.calcite.KylinRelDataTypeSystem");
        return DriverManager.getConnection("jdbc:calcite:", info);
    }

那UDF类路径是从哪里获取到的呢,没错,就是来自一开始用户配置的kylin.properties。schema里的函数信息除了UDF以外,还有UDAF的类路径和方法名,注册方式和UDF一样,但是目前kylin并未向用户开放注册UDAF的入口,而有关UDF和UDAF的区别这里不作介绍,后面想单独写一篇关于UDAF的文章。

public Map<String, String> getUDFs() {
        Map<String, String> udfMap = Maps.newLinkedHashMap();
        udfMap.put("version", "org.apache.kylin.query.udf.VersionUDF");
        udfMap.put("concat", "org.apache.kylin.query.udf.ConcatUDF");
        udfMap.put("massin", "org.apache.kylin.query.udf.MassInUDF");
        Map<String, String> overrideUdfMap = getPropertiesByPrefix("kylin.query.udf.");
        udfMap.putAll(overrideUdfMap);
        return udfMap;
    }
private static void createOLAPSchemaFunctions(Map<String, String> definedUdfs, StringBuilder out)
            throws IOException {
        Map<String, String> udfs = Maps.newHashMap();
        if (definedUdfs != null)
            udfs.putAll(definedUdfs);

        for (Entry<String, Class<?>> entry : MeasureTypeFactory.getUDAFs().entrySet()) {
            udfs.put(entry.getKey(), entry.getValue().getName());
        }

        int index = 0;
        out.append("            \"functions\": [\n");
        for (Map.Entry<String, String> udf : udfs.entrySet()) {
            String udfName = udf.getKey().trim().toUpperCase(Locale.ROOT);
            String udfClassName = udf.getValue().trim();
            out.append("               {\n");
            out.append("                   name: '" + udfName + "',\n");
            out.append("                   className: '" + udfClassName + "'\n");
            if (index < udfs.size() - 1) {
                out.append("               },\n");
            } else {
                out.append("               }\n");
            }
            index++;
        }
        out.append("            ]\n");
    }

通过kylin拼接好的schema信息如下所示,它会被写入到一个以"olap_model_" 开头的json 文件中,后续由calcite自己读取。

{
    "version": "1.0",
    "defaultSchema": "DEFAULT",
    "schemas": [
        {
            "type": "custom",
            "name": "DEFAULT",
            "factory": "io.kyligence.kap.query.schema.KapSchemaFactory",
            "operand": {
                "project": "bingfeng"
            },
            "functions": 
 [{"name":"MASSIN","className":"org.apache.kylin.query.udf.MassInUDF"},
  {"name":"CONCAT","className":"org.apache.kylin.query.udf.string.ConcatUDF"},
  {"name":"VERSION","className":"org.apache.kylin.query.udf.VersionUDF"},
    ]
}

实际上,kylin 并没有把全部的字段都填写上,还要两个字段可以使用,methodName和path。

填补后的schema 如下所示。methodName字段的值被设为"*",这是方便为了后续calcite的处理,详细原因请往后看。还有一个字段叫path,这个字段目前已经被calcite废弃,不需要填写。

{
    "version": "1.0",
    "defaultSchema": "DEFAULT",
    "schemas": [
        {
            "type": "custom",
            "name": "DEFAULT",
            "factory": "io.kyligence.kap.query.schema.KapSchemaFactory",
            "operand": {
                "project": "bingfeng"
            },
            "functions": 
 [{"name":"MASSIN","className":"org.apache.kylin.query.udf.MassInUDF","methodName":"*","path":null},
  {"name":"CONCAT","className":"org.apache.kylin.query.udf.string.ConcatUDF","methodName":"*","path":null},
  {"name":"VERSION","className":"org.apache.kylin.query.udf.VersionUDF","methodName":"*","path":null},
    ]
}

2. 向Apache Calcite添加UDF的多种方式

接下来我们再看看calcite这边是如何处理的,有人猜到是如何把用户写的函数逻辑注册进去的吗?
没错,很简单,就是反射!

calcite 将json 里的UDF 类路径和方法名拿到,通过反射,获取到用户实现的UDF方法,然后将此方法注册进calcite的schema中。在后续的查询中,便可从schema中获取到此UDF方法

/** Creates and validates a {@link ScalarFunctionImpl}, and adds it to a
   * schema. If {@code methodName} is "*", may add more than one function.
   *
   * @param schema Schema to add to
   * @param functionName Name of function; null to derived from method name
   * @param path Path to look for functions
   * @param className Class to inspect for methods that may be user-defined
   *                  functions
   * @param methodName Method name;
   *                  null means use the class as a UDF;
   *                  "*" means add all methods
   * @param upCase Whether to convert method names to upper case, so that they
   *               can be called without using quotes
   */
  public static void addFunctions(SchemaPlus schema, @Nullable String functionName,
      List<String> path, String className, @Nullable String methodName, boolean upCase) {
    final Class<?> clazz;
    try {
      clazz = Class.forName(className);
    } catch (ClassNotFoundException e) {
      throw new RuntimeException("UDF class '"
          + className + "' not found");
    }
    String methodNameOrDefault = Util.first(methodName, "eval");
    String actualFunctionName;
    if (functionName != null) {
      actualFunctionName = functionName;
    } else {
      actualFunctionName = methodNameOrDefault;
    }
    if (upCase) {
      actualFunctionName = actualFunctionName.toUpperCase(Locale.ROOT);
    }
    final TableFunction tableFunction =
        TableFunctionImpl.create(clazz, methodNameOrDefault);
    if (tableFunction != null) {
      schema.add(Util.first(functionName, methodNameOrDefault),
          tableFunction);
      return;
    }
    // Must look for TableMacro before ScalarFunction. Both have an "eval"
    // method.
    final TableMacro macro = TableMacroImpl.create(clazz);
    if (macro != null) {
      schema.add(actualFunctionName, macro);
      return;
    }
    if (methodName != null && methodName.equals("*")) {
      for (Map.Entry<String, Function> entry
          : ScalarFunctionImpl.functions(clazz).entries()) {
        String name = entry.getKey();
        if (upCase) {
          name = name.toUpperCase(Locale.ROOT);
        }
        schema.add(name, entry.getValue());
      }
      return;
    } else {
      final ScalarFunction function =
          ScalarFunctionImpl.create(clazz, methodNameOrDefault);
      if (function != null) {
        schema.add(actualFunctionName, function);
        return;
      }
    }
    if (methodName == null) {
      final AggregateFunction aggFunction = AggregateFunctionImpl.create(clazz);
      if (aggFunction != null) {
        schema.add(actualFunctionName, aggFunction);
        return;
      }
    }
    throw new RuntimeException("Not a valid function class: " + clazz
        + ". Scalar functions and table macros have an 'eval' method; "
        + "aggregate functions have 'init' and 'add' methods, and optionally "
        + "'initAdd', 'merge' and 'result' methods.");
  }

从上面的代码我们可以看出,将UDF被添加到calcite中可以有多种方式,我将所有的情况总结成了如下表格(假设用户实现了一个类名为InStrUDF的UDF),而不同的注册方式,被calcite选中的方法会有不一样。

无论该UDF是否需要重载,都强烈推荐使用方式五,methodName设为"*",将所有UDF类中需要重载的方法都取名为udf名的全大写(例如INSTR)。

屏幕快照 2019-07-28 下午11.40.54.png

四、目前kylin中UDF注册存在的问题

1. 注册方式

目前开源kylin 中使用的是上面那张表格里的方式一,这种方式要求UDF的方法名必须为eval,并且无法实现重载。不够灵活。

2. 性能问题

通过阅读kylin 的源码可以看到,在每一次查询都会去创建一次schema,也就是每次都需要从kylin.properties 里面读取到UDF的类路径和方法名,每次都需要完成不少字符串拼接的工作,并且还要走一遍json,再注册进calcite。

如果把注册进calcite之前的步骤提前做好,进行预计算存放在一个变量中,每次查询只需要拿这个变量去完成注册的工作,那么就可以节省掉查询过程中不少的时间。

或者更极致一点,不调用calcite的addFunctions 进行注册,如果能拿到schema变量,提前把反射的代码也预计算好,直接调用最底层的 schema.add()进行注册,这样可以把查询过程中每个UDF反射的时间也省掉。

经过性能测试,这种改进可以将kylin查询qps 从60提升至70。

五、UDF对apache calcite优化器的影响

优化器很难评估UDF的性能,更新中...

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容