如果不熟悉spark的运行原理,那么在实际的生产当中,不论是调优还是debug都会显得没有下手之处。因此我们必须对Spark作业的资源使用原理有一个清晰的认识,并知道在Spark作业运行过程中,调优参数对于运行的影响,以及运行过程中的错误如何修改。
使用spark-submit提交一个作业之后,这个作业就会启动一个Driver来执行(cluster模式下Driver在集群中某个节点启动,client模式下在本地启动)。Driver进程会根据我们设置的参数,使用一定数量内存和Core。
Driver(spark-yarn中是Application进行申请)首先会向集群的资源管理器申请资源(Executor),Spark-Yarn中,ResourceManager会根据我们为Spark作业设置的资源参数,在各个NodeManager上分配Container,之后NodeManager就会启动一定数量executor进程,每个executor都有一定的内存和CpuCoue。之后NodeManager会向Driver进行反注册,这样Driver对于全局资源就有了一个基本的了解。
资源申请完毕之后,Driver就可以进行任务分发了。Driver会将Application按照Action切分Job,按照Shuffle切分Stage,每个Stage分配一批taskSet(数量由并行度决定),然后将taskSet分发到各个executor进程中执行。task是最小的计算单元,每一组算子进行一样的计算操作,只是负责的数据不同而已。当一个stage的所有task计算完毕之后,会在各个几点的本地磁盘文件写入计算的中间结果。如此往复,直到所有Job计算完毕,得到最终的计算结果。
Spark进行stage的划分是依据Shuffle进行的。如果代码中包含了某一个shuffle算子,那么将其划分为一个stage的界限,每一个stage刚开始执行的时候,每个task都需要从上一个stage的task所在的节点通过网络进行数据拉取自己需要处理的key,这里也就是宽依赖,对拉取到的所有相同的key使用当前的shuffle算子进行聚合操作。
Executor的内存由称为BlockManager管理(),主要分为三块:
这部分也叫堆内内存
(1)存储shuffle过程中从上一个stage中拉取得到的数据或者是存放等待下一个stage拉取得数据使用,默认20%;
new SparkConf().set("spark.shuffle.memoryFraction","0.3");设置为30%,调节大之后会减少因为内存不够导致的数据磁盘落地,减少磁盘io
(2)让RDD持久化使用,默认60%;
new sparkConf().set("spark.storage.memoryFraction","0.4")设置为40%,调节小之后,就会留给代码计算更大的内存空间
(3)让task执行自己编写的代码时候使用,存放对象等。默认20%;
其大小由1减去上述两个内存占比
task执行的速度和每个Executor进程的资源有直接关系。一个CPUCore同一时间只能执行一个线程,如果CpuCore数量充足,并且分配到的task数量比较合理,就可以比较告诉和高效的执行完task线程。