spark-local模式详解

spark-local 详解

Spark的Local模式是在本地启动多个Threads(线程)来模拟分布式运行模式,每个Thread代表一个worker。l根据Spark官方文档,spark-Local模式下有以下集中设置mast url的方式,不同Local部署模式的不同之处在于任务失败后的重试次数。

Master-URL Meaning Max-Retry time
local 1 worker Thread(no parallelism) 失败的任务不会重新尝试
local[N] N worker Threads,ideally set to cores number 失败的任务不会重新尝试
local[*] as many worker Threads as your machine 失败的任务不会重新尝试
local[N,F] N worker Threads 失败的任务最多进行F-1次尝试
local[*,F] as many worker Threads as your machine 失败的任务最多进行F-1此尝试
local-cluster[numSlaves,coresPerSlave,memeoryPerySlave] 伪分布式模式,本地运行master和worker,master中指定了worker数目,CPU core数目和每个worker能使用的内存 其他与standalone运行模式相同

spark.task.maxFailures是Task失败时重试的次数。

Property Name Default Meaning
spark.task.maxFailures 4 Number of failures of any particular task before giving up on the job. The total number of failures spread across different tasks will not cause the job to fail; a particular task has to fail this number of attempts. Should be greater than or equal to 1. Number of allowed retries = this value - 1.

Spark在不同的部署模式下的区别是实现TaskScheduler and SchedulerBackend。Spark会在SparkContext的创建过程中通过传入的Master URL来确定不同的运行模式,并且创建不同的TaskScheduler和SchedulerBackend,具体实现是在org.apache.spark.SparkContext#createTaskScheduler中。
local模式的整体架构如下如所示,其中LocalBackend持有一个LocalActor,他与Executor之间的通信就是通过这个Executor来完成的。

image.png

local-cluster[N,F,M]整体架构如下,需要注意的是org.apache.spark.executor.CoarseGrainedExecutorBackendorg.apache.spark.deploy.worker.Worker两个不同的进程,两者之间没有包含与被包含的关系。当Worker接收到Master的消息后会创建一个ExecutorRunner实例,然后由ExecutorRunner来启动新的进程CoarseGrainedExecutorBackend。

image.png

在SparkContext.scala中有匹配Local和Spark-Standalone模式下Master URL的正则表达式。

    /**
     *Master URL正则表达式
     */
    private object SparkMasterRegex {
      // Regular expression used for local[N] and local[*] master formats
      val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
      // Regular expression for local[N, maxRetries], used in tests with failing tasks
      val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
      // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
      val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
      // Regular expression for connecting to Spark deploy clusters
      // Standalone 正则表达式
      val SPARK_REGEX = """spark://(.*)""".r
    }

对于其他部署模式则直接通过getClusterManager方法创建ExternalClusterManager:

private def getClusterManager(url: String): Option[ExternalClusterManager] = {
        val loader = Utils.getContextOrSparkClassLoader
        val serviceLoaders =
          ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
        if (serviceLoaders.size > 1) {
          throw new SparkException(
            s"Multiple external cluster managers registered for the url $url: $serviceLoaders")
        }
        serviceLoaders.headOption
      }
    }

Spark中创建TaskScheduler和SchedulerBackend实例的具体过程如下:

      /**
       * 基于Master URL创建不同的schedulerbackend and task scheduler实例
       */
      private def createTaskScheduler(
          sc: SparkContext,
          master: String,
          deployMode: String): (SchedulerBackend, TaskScheduler) = {
        import SparkMasterRegex._
        // 在Local模式下,失败的Task不进行重试
        val MAX_LOCAL_TASK_FAILURES = 1
    
        master match {
          case "local" =>
            val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
            val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
            scheduler.initialize(backend)
            (backend, scheduler)

          case LOCAL_N_REGEX(threads) =>
            // 获取当前能使用的cores
            def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
            // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
            val threadCount = if (threads == "*") localCpuCount else threads.toInt
            if (threadCount <= 0) {
              throw new SparkException(s"Asked to run locally with $threadCount threads")
            }
            val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
            val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
            scheduler.initialize(backend)
            (backend, scheduler)
    
          case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
            def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
            // 这里指定了每个Task的尝试次数
            val threadCount = if (threads == "*") localCpuCount else threads.toInt
            val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
            val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
            scheduler.initialize(backend)
            (backend, scheduler)
          // Spark Standalone部署模式
          case SPARK_REGEX(sparkUrl) =>
            val scheduler = new TaskSchedulerImpl(sc)
            val masterUrls = sparkUrl.split(",").map("spark://" + _)
            val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
            scheduler.initialize(backend)
            (backend, scheduler)
          // local-cluster 模式
          case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
            // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
            val memoryPerSlaveInt = memoryPerSlave.toInt
            if (sc.executorMemory > memoryPerSlaveInt) {
              throw new SparkException(
                "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
                  memoryPerSlaveInt, sc.executorMemory))
            }
    
            val scheduler = new TaskSchedulerImpl(sc)
            val localCluster = new LocalSparkCluster(
              numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
            val masterUrls = localCluster.start()
            val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
            scheduler.initialize(backend)
            backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
              localCluster.stop()
            }
            (backend, scheduler)
          // 其他部署模式(Mesos,yarn,k8s,ec2)时使用getClusterManager
          case masterUrl =>
            val cm = getClusterManager(masterUrl) match {
              case Some(clusterMgr) => clusterMgr
              case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
            }
            try {
              val scheduler = cm.createTaskScheduler(sc, masterUrl)
              val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
              cm.initialize(scheduler, backend)
              (backend, scheduler)
            } catch {
              case se: SparkException => throw se
              case NonFatal(e) =>
                throw new SparkException("External scheduler cannot be instantiated", e)
            }
        }
      }

Reference:

[1] Spark技术内幕:深入解析Spark内核架构设计与原理实现(张安站)
[2] https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/SparkContext.scala

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容