spark-sql源码解读

一、开发一个spark应用

//初始化sparksession
    val spark = SparkSession.builder.appName("SparkSQL Test").master("local[4]").getOrCreate() 
    //transform操作,生成dataframe,可继续执行dataframe相关dsl api,
    val sqlDf = spark.sql("select count(*) from table")
    //action操作,spark-core开始执行
    sqlDf.show(false)

二、初始化sparksession-sessionState构造过程

//1:sparksession懒加载sessionstate
        lazy val sessionState: SessionState = {
            parentSessionState
              .map(_.clone(this))
              .getOrElse {
                val state = SparkSession.instantiateSessionState(
                  SparkSession.sessionStateClassName(sparkContext.conf),
                  self)
                initialSessionOptions.foreach { case (k, v) => state.conf.setConfString(k, v) }
                state
              }
          }
//2:实例化sessionstate
    /**
       * Helper method to create an instance of `SessionState` based on `className` from conf.
       * The result is either `SessionState` or a Hive based `SessionState`.
       */
      private def instantiateSessionState(
          className: String,
          sparkSession: SparkSession): SessionState = {
        try {
          // invoke `new [Hive]SessionStateBuilder(SparkSession, Option[SessionState])`
          val clazz = Utils.classForName(className)
          val ctor = clazz.getConstructors.head
          //默认:BaseSessionStateBuilder, hive:HiveSessionStateBuilder
          ctor.newInstance(sparkSession, None).asInstanceOf[BaseSessionStateBuilder].build()
            ...
//3: 构建SessionState,初始化catalog、sqlparser、analyzer、optimzizer,内置函数以及udf函数等等
            def build(): SessionState = {
              new SessionState(
                session.sharedState,
                conf,
                experimentalMethods,
                functionRegistry,
                udfRegistration,
                () => catalog,
                sqlParser,
                () => analyzer,
                () => optimizer,
                planner,
                streamingQueryManager,
                listenerManager,
                () => resourceLoader,
                createQueryExecution,
                createClone)
            }

注:QueryExecution、SessionState、BaseSessionStateBuilder之间的关系:
(1)QueryExecution的analyzed、optimizedPlan是懒加载的,被调用时实际调用的是SessionState中的analyzer、optimizer的相关方法做解析和优化
(2)SessionState的catalog、analyzer、optimizer、resourceLoader也是懒加载的,被调用时实际调用的是在BaseSessionStateBuilder初始化SessionState的时候生成的匿名函数
三、transform-生成dataframe-resolved logicalPlan

/*
1: paserplan生成unresolved logicalPlan, ofRows方法中调用QueryExecution.assertAnalyzed(),
        其实是sparkSession.sessionState.analyzer.executeAndCheck(logical),
        再使用定义的各种解析规则,resolving unresolved attributes and relations,生成resolved logicalPlan,
        最终new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))生成dataframe*/
def sql(sqlText: String): DataFrame = {Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))}
//2: 使用访问者模式,astBuilder遍历antlr sql语法树,解析成catalyst的ast语法树,生成unresolved的逻辑计划
  override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    astBuilder.visitSingleStatement(parser.singleStatement()) match {
      case plan: LogicalPlan => plan
      case _ =>
        val position = Origin(None, None)
        throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
    }
  }
 //代码3:将sql命令传给antlr,使用SqlBase.g4生成的词汇解析器SqlBaseLexer和语法解析器SqlBaseParser,对词和语法校验
  protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
    logDebug(s"Parsing command: $command")

    val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
    lexer.removeErrorListeners()
    lexer.addErrorListener(ParseErrorListener)
    lexer.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced

    val tokenStream = new CommonTokenStream(lexer)
    val parser = new SqlBaseParser(tokenStream)
    parser.addParseListener(PostProcessor)
    parser.removeErrorListeners()
    parser.addErrorListener(ParseErrorListener)
    parser.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced

    try {
      try {
        // first, try parsing with potentially faster SLL mode
        parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
        toResult(parser)
        ...

四、action-触发执行-优化逻辑计划,生成物理计划,转为rdd提交给sparkContex

//1:拉取20行数据到driver端,调用take(),最终调用head()
        def show(): Unit = show(20)
            
        def head(n: Int): Array[T] = withAction("head", limit(n).queryExecution)(collectFromPlan)
//2:Wrap一个action,监控查询执行过程和时间花费,执行用户注册的回调函数
        private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = {
            try {
                /*触发optimizer优化器采用一系列优化规则(eg:谓词下推)对resolved logicalPlan进行优化,
                /sparkplanner选择出最优策略(eg:广播表)将optimizedPlan转化为sparkplan,
                    sparkplan应用一系列规则,转化为可预备执行的物理计划
                    */
              qe.executedPlan.foreach { plan =>
                plan.resetMetrics()
              }
              val start = System.nanoTime()
              val result = SQLExecution.withNewExecutionId(sparkSession, qe) {
                 //调用collectFromPlan,交给spark-core,执行物理计划,转为rdd操作
                action(qe.executedPlan)
              }
              val end = System.nanoTime()
              sparkSession.listenerManager.onSuccess(name, qe, end - start)
              result
            } catch {
              case e: Exception =>
                sparkSession.listenerManager.onFailure(name, qe, e)
                throw e
            }
          }
//3:QueryExecution中从优化到生成可预备执行的物理计划工作流
          lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)

          lazy val sparkPlan: SparkPlan = {
            SparkSession.setActiveSession(sparkSession)
            // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
            //       but we will implement to choose the best plan.
            planner.plan(ReturnAnswer(optimizedPlan)).next()
          }

          // executedPlan should not be used to initialize any SparkPlan. It should be
          // only used for execution.
          lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
              
          protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
              preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
            }

            /** A sequence of rules that will be applied in order to the physical plan before execution. */
            protected def preparations: Seq[Rule[SparkPlan]] = Seq(
              python.ExtractPythonUDFs,
              PlanSubqueries(sparkSession),
              EnsureRequirements(sparkSession.sessionState.conf),
              CollapseCodegenStages(sparkSession.sessionState.conf),
              ReuseExchange(sparkSession.sessionState.conf),
              ReuseSubquery(sparkSession.sessionState.conf))
//4:执行自定义的回调函数函数,该函数底层最终执行sparkplan的do把物理计划转化为rdd操作
            /**
               * Collect all elements from a spark plan.
               */
              private def collectFromPlan(plan: SparkPlan): Array[T] = {
                // This projection writes output to a `InternalRow`, which means applying this projection is not
                // thread-safe. Here we create the projection inside this method to make `Dataset` thread-safe.
                val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
                plan.executeCollect().map { row =>
                  // The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type
                  // parameter of its `get` method, so it's safe to use null here.
                  objProj(row).get(0, null).asInstanceOf[T]
                }
              }
//5:将sparkplan转为rdd,交给sparkContext提交job
              /**
                 * Runs this query returning the result as an array.
                 */
                def executeCollect(): Array[InternalRow] = {
                    //getByteArrayRdd调用execute(),再调用doExecute()方法,将sparkplan转为RDD
                  val byteArrayRdd = getByteArrayRdd()

                  val results = ArrayBuffer[InternalRow]()
                      //byteArrayRdd.collect()是rdd的action算子,会运行sc.runJob()提交job给spark集群
                  byteArrayRdd.collect().foreach { countAndBytes =>
                    decodeUnsafeRows(countAndBytes._2).foreach(results.+=)
                  }
                  results.toArray
                }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,639评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,093评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,079评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,329评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,343评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,047评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,645评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,565评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,095评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,201评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,338评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,014评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,701评论 3 332
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,194评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,320评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,685评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,345评论 2 358

推荐阅读更多精彩内容