本文参考了这篇文章实现,但是这篇文章使用过程中,local模式是没问题的,但是发生了cluster模式序列化问题,询问了作者也没给回复,所以这里给出了解决方案
https://www.jianshu.com/p/56ff9549d19a
我试了好几个网站,发布都有问题,这里我把文章拆成三个部分,解决发布问题,如果有人真心想解决这个问题,请耐心的看完三个文章
背景
在推荐算法业务中,需要使用SparkSql,对存储在hive的离线数据进行处理,制作用户特征,然后入库到redis,我们做了一套离线特征入库的sql化的系统,可以将用户需要入库的特征,通过sql提取出来,选取对应的入库特征类型、引擎即可入库特征,实现自动化入库离线特征的效果
但是业务往往需要一些个性化的数据处理,平台不能全部注册好用户需要的方法,所以我们做了一套udf、jar管理系统,用户启动任务后,动态的去加载用户jar来实现自定义UDF的逻辑,下面是具体做法
具体思路
通过调研之后,我决定使用spark的函数注册接口 spark.udf.register,分别构造函数名称、UDF1(这里目前只需要输入参数为一种的业务,所以不用去实现UDF2....等)、返回类型 returnType
具体方法参数 (name:String,f:UDF1[_,_],returnType:DataType),你可以自行查看
找到注册函数,一切都好办了,我只需要动态的加载用户的jar里面的udf,通过反射去构造UDF1即可实现函数注册,**这里需要注意的是UDF1实现的时候必须实现序列化,否则就会报序列化错误**
其中加载jar、识别输入、输出、方法体、构造UDF1使用如下代码
val url=newURL("file:xxx")
valurls=Array(url)
var myClassLoader:URLClassLoader=newURLClassLoader(urls,scala.reflect.runtime.universe.getClass.getClassLoader)
varIType=method.getParameterTypes.map(JavaTypeInference.inferDataType).map(_._1).head
valiputType=method.getParameterTypes.head.getName
varRtype=JavaTypeInference.inferDataType(method.getReturnType)._1
val zz = myClassLoader.loadClass(className)
case class TempUDF() extends UDF1[Object,Any] with Serializable{
override def call(v1: Object): Any = {
val m1 = zz.getDeclaredMethods.filter(_.getName.equals(methodName)).head
m1.invoke(zz.newInstance(), v1)
}
}
下面是具体实现
import java.net.{URL, URLClassLoader}
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.{JavaTypeInference, ScalaReflection}
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
import scala.collection.mutable
import java.lang.reflect.{Method, ParameterizedType}
import org.apache.spark.sql.api.java.UDF1
/**
* @author liangwt
* @create 2020/11/16
* @since 1.0.0
* @Description :
*/
case class ClassInfo(clazz: Class[_], instance: Any, function: UDF1[Object,Any]) extends Serializable
object ScalaGenerateFunctions {
val logger: Logger = LoggerFactory.getLogger(getClass)
private var myClassLoader: URLClassLoader = null
def apply(urls: Array[URL]): Unit = {
//scala.reflect.runtime.universe.getClass.getClassLoader
//Thread.currentThread().getContextClassLoader()
myClassLoader = new URLClassLoader(urls, scala.reflect.runtime.universe.getClass.getClassLoader)
}
@transient private lazy val classMap = new mutable.HashMap[(String,String,String), ClassInfo]()
def genetateFunction(methodName: String, className: String,jarName:String) = {
if (!classMap.contains((className,methodName,jarName))) {
val zz = myClassLoader.loadClass(className)
case class TempUDF() extends UDF1[Object,Any] with Serializable{
override def call(v1: Object): Any = {
val m1 = zz.getDeclaredMethods.filter(_.getName.equals(methodName)).head
m1.invoke(zz.newInstance(), v1)
}
}
logger.info("*************************************************************************")
logger.info(s"动态编译$jarName $className $methodName 成功")
logger.info("*************************************************************************")
classMap.put((className,methodName,jarName), ClassInfo(zz, zz.newInstance(), new TempUDF()))
}
val (iType, rType) = getDataType(methodName, className,jarName)
(classMap((className,methodName,jarName)).function, iType, rType)
}
private def getDataType(methodName: String, className: String,jarName:String) = {
val method = classMap((className,methodName,jarName)).clazz.getMethods.filter(_.getName.equals(methodName)).head
var IType = method.getParameterTypes.map(JavaTypeInference.inferDataType).map(_._1).head
val iputType= method.getParameterTypes.head.getName
IType=iputType match {
//todo
//这里有点问题,获取不到注册函数中,Seq中的类型,后续有方法可以识别的话,可替换,目前只支持基本类型的转换
case "scala.collection.Seq"=>{
val inputGenericType=method.getGenericParameterTypes.head
val contentType=inputGenericType.asInstanceOf[ParameterizedType].getActualTypeArguments.head.getTypeName
ArrayType(TypeUtil(contentType))
}
case "scala.collection.Map" | "scala.collection.immutable.Map" | "scala.collection.mutable.Map" |"scala.Map" | "java.util.Map"=>
{
val inputGenericType=method.getGenericParameterTypes.head.getTypeName
val keyType = inputGenericType.substring(inputGenericType.indexOf("<"))
val valueType = inputGenericType.substring(inputGenericType.indexOf(","))
MapType(TypeUtil(keyType), TypeUtil(valueType))
}
case _=>IType
}
var Rtype = JavaTypeInference.inferDataType(method.getReturnType)._1
Rtype = method.getReturnType.getName match {
//spark不支支持java Map,这里支持
case "scala.collection.Map" | "scala.collection.immutable.Map" | "scala.collection.mutable.Map" |"scala.Map" | "java.util.Map" => {
val rGenericType = method.getGenericReturnType.getTypeName
val keyType = rGenericType.substring(rGenericType.indexOf("<"))
val valueType = rGenericType.substring(rGenericType.indexOf(","))
MapType(TypeUtil(keyType), TypeUtil(valueType))
}
case "scala.collection.Seq" |"scala.Seq"=>{
val rGenericType = method.getGenericReturnType.getTypeName
ArrayType(TypeUtil(rGenericType.substring(rGenericType.indexOf("<")+1,rGenericType.indexOf(">"))))
}
case _=>Rtype
//todo
//后面还有seq,参考 ScalaReflection 支持的类型进行支持,因为在 JavaTypeInference 中无法通过
//private val mapType = TypeToken.of(classOf[JMap[_, _]]) 来构造DataType,这里还需要再细致得到看下代码,有优化空间
}
(IType, Rtype)
}
private def TypeUtil(str: String): DataType = {
str match {
case f if (f.contains("String")) => StringType
case f if (f.contains("Double")) => DoubleType
case f if (f.contains("Float")) => FloatType
case f if (f.contains("Integer") || f.contains("Int")) => IntegerType
case f if (f.contains("Char")) => FloatType
case f if (f.contains("Boolean")) => BooleanType
case f if (f.contains("Long")) => LongType
case f if (f.contains("Float")) => FloatType
case f if (f.contains("Float")) => FloatType
case f if (f.contains("Object")) => StringType
case _ => throw new Exception(s"unSupport Type $str")
}
}
}