1 概述
spark的内存主要有消耗在三个方面:
the amount of memory used by your objects(数据集的存储);
the cost of accessing those objects(访问数据对象);
the overhead of garbage collection(垃圾回收)
默认情况下,java对象能够快速访问,但是需要消耗原始数据字段2-5倍的内存空间,主要原因如下:
Each distinct Java object has an “object header”(每个对象都有数据头);
Java Strings have about 40 bytes of overhead over the raw string data(字符串都是字符数组)
Common collection classes not only has a header, but also pointers(java集合不仅有数据头,还有指针)
Collections of primitive types often store them as “boxed” objects(原始类型的集合通常存的是包装对象)
2 内存模型
运行于Executor中的Task同时可使用On-Heap和Off-heap两种内存。
JVM On-Heap内存:大小由”--executor-memory”(即 spark.executor.memory)参数指定。Executor中运行的并发任务共享JVM堆内内存。
JVM Off-Heap内存:大小由”spark.yarn.executor.memoryOverhead”参数指定,主要用于JVM自身,字符串, NIO Buffer等开销。
在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。
on-Head:
off-Head:
3 确定内存消耗
数据集:创建rdd并进行缓存,通过web UI界面查看
对象: 通过SizeEstimator的estimate()方法
4 调优措施
4.1 Tuning Data Structures
prefer arrays of objects, and primitive types, instead of the standard Java or Scala collection classes(数组和简单类型替换集合类型)
Avoid nested structures with a lot of small objects and pointers when possible(避免使用大量小对象和带指针的嵌套数据结构)
Consider using numeric IDs or enumeration objects instead of strings for keys(用数字型或者枚举类型替换字符串)
If you have less than 32 GiB of RAM, set the JVM flag -XX:+UseCompressedOops to make pointers be four bytes instead of eight(如果内存小于32G,设置指针为4字节)
4.2 Serialized RDD Storage
store them in serialized form, using the serialized StorageLevels in the RDD persistence API, such as MEMORY_ONLY_SER(序列化存储rdd),The only downside of storing data in serialized form is slower access times(缺点就是访问速度变慢)
4.3 Garbage Collection Tuning
Before trying other techniques, the first thing to try if GC is a problem is to use serialized caching(出现GC问题第一个解决方案就是进行rdd的序列化缓存)
GC can also be a problem due to interference between your tasks’ working memory (the amount of space needed to run the task) and the RDDs cached on your nodes(GC问题主要来自于程序运行和rdd缓存)
Measuring the Impact of GC
The first step in GC tuning is to collect statistics on how frequently garbage collection occurs and the amount of time spent GC. This can be done by adding -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps to the Java options(通过提交任务时设置参数获取GC的频率和消耗时间详情)
Advanced GC Tuning
jvm基本概念
Java Heap space is divided in to two regions Young and Old. The Young generation is meant to hold short-lived objects while the Old generation is intended for objects with longer lifetimes.
jvm分成新生代和老年代,新生代负责存储生命周期短的对象,老年代负责存储生命周期长的对象。
The Young generation is further divided into three regions [Eden, Survivor1, Survivor2].
新生代又分为Eden,Survivor1, Survivor2。两个Survivor是相同大小的,一个负责存储对象,一个负责辅助复制。
A simplified description of the garbage collection procedure: When Eden is full, a minor GC is run on Eden and objects that are alive from Eden and Survivor1 are copied to Survivor2. The Survivor regions are swapped. If an object is old enough or Survivor2 is full, it is moved to Old. Finally, when Old is close to full, a full GC is invoked.
当Eden区满了,有用的对象会被复制到Survivor1,无用的清除。两个Survivor不断交替负责存储和辅助,当负责存储的也满了后,就会将还存活的对象复制到老年代,如果老年代也满了,就会触发Full GC。
The goal of GC tuning in Spark is to ensure that only long-lived RDDs are stored in the Old generation and that the Young generation is sufficiently sized to store short-lived objects.
spark gc调优的目标就是是生命周期长的rdds存储在老年代,使得新生代有足够的空间存储生命周期短的对象(temporary objects created during task execution),从而避免full GC。
调优措施
If a full GC is invoked multiple times for before a task completes, it means that there isn’t enough memory available for executing tasks.
如果整个task过程中有多次full GC,说明executor中任务执行可用内存不够。
If there are too many minor collections but not many major GCs, allocating more memory for Eden would help.
如果新生代垃圾回收太多,应该给eden区分配更多的内存
if the OldGen is close to being full, reduce the amount of memory used for caching by lowering spark.memory.fraction;consider decreasing the size of the Young generation
如果老年代接近full GC,那么通过降低spark.memory.fraction的参数值减少用于缓存的内存,另外可以考虑减少新生代的内存。
Try the G1GC garbage collector with -XX:+UseG1GC.
可以尝试使用G1GC垃圾回收器。
Monitor how the frequency and time taken by garbage collection changes with the new settings.
通过监控gc的频率和时间来不断调整参数设置
GC tuning flags for executors can be specified by setting spark.executor.defaultJavaOptions or spark.executor.extraJavaOptions in a job’s configuration.
4.4 Other Considerations
Level of Parallelism
In general, we recommend 2-3 tasks per CPU core in your cluster.
Parallel Listing on Input Paths
Sometimes you may also need to increase directory listing parallelism when job input has large number of directories
Memory Usage of Reduce Tasks
Spark’s shuffle operations (sortByKey, groupByKey, reduceByKey, join, etc) build a hash table within each task to perform the grouping, which can often be large.The simplest fix here is to increase the level of parallelism. you can safely increase the level of parallelism to more than the number of cores in your clusters.
Broadcasting Large Variables
Data Locality
In situations where there is no unprocessed data on any idle executor, Spark switches to lower locality levels. There are two options: a) wait until a busy CPU frees up to start a task on data on the same server, or b) immediately start a new task in a farther away place that requires moving data there.
What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout expires, it starts moving the data from far away to the free CPU. The wait timeout for fallback between each level can be configured individually or all together in one parameter.
5 Summary
For most programs, switching to Kryo serialization and persisting data in serialized form will solve most common performance issues.