目录
Spark-相关概念
MapReduce和Spark对比
Spark-rdd
Spark-shuffle分析
Spark-闭包/线程安全
Spark-sql
Spark-相关概念
RDD:
是一个抽象数据集,RDD中不保存要计算的数据,保存的是元数据(描述信息),即数据的描述信息和运算逻辑,比如数据要从哪里读取,怎么运算等
RDD可以认为是一个代理,你对RDD进行操作,相当于在Driver端先是记录下计算的描述信息,然后生成Task,将Task调度到Executor端才执行真正的计算逻辑
DAG:
是对多个RDD转换过程和依赖关系的描述
触发Action就会形成一个完整的DAG,一个DAG就是一个Job
一个Job中有一到多个Stage,一个Stage对应一个TaskSet,一个TaskSet中有一到多个Task
Job:
Driver向Executor提交的作业
触发一次Acition形成一个完整的DAG
一个DAG对应一个Job
Stage:
Stage执行是有先后顺序的,先执行前的,在执行后面的
一个Stage对应一个TaskSet
一个TaskSet中的Task的数量取决于Stage中最后一个RDD分区的数量
MapReduce和Spark对比
关于MapReduce和spark的对比,直接对比执行流程就可显而易见
Spark
1.集流批处理、交互式查询、机器学习及图计算等于一体
2.基于内存迭代式计算,适合低延迟、迭代运算类型作业
3.可以通过缓存共享rdd、DataFrame,提升效率【尤其是SparkSQL可以将数据以列式的形式存储于内存中】
4.中间结果支持checkpoint,遇错可快速恢复
5.支持DAG、map之间以pipeline方式运行,无需刷磁盘
6.多线程模型,每个worker节点运行一个或多个executor服务,每个task作为线程运行在executor中,task间可共享资源
7.Spark编程模型更灵活,支持多种语言如java、scala、python、R,并支持丰富的transformation和action的算子
MapReduce
1.适合离线数据处理,不适合迭代计算、交互式处理、流式处理
2.中间结果需要落地,需要大量的磁盘IO和网络IO影响性能
3.虽然MapReduce中间结果可以存储于HDFS,利用HDFS缓存功能,但相对Spark缓存功能较低效
4.多进程模型,任务调度(频繁申请、释放资源)和启动开销大,不适合低延迟类型作业
5.MR编程不够灵活,仅支持map和reduce两种操作。当一个计算逻辑复杂的时候,需要写多个MR任务运行【并且这些MR任务生成的结果在下一个MR任务使用时需要将数据持久化到磁盘才行,这就不可避免的进行遭遇大量磁盘IO影响效率】
spark执行流程
spark-submit执行脚本在client模式下,脚本通过反射调用程序的业务逻辑
sparksubmit将任务的信息发送给master
master向worker通信,需要的资源信息,application,driver信息发送给worker
worker启动executor
executor跟driver反向注册
driver创建RDD Action算子会调用runjob触发执行
根据最后一个RDD往前推,根据依赖关系stage,RDD递归类似于类似于栈的结构,递归的出口是没有父RDD
先提交前面的stage,再提交后面的stage,一个stage对应一个stageset,一个stageset有多个task(shuffleMaptask Resulttask),然后把taskset传递给tasksecheduler
asksecheduler将taskset中的task进行序列化,然后根据executor的资源情况,将序列化的task发送给executor
将taskdesccription反序列化用taskrunner包装放在线程池中
调用task的run方法,传入到taskcontext中,然后根据具体的task类型,如果是shufflemaptask,就调用其runtask
将数据先应用分区器返回ID,然后写入到APPendonlymap的内存中,默认达到5M溢写到磁盘,生成两个文件一个索引文件和数据文件
mappartitionRDD向shuffledRDD要数据,shuffleRDD获取shuffledReader,从上游拉取属于自己分区的数据,然后进行全局的聚合,最后将聚合的结果写入到hdfs中
MapReduce执行流程
read阶段 默认testinputformat一行一行读取数据
map阶段 返回相应(k,v),并写入到context(k,v)
collect阶段 向环形缓冲区写入(k,v)数据,默认100m 80%反向 分区(可以重写分区方法)排序(按分区规则来排序 所以重写这个排序没有意义)
溢出阶段 溢出到文件
combine阶段 merge并归排序(此处有接口combiner(减少了io传输),特定业务)
reducetask工作机制
copy阶段:所以maptask结束.reducetask端主动拉取数据,下载到reduce本地磁盘(存不下溢出到磁盘,存的下就在内存中)
merge阶段 sout 阶段 合并文件并归排序(groupcomparater方法)
reduce阶段 相同keycopy到一个迭代器中
shuffle机制 (map之前 reduce之后)
Spark-rdd和高级功能
Transformation √
Action
Cache/persist
Checkpoint
广播变量
自定义分区
自定义排序
分组TOP_N
序列化问题
累加器
Spark-闭包/线程安全
1.使用object,就不需要new 一个实例(会出现线程安全的问题)
2.new 一个实例(每个task持有一个实例)
3.不在driver端初始化,就没有加载类,相当于工具类,在extutor端初始化
4.在算子内new 每来一条数据就new 一个实例效率低下
5.使用mapPartitions是一个迭代器new 一个实例,效率较高且不需要考虑序列化的问题
(重点学习1和2的适用场所:1适用于只读2适用于存在改的场景)
(使用1每个excutor共用一个地址,driver一个地址,如果是class的场景,每个task共用一个地址值)
(多线程问题与线程不安全类的解决简单解决方案)
方法一:加锁(读写锁)
方法二:(一个extutor一个task)
SparkSQL
Spark SQL运行架构
Spark SQL对SQL语句的处理和关系型数据库类似,即词法/语法解析、绑定、优化、执行。
Spark SQL会先将SQL语句解析成一棵树,然后使用规则(Rule)对Tree进行绑定、优化等处理过程。
Spark SQL由Core、Catalyst、Hive、Hive-ThriftServer四部分构成:
• core: 负责处理数据的输入和输出,如获取数据,查询结果输出成DataFrame等
• catalyst: 负责处理整个查询过程,包括解析、绑定、优化等
• hive: 负责对Hive数据进行处理
• hive-ThriftServer: 主要用于对hive的访问
Spark-SQL运行原理
- 使用SessionCatalog保存元数据
在解析SQL语句之前,会创建SparkSession,或者如果是2.0之前的版本初始化SQLContext,SparkSession只是封装了SparkContext和SQLContext的创建而已。会把元数据保存在SessionCatalog中,涉及到表名,字段名称和字段类型。创建临时表或者视图,其实就会往SessionCatalog注册 - 解析SQL,使用ANTLR生成未绑定的逻辑计划
当调用SparkSession的sql或者SQLContext的sql方法,我们以2.0为准,就会使用SparkSqlParser进行解析SQL. 使用的ANTLR进行词法解析和语法解析。它分为2个步骤来生成Unresolved LogicalPlan:
词法分析:Lexical Analysis,负责将token分组成符号类
构建一个分析树或者语法树AST - 使用分析器Analyzer绑定逻辑计划
在该阶段,Analyzer会使用Analyzer Rules,并结合SessionCatalog,对未绑定的逻辑计划进行解析,生成已绑定的逻辑计划。 - 使用优化器Optimizer优化逻辑计划
优化器也是会定义一套Rules,利用这些Rule对逻辑计划和Exepression进行迭代处理,从而使得树的节点进行和并和优化 - 使用SparkPlanner生成物理计划
SparkSpanner使用Planning Strategies,对优化后的逻辑计划进行转换,生成可以执行的物理计划SparkPlan. - 使用QueryExecution执行物理计划
此时调用SparkPlan的execute方法,底层其实已经再触发JOB了,然后返回RDD
SQL案例
例如上:要求使用rdd,sql,dsl风格分别实现上面的需求
//分组转化案例
object GroupedIntoRdd {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]"))
val rdd = sc.textFile("src/main/resources/data.csv")
val maprdd = rdd.map(line => {
val data = line.split(",")
(data(0).toInt, (data(1), data(2), data(3).toInt))
})
//按id来分组处理
val etlrdd = maprdd.groupByKey().flatMapValues(data => {
//下一条的begin_time - 上一条的end_time > 10min再分一组
//按照begin_time来排序
val sort = data.toSet.toList.sorted
val sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm")
var flag = 0 // 0或者1 用来判断是否小于十分钟
var sum = 0 //划分分组
var temp_time = 0L //接收上一个的end_time
sort.map(da => {
val begin_time = sdf.parse(da._1).getTime
val end_time = sdf.parse(da._2).getTime
if (temp_time != 0L) {
if ((begin_time - temp_time) / (1000 * 60) < 10) {
flag = 0
}
else {
flag = 1
}
}
sum += flag
temp_time = end_time
(begin_time, end_time, da._3, sum)
})
}).map({ case (x, (y, z, q, p)) => ((x, p), (y, z, q)) })
val value = etlrdd.reduceByKey((x, y) => {
val min = Math.min(x._2, y._2)
val max = Math.max(x._2, y._2)
var sum = x._3 + y._3
(min, max, sum)
}
).mapPartitions(iter => {
val sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm")
iter.map(data => {
val start = sdf.format(data._2._1)
val last = sdf.format(data._2._2)
((data._1._1, start), (last, data._2._3))
})
}).sortByKey()
value.collect().foreach(println)
sc.stop()
}
}
-- begin_time - 上一个的end_time 大于6分钟就重新分一组
object GroupedIntoSql {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate()
val schema = new StructType(Array(
StructField("id", DataTypes.StringType),
StructField("begin_time", DataTypes.StringType),
StructField("end_time", DataTypes.StringType),
StructField("down_flow", DataTypes.DoubleType),
))
val frame = spark.read.format("csv").schema(schema).load("C:\\Users\\hp\\IdeaProjects\\Spark_Maven\\src\\main\\resources\\data.csv")
frame.createTempView("GroupedInto")
spark.sql(
"""
|select
|id,
|min(begin_time) as begin_time,
|max(end_time) as end_time,
|sum(down_flow) as down_flow
|from
|(
|select
|id,begin_time,end_time,down_flow,
|sum(flag) over(partition by id order by begin_time
|rows between unbounded preceding and current row ) as sum_flag
|from
|(
|select
|id,begin_time,end_time,down_flow,
|if((to_unix_timestamp(begin_time,'yyyy/M/dd HH:mm') -
|to_unix_timestamp(rn_time,'yyyy/M/dd HH:mm') )> 600, 1, 0) as flag
|from
|(
|select
|id,begin_time,end_time,down_flow,
|lag(end_time,1,begin_time) over(partition by id order by begin_time ) as rn_time
|from
|GroupedInto
|) t1
|) t2
|) t3
|group by id , sum_flag
|""".stripMargin).show(100)
}
}
object GroupedIntoSqlDsl {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate()
val dataFrame = spark.read.csv("src/main/resources/data.csv").toDF("id", "begin_time", "end_time", "down_flow")
import spark.implicits._
import org.apache.spark.sql.functions._
dataFrame.selectExpr(
"id"
,"begin_time"
,"end_time"
,"down_flow"
,"lag(end_time,1,begin_time) over(partition by id order by begin_time) as lag_time " )
.select(
'id
,'begin_time
,'end_time
,'down_flow
//if (unix_timestamp('begin_time,"yyyy/M/dd HH:mm")-unix_timestamp(col("lag_time"),"yyyy/M/dd HH:mm") > 600 ,1,0)
,expr("if((to_unix_timestamp(begin_time,'yyyy/M/dd HH:mm')-to_unix_timestamp(lag_time ,'yyyy/M/dd HH:mm'))>600,1,0)") as "flag"
)
.select(
col("id")
,col("begin_time")
,col("end_time")
,col("down_flow")
,sum(col("flag")).over( Window.partitionBy(col("id")).orderBy(col("begin_time")).rowsBetween(Window.unboundedPreceding,Window.currentRow)) as "rn"
)
.groupBy(col("id"),col("rn"))
.agg(
min(col("begin_time")) as "begin_time"
,max((col("end_time"))) as "end_time"
,sum(col("down_flow")) as "down_flow"
)
//要是后面有select 则不需要使用drop来过滤字段了
.drop(col("rn"))
.orderBy(col("id"))
.select("id","begin_time","end_time","down_flow")
.show(100,false)
}
}
SQL优化
- 业务逻辑
- SQL的写法
- 参数的优化
关于SQL的优化主要还是经验为主,这才是吃饭的家伙