SparkCore总结

Driver

Spark驱动器,用于执行Spark应用程序中的main方法,主要负责:
1.将代码转换为job
2.在executor之间调度task
3.跟踪executor的执行情况
4.展示执行结果

Executor

负责运行Spark作业中的具体task。如果有Executor节点发生故障,Spark应用会继续执行,将task转移到其他Executor上。核心功能:
1.运行组成Spark应用的task
2.通过自身的BlockManager,对需要缓存的RDD进行内存式存储。RDD直接缓存在Executor进程中,任务在运行时可以充分利用缓存数据加速。

Spark应用执行通用流程

任务提交后,先启动Driver,Driver将应用提交给集群管理器(ResourceManager或Master)。集群管理器根据应用的配置文件分配Executor并启动。当Driver需要的资源完全满足后,执行main函数。当执行到action算子后,开始划分stage,每个stage对应一个taskset。根据本地化原则,task被发送到对应的Executor执行。task执行期间,会不断向Driver汇报。

RDD的理解

RDD,弹性分布式数据集。处理数据需要将数据读取出来存放到一个数据结构中。RDD是Spark最基本的数据抽象,Spark处理分布式数组将数据以RDD的形式存储在内存中处理。
弹性意味着灵活,可以存储任意数据类型,可以将数据分区,一个分区由一个task处理,反映了并行粒度,可自行设定,默认设置为cpu核心数。若数据来源为HDFS,一个分区对应HDFS的一个数据块。
Spark算子是能并行处理RDD各个分区数据的函数。
加载数据得到最原始的RDD,将转移算子作用于已有的RDD形成新的RDD。转换后的RDD与原始的RDD形成依赖关系,构成lineage。凭借lineage,Spark可以将所有丢失的RDD恢复。但RDD的转换都是惰性的,只有action算子触发,需要将结果返回给Driver,Spark才会创建任务执行RDD的转换。转移算子只会记录操作,遇到action算子由最后一个RDD根据lineage向前回溯开始执行。

DAG
宽窄依赖

RDD之间的依赖关系又可分为宽依赖和窄依赖。如果父RDD的每个分区最多只由一个子RDD分区使用,这种关系便为宽依赖;如果父RDD的每个分区由多个子RDD分区使用,便为窄依赖。在执行DAG前会将DAG划分为多个stage,划分规则就是遇到窄依赖就断开。窄依赖因为子RDD的一个分区依赖父RDD的多个分区,父RDD的分区可能不在同一台机器上,那么在stage边界就会发生shuffle操作,会发生数据传送磁盘IO。一个stage代表一组可以并行执行的任务。

pipeline模式

每个stage在计算过程中数据不会落地,而是放在内存中供下一个函数使用。直到一个stage执行完或遇到action算子才会落磁盘。同一个stage可以进行pipeline计算。一个stage是否被提交,由他的父stage决定。只有当他的父stage被执行后,或者他没有父stage,才能提交执行。

任务调度

Driver初始化sparkcontext的过程中,会创建DAGScheduler和TaskScheduler对象,是在是sparkcontext的构造器中实现的。
sparkcontext是集群的入口。
DAGScheduler负责stage级的调度。当程序执行触发action算子,DAGScheduler会将job划分为多个stage,job由最终的RDD和action算子封装而成。划分策略是由最终的RDD根据lineage向前回溯,判断父依赖是否是宽依赖,即以shuffle为界,划分stage。stage提交时会将stage打包成TaskSet交给TaskScheduler,TaskSet为task的集合。TaskScheduler会一直监控stage的执行状态。
TaskScheduler负责task级的调度。TaskScheduler会将TaskSet放入调度队列中,从队列中取出一个TaskSet发送到Executor上执行。一个分区对应一个task,此task的优先位置就是对应的分区。同时监控每个task的执行状态。若失败会重试,同时记录失败次数,并启动黑名单机制,避免去上一个Executor上执行。

资源调度

Worker启动成功后,会向Master注册,Master便掌握了整个集群的资源情况。TaskScheduler会向Master发送请求,为当前的application申请资源。
Master接受请求后,查看哪个Worker节点的资源充足,向其发送消息,创建Executor进程。
所有的Executor向TaskScheduler注册,TaskScheduler便掌握了所有Executor的请况
DAGScheduler将job划分为多个stage,stage以TaskSet形式发送给TaskScheduler。
TaskScheduler遍历TaskSet对象,将Task发送到最佳的Executor上执行。
Spark是粗粒度的资源调度,在Application执行之前,便将所有执行需要的资源申请完毕,再进行任务调度,直到所有任务执行完毕才释放资源。缺点是无法充分利用集群资源,运行过程中空闲的资源其他Application无法利用。

运行模式

Standalone Spark原生的集群管理器。
Master,管理集群的资源。
Worker,管理本地资源,启动后向Master注册,启动Executor。
Executor,启动分区执行RDD分区的并行计算。

image.png

客户端提交,--deploy-mode client,Driver在客户端启动,适用于测试,当客户端提交任务过多,客户端资源不够用。集群方式提交,随机找一个Worker启动Driver。适用于生产环境,集群方式有助于负载均衡。
yarn 把spark任务提交给yarn集群。上传依赖jar包(包含启动Executor的jar包)到HDFS,启动Driver进程,向RM发送请求,为其建立一个APPMaster,APPMaster建立成功向RM申请资源。RM接收请求,向资源充足的NM发送消息,NM从HDFS下载依赖jar包,启动Executor。Executor启动成功,向Driver反向注册,Driver开始执行main函数。分为在客户端模式和集群模式。客户端模式在客户端启动Driver进程,集群模式ApplicationMaster进程就是Driver进程,任务调度和资源申请都由同一个进程来做。
HA ZooKeeper存放Master上的一些元数据,即每个Worker的资源情况。主从Master切换过程中不能提交新的Application,从Master从ZooKeeper读取元数据信息后,变为Active。运行中的Application在主从切换过程中不会受影响,因为Spark是粗粒度的资源调度。

通信机制

Spark模块间的通信使用的是AKKA

持久化与容错

cache 把RDD持久化到内存
persist 可选择多种持久化方式
都是懒执行算子,需要action触发。需要赋给一个变量,用于一个application有多个job。
lineage,若丢失数据不必重新计算,通过lineage进行部分计算便可得到结果。当业务逻辑很复杂,RDD之间频繁转换,lineage就很长,丢失数据计算需要花费较多时间。checkpoint可以将RDD上传到HDFS,让后面的RDD不再依赖于这个RDD,而是依赖于HDFS上的数据,下次计算会方便的多。
checkpoint执行原理,当job完成后,会从最后一个RDD向前回溯,回溯到标记checkpoint的RDD。spark会自动启动一个新的job,重新计算该RDD,将该RDD持久化到HDFS中。可以将该RDDcache到内存中,这样启动job就可以直接从内存中获取该RDD不用重新计算。

广播变量

当计算task需要使用RDD外部的变量,当把task发送到其他节点执行,需要用到此变量还要发送给每个task,当变量占用内存过大,造成集群资源紧张。声明为广播变量,会将该变量发送到每个节点的Executor中,供该Executor执行的所有任务使用,减少了集群的资源负荷。只能在Driver端定义和修改。

累加器

继承Accumulator,集群规模的全局变量。

shuffle

如果没有设置参数,reduce端的分区数等于map端的分区数,那么reduce端的task数与map端相同。
maptask执行完毕会将产生的磁盘小文件位置封装成对象发送给Driver。reducetask执行前会向Driver请求文件位置。每次拉取数据量不能超过48M,将拉来的数据存储在Executor内存的20%中。
未经优化的HashShuffleManager
shufflewrite,在一个stage执行结束后,将要落磁盘的数据按key划分,以便reducetask方便拉取数据。
相同key的数据写入一个文件中。每个文件只属于下游stage中的一个task。在数据写入磁盘,会将数据写入缓冲区,缓冲区满了才会溢写到磁盘中。下一个stage的task有多少,就要创建多少个文件。
shuffleread一边拉取一边聚合,拉完一批聚合完再拉下一批。
优化后的HashShuffleManager
可设置参数spark.shuffle. consolidateFiles,默认为false。consolidate机制允许不同的task产生的文件合并,这样大大减少了磁盘文件的数量。
SortShuffle
数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件

彻底搞懂spark的shuffle过程
Spark 内核解析-上

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 原文:https://tech.meituan.com/spark-tuning-pro.html Spark性能...
    code_solve阅读 4,919评论 0 34
  • 1 数据倾斜调优 1.1 调优概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spar...
    wisfern阅读 7,977评论 0 23
  • 场景 数据倾斜解决方案与shuffle类性能调优 分析 数据倾斜 有的时候,我们可能会遇到大数据计算中一个最棘手的...
    过江小卒阅读 8,820评论 0 9
  • 所谓的“韭菜”,指的是在交易市场中没赚到钱甚至赔钱的势单力薄的散户。 投机者拒绝学习,投资者善于学习。 交易之前,...
    Philly008阅读 1,513评论 0 0
  • 我就是个普通人,我能找个自己能做的事情,把它做好,通过自己的辛勤付出能让自己和家人衣食无忧,就是最好的归宿了。
    一埝阅读 1,166评论 0 1

友情链接更多精彩内容