一、高性能序列化类库
- 数据序列化概述
1)在任何分布式系统中,序列化都是扮演着一个重要的角色的。如果使用的序列化技术,在执行序列化操作的时候很慢,或者是序列化后的数据还是很大,那么会让分布式应用程序的性能下降很多。所以,进行Spark性能优化的第一步,就是进行序列化的性能优化。
2)Spark自身默认就会在一些地方对数据进行序列化,比如Shuffle。还有就是,如果我们的算子函数使用到了外部的数据(比如Java内置类型,或者自定义类型),那么也需要让其可序列化。
3)而Spark自身对于序列化的便捷性和性能进行了一个取舍和权衡。默认,Spark倾向于序列化的便捷性,使用了Java自身提供的序列化机制——基于ObjectInputStream和ObjectOutputStream的序列化机制。因为这种方式是Java原生提供的,很方便使用。
4)但是问题是,Java序列化机制的性能并不高。序列化的速度相对较慢,而且序列化以后的数据,还是相对来说比较大,还是比较占用内存空间。因此,如果你的Spark应用程序对内存很敏感,那么,实际上默认的Java序列化机制并不是最好的选择。
二、Spark提供的两种序列化机制
- java序列化机制
默认情况下、Spark使用自身的ObjectInputStream和ObjectOutputStream机制进行对象的序列化。只要你实现了Serializable接口,那么都是可以序列化的,而且Java序列化机制是提供了自定义序列化支持的,只要你实现Externalizable接口即可实习自己的更高性能的序列化算法,java序列化机制的速度比较慢,而且序列化后的数据占有的内存空间比较大。
- Kryo序列化机制
Kryo序列化机制比java序列化机制更快,而且序列化机制的数据占用的空间更小,通常比java序列化的数据占用空间要小10倍。Kryo序列化机制之所以不是默认序列化机制的原因是,有些类型虽然实现了Seriralizable接口,但是也不一定能够进行序列化;此外,如果你要得到序列化最佳的性能,Kryo还要要求你在Spark应用程序中,对所有你需要序列化的类型都进行注册。
- 如何使用Kryo序列化机制
(1) 如果使用Kryo序列化机制,首先要用SparkConf设置一个参数,使用new SparkConf().set("spark.serializer","org.apache.spark.serializer.KryoSerializer")即可,即将spark的序列化器设置为KryoSerializer。这样,Spark在内部的一些操作,比如Shuffle,进行序列化时,就会使用Kryo类库进行高性能、快速、更低内存占用量的序列化了。
使用Kryo时,它要求需要序列化的类,是要预先进行注册的,以获得最佳性能。如果不注册的话,那么Kryo必须时刻保存类型的全限定类名,反而占用不少内存。Spark默认是对scala中常用的类型自动注册Kryo的,都在AllScalaRegistry类中。
(2) 如果要使用自定义的类型
Scala版本:
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[Counter] ))
val sc = new SparkContext(conf)
Java版本:
SparkConf conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Counter.class)
JavaSparkContext sc = new JavaSparkContext(conf)
三、优化Kryo类库的使用
- 优化缓存大小
如果注册的要序列化的自定义的类型,本身特别大,比如包含超过100个field。那么就会导致要序列化的对象过大。此时就需要对Kryo本身进行优化。因此Kryo内存的缓存可能不够存放那么大的class对象。此时就需要调用SparkConf.set()方法,设置spark.kryoserializer.buffer.mb参数的值,将其调大。默认情况是2M
- 预先注册自定义类型
虽然不注册自定义类型,Kryo类库也能正常工作,但是那样的话,对于它要序列化的每个对象,都会保存一份它的全限定类名。此时反而会耗费大量内存。因此通常都建议预先注册号要序列化的自定义的类。
四、Kryo使用场景
- 自定义了一个外部的大对象