SparkSQL中可以创建自定义函数UDF对dataframe进行操作,UDF是一对一的关系,用于给dataframe增加一列数据的场景。 每次传入一行数据,该行数据可以是一列,也可以是多列,进行一顿操作后,最终只能输出该新增列的一个值。
UDF函数有两种注册方式:
- spark.udf.register() // spark是SparkSession对象
- udf() // 需要import org.apache.spark.sql.functions._
详细见示例代码如下:
import com.longi.bigdata.spark.utils.SparkSessionCreate
import org.apache.spark.sql.functions._
/**
* Author: whn
* Date: 2019-12-19 18:12
* Version: 1.0
* Function:
*/
object TwoWaysOfUsingUDF {
case class Person(name: String, age: Int)
def main(args: Array[String]): Unit = {
val spark = SparkSessionCreate.getSparkSession("TwoWaysOfUsingUDF", "local")
import spark.implicits._
//people.txt数据如下
//Michael, 10
//Andy, 60
//Justin, 40
val inputDF = spark.sparkContext
.textFile("file:\\E:\\ideaProjects\\sparketl\\people.txt")
.map(_.split(","))
.map((attributes: Array[String]) => Person(attributes(0), attributes(1).trim.toInt)) // 将Array[String]转换为自定义的Person类型
.toDF("name", "age") // rdd转换为DF,并为每个字段命名,字段顺序与case class对应
// TODO 方式1. spark.udf.register方式注册udf用于sql语法中
spark.udf.register("age_judge", ageJudge _)
// selectExpr是Dataframe的方法,属于DSL编程,但是该方法可以解析sql语句
val res1 = inputDF.selectExpr("name", "age", "age_judge(age) AS age_divide")
res1.show()
inputDF.createOrReplaceTempView("temp")
val res2 = spark.sql(
"""
|SELECT name, age, age_judge(age) AS age_divide
|FROM temp
""".stripMargin)
res2.show()
// TODO 方式2. udf()注册udf用于withColumn方法中
val ageJudgeUDF = udf(ageJudge _)
val res3 = inputDF.withColumn("age_divide", ageJudgeUDF(inputDF("age")))
res3.show()
spark.stop()
}
def ageJudge(age: Int): String = {
age match {
case ages =>
if (ages >= 1 && ages <= 3) "婴儿"
else if (ages >= 4 && ages <= 18) "少年"
else if (ages >= 19 && ages <= 45) "青年"
else if (ages >= 46 && ages <= 60) "中年"
else if (ages >= 61 && ages <= 130) "老年"
else "数据异常"
case _ => "数据异常"
}
}
}