1、数据序列化
在任何分布式系统中,序列化都是扮演着一个非常重要的角色。如果使用的序列化技术,在执行序列化操作的时候很慢,或者是序列化后的数据还是很大,那么会让分布式应用程序的性能下降很多。所以,进行Spark性能优化的第一步,就是进行序列化的性能优化。
Spark自身默认就会在一些地方对数据进行序列化,比如Shuffle;还有,如果我们的算子函数使用到了外部的数据(比如java内置类型,或者自定义类型),那么也需要让其可序列化。
Spark自身对于序列化的便捷性和性能进行了一个取舍和平衡。默认,Spark倾向于序列化的便捷性,使用了Java自身提供的序列化机制——基于ObjectInputStream和ObjectOutputStream的序列化机制。因为这种方式是Java原生提供的,很方便使用。
但是,Java序列化机制的性能并不高,序列化的速度相对较慢,而且序列化以后的数据,相对来说,还是比较大,比较占用内存空间;因此,如果你的Spark应用程序对内存很敏感,那么实际上默认的Java序列化机制并不是最好的选择。
2、Spark提供的两种序列化机制
Spark提供了两种序列化机制,默认使用第一种;
1、Java序列化机制:默认情况下,Spark使用java自身的ObjectInputStream和ObjectOutputStream机制进行对象的序列化。只要实现Serializable接口,那么都是可以序列化的;而且Java序列化机制是提供了自定义序列化支持的,只要实现Externalizable接口,即可实现自己的更高性能的序列化算法;但是Java序列化机制的速度比较慢,而且序列化后的数据占用的内存空间比较大
2、Kryo序列化机制:Spark也支持使用Kryo类库来进行序列化。Kryo序列化机制比Java序列化机制更快,序列化后的数据占用的内存空间更小,通常要比Java序列化的数据占用的空间要小10倍左右。Kryo性能这么好,但是却不是spark默认序列化机制的原因是,有些类虽然实现了Serializable接口,但是它也不一定能够进行序列化;同时,如果要得到最佳的性能,Kryo好要求在Spark应用程序中,对所有需要序列化的类都进行注册。
3、Kryo序列化
-
使用方法
如果要使用Kryo序列化机制,首先要用SparkConf设置一个参数,new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),将Spark的序列化机制设置为KryoSerializer;这样,Spark在内部的一些操作,比如Shuffle,进行序列化时,就会使用Kryo类库进行高性能、快速、更低内存占用的序列化了。使用Kryo时,要求是需要序列化的类,是要预先进行注册的,以获得最佳性能——如果不注册的话,那么Kryo必须时刻保存类的全限定名,反而占用不少内存;Spark默认是对Scala中常用的类自动注册了Kryo的,都在ALLScalaRegistry类中。
但是,比如自己的算子中,使用了外部的自定义类的对象,那么还是需要将其进行注册。如果要注册自定义的类型,那么就使用如下的代码(Counter为自定义类):
//Scala版本:
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[Counter] ))
val sc = new SparkContext(conf)
//Java版本:
SparkConf conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Counter.class)
JavaSparkContext sc = new JavaSparkContext(conf)
-
Kryo类库使用优化
优化Kryo缓存大小,如果注册的要序列化的自定义的类,本身特别大,比如包含了超过100个field,那么就会导致要序列化的对象过大,此时就需要对Kryo本身进行优化;因为Kryo内部的缓存可能不够存放那么大的class对象;此时,就需要调用SparkConf.set()方法,设置spark.kryoserializer.buffer.mb参数的值,将其调大。
默认情况下,这个是2,表示最大能缓存2M的对象,然后进行序列化,可以在必要时将其调大,比如设置为10。预先注册自定义类。 (上面已经写过)其实,不注册自定义类,Kryo类库也能正常工作,但是那样的话,对于它要序列化的每个对象,都会保存一份它的全限定类名,此时反而会消耗大量内存。因此,通常都建议预先注册好序列化的自定义类
-
使用场景
Kryo序列化机制启动—SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")—以后,会生效的地方:1、当我们Spark应用中算子函数用到自定义的类,这个类包含100M的数据,此时,默认情况下,Spark会用java序列化机制来序列化这种外部的大对象,那么就会导致,序列化速度缓慢,并且序列化以后的数据还是比较大,比较占用内存空间。
因此,在这种情况下,比较适合,切换到Kryo序列化类库,来对外部的大对象进行序列化操作。一是,序列化速度会变快;二是,会减少序列化后的数据占用的内存空间。2、持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER:
优化内存的占用和消耗,持久化RDD占用的内存越少,task执行的时候,创建的对象就不至于频繁的占满内存,导致频繁的GC3、shuffle:优化网络传输性能
4、优化数据结构
要减少内存的消耗,除了使用高效的序列化类库以外,还有一个很重要的事情,就是优化数据结构,从而避免Java语法特性所导致的额外内存的开销,比如基于指针的java数据结构,以及包装类型。
-
优化什么数据结构
主要是优化算子函数内部使用到的局部数据,或者是算子函数外部的数据,都可以进行数据结构的优化,优化之后,都会减少其对内存的消耗和占用;1、优先使用数组以及字符串,而不是集合类。也就是说,优先使用array,而不是ArrayList、LinkedList、hashMap等集合。
比如,有个 List<Integer> list = new ArrayList<Integer>(),可以将其替换为int[] arr = new int[],这样的话,array既比List少了额外信息的存储开销,还能使用原始数据类型(int)来存储数据,比List中用integer这种包装类型存储数据要节省内存的多;
还比如,通常企业级应用中的做法是,对于HashMap List这种数据,统一用String拼接成特殊格式的字符串,比如,Map<Integer, Person> persons = new HashMap<Integer, Person>(),可以优化为,特殊的字符串格式:id:name,address|id:name,address...2、避免使用多层嵌套的对象结构。
比如说,public class Teacher { private List<Student> students = new ArrayList<Student>() } 就是非常不好的例子,因为Teacher类的内部又嵌套了大量的小Student对象,对于这种,完全可以使用特殊的字符串来进行数据的存储,其实,用json字符串来存储数据就是一个很好的选择;3、对于有些能够避免的场景,尽量使用int替代String。因为String虽然比ArrayList、HashMap等数据结构高效多了,占用内存量也少多了,但是还是有额外信息的消耗。
注意:在Spark应用中,id就不要用常用的uuid了,因为无法转成int,就用自增的int类型的id即可。
5、使用高性能的库fastutil
fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库,提供了特殊类型的map、set、list和queue;
fastutil能够提供更小的内存占用,更快的存取速度;我们使用fastutil提供的集合类,来替代自己平时使用的JDK的原生的Map、List、Set,好处在于,fastutil集合类,可以减小内存的占用,并且在进行集合的遍历、根据索引(或者key)获取元素的值和设置元素的值的时候,提供更快的存取速度;
fastutil也提供了64位的array、set和list,以及高性能快速的,以及实用的IO类,来处理二进制和文本类型的文件;fastutil最新版本要求Java 7以及以上版本;
fastutil的每一种集合类型,都实现了对应的Java中的标准接口(比如fastutil的map,实现了Java的Map接口),因此可以直接放入已有系统的任何代码中。
fastutil还提供了一些JDK标准类库中没有的额外功能(比如双向迭代器)。
fastutil除了对象和原始类型为元素的集合,fastutil也提供引用类型的支持,但是对引用类型是使用等于号(=)进行比较的,而不是equals()方法。fastutil尽量提供了在任何场景下都是速度最快的集合类库。
-
Spark中应用fastutil的场景:
1、如果算子函数使用了外部变量;那么第一,你可以使用Broadcast广播变量优化;第二,可以使用Kryo序列化类库,提升序列化性能和效率;第三,如果外部变量是某种比较大的集合,那么可以考虑使用fastutil改写外部变量,首先从源头上就减少内存的占用,通过广播变量进一步减少内存占用,再通过Kryo序列化类库进一步减少内存占用。2、在你的算子函数里,也就是task要执行的计算逻辑里面,如果有逻辑中出现,要创建比较大的Map、List等集合,可能会占用较大的内存空间,而且可能涉及到消耗性能的遍历、存取等集合操作;那么此时,可以考虑将这些集合类型使用fastutil类库重写,使用了fastutil集合类以后,就可以在一定程度上,减少task创建出来的集合类型的内存占用。避免executor内存频繁占满,频繁唤起GC,导致性能下降。
fastutil使用
<dependency>
<groupId>fastutil</groupId>
<artifactId>fastutil</artifactId>
<version>5.0.9</version>
</dependency>