前面我们了解了RDD编程,RDD优点极多,但是不包含schema信息,即列的信息,只能通过反复的迭代处理获取期待的数据,本文将阐述DataFrame的用法,所谓DataFrame就是包含schema信息的RDD。
RDD【弹性分布式数据集】是spark的核心,它是只读的,基于内存的,RDD结合算子会形成一个DAG【有向无环图】,DAG可以推测和延迟执行,效率极高。本文将阐述基于RDD的编程。
1 系统、软件以及前提约束
- CentOS 7 64 工作站 作者的机子ip是192.168.100.200,请读者根据自己实际情况设置
- 已完成RDD编程
https://www.jianshu.com/p/dd250c656c91 - hadoop已经安装
https://www.jianshu.com/p/b7ae3b51e559 - 为去除权限对操作的影响,所有操作都以root进行
- 确保hadoop,spark,hive的metastore已经启动,已经执行spark-shell连接到scala
2 操作
- 1 分析people.json
在scala命令行输入以下命令:
# 导入SparkSession
import org.apache.spark.sql.SparkSession
# 导入RDD隐式转DataFrame的包
import spark.implicits._
# 创建sparkSession【代替SparkSql】
val sparkSession = SparkSession.builder().getOrCreate()
# 加载people.json形成DataFrame
val df = spark.read.json("file:///root/spark-2.2.1-bin-hadoop2.7/examples/src/main/resources/people.json")
# 查询全部
df.show()
# 打印模式信息
df.printSchema()
# 条件过滤
df.filter("name='Andy'").show()
df.filter("age>20").show()
# 选择多列
df.select("name","age").show()
# 排序
df.sort(df("age").desc).show()
# 分组
df.groupBy("age").count().show()
# 列重命名
df.select(df("name").as("用户名称")).show()
# df数据写入本地文件
df.select("name","age").write.format("csv").save("file:///root/people.csv")
# rdd转化为df
val rdd = sc.parallelize(Array("java","python","cpp"))
val df1 = rdd.toDF()
df1.show()
以上就是在sparksql进行以DataFrame为基础的操作。