目录
一、什么是UDF?
二、实现一个UDF
三、源码解读 -- 注册UDF 的原理 && 向Apache Calcite添加UDF的多种方式
四、目前kylin中UDF注册存在的问题
五、UDF对apache calcite优化器的影响
一、什么是UDF?
UDF全称为user defined function(用户自定义函数),是查询引擎留给用户的一个口子,用于扩展sql 的能力,用户把自己实现的UDF打包部署后放在指定目录下,便可在查询时使用此函数。
二、实现一个UDF
下面举一个简单的例子,演示如何在Kylin 中实现一个UDF
1. 实现一个java类
- 类名随意,例如ConcatUDF
- 方法名为eval,方法名不能乱取,后面会解释为什么
- 在eval 方法中实现自己的函数逻辑,例如下面实现的这个函数的逻辑就是将传入的两个字符串进行拼接
import org.apache.calcite.linq4j.function.Parameter;
public class ConcatUDF {
public static String eval(@Parameter(name = "str1") String col1, @Parameter(name = "str2") String col2) {
if (col1 == null) {
return null;
}
if (col2 == null) {
return null;
}
return col1 + col2;
}
}
2. 将实现的java类打成jar包,部署在Kylin 目录的 “lib” 目录下
3. 修改配置文件
在kylin.properties中添加 org.apache.kylin.query.udf.ConcatUDF= 类的路径
三、源码解读 注册UDF 的原理 && 向Apache Calcite添加UDF的多种方式
1. 注册UDF 的原理
kylin中的查询引擎使用的是Apache Calcite,因此往kylin 中添加一个UDF 实际上就是往calcite 框架中注册一个UDF。 Apache Calcite 是目前比较通用的大数据查询引擎框架,想了解Apache Calcite的可以看看这篇文章 Apache Calcite:Hadoop 中新型大数据查询引擎。
让我们一起来读一读kylin 的源码,看一看用户实现的UDF 是如何注册进calcite 的。
从kylin的查询引擎入口query(@RequestBody PrepareSqlRequest sqlRequest)开始看
query(@RequestBody PrepareSqlRequest sqlRequest)
doQueryWithCache(SQLRequest sqlRequest, boolean isQueryInspect)
queryWithCache(sqlRequest, startTime, isQueryCacheEnabled);
queryAndUpdateCache(sqlRequest, startTime, isQueryCacheEnabled);
query(sqlRequest);
queryWithSqlMassage(sqlRequest);
private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws Exception {
Connection conn = null;
// 省略
conn = QueryConnection.getConnection(sqlRequest.getProject());
// 省略
return executeRequest(correctedSql, sqlRequest, conn);
// 省略
}
在queryWithSqlMassage中,会获取到calcite的jdbc 连接,在获取连接的同时,会将schema 等信息传递给calcite,schema信息之一便是我们要注册的UDF,包括UDF的类路径和方法名。
public static Connection getConnection(String project) throws SQLException {
if (!isRegister) {
try {
Class<?> aClass = Thread.currentThread().getContextClassLoader()
.loadClass("org.apache.calcite.jdbc.Driver");
Driver o = (Driver) aClass.getDeclaredConstructor().newInstance();
DriverManager.registerDriver(o);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
e.printStackTrace();
}
isRegister = true;
}
File olapTmp = OLAPSchemaFactory.createTempOLAPJson(project, KylinConfig.getInstanceFromEnv());
Properties info = new Properties();
info.putAll(KylinConfig.getInstanceFromEnv().getCalciteExtrasProperties());
// Import calcite props from jdbc client(override the kylin.properties)
info.putAll(BackdoorToggles.getJdbcDriverClientCalciteProps());
info.put("model", olapTmp.getAbsolutePath());
info.put("typeSystem", "org.apache.kylin.query.calcite.KylinRelDataTypeSystem");
return DriverManager.getConnection("jdbc:calcite:", info);
}
那UDF类路径是从哪里获取到的呢,没错,就是来自一开始用户配置的kylin.properties。schema里的函数信息除了UDF以外,还有UDAF的类路径和方法名,注册方式和UDF一样,但是目前kylin并未向用户开放注册UDAF的入口,而有关UDF和UDAF的区别这里不作介绍,后面想单独写一篇关于UDAF的文章。
public Map<String, String> getUDFs() {
Map<String, String> udfMap = Maps.newLinkedHashMap();
udfMap.put("version", "org.apache.kylin.query.udf.VersionUDF");
udfMap.put("concat", "org.apache.kylin.query.udf.ConcatUDF");
udfMap.put("massin", "org.apache.kylin.query.udf.MassInUDF");
Map<String, String> overrideUdfMap = getPropertiesByPrefix("kylin.query.udf.");
udfMap.putAll(overrideUdfMap);
return udfMap;
}
private static void createOLAPSchemaFunctions(Map<String, String> definedUdfs, StringBuilder out)
throws IOException {
Map<String, String> udfs = Maps.newHashMap();
if (definedUdfs != null)
udfs.putAll(definedUdfs);
for (Entry<String, Class<?>> entry : MeasureTypeFactory.getUDAFs().entrySet()) {
udfs.put(entry.getKey(), entry.getValue().getName());
}
int index = 0;
out.append(" \"functions\": [\n");
for (Map.Entry<String, String> udf : udfs.entrySet()) {
String udfName = udf.getKey().trim().toUpperCase(Locale.ROOT);
String udfClassName = udf.getValue().trim();
out.append(" {\n");
out.append(" name: '" + udfName + "',\n");
out.append(" className: '" + udfClassName + "'\n");
if (index < udfs.size() - 1) {
out.append(" },\n");
} else {
out.append(" }\n");
}
index++;
}
out.append(" ]\n");
}
通过kylin拼接好的schema信息如下所示,它会被写入到一个以"olap_model_" 开头的json 文件中,后续由calcite自己读取。
{
"version": "1.0",
"defaultSchema": "DEFAULT",
"schemas": [
{
"type": "custom",
"name": "DEFAULT",
"factory": "io.kyligence.kap.query.schema.KapSchemaFactory",
"operand": {
"project": "bingfeng"
},
"functions":
[{"name":"MASSIN","className":"org.apache.kylin.query.udf.MassInUDF"},
{"name":"CONCAT","className":"org.apache.kylin.query.udf.string.ConcatUDF"},
{"name":"VERSION","className":"org.apache.kylin.query.udf.VersionUDF"},
]
}
实际上,kylin 并没有把全部的字段都填写上,还要两个字段可以使用,methodName和path。
填补后的schema 如下所示。methodName字段的值被设为"*",这是方便为了后续calcite的处理,详细原因请往后看。还有一个字段叫path,这个字段目前已经被calcite废弃,不需要填写。
{
"version": "1.0",
"defaultSchema": "DEFAULT",
"schemas": [
{
"type": "custom",
"name": "DEFAULT",
"factory": "io.kyligence.kap.query.schema.KapSchemaFactory",
"operand": {
"project": "bingfeng"
},
"functions":
[{"name":"MASSIN","className":"org.apache.kylin.query.udf.MassInUDF","methodName":"*","path":null},
{"name":"CONCAT","className":"org.apache.kylin.query.udf.string.ConcatUDF","methodName":"*","path":null},
{"name":"VERSION","className":"org.apache.kylin.query.udf.VersionUDF","methodName":"*","path":null},
]
}
2. 向Apache Calcite添加UDF的多种方式
接下来我们再看看calcite这边是如何处理的,有人猜到是如何把用户写的函数逻辑注册进去的吗?
没错,很简单,就是反射!
calcite 将json 里的UDF 类路径和方法名拿到,通过反射,获取到用户实现的UDF方法,然后将此方法注册进calcite的schema中。在后续的查询中,便可从schema中获取到此UDF方法
/** Creates and validates a {@link ScalarFunctionImpl}, and adds it to a
* schema. If {@code methodName} is "*", may add more than one function.
*
* @param schema Schema to add to
* @param functionName Name of function; null to derived from method name
* @param path Path to look for functions
* @param className Class to inspect for methods that may be user-defined
* functions
* @param methodName Method name;
* null means use the class as a UDF;
* "*" means add all methods
* @param upCase Whether to convert method names to upper case, so that they
* can be called without using quotes
*/
public static void addFunctions(SchemaPlus schema, @Nullable String functionName,
List<String> path, String className, @Nullable String methodName, boolean upCase) {
final Class<?> clazz;
try {
clazz = Class.forName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException("UDF class '"
+ className + "' not found");
}
String methodNameOrDefault = Util.first(methodName, "eval");
String actualFunctionName;
if (functionName != null) {
actualFunctionName = functionName;
} else {
actualFunctionName = methodNameOrDefault;
}
if (upCase) {
actualFunctionName = actualFunctionName.toUpperCase(Locale.ROOT);
}
final TableFunction tableFunction =
TableFunctionImpl.create(clazz, methodNameOrDefault);
if (tableFunction != null) {
schema.add(Util.first(functionName, methodNameOrDefault),
tableFunction);
return;
}
// Must look for TableMacro before ScalarFunction. Both have an "eval"
// method.
final TableMacro macro = TableMacroImpl.create(clazz);
if (macro != null) {
schema.add(actualFunctionName, macro);
return;
}
if (methodName != null && methodName.equals("*")) {
for (Map.Entry<String, Function> entry
: ScalarFunctionImpl.functions(clazz).entries()) {
String name = entry.getKey();
if (upCase) {
name = name.toUpperCase(Locale.ROOT);
}
schema.add(name, entry.getValue());
}
return;
} else {
final ScalarFunction function =
ScalarFunctionImpl.create(clazz, methodNameOrDefault);
if (function != null) {
schema.add(actualFunctionName, function);
return;
}
}
if (methodName == null) {
final AggregateFunction aggFunction = AggregateFunctionImpl.create(clazz);
if (aggFunction != null) {
schema.add(actualFunctionName, aggFunction);
return;
}
}
throw new RuntimeException("Not a valid function class: " + clazz
+ ". Scalar functions and table macros have an 'eval' method; "
+ "aggregate functions have 'init' and 'add' methods, and optionally "
+ "'initAdd', 'merge' and 'result' methods.");
}
从上面的代码我们可以看出,将UDF被添加到calcite中可以有多种方式,我将所有的情况总结成了如下表格(假设用户实现了一个类名为InStrUDF的UDF),而不同的注册方式,被calcite选中的方法会有不一样。
无论该UDF是否需要重载,都强烈推荐使用方式五,methodName设为"*",将所有UDF类中需要重载的方法都取名为udf名的全大写(例如INSTR)。
四、目前kylin中UDF注册存在的问题
1. 注册方式
目前开源kylin 中使用的是上面那张表格里的方式一,这种方式要求UDF的方法名必须为eval,并且无法实现重载。不够灵活。
2. 性能问题
通过阅读kylin 的源码可以看到,在每一次查询都会去创建一次schema,也就是每次都需要从kylin.properties 里面读取到UDF的类路径和方法名,每次都需要完成不少字符串拼接的工作,并且还要走一遍json,再注册进calcite。
如果把注册进calcite之前的步骤提前做好,进行预计算存放在一个变量中,每次查询只需要拿这个变量去完成注册的工作,那么就可以节省掉查询过程中不少的时间。
或者更极致一点,不调用calcite的addFunctions 进行注册,如果能拿到schema变量,提前把反射的代码也预计算好,直接调用最底层的 schema.add()进行注册,这样可以把查询过程中每个UDF反射的时间也省掉。
经过性能测试,这种改进可以将kylin查询qps 从60提升至70。
五、UDF对apache calcite优化器的影响
优化器很难评估UDF的性能,更新中...