对Spark的那些【魔改】

前言

这两年做streamingpro时,不可避免的需要对Spark做大量的增强。就如同我之前吐槽的,Spark大量使用了new进行对象的创建,导致里面的实现基本没有办法进行替换。

比如SparkEnv里有个属性叫closureSerializer,是专门做任务的序列化反序列化的,当然也负责对函数闭包的序列化反序列化。我们看看内部是怎么实现的:

val serializer = instantiateClassFromConf[Serializer](
      "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
    logDebug(s"Using serializer: ${serializer.getClass}")

    val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)

    val closureSerializer = new JavaSerializer(conf)

val envInstance = new SparkEnv(
.....
 closureSerializer, ....

这里直接new了一个JavaSerializer,并不能做配置。如果不改源码,你没有任何办法可以替换掉掉这个实现。同理,如果我想替换掉Executor的实现,基本也是不可能的。

今年有两个大地方涉及到了对Spark的【魔改】,也就是不通过改源码,使用原有发型包,通过添加新代码的方式来对Spark进行增强。

二层RPC的支持

我们知道,在Spark里,我们只能通过Task才能touch到Executor。现有的API你是没办法直接操作到所有或者指定部分的Executor。比如,我希望所有Executor都加载一个资源文件,现在是没办法做到的。为了能够对Executor进行直接的操作,那就需要建立一个新的通讯层。那具体怎么做呢?
首先,在Driver端建立一个Backend,这个比较简单,

class PSDriverBackend(sc: SparkContext) extends Logging {

  val conf = sc.conf
  var psDriverRpcEndpointRef: RpcEndpointRef = null

  def createRpcEnv = {
    val isDriver = sc.env.executorId == SparkContext.DRIVER_IDENTIFIER
    val bindAddress = sc.conf.get(DRIVER_BIND_ADDRESS)
    val advertiseAddress = sc.conf.get(DRIVER_HOST_ADDRESS)
    var port = sc.conf.getOption("spark.ps.driver.port").getOrElse("7777").toInt
    val ioEncryptionKey = if (sc.conf.get(IO_ENCRYPTION_ENABLED)) {
      Some(CryptoStreamUtils.createKey(sc.conf))
    } else {
      None
    }
    logInfo(s"setup ps driver rpc env: ${bindAddress}:${port} clientMode=${!isDriver}")
    var createSucess = false
    var count = 0
    val env = new AtomicReference[RpcEnv]()
    while (!createSucess && count < 10) {
      try {
        env.set(RpcEnv.create("PSDriverEndpoint", bindAddress, port, sc.conf,
          sc.env.securityManager, clientMode = !isDriver))
        createSucess = true
      } catch {
        case e: Exception =>
          logInfo("fail to create rpcenv", e)
          count += 1
          port += 1
      }
    }
    if (env.get() == null) {
      logError(s"fail to create rpcenv finally with attemp ${count} ")
    }
    env.get()
  }

  def start() = {
    val env = createRpcEnv
    val pSDriverBackend = new PSDriverEndpoint(sc, env)
    psDriverRpcEndpointRef = env.setupEndpoint("ps-driver-endpoint", pSDriverBackend)
  }

}

这样,你可以理解为在Driver端启动了一个PRC Server。要运行这段代码也非常简单,直接在主程序里运行即可:

// parameter server should be enabled by default
    if (!params.containsKey("streaming.ps.enable") || params.get("streaming.ps.enable").toString.toBoolean) {
      logger.info("ps enabled...")
      if (ss.sparkContext.isLocal) {
        localSchedulerBackend = new LocalPSSchedulerBackend(ss.sparkContext)
        localSchedulerBackend.start()
      } else {
        logger.info("start PSDriverBackend")
        psDriverBackend = new PSDriverBackend(ss.sparkContext)
        psDriverBackend.start()
      }
    }

这里我们需要实现local模式和cluster模式两种。

Driver启动了一个PRC Server,那么Executor端如何启动呢?Executor端似乎没有任何一个地方可以让我启动一个PRC Server? 其实有的,只是非常trick,我们知道Spark是允许自定义Metrics的,并且会调用用户实现的metric特定的方法,我们只要开发一个metric Sink,在里面启动RPC Server,骗过Spark即可。具体时下如下:

class PSServiceSink(val property: Properties, val registry: MetricRegistry,
                    securityMgr: SecurityManager) extends Sink with Logging {
  def env = SparkEnv.get

  var psDriverUrl: String = null
  var psExecutorId: String = null
  var hostname: String = null
  var cores: Int = 0
  var appId: String = null
  val psDriverPort = 7777
  var psDriverHost: String = null
  var workerUrl: Option[String] = None
  val userClassPath = new mutable.ListBuffer[URL]()

  def parseArgs = {
    //val runtimeMxBean = ManagementFactory.getRuntimeMXBean();
    //var argv = runtimeMxBean.getInputArguments.toList
    var argv = System.getProperty("sun.java.command").split("\\s+").toList

   .....
    psDriverHost = host
    psDriverUrl = "spark://ps-driver-endpoint@" + psDriverHost + ":" + psDriverPort
  }

  parseArgs

  def createRpcEnv = {
    val isDriver = env.executorId == SparkContext.DRIVER_IDENTIFIER
    val bindAddress = hostname
    val advertiseAddress = ""
    val port = env.conf.getOption("spark.ps.executor.port").getOrElse("0").toInt
    val ioEncryptionKey = if (env.conf.get(IO_ENCRYPTION_ENABLED)) {
      Some(CryptoStreamUtils.createKey(env.conf))
    } else {
      None
    }
    //logInfo(s"setup ps driver rpc env: ${bindAddress}:${port} clientMode=${!isDriver}")
    RpcEnv.create("PSExecutorBackend", bindAddress, port, env.conf,
      env.securityManager, clientMode = !isDriver)
  }

  override def start(): Unit = {

    new Thread(new Runnable {
      override def run(): Unit = {
        logInfo(s"delay PSExecutorBackend 3s")
        Thread.sleep(3000)
        logInfo(s"start PSExecutor;env:${env}")
        if (env.executorId != SparkContext.DRIVER_IDENTIFIER) {
          val rpcEnv = createRpcEnv
          val pSExecutorBackend = new PSExecutorBackend(env, rpcEnv, psDriverUrl, psExecutorId, hostname, cores)
          PSExecutorBackend.executorBackend = Some(pSExecutorBackend)
          rpcEnv.setupEndpoint("ps-executor-endpoint", pSExecutorBackend)
        }
      }
    }).start()

  }
...
}

到这里,我们就能成功启动RPC Server,并且连接上Driver中的PRC Server。现在,你就可以在不修改Spark 源码的情况下,尽情的写通讯相关的代码了,让你可以更好的控制Executor。

比如在PSExecutorBackend 实现如下代码:

 override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case Message.TensorFlowModelClean(modelPath) => {
      logInfo("clean tensorflow model")
      TFModelLoader.close(modelPath)
      context.reply(true)
    }
    case Message.CopyModelToLocal(modelPath, destPath) => {
      logInfo(s"copying model: ${modelPath} -> ${destPath}")
      HDFSOperator.copyToLocalFile(destPath, modelPath, true)
      context.reply(true)
    }
  }

接着你就可以在Spark里写如下的代码调用了:

val psDriverBackend = runtime.asInstanceOf[SparkRuntime].psDriverBackend    psDriverBackend.psDriverRpcEndpointRef.send(Message.TensorFlowModelClean("/tmp/ok"))

是不是很酷。

修改闭包的序列化方式

Spark的任务调度开销非常大。对于一个复杂的任务,业务逻辑代码执行时间大约是3-7ms,但是整个spark运行的开销大概是1.3s左右。

经过详细dig发现,sparkContext里RDD转化时,会对函数进行clean操作,clean操作的过程中,默认会检查是不是能序列化(就是序列化一遍,没抛出异常就算可以序列化)。而序列化成本相当高(默认使用的JavaSerializer并且对于函数和任务序列化,是不可更改的),单次序列化耗时就达到200ms左右,在local模式下对其进行优化,可以减少600ms左右的请求时间。

当然,需要申明的是,这个是针对local模式进行修改的。那具体怎么做的呢?

我们先看看Spark是怎么调用序列化函数的,首先在SparkContext里,clean函数是这样的:

 private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
    ClosureCleaner.clean(f, checkSerializable)
    f
  }

调用的是ClosureCleaner.clean方法,该方法里是这么调用学序列化的:

try {
      if (SparkEnv.get != null) {
        SparkEnv.get.closureSerializer.newInstance().serialize(func)
      }
    } catch {
      case ex: Exception => throw new SparkException("Task not serializable", ex)
    }

SparkEnv是在SparkContext初始化的时候创建的,该对象里面包含了closureSerializer,该对象通过new JavaSerializer创建。既然序列化太慢,又因为我们其实是在Local模式下,本身是可以不需要序列化的,所以我们这里想办法把closureSerializer的实现替换掉。正如我们前面吐槽,因为在Spark代码里写死了,没有暴露任何自定义的可能性,所以我们又要魔改一下了。

首先,我们新建一个SparkEnv的子类:

class WowSparkEnv(
                   ....) extends SparkEnv(

接着实现一个自定义的Serializer:

class LocalNonOpSerializerInstance(javaD: SerializerInstance) extends SerializerInstance {

  private def isClosure(cls: Class[_]): Boolean = {
    cls.getName.contains("$anonfun$")
  }

  override def serialize[T: ClassTag](t: T): ByteBuffer = {
    if (isClosure(t.getClass)) {
      val uuid = UUID.randomUUID().toString
      LocalNonOpSerializerInstance.maps.put(uuid, t.asInstanceOf[AnyRef])
      ByteBuffer.wrap(uuid.getBytes())
    } else {
      javaD.serialize(t)
    }

  }

  override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
    val s = StandardCharsets.UTF_8.decode(bytes).toString()
    if (LocalNonOpSerializerInstance.maps.containsKey(s)) {
      LocalNonOpSerializerInstance.maps.remove(s).asInstanceOf[T]
    } else {
      bytes.flip()
      javaD.deserialize(bytes)
    }

  }

  override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
    val s = StandardCharsets.UTF_8.decode(bytes).toString()
    if (LocalNonOpSerializerInstance.maps.containsKey(s)) {
      LocalNonOpSerializerInstance.maps.remove(s).asInstanceOf[T]
    } else {
      bytes.flip()
      javaD.deserialize(bytes, loader)
    }
  }

  override def serializeStream(s: OutputStream): SerializationStream = {
    javaD.serializeStream(s)
  }

  override def deserializeStream(s: InputStream): DeserializationStream = {
    javaD.deserializeStream(s)
  }

接着我们需要再封装一个LocalNonOpSerializer,

class LocalNonOpSerializer(conf: SparkConf) extends Serializer with Externalizable {
  val javaS = new JavaSerializer(conf)

  override def newInstance(): SerializerInstance = {
    new LocalNonOpSerializerInstance(javaS.newInstance())
  }

  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
    javaS.writeExternal(out)
  }

  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
    javaS.readExternal(in)
  }
}

现在,万事俱备,只欠东风了,我们怎么才能把这些代码让Spark运行起来。具体做法非常魔幻,实现一个enhance类:

def enhanceSparkEnvForAPIService(session: SparkSession) = {
      val env = SparkEnv.get
   //创建一个新的WowSparkEnv对象,然后将里面的Serializer替换成我们自己的LocalNonOpSerializer
    val wowEnv = new WowSparkEnv(
 .....
      new LocalNonOpSerializer(env.conf): Serializer,
 ....)
    // 将SparkEnv object里的实例替换成我们的
    //WowSparkEnv
    SparkEnv.set(wowEnv)
  //但是很多地方在SparkContext启动后都已经在使用之前就已经生成的SparkEnv,我们需要做些调整
//我们先把之前已经启动的LocalSchedulerBackend里的scheduer停掉
    val localScheduler = session.sparkContext.schedulerBackend.asInstanceOf[LocalSchedulerBackend]

    val scheduler = ReflectHelper.field(localScheduler, "scheduler")

    val totalCores = localScheduler.totalCores
    localScheduler.stop()

  //创建一个新的LocalSchedulerBackend
    val wowLocalSchedulerBackend = new WowLocalSchedulerBackend(session.sparkContext.getConf, scheduler.asInstanceOf[TaskSchedulerImpl], totalCores)
    wowLocalSchedulerBackend.start()
 //把SparkContext里的_schedulerBackend替换成我们的实现
    ReflectHelper.field(session.sparkContext, "_schedulerBackend", wowLocalSchedulerBackend)
  }

完工。

其实还有很多

比如在Spark里,Python Worker默认一分钟没有被使用是会被杀死的,但是在StreamingPro里,这些python worker因为都要加载模型,所以启动成本是非常高的,杀了之后再启动就没办法忍受了,通过类似的方式进行魔改,从而使得空闲时间是可配置的。如果大家感兴趣,可以翻看StreamingPro相关代码。

我的博客即将搬运同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=45eorticfc31

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352

推荐阅读更多精彩内容