spark 动态UDF加载实现(2)

 def LoadUserDefineUDF(user: String, spark: SparkSession): Unit = {
    val brainUrl: String = PropertiesLoader.getProperties("database.properties").getProperty("brain.url")
    val brainPrefix = brainUrl.substring(0, brainUrl.indexOf("feature-panel/online-feature") - 1)
    val udfURL = s"$brainPrefix/udfWarehouse/findInfoByUserId?userId=$user"
    val simpleHttp = new SimpleHttp
    val result = simpleHttp.fetchResponseText(udfURL)
    logger.info("***********************************************************")
    logger.info(s"result:$result")
    try {
      val resultJson = JSON.parseObject(result)
      val flag = resultJson.getInteger("code").toInt
      flag match {
        case 0 => LoadUDF(resultJson, spark)
        case _ => logger.error(s"加载用户自定义离线特征处理udf失败!原因:${resultJson.getString("msg")}")
      }
    } catch {
      case e: Exception =>
        logger.error(s"加载用户自定义离线特征处理udf失败!原因:服务器异常!" + e.getMessage, e)
    }
  }

  def LoadUDF(jsonObj: JSONObject, spark: SparkSession): Unit = {
    val udfArray = jsonObj.getJSONObject("data").getJSONArray("data")
    var array = mutable.ArrayBuilder.make[URL]()
    logger.info("************************************************************")
    val methodMap = new mutable.HashMap[String, (String, String, String)]()
    for (i <- 0 to udfArray.length - 1) {
      val udfJson = udfArray.getJSONObject(i)
      val udfName = udfJson.getString("udfName")
      val downLoadJarUrl = udfJson.getString("downLoadJarUrl")
      val entryClass = udfJson.getString("entryClass")
      val jarName = udfJson.getString("jarName")+".jar"
      val functionName = udfJson.getString("functionName")
      try {
        downLoadJar(downLoadJarUrl, jarName)
        spark.sparkContext.addJar(HdfsPrefix+jarName)
        val url2 = new URL(s"file:./${jarName}")
        logger.info(s"*********加载udf $udfName 成功**********")
        methodMap.put(udfName, (functionName, entryClass, jarName))
        array += url2
      } catch {
        case e: Exception =>
          logger.error(s"$jarName $functionName $entryClass Exception!!!", e.getMessage)
      }
    }
    ScalaGenerateFunctions(array.result())
    methodMap.foreach {
      map =>
        try {
          val (fun, inputType, returnType) = ScalaGenerateFunctions.genetateFunction(map._2._1, map._2._2, map._2._3)
          val inputTypes = Try(List(inputType)).toOption
          spark.udf.register(map._1, fun, returnType)
          logger.info(s"*********spark 注册udf ${map._1} 成功**********")
        } catch {
          case e: Exception =>
            logger.error(s"*********spark 注册udf ${map._1} 失败!!", e.getMessage)
        }
    }
  }

  def downLoadJar(url: String, jarName: String): Unit = {
    logger.info("*******************************************")
    logger.info(s"****************url:$url**********************")
    //val path = "E:\\temp\\"
    val path = "./"
    val file = new File(path)
    //val jars = Array("test.jar", "test2.jar")
    if (!file.exists()) {
      //如果文件夹不存在,则创建新的的文件夹
      file.mkdirs()
    }
    var fileOut: FileOutputStream = null
    var conn: HttpURLConnection = null
    var inputStream: InputStream = null
    try {
      val httpUrl = new URL(url)
      conn = httpUrl.openConnection().asInstanceOf[HttpURLConnection]
      conn.setRequestMethod("GET")
      conn.setDoInput(true)
      conn.setDoOutput(true)
      // post方式不能使用缓存
      conn.setUseCaches(false)
      //连接指定的资源
      conn.connect()
      //获取网络输入流
      inputStream = conn.getInputStream();
      val bis = new BufferedInputStream(inputStream)
      fileOut = new FileOutputStream(path + jarName)
      val bos = new BufferedOutputStream(fileOut)
      val buf = new Array[Byte](4096)
      var length = bis.read(buf);
      //保存文件
      while (length != -1) {
        bos.write(buf, 0, length);
        length = bis.read(buf);
      }
      //关闭流
      bos.close();
      bis.close();
      conn.disconnect();
    } catch {
      case e: Exception =>
        logger.error(s"下载jar:$jarName 出错" + e.getMessage, e)
    }
  }

下面是一个单元测试

@Test
  def testStr2VecJson(): Unit = {
    System.setProperty("hadoop.home.dir", "D:\\winutils")
    val conf = new SparkConf().setAppName("test").setMaster("local[2]")//.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //    val sc = new SparkContext(conf)
    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._
    val data = Array("1", "2")
    val rdd = spark.sparkContext.parallelize(data)
    val df = rdd.toDF("str")
    //这里套用工具类 E:\adworkSpace\autotask\target
    val url = new URL("file:F:/ad_codes/data_flow_test/target/data_flow_test-1.0-SNAPSHOT.jar")
    val urls = Array(url)
    ScalaGenerateFunctions(urls)


    val className = "com.vivo.ai.temp.Method"
    val methodArray = Array("str2VecJson")

    methodArray.foreach {
      methodName =>
        val (fun, inputType, returnType) = ScalaGenerateFunctions.genetateFunction(methodName, className,"autotask-2.0-SNAPSHOT.jar")
        val inputTypes = Try(List(inputType)).toOption

        //def builder(e: Seq[Expression]) = ScalaUDF(fun, returnType, e, inputTypes.getOrElse(Nil), Some(methodName))
        spark.udf.register(methodName, fun, returnType)
      //        def builder(e: Seq[Expression]) = ScalaUDF(function = fun, dataType = returnType, children = e, Seq(true), inputTypes = inputTypes.getOrElse(Nil), udfName = Some(methodName))
      //
      //        spark.sessionState.functionRegistry.registerFunction(new FunctionIdentifier(methodName), builder)
    }
    df.createTempView("strDF")
    df.show()
    spark.sql("select str2VecJson(str) from strDF").show()
  }
  
其中 com.vivo.ai.temp.Method定义如下

import com.alibaba.fastjson.JSON
import com.vivo.ai.encode.ContinuousEncoder
import com.vivo.ai.encode.util.EncodeEnv
import com.vivo.vector.Vector
import com.alibaba.fastjson.serializer.SerializerFeature
import scala.collection.mutable
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
/**
  * @author liangwt
  * @create 2020/11/16
  * @since 1.0.0
  * @Description :
  */
class Method {
  def method(value:String):String={
    //用户自定义处理方法
    value
  }
  def method2(value:Int):String={
    (value+100).toString
  }
  def testMap(value:Int):scala.collection.Map[String,String]={
    scala.collection.Map("1"->"1")
  }
  def testJMap(value:Int):java.util.Map[String,String]={
    scala.collection.Map("1"->"1").asJava
  }

  def testMap2(value:Int):Map[String,String]={
    Map("1"->"1")
  }
  def testSet(value:Int)={
    val set=mutable.Set("1")
    set.asJava
  }
  def testSeq(value:Int)={
   Seq("1")
  }
  def inputSeq(seq:Seq[Int]): String ={
    "1"
  }
  def inputMap(map:Map[String,Integer]):String={
    "1"
  }
  def str2VecJson(str:String):String={
    var userNewsRTFeatureVec: Vector = Vector.builder(24).build()
    var userArrayBuffer = new ArrayBuffer[(Int,Float)]()
//    str.split(",").foreach{
//      line=>
//
//    }
//    Tools.str2Map(str).map {
//      case (k, v) =>
//        //val index = ContinuousEncoder.encode("news_category_v3", k, EncodeEnv.PRD)
//        userArrayBuffer += (index -> v.toString.toFloat)
//    }
    userArrayBuffer +=(1->0.1f)
    userArrayBuffer.sortWith((x,y) => x._1 < y._1)
    userNewsRTFeatureVec.setIndices(userArrayBuffer.map(_._1).toArray)
    userNewsRTFeatureVec.setValues(userArrayBuffer.map(_._2).toArray)
    JSON.toJSONString(userNewsRTFeatureVec,SerializerFeature.IgnoreNonFieldGetter)
  }

}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,928评论 6 509
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,748评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,282评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,065评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,101评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,855评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,521评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,414评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,931评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,053评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,191评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,873评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,529评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,074评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,188评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,491评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,173评论 2 357

推荐阅读更多精彩内容