SparkRDD转DataFrame 映射的方式
package com.gofun.sparkSql
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
/**
* Create by IntelliJ IDEA.
* Author gofun
* 2017/10/10 20:18
*/
object RDD2DataFrameReflection {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
def main(args: Array[String]): Unit = {
rdd2DataFrame()
}
//rdd与dataframe转换
//在scala中使用反射方式,进行RDD到DataFrame转换,需要手动导入
def rdd2DataFrame(): Unit = {
val conf = new SparkConf().setAppName("rdd2DataFrame").setMaster("local[2]")
conf.set("hbase.zookeeper.quorum", "192.168.157.200:2181")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// System.setProperty("hadoop.home.dir", "E:\\hadoop-2.5.0-cdh5.3.6")
val lines = sc.textFile("hdfs://192.xxx.xxx.200:8020/test/student.txt")
val students = lines.map(_.split("\t"))
.map { line =>
Student(line(0).trim().toInt, line(1).trim())
}
val studentDF = sqlContext.createDataFrame(students)
studentDF.registerTempTable("studentTable")
val df = sqlContext.sql("select * from studentTable")
df.rdd.collect().foreach(println)
sc.stop()
}
}
case class Student(id: Int, name: String)
SparkRDD转DataFrame 构造元数据的方式
package com.gofun.sparkSql
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, RowFactory, SQLContext}
import org.apache.spark.sql.types._
/**
* Create by IntelliJ IDEA.
* Author gofun
* 2017/10/10 20:19
*/
object RDD2DataFrameProgrammatically extends App {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
override def main(args: Array[String]): Unit = {
rdd2DataFrame()
}
//构造元数据的方式加载数据将rdd转换为dataFrame
def rdd2DataFrame(): Unit = {
val conf = new SparkConf().setAppName("RDD2DataFrameProgrammatically").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val lines = sc.textFile("hdfs://192.xxx.xx.200:8020/test/student.txt")
val studentRdd = lines.map(_.split(" ")).map(line => RowFactory.create(Integer.valueOf(line(0)), String.valueOf(line(1)), Integer.valueOf(line(2))))
val fields = new scala.collection.mutable.ArrayBuffer[StructField]()
fields += DataTypes.createStructField("id", DataTypes.IntegerType, true)
fields += DataTypes.createStructField("name", DataTypes.StringType, true)
fields += DataTypes.createStructField("age", DataTypes.IntegerType, true)
val structType = DataTypes.createStructType(fields.toArray)
val studentDF = sqlContext.createDataFrame(studentRdd, structType)
studentDF.registerTempTable("student")
val df = sqlContext.sql("select name from student")
df.rdd.collect().foreach(println)
sc.stop()
}
def rdd2DataFrame2(): Unit = {
val conf = new SparkConf().setAppName("RDD2DataFrameProgrammatically").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val lines = sc.textFile("hdfs://192.168.64.200:8020/test/student.txt", 1)
val studentRDD = lines.map(line => Row(line.split(" ")(0).toInt, line.split(" ")(1), line.split(" ")(2).toInt))
val structType = StructType(Array(StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, true)))
val studentDF = sqlContext.createDataFrame(studentRDD, structType)
studentDF.registerTempTable("student")
val df = sqlContext.sql("select name,age from student")
df.rdd.foreach(println)
}
}