SerializerManager--SparkEnv

SparkEnv中有两个序列化的组件,分别是SerializerManager和closureSerializer

SerializerManager集成序列化、压缩、加密的一体化管理器;closureSerializer则是闭包序列化器,由JavaSerializer实现

SparkEnv的Serializer

SparkEnv中创建它们的代码如下

// Create an instance of the class with the given name, possibly initializing it with our conf
// 实例化class对象。这里可以抽成Utils
def instantiateClass[T](className: String): T = {
    // Class.forName(className)
    val cls = Utils.classForName(className)
    // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
    // SparkConf, then one taking no arguments
    try {
    // 根据sparkconf,boolean构造对象
    cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
        .newInstance(conf, new java.lang.Boolean(isDriver))
        .asInstanceOf[T]
    } catch {
    case _: NoSuchMethodException =>
        try {
// 失败时,只使用sparkconf构造对象
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
        } catch {
        case _: NoSuchMethodException =>
            // 兜底方案是使用默认无参构造函数
            cls.getConstructor().newInstance().asInstanceOf[T]
        }
    }
}

// Create an instance of the class named by the given SparkConf property, or defaultClassName
// if the property is not set, possibly initializing it with our conf
// 通过conf配置项的key获取对应的class value,不存在时defaultClassName
def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
    instantiateClass[T](conf.get(propertyName, defaultClassName))
}

// key: "spark.serializer",可以在conf中配置其他序列化实现: KryoSerializer
// default class: "org.apache.spark.serializer.JavaSerializer"
val serializer = instantiateClassFromConf[Serializer](
    "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")

// 构建序列化管理器对象
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
// 闭包序列化器JavaSerializer
val closureSerializer = new JavaSerializer(conf)

// "spark.io.encryption.enabled",默认false
val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
  // "spark.io.encryption.keySizeBits": 密钥长度,有128、192、256三种长度
  // "spark.io.encryption.keygen.algorithm": 加密算法,默认为HmacSHA1
  Some(CryptoStreamUtils.createKey(conf))
} else {
  None
}

用户可以通过"spark.serializer",配置其他序列化器实现。spark目前有JavaSerializer、KryoSerializer、UnsafeRowSerializer三种序列化实现

closureSerializer闭包固定是JavaSerializer,不能配置

分析SerializerManager

SerializerManager类代码

/**
 * Component which configures serialization, compression and encryption for various Spark
 * components, including automatic selection of which [[Serializer]] to use for shuffles.
 */
private[spark] class SerializerManager(
    defaultSerializer: Serializer,
    conf: SparkConf,
    encryptionKey: Option[Array[Byte]]) {

  def this(defaultSerializer: Serializer, conf: SparkConf) = this(defaultSerializer, conf, None)
  // 创建KryoSerializer对象
  private[this] val kryoSerializer = new KryoSerializer(conf)

  // 设置Serializer的类加载器classloader。主要在kryoSerializer类的newKryo()方法里,Class.forName(classname, true, classloader)
  // https://issues-test.apache.org/jira/browse/SPARK-21928
  // 当用户自定义实现KryoRegistrator接口,register用户的类时,netty进行MessageToMessageDecoder会报ClassNotFoundException,所以newKryo()方法需要切换ClassLoader
  // 加载用户自定义的class文件时,切记ClassLoader的切换
  def setDefaultClassLoader(classLoader: ClassLoader): Unit = {
    kryoSerializer.setDefaultClassLoader(classLoader)
  }

  private[this] val stringClassTag: ClassTag[String] = implicitly[ClassTag[String]]
  
  // 原生类型及原生类型的数组类型: Boolean、Array[boolean]、Int、Array[int]
  private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = {
    val primitiveClassTags = Set[ClassTag[_]](
      ClassTag.Boolean,
      ClassTag.Byte,
      ClassTag.Char,
      ClassTag.Double,
      ClassTag.Float,
      ClassTag.Int,
      ClassTag.Long,
      ClassTag.Null,
      ClassTag.Short
    )
    val arrayClassTags = primitiveClassTags.map(_.wrap)
    primitiveClassTags ++ arrayClassTags
  }

  // 广播对象、Shuffle输出数据、RDD、溢出到磁盘的Shuffle数据,是否压缩配置
  // Whether to compress broadcast variables that are stored
  private[this] val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true)
  // Whether to compress shuffle output that are stored
  private[this] val compressShuffle = conf.getBoolean("spark.shuffle.compress", true)
  // Whether to compress RDD partitions that are stored serialized
  private[this] val compressRdds = conf.getBoolean("spark.rdd.compress", false)
  // Whether to compress shuffle output temporarily spilled to disk
  private[this] val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true)

  /* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
   * the initialization of the compression codec until it is first used. The reason is that a Spark
   * program could be using a user-defined codec in a third party jar, which is loaded in
   * Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
   * loaded yet. */
// 提供4种压缩方法,默认是<"spark.io.compression.codec", "lz4">
// "lz4" -> LZ4CompressionCodec -> lz4-java
// "lzf" -> LZFCompressionCodec -> compress-lzf
// "snappy" -> SnappyCompressionCodec -> snappy-java
// "zstd" -> ZStdCompressionCodec -> zstd-jni
// conf配置"spark.io.compression.codec"对应的class,反射获取对象
  private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)
  
  // 是否支持加密
  def encryptionEnabled: Boolean = encryptionKey.isDefined
  
  // 根据基础类型判定是否使用kryo: Boolean、Double、Int、Long、String、Array等
  def canUseKryo(ct: ClassTag[_]): Boolean = {
    primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
  }

  // SPARK-18617: As feature in SPARK-13990 can not be applied to Spark Streaming now. The worst
  // result is streaming job based on `Receiver` mode can not run on Spark 2.x properly. It may be
  // a rational choice to close `kryo auto pick` feature for streaming in the first step.
  // autoPick: !blockId.isInstanceOf[StreamBlockId]
  // Receiver模式的流计算建议关闭kryo,使用JavaSerializer
  def getSerializer(ct: ClassTag[_], autoPick: Boolean): Serializer = {
    if (autoPick && canUseKryo(ct)) {
      kryoSerializer
    } else {
      defaultSerializer
    }
  }

  /**
   * Pick the best serializer for shuffling an RDD of key-value pairs.
   */
  // 校验keyClass、valueClass是否可以使用kryo
  def getSerializer(keyClassTag: ClassTag[_], valueClassTag: ClassTag[_]): Serializer = {
    if (canUseKryo(keyClassTag) && canUseKryo(valueClassTag)) {
      kryoSerializer
    } else {
      defaultSerializer
    }
  }

  // 类型模式匹配,返回是否需要压缩的配置项
  private def shouldCompress(blockId: BlockId): Boolean = {
    blockId match {
      case _: ShuffleBlockId => compressShuffle
      case _: BroadcastBlockId => compressBroadcast
      case _: RDDBlockId => compressRdds
      case _: TempLocalBlockId => compressShuffleSpill
      case _: TempShuffleBlockId => compressShuffle
      case _ => false
    }
  }

  /**
   * Wrap an input stream for encryption and compression
   */
   // 用wrapForEncryption、wrapForCompression装饰InputStream
  def wrapStream(blockId: BlockId, s: InputStream): InputStream = {
    wrapForCompression(blockId, wrapForEncryption(s))
  }

  /**
   * Wrap an output stream for encryption and compression
   */
   // 用wrapForEncryption、wrapForCompression装饰OutputStream
  def wrapStream(blockId: BlockId, s: OutputStream): OutputStream = {
    wrapForCompression(blockId, wrapForEncryption(s))
  }

  /**
   * Wrap an input stream for encryption if shuffle encryption is enabled
   */
   // 使用apache common-crypto实现AES加密。注意CryptoParams类里conf的生成实现
   // CryptoUtils.toCryptoConf: 将spark的key转成Crypto对应的key
  def wrapForEncryption(s: InputStream): InputStream = {
    encryptionKey
      .map { key => CryptoStreamUtils.createCryptoInputStream(s, conf, key) }
      .getOrElse(s)
  }

  /**
   * Wrap an output stream for encryption if shuffle encryption is enabled
   */
  def wrapForEncryption(s: OutputStream): OutputStream = {
    encryptionKey
      .map { key => CryptoStreamUtils.createCryptoOutputStream(s, conf, key) }
      .getOrElse(s)
  }

  /**
   * Wrap an output stream for compression if block compression is enabled for its block type
   */
   // InputStream、OutputStream使用装饰器模式添加Compress功能
  def wrapForCompression(blockId: BlockId, s: OutputStream): OutputStream = {
    if (shouldCompress(blockId)) compressionCodec.compressedOutputStream(s) else s
  }

  /**
   * Wrap an input stream for compression if block compression is enabled for its block type
   */
  def wrapForCompression(blockId: BlockId, s: InputStream): InputStream = {
    if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s
  }

  /** Serializes into a stream. */
  // 序列化values数据到OutputStream
  def dataSerializeStream[T: ClassTag](
      blockId: BlockId,
      outputStream: OutputStream,
      values: Iterator[T]): Unit = {
    // 用BufferedOutputStream包装,缓冲
    val byteStream = new BufferedOutputStream(outputStream)
    val autoPick = !blockId.isInstanceOf[StreamBlockId]
    // 先获取KryoSerializer,后获取KryoSerializerInstance实例
    val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
    // Instance实例获取序列化流KryoSerializationStream,values迭代数据写入output
    ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
  }

  /** Serializes into a chunked byte buffer. */
  def dataSerialize[T: ClassTag](
      blockId: BlockId,
      values: Iterator[T]): ChunkedByteBuffer = {
    dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]])
  }

  /** Serializes into a chunked byte buffer. */
  // 将values数据序列化到分块字节缓冲区ArrayBuffer[ByteBuffer]
  def dataSerializeWithExplicitClassTag(
      blockId: BlockId,
      values: Iterator[_],
      classTag: ClassTag[_]): ChunkedByteBuffer = {
    // 即ByteBuffer.allocate(4*1024*1024),每个块大小
    val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)
    val byteStream = new BufferedOutputStream(bbos)
    val autoPick = !blockId.isInstanceOf[StreamBlockId]
    val ser = getSerializer(classTag, autoPick).newInstance()
    ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
    // ArrayBuffer[ByteBuffer] -> Array[ByteBuffer]
    bbos.toChunkedByteBuffer
  }

  /**
   * Deserializes an InputStream into an iterator of values and disposes of it when the end of
   * the iterator is reached.
   */
   // 反序列化InputStream,返回Iterator[T]
  def dataDeserializeStream[T](
      blockId: BlockId,
      inputStream: InputStream)
      (classTag: ClassTag[T]): Iterator[T] = {
    val stream = new BufferedInputStream(inputStream)
    val autoPick = !blockId.isInstanceOf[StreamBlockId]
    getSerializer(classTag, autoPick)
      .newInstance()
      .deserializeStream(wrapForCompression(blockId, stream))
      .asIterator.asInstanceOf[Iterator[T]]
  }
}

SerializerManager类中data开头的Serialize、Deserialize方法,都默认先对InputStream、OutputStream进行Buffered缓冲包装,然后wrapForCompression(blockId, stream)对流进行压缩处理

分析CompressionCodec类

这里摘取CompressionCodec类重要的代码片段进行分析

首先是压缩InputStream、OutputStream的trait抽象

trait CompressionCodec {

  def compressedOutputStream(s: OutputStream): OutputStream

  def compressedInputStream(s: InputStream): InputStream
}

再分别实现lz4、lzf、snappy、zstd压缩算法: lz4-java,compress-lzf,snappy-java,zstd-jni

// lz4
class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {
  override def compressedOutputStream(s: OutputStream): OutputStream = {
    val blockSize = conf.getSizeAsBytes("spark.io.compression.lz4.blockSize", "32k").toInt
    new LZ4BlockOutputStream(s, blockSize)
  }

  override def compressedInputStream(s: InputStream): InputStream = {
    val disableConcatenationOfByteStream = false
    new LZ4BlockInputStream(s, disableConcatenationOfByteStream)
  }
}

// lzf
class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
  override def compressedOutputStream(s: OutputStream): OutputStream = {
    new LZFOutputStream(s).setFinishBlockOnFlush(true)
  }

  override def compressedInputStream(s: InputStream): InputStream = new LZFInputStream(s)
}

// snappy
class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
  // Snappy.getNativeLibraryVersion
  val version = SnappyCompressionCodec.version

  override def compressedOutputStream(s: OutputStream): OutputStream = {
    val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
    // 这里使用自定义的SnappyOutputStreamWrapper类包装(因为SnappyOutputStream类的close方法不是幂等的,当两个SnappyOutputStream对象共用同一个Buffer时,其中一个close会导致另一个引用出错)
    // snappy-java:1.1.2已经修复,可以直接返回SnappyOutputStream对象
    new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
  }

  override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
}

// zstd。使用Buffered缓冲,减少调用JNI方法频次,降低开销
class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec {
  private val bufferSize = conf.getSizeAsBytes("spark.io.compression.zstd.bufferSize", "32k").toInt
  // Default compression level for zstd compression to 1 because it is
  // fastest of all with reasonably high compression ratio.
  private val level = conf.getInt("spark.io.compression.zstd.level", 1)

  override def compressedOutputStream(s: OutputStream): OutputStream = {
    // Wrap the zstd output stream in a buffered output stream, so that we can
    // avoid overhead excessive of JNI call while trying to compress small amount of data.
    new BufferedOutputStream(new ZstdOutputStream(s, level), bufferSize)
  }

  override def compressedInputStream(s: InputStream): InputStream = {
    // Wrap the zstd input stream in a buffered input stream so that we can
    // avoid overhead excessive of JNI call while trying to uncompress small amount of data.
    new BufferedInputStream(new ZstdInputStream(s), bufferSize)
  }
}

Serializer类

abstract class SerializerInstance {
  def serialize[T: ClassTag](t: T): ByteBuffer

  def deserialize[T: ClassTag](bytes: ByteBuffer): T

  def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T

  def serializeStream(s: OutputStream): SerializationStream

  def deserializeStream(s: InputStream): DeserializationStream
}

定义序列化、反序列化接口

分析KryoSerializer类

spark的KryoSerializer实现相对复杂,使用twitter-chill开源的scala kryo库,其封装了kryopool,register基础class等操作。下面提供一个日常开发的Kryo工具类

public class KryoSerDe {

    private static KryoPool pool = new KryoPool.Builder(() -> {
        Kryo kryo = new Kryo();

        // 关闭循环引用,节约空间。但对象有循环嵌套时,可能会出现StackOverflowError
        kryo.setReferences(false);
        kryo.setRegistrationRequired(false);
        kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
        return kryo;
    }).softReferences().build();

    /**
     * 使用writeObject只序列化对象,不记录类信息。反序列时readObject+Class
     * 使用writeClassAndObject时,序列对象和类信息。反序列化readClassAndObject,不需要Class
     *
     * @param t
     * @param <T>
     * @return
     */
    public static <T> byte[] serialize(T t) {
        Validate.notNull(t);

        // apache common-io, not java.io
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        Output output = new Output(stream);

        pool.run(kryo -> {
            kryo.writeClassAndObject(output, t);
            return output;
        });
        output.close();

        return stream.toByteArray();
    }

    public static <T> T deserialize(byte[] bytes) {
        if (bytes == null) {
            return null;
        }

        Input input = new Input(new ByteArrayInputStream(bytes));
        T t = (T) pool.run(kryo -> kryo.readClassAndObject(input));
        input.close();

        return t;
    }
}

Kryo官方库使用ConcurrentLinkedQueue、SoftReferenceQueue实现KryoPool对象池,值得借鉴: KryoPool接口定义borrow、release方法;KryoFactory定义create方法构建Kryo对象;KryoCallback封装业务代码,屏蔽borrow、release操作,类似JedisTemplate思想;KryoPoolQueueImpl是具体操作实现

可以事先对Class进行register,这样kryo序列化时会用整数代替类名,节省空间。或者extends Serializer,自定义对象的序列化、反序列化实现

Spark的KryoSerializer类自定义了RoaringBitmap对象的序列化、反序列化实现

classOf[RoaringBitmap] -> new KryoClassSerializer[RoaringBitmap]() {
  override def write(kryo: Kryo, output: KryoOutput, bitmap: RoaringBitmap): Unit = {
    bitmap.serialize(new KryoOutputObjectOutputBridge(kryo, output))
  }
  override def read(kryo: Kryo, input: KryoInput, cls: Class[RoaringBitmap]): RoaringBitmap = {
    val ret = new RoaringBitmap
    ret.deserialize(new KryoInputObjectInputBridge(kryo, input))
    ret
  }
}

RoaringBitmap的deserialize方法入参是DataInput,而KryoInput继承于InputStream类,所以定义KryoInputObjectInputBridge类,同时实现FilterInputStream,ObjectInput

private[spark] class KryoInputObjectInputBridge(
    kryo: Kryo, input: KryoInput) extends FilterInputStream(input) with ObjectInput {
  override def readLong(): Long = input.readLong()
  override def readChar(): Char = input.readChar()
  override def readFloat(): Float = input.readFloat()
  override def readByte(): Byte = input.readByte()
  override def readShort(): Short = input.readShort()
  override def readUTF(): String = input.readString() // readString in kryo does utf8
  override def readInt(): Int = input.readInt()
  override def readUnsignedShort(): Int = input.readShortUnsigned()
  override def skipBytes(n: Int): Int = {
    input.skip(n)
    n
  }
  override def readFully(b: Array[Byte]): Unit = input.read(b)
  override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len)
  override def readLine(): String = throw new UnsupportedOperationException("readLine")
  override def readBoolean(): Boolean = input.readBoolean()
  override def readUnsignedByte(): Int = input.readByteUnsigned()
  override def readDouble(): Double = input.readDouble()
  override def readObject(): AnyRef = kryo.readClassAndObject(input)
}

继承FilterInputStream类,可以封装其他流,提供额外功能;ObjectInput是对象与流的转换接口

这样KryoInputObjectInputBridge类既能对接InputStream,也能对接ObjectInput。而不是单纯的实现InputStream到ObjectInput的转换或者ObjectInput到InputStream的转换,KryoOutputObjectOutputBridge类同理

分析JavaSerializer类

JavaSerializer实现Externalizable接口的writeExternal()、readExternal()方法,用来自定义类的序列化、反序列化字段: counterReset、extraDebugInfo

Externalizable继承于Serializable接口。当类实现Serializable接口时,默认所有字段都序列化;transient+Serializable可以实现部分字段的序列化;类实现Externalizable接口,可以自定义序列化实现

class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
  private var counterReset = conf.getInt("spark.serializer.objectStreamReset", 100)
  private var extraDebugInfo = conf.getBoolean("spark.serializer.extraDebugInfo", true)

  protected def this() = this(new SparkConf())  // For deserialization only

  override def newInstance(): SerializerInstance = {
    val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)
    new JavaSerializerInstance(counterReset, extraDebugInfo, classLoader)
  }

  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
    out.writeInt(counterReset)
    out.writeBoolean(extraDebugInfo)
  }

  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
    counterReset = in.readInt()
    extraDebugInfo = in.readBoolean()
  }
}

日常开发实现的java标准序列化、反序列化工具类

public class JavaSerDe {

    public static <T extends Serializable> byte[] serialize(T t) {
        Objects.requireNonNull(t);

        try {
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            ObjectOutputStream objOut = new ObjectOutputStream(bos);

            objOut.writeObject(t);
            objOut.close();
            return bos.toByteArray();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static <T extends Serializable> T deserialize(byte[] bytes) {
        if (bytes == null) {
            return null;
        }

        try {
            ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
            ObjectInputStream objIn = new ObjectInputStream(bis);

            objIn.close();
            return (T) objIn.readObject();
        } catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }
}

当反序列化需要用到ClassLoader时,需要重写ObjectInputStream类的resolveClass方法,传入ClassLoader

ObjectInputStream objIn = new ObjectInputStream(bis) {
    @Override
    protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
        String name = desc.getName();
        try {
            // 这里的loader可以外界传入
            return Class.forName(name, false, loader);
        } catch (ClassNotFoundException ex) {
            // java基本类型,直接返回
            Class<?> cl = primClasses.get(name);
            if (cl != null) {
                return cl;
            } else {
                throw ex;
            }
        }
    }
};

// 下面这段代码是ObjectInputStream:223的源码,spark的JavaDeserializationStream:60也是类似代码
private static final HashMap<String, Class<?>> primClasses
        = new HashMap<>(8, 1.0F);
static {
    primClasses.put("boolean", boolean.class);
    primClasses.put("byte", byte.class);
    primClasses.put("char", char.class);
    primClasses.put("short", short.class);
    primClasses.put("int", int.class);
    primClasses.put("long", long.class);
    primClasses.put("float", float.class);
    primClasses.put("double", double.class);
    primClasses.put("void", void.class);
}

剖析使用ObjectOutputStream可能引起的内存泄漏

关于ObjectOutputStream内存溢出和JVisualVM堆分析使用

ObjectOutputStream类的writeObject方法在调用时,会持有序列化对象的引用,所以对同一个ObjectOutputStream对象不断调用writeObject方法,会导致序列化对象无法回收,从而内存泄漏

两种解决方案:

  1. 避免ObjectOutputStream成为类变量,也就是每次使用时进行new,最后close
  2. 当需要同一个ObjectOutputStream对象,多次调用writeObject方法时,切记进行reset,清除持有的引用
private[spark] class JavaSerializationStream(
    out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
  extends SerializationStream {
  private val objOut = new ObjectOutputStream(out)
  private var counter = 0

  /**
   * Calling reset to avoid memory leak:
   * http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
   * But only call it every 100th time to avoid bloated serialization streams (when
   * the stream 'resets' object class descriptions have to be re-written)
   */
  def writeObject[T: ClassTag](t: T): SerializationStream = {
    try {
      objOut.writeObject(t)
    } catch {
      case e: NotSerializableException if extraDebugInfo =>
        throw SerializationDebugger.improveException(t, e)
    }
    counter += 1
    // 每次调用+1,超过100就reset。提高性能,也避免内存泄漏
    if (counterReset > 0 && counter >= counterReset) {
      objOut.reset()
      counter = 0
    }
    this
  }

  def flush() { objOut.flush() }
  def close() { objOut.close() }
}

Spark的objOut.writeObject对writeObject方法进行了调用count计数,当超过设置的阈值: conf.getInt("spark.serializer.objectStreamReset", 100)时,reset()!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容