这篇文章,是我在尝试理解Spark的一些概念时,找到的一篇文章。这篇文章主要介绍的是Spark中的内存模型。由于译者水平有限,翻译的过程中难免有些错误,请读者怀着质疑的心理来阅读这篇文章。
我们先来看 http://spark.apache.org/docs/1.3.0/cluster-overview.html中给的一副图片:
正如你所看到的那样,其中包含了太多术语。我个人不是很喜欢这种风格,因为它并不能突出一些重点概念。
我们从头开始说。在你的集群中,任何Spark进程都是都是一个JVM进程。所以,你可以通过配置JVM的内存的那些参数来配置它。下面是一幅关于Spark如何分配JVM堆内存的图片:
默认情况下,Spark会启动一个有512MB大小堆内存的JVM。为了保证安全,避免OOM这种错误,Spark只允许你使用90%的堆内存,当然,你可以通过spark.storage.safetyFraaction参数来进行调节。
你可能听说过这样一种说法,作为一个内存工具,Spark允许你将数据保存到内存中。如果你看过我的https://0x0fff.com/spark-misconceptions/这篇文章,你应该明白,Spark并不是一个真正的内存工具,它只是按照LRU算法来使用的它的缓存。
所以,Spark会划分一部分内存,用于缓存你在处理过程中产生的数据,这一部分内存的大小,通常是安全堆(堆的90%)的60%,你也可以通过spark.storage.memoryFraction参数来调节。所以,如果你想要知道,在你的集群中,Spark能够缓存多少数据,你可以通过下面的公式计算:集群中全部的堆的大小safetyFraction * storage.memoryFraction,在默认的情况下,允许Spark使用90%60%,也就是54%的集群中全部的堆内存。
现在我们来介绍一下shuffle memory。它可以通过下面的公式计算:堆大小 * spark.shuffle.safetyFraction * spark.shuffle.memoryFraction。其中,默认情况下,spark.shuffle.memoryFraction是80%,spark.shuffle.memoryFraction是20%。所以,你可以将16%的堆大小用于shuffle。
那么,shuffle memory是干嘛的呢?顾名思义,一般情况下,就是用于shuffle这个操作。当执行shuffle操作时,有的时候,需要对数据进行排序,这时候,我们就需要一个buffer,而shuffle memory就可以作为这个buffer来使用。那么,如果我们的数据量太大,shuffle memory不够用怎么办?我们有大量的外排序算法可以使用。
还有一块区域我们没有介绍到,就是unroll memory。这块内存的大小可以通过下面的公式计算:堆大小 * spark.storage.unrollFraction * spark.storage.memoryFraction * spark.storage.safetyFraction,默认情况下,就是20% * 60% * 90%,即10.8%。
那么,unroll memory又是干嘛的呢?这块内存,是我们需要数据"展开"的时候使用的。比如,被序列化之后的数据并不能被直接使用,所以,我们需要将它"展开"。这块区域是和Storage这块区域共享的,所以,这也就意味着,如果你需要一些内存来展开数据,那么,可能会丢失一些存储在Spark LRU Cache中的Partition.
现在你理解了Spark如何使用内存了。
那我们来看一下,当你启动一个Spark集群时,它看起来是什么样子?下面的例子都是针对YARN来进行的。
在一个YARN集群中,有一个Resource Manager,用于控制集群中的资源,以及好多个Node Managers,用来控制每个节点上资源的使用率。当你向YARN Resource Manager请求资源的时候,它会告诉你,你应该去联系哪个Node Manager,来获取execution container。每个execution container都是一个拥有你请求的资源的JVM。execution container的位置是由Resource Manager决定的,你无法控制它。比如,如果一个节点有64GB的内存,然后你请求10个executors,每个executors都需要4GB的内存,那么,即使你有一个包含很多个节点的集群,这些executors还是可能仅仅运行在一个节点上。
你的Job,会被拆分成不同的Stage,然后,每个Stage又会被切分成tasks。每个task都会被分开调度。你可以把每个executor看成一个task池,每个executor都可以让你运行spark.executor.cores / spark.task.cpus个tasks。
例如,如果我们有一个12个节点的集群,每个节点都有64GB的内存,32个核。所以,你可以在每个节点上启动两个用于26GB内存以及12个核的executor。所以你的集群总共能处理12 machines * 2 executors per machine * 12 cores per executor / 1 core for each task = 288个task。也就是说,你的集群能够同时运行288个task。这个集群中,你可以用来缓存你的数据的内存为0.9 spark.storage.safetyFraction * 0.6 spark.storage.memoryFraction * 12 machines * 2 executors per machine * 26GB per executor=336.96GB。