SparkSql现在有两个版本,方式如下:
方式一:使用sql版本
//提交的这个程序可以连接到spark集群中
val conf =new SparkConf().setAppName("SaparkDemo1").setMaster("local[*]")
//创建sparksql的连接(程序执行的入口)
val sc=new SparkContext(conf)
//sparkContext不能创建特殊的RDD
//将sparkContext包装增强
val sqlcontext =new SQLContext(sc)
//创建特殊的RDD(DataFrame),就是有schema的RDD
//先有一个普通的RDD,然后在关联schema,进而转成DataFrame
//在集群如有如下数据 1,laoduan,35,29
val lines =sc.textFile("hdfs://pro01:9000/person")
val boyRDd:RDD[Boy] = lines.map(line => {
val fields = line.split(",")
val id = fields(0).toLong
val name = fields(1)
val age = fields(2).toInt
val fv = fields(3).toDouble
Boy(id,name,age,fv)
})
//该RDD装的是Boy类型的数据,有个shcma信息,还是一个RDD
//将RDD转换成DataFrame
//导入隐式转换
import sqlcontext.implicits._
val bdf = boyRDd.toDF
//变成DF后可以使用两种API进行编程
//把DataFrame注册临时表
bdf.registerTempTable("t_boy")
//书写sql
val result=sqlcontext.sql("SELECT * FROM t_boy ORDER BY fv desc ,age asc")
//查看结果(触发Action)
result.show()
sc.stop()
}
}
case class Boy (id:Long,name:String,age:Int,fv:Double)
方式一的扩展:
//提交的这个程序可以连接到spark集群中
val conf =new SparkConf().setAppName("SaparkDemo1").setMaster("local[*]")
//创建sparksql的连接(程序执行的入口)
val sc=new SparkContext(conf)
//sparkContext不能创建特殊的RDD
//将sparkContext包装增强
val sqlcontext =new SQLContext(sc)
//创建特殊的RDD(DataFrame),就是有schema的RDD
//先有一个普通的RDD,然后在关联schema,进而转成DataFrame
//在集群如有如下数据 1,laoduan,35,29
val lines =sc.textFile("hdfs://pro01:9000/person")
val rowRDD:RDD[Row] = lines.map(line => {
val fields = line.split(",")
val id = fields(0).toLong
val name = fields(1)
val age = fields(2).toInt
val fv = fields(3).toDouble
Row(id,name,age ,fv)
})
val sch =StructType(List(
StructField("id", LongType,true),
StructField("name", StringType,true),
StructField("age", IntegerType,true),
StructField("fv", DoubleType,true)
))
//结果类型,表头,用于描述DataFram
val bdf = sqlcontext.createDataFrame(rowRDD,sch)
//该RDD装的是Boy类型的数据,有个shcma信息,还是一个RDD
//将RDD转换成DataFrame
//变成DF后可以使用两种API进行编程
//把DataFrame注册临时表
bdf.registerTempTable("t_boy")
//书写sql
val result=sqlcontext.sql("SELECT * FROM t_boy ORDER BY fv desc ,age asc")
//查看结果(触发Action)
result.show()
sc.stop()
另一种写法:将写sql的方式使用方法来调用
//提交的这个程序可以连接到spark集群中
val conf =new SparkConf().setAppName("SaparkDemo1").setMaster("local[*]")
//创建sparksql的连接(程序执行的入口)
val sc=new SparkContext(conf)
//sparkContext不能创建特殊的RDD
//将sparkContext包装增强
val sqlcontext =new SQLContext(sc)
//创建特殊的RDD(DataFrame),就是有schema的RDD
//先有一个普通的RDD,然后在关联schema,进而转成DataFrame
//在集群如有如下数据 1,laoduan,35,29
val lines =sc.textFile("hdfs://pro01:9000/person")
val rowRDD:RDD[Row] = lines.map(line => {
val fields = line.split(",")
val id = fields(0).toLong
val name = fields(1)
val age = fields(2).toInt
val fv = fields(3).toDouble
Row(id,name,age,fv)
})
val sch =StructType(List(
StructField("id", LongType,true),
StructField("name", StringType,true),
StructField("age", IntegerType,true),
StructField("fv", DoubleType,true)
))
//结果类型,表头,用于描述DataFram
val bdf = sqlcontext.createDataFrame(rowRDD,sch)
//不使用SQl的方式不用注册临时表
import sqlcontext.implicits._
val df1 = bdf.select("name","age","fv")
val df2 = df1.orderBy($"fv" desc,$"age" asc )
sc.stop()
方式二:
//Spark2.x
val session = SparkSession.builder()
.appName("SqlText1")
.master("local[*]")
.getOrCreate()
//创建RDD
val lines = session.sparkContext.textFile("hdfs://pro01:9000/person")
val rowRDD: RDD[Row] = lines.map(line => {
val fields = line.split(",")
val id = fields(0).toLong
val name = fields(1)
val age = fields(2).toInt
val fv = fields(3).toDouble
Row(id, name,age, fv)
})
val sch =StructType(List(
StructField("id", LongType,true),
StructField("name", StringType,true),
StructField("age", IntegerType,true),
StructField("fv", DoubleType,true)
))
//创建DataFrame
val df = session.createDataFrame(rowRDD, sch)
import session.implicits._
val df2 = df.where($"fv" >98).orderBy($"fv" desc,$"age" asc)
session.stop()
另一种写法:创建视图
//创建SparkSession
val sparke = SparkSession.builder()
.appName("SqlWordcount")
.master("local[*]")
.getOrCreate()
//指定读数据
//Dataset分布式数据集,是对RDD的进一步分装,更加智能
//Dateset默认只有一列,是value
val lines: Dataset[String] = sparke.read.textFile("hdfs://pro01:9000/person")
//整理数据
import sparke.implicits._
val words: Dataset[String] = lines.flatMap(_.split(" "))
//注册试图
words.createGlobalTempView("v_wc")
val result:DataFrame =sparke.sql("SELECT value,COUNT(*) counts FROM v_wc GROUP BY value ORDER BY counts DESC")
result.show()
另一种写法:将sql使用方法来调用
//创建SparkSession
val sparke = SparkSession.builder()
.appName("DataSerWordcount")
.master("local[*]")
.getOrCreate()
//指定读数据
//Dataset分布式数据集,是对RDD的进一步分装,更加智能
//Dateset默认只有一列,是value
val lines: Dataset[String] = sparke.read.textFile("hdfs://pro01:9000/person")
//整理数据
import sparke.implicits._
val words: Dataset[String] = lines.flatMap(_.split(" "))
//使用DataSet的API(DSL)
//val cou = words.groupBy($"value" as "word" ).count().sort("").sort($"count" desc)
//导入聚合函数
import org.apache.spark.sql.functions._
val counts = words.groupBy($"value" as"word").agg(count("*") as"counts").orderBy($"counts " desc)
counts.show()
sparke.stop()