套用官文Tuning Spark中的一句话作为文章的标题:
*Often, choose a serialization type will be the first thing you should tune to optimize a Spark application. *
- 分发给Executor上的Task
- 需要缓存的RDD(前提是使用序列化方式缓存)
- 广播变量
- Shuffle过程中的数据缓存
- 使用receiver方式接收的流数据缓存
- 算子函数中使用的外部变量
- 把数据序列化为字节数组、把字节数组反序列化为对象的操作,是会消耗CPU、延长作业时间的,从而降低了Spark的性能。
由于 Spark2.1.0默认对Task使用Java序列化(该序列化方式不允许修改,源码如下),
* Helper method to create a SparkEnv for a driver or an executor.
private def create(
conf: SparkConf,
executorId: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
isLocal: Boolean,
numUsableCores: Int,
ioEncryptionKey: Option[Array[Byte]],
listenerBus: LiveListenerBus = null,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
val closureSerializer = new JavaSerializer(conf) --Task闭包函数使用Java序列化库
其实从Spark 2.0.0版本开始,简单类型、简单类型数组、字符串类型的Shuffling RDDs 已经默认使用Kryo序列化方式了。
Property Name | Default | Meaning |
spark.serializer | org.apache.spark.serializer.JavaSerializer | Class to use for serializing objects that will be sent over the network or need to be cached in serialized form. The default of Java serialization works with any Serializable Java object but is quite slow, so we recommend using org.apache.spark.serializer.KryoSerializer and configuring Kryo serialization when speed is necessary. |
spark.kryoserializer.buffer | 64k | Initial size of Kryo's serialization buffer. Note that there will be one buffer per core on each worker. This buffer will grow up to spark.kryoserializer.buffer.max if needed. |
spark.kryoserializer.buffer.max | 64m | Maximum allowable size of Kryo serialization buffer. This must be larger than any object you attempt to serialize. Increase this if you get a "buffer limit exceeded" exception inside Kryo. |
spark.kryo.classesToRegister | (none) | If you use Kryo serialization, give a comma-separated list of custom class names to register with Kryo. See the tuning guide for more details. |
spark.kryo.referenceTracking | true | Whether to track references to the same object when serializing data with Kryo, which is necessary if your object graphs have loops and useful for efficiency if they contain multiple copies of the same object. Can be disabled to improve performance if you know this is not the case. |
spark.kryo.registrationRequired | false | Whether to require registration with Kryo. If set to 'true', Kryo will throw an exception if an unregistered class is serialized. If set to false (the default), Kryo will write unregistered class names along with each object. Writing class names can cause significant performance overhead, so enabling this option can enforce strictly that a user has not omitted classes from registration. |
spark.kryo.registrator | (none) | If you use Kryo serialization, give a comma-separated list of classes that register your custom classes with Kryo. This property is useful if you need to register your classes in a custom way, e.g. to specify a custom field serializer. Otherwise spark.kryo.classesToRegister is simpler. It should be set to classes that extend KryoRegistrator. See the tuning guide for more details. |
spark.kryo.unsafe | false | Whether to use unsafe based Kryo serializer. Can be substantially faster by using Unsafe Based IO. |
spark.kryo.unsafe:如果想更加提升性能,可以使用Kryo unsafe方式
- 设置序列化使用的库
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); //使用Kryo序列化库
- 在该库中注册用户定义的类型
conf.set("spark.kryo.registrator", toKryoRegistrator.class.getName()); //在Kryo序列化库中注册自定义的类集合
- 在自定义类中实现KryoRegistrator接口的registerClasses方法
public static class toKryoRegistrator implements KryoRegistrator {
public void registerClasses(Kryo kryo) {
kryo.register(tmp1.class, new FieldSerializer(kryo, tmp1.class)); //在Kryo序列化库中注册自定义的类
kryo.register(tmp2.class, new FieldSerializer(kryo, tmp2.class)); //在Kryo序列化库中注册自定义的类
import java.util.Arrays;
import java.util.Iterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.serializer.KryoRegistrator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import org.apache.spark.storage.StorageLevel;
import java.util.regex.Pattern;
import java.io.IOException;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.apache.spark.broadcast.Broadcast;
public final class javakryoserializer {
private static final Pattern SPACE = Pattern.compile(" ");
// This is our custom class we will configure Kyro to serialize
static class tmp1 implements java.io.Serializable {
public int total_;
public int num_;
static class tmp2 implements java.io.Serializable {
public tmp2 (String ss)
s = ss;
public String s;
public static class toKryoRegistrator implements KryoRegistrator {
public void registerClasses(Kryo kryo) {
kryo.register(tmp1.class, new FieldSerializer(kryo, tmp1.class)); //在Kryo序列化库中注册自定义的类
kryo.register(tmp2.class, new FieldSerializer(kryo, tmp2.class)); //在Kryo序列化库中注册自定义的类
public static void readToBuffer(StringBuffer buffer, String filePath) throws IOException {
InputStream is = new FileInputStream(filePath);
String line; // 用来保存每行读取的内容
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
line = reader.readLine(); // 读取第一行
while (line != null) { // 如果 line 为空说明读完了
buffer.append(line); // 将读到的内容添加到 buffer 中
buffer.append("\n"); // 添加换行符
line = reader.readLine(); // 读取下一行
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setMaster("local").setAppName("basicavgwithkyro");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); //使用Kryo序列化库,如果要使用Java序列化库,需要把该行屏蔽掉
conf.set("spark.kryo.registrator", toKryoRegistrator.class.getName()); //在Kryo序列化库中注册自定义的类集合,如果要使用Java序列化库,需要把该行屏蔽掉
JavaSparkContext sc = new JavaSparkContext(conf);
StringBuffer sb = new StringBuffer();
javakryoserializer.readToBuffer(sb, args[0]);
final Broadcast<tmp2> stringBV = sc.broadcast(new tmp2(sb.toString()));
JavaRDD<String> rdd1 = sc.textFile(args[1]);
JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) {
return Arrays.asList(SPACE.split(s)).iterator();
JavaRDD<Integer> rdd3 = rdd2.map(new Function<String, Integer>() {
public Integer call(String s) {
String length = stringBV.value().s; //只是为了使用广播变量stringBV,没有实际的意义
String tmp = length; //只是为了使用广播变量stringBV,没有实际的意义
return s.length();
JavaRDD<tmp1> rdd4 = rdd3.map(new Function<Integer, tmp1>() {
public tmp1 call(Integer x) {
tmp1 a = new tmp1(); //只是为了将rdd4中的元素类型转换为tmp1类型的对象,没有实际的意义
a.total_ += x;
a.num_ += 1;
return a;
rdd4.persist(StorageLevel.MEMORY_ONLY_SER()); //将rdd4以序列化的形式缓存在内存中,因为其元素是tmp1对象,所以使用Kryo的序列化方式缓存
System.out.println("the count is " + rdd4.count());
while (true) {} //调试命令,只是用来将程序挂住,方便在Driver 4040的WEB UI中观察rdd的storage情况
使用默认的Java序列化库的情况:缓存后的 rdd4占用内存空间137.7MB
使用Kryo序列化库的情况:缓存后的 rdd4占用内存空间38.5MB
Property Name | Default | Meaning |
spark.io.compression.codec | lz4 | The codec used to compress internal data such as RDD partitions, broadcast variables and shuffle outputs. By default, Spark provides three codecs: lz4, lzf, and snappy. |
spark.broadcast.compress | true | Whether to compress broadcast variables before sending them. Generally a good idea. |
spark.shuffle.compress | true | Whether to compress map output files. Generally a good idea. |
spark.shuffle.spill.compress | true | Whether to compress data spilled during shuffles. |
spark.rdd.compress | false | Whether to compress serialized RDD partitions (e.g. for StorageLevel.MEMORY_ONLY_SER in Java and Scala or StorageLevel.MEMORY_ONLY in Python). Can save substantial space at the cost of some extra CPU time. |
SparkConf 增加下面的配置
conf.set("spark.rdd.compress", "true");
