1. Spark SQL中,将已存在的RDD转换为DataFrame的两种方式。
- Inferring the Schema Using Reflection: 使用反射机制来推断包含特定对象类型的RDD的字段。
- Programmatically Specifying the Schema:通过编程创造字段结构类型。
2. 两种方法的应用场景
选择用哪种方式取决于开发人员在编写Spark应用程序时是否清楚数据的具体字段类型。
- 如果清楚,那么采用反射的方式,代码会简洁很多。
- 如果不清楚,直到运行时才知道数据的具体类型,采用编程的方式,数据字段的类型可以都先统一设置为String类型。
3. 两种方法的具体实现
数据源people.txt如下:
Justin, 19
Michael, 29
Andy, 30
3.1 Inferring the Schema Using Reflection
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
/**
* Author: whn
* Date: 2019-12-19 17:06
* Version: 1.0
* Function:
*/
object Rdd2DataFrame {
case class Person(name: String, age: Int) // 为实现rdd转换定义样例类,每个字段定义已经获知的数据类型
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.set("spark.driver.extraJavaOptions", "-Dfile.encoding=utf-8")
.set("spark.executor.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
val spark = SparkSession.builder()
.appName("Rdd2DataFrame")
.master("local")
.config(conf) // 加载sparkconf配置
.enableHiveSupport() // 支持hive查询
.getOrCreate()
import spark.implicits._ // 导入隐式转换,不然后面不能使用rdd.toDF() .toDS等方法
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map((attributes: Array[String]) => Person(attributes(0), attributes(1).trim.toInt)) // 将Array[String]转换为自定义的Person类型
.toDF("name", "age") // rdd转换为DF,并为每个字段命名,字段顺序与case class对应
peopleDF.show()
}
}
3.2 Programmatically Specifying the Schema
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
/**
* Author: whn
* Date: 2019-12-19 17:06
* Version: 1.0
* Function:
*/
object Rdd2DataFrame {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.set("spark.driver.extraJavaOptions", "-Dfile.encoding=utf-8")
.set("spark.executor.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
val spark = SparkSession.builder()
.appName("Rdd2DataFrame")
.master("local")
.config(conf) // 加载sparkconf配置
.enableHiveSupport() // 支持hive查询
.getOrCreate()
// 导入隐式转换,不然后面不能使用rdd.toDF() .toDS等方法
import spark.implicits._
// 获取rdd
val peopleRdd: RDD[String] = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
// 定义字段名称字符串
val schemaString: String = "name age"
// 创建对应的字段类型StructField,每个字段均定义为String类型,最后得到Array[StructField]
val fields: Array[StructField] = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
// 创建相应的结构体类型StructType,即得到rdd转换需要的schema
val schema: StructType = StructType(fields)
// 将rdd的每行转换为Row类型
val rowRDD: RDD[Row] = peopleRdd
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// 创建DataFrame
val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF.show()
}
}