1. RDD编程
初始化
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
RDD可以由Hadoop文件系统(hdfs://)、本地文件、Amazon S3(s3a://)等创建或者使用sc.parallelize()直接存放内存,例如scala的driver程序的collection中。
val distFile = sc.textFile("data.txt")
SparkContext.textFile可以读入表达式或目录, 但SparkContext.wholeTextFiles读入目录会针对每个文件返回 (filename, content)
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
常用算子:
Transformation
lazy执行,生成新的RDD
- map
- flatmap
对每个输入元素生成多个输出元素
输出是一个包含各个返回值序列的迭代器可访问的所有元素的 RDD - filter
返回一个由能够符合通过传给 filter() 的函数的条件的元素组成的 RDD - sample(withRe placement, fra ction, [seed])
对 RDD 采样,以及是否替换 - distinct
distinct() 操作的开销很大,因为它需要将所有数据通过网络进行 混洗(shuffle),以确保每个元素都只有一份 - union(other RDD)
- intersection(other RDD)
shuffle 开销大 - subtract(other RDD)
返回一个由只存在于第一个 RDD 中而不存在于第二个 RDD 中的所有元素组成的 RDD,shuffle开销大 - cartesian(other RDD)
笛卡尔积 - join(other RDD) 仅适用于pairRDD
Action
| reduce(func) | 返回一个由func函数的逻辑聚集后形成的 RDD |
| collect() | 返回dataset的元素到driver|
| count() | 返回dataset元素个数 |
| first() | 返回dataset第一个元素 (similar to take(1)). |
| take(n) | 返回dataset前n个元素 |
| saveAsTextFile(path) | 本地或hdfs存储dataset |
| countByKey() | 仅适合pairRDDs of type (K, V) |
| foreach(func) | 对dataset中每个元素执行func,通常用于更新Accumulator 或者和外部存储交互|
持久化
共享变量
broadcast
累加器
2. Spark SQL
支持hive查询
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
spark.sql(string sql command)返回dataframe
常见问题
- Hadoop与Spark的区别
- shuffle阶段的过程详细介绍一下
- Spark基于图计算的优势
- Spark作业运行机制以及Spark各组件运行机制
- MapReduce中map和reduce各自负责什么内容,两个阶段如何衔接
- scala中的各种算子有使用过吗?说一下map函数、reduceByKey函数的机制
- 有些过海量数据处理的MapReduce脚本吗?(答:没写过,然后让我想想如何使用MR对大量文本数据进行清洗)