【Spark 精选】源码阅读 — Scala 高级语法

1.case 模式匹配

case 模式匹配的使用样例

# 1. 匹配特定的数据类型
def processValue(value: Any): String = value match {
  case s: String => s"String: $s"
  case i: Int => s"Int: $i"
  case d: Double => s"Double: $d"
  case _ => "Other"
}

val result1 = processValue("Hello") // 输出 "String: Hello"
val result2 = processValue(123) // 输出 "Int: 123"
val result3 = processValue(3.14) // 输出 "Double: 3.14"
val result4 = processValue(true) // 输出 "Other"


# 2.根据不同的输入执行不同的逻辑
def processInput(input: Any): String = input match {
  case 1 => "One"
  case "two" => "Two"
  case _: Double => "A Double"
  case _ => "Other"
}

val result1 = processInput(1) // 输出 "One"
val result2 = processInput("two") // 输出 "Two"
val result3 = processInput(3.14) // 输出 "A Double"
val result4 = processInput(true) // 输出 "Other"


# 3.解构数据结构
val tuple = (1, "two", 3.14)
val (a, b, c) = tuple // 解构元组
println(a) // 输出 1
println(b) // 输出 "two"
println(c) // 输出 3.14

val list = List(1, 2, 3, 4, 5)
val result = list.map {
  case x if x % 2 == 0 => "Even"
  case _ => "Odd"
}
println(result) // 输出 List("Odd", "Even", "Odd", "Even", "Odd")

spark-sql 源码中的 case 模式匹配AnalyzerResolveRelations

  object ResolveRelations extends Rule[LogicalPlan] {
    def resolveViews(plan: LogicalPlan): LogicalPlan = plan match {
      case view @ View(desc, isTempView, _, child) if !child.resolved =>
          // 省略 ...
      case p @ SubqueryAlias(_, view: View) =>
        p.copy(child = resolveViews(view))
      case _ => plan
    }
}

2.case 类

case 类的使用场景

  • 数据传递case class可以用于封装一组相关的数据,并且很容易进行复制和传递。
  • 模式匹配case class可以与模式匹配结合使用,方便地根据不同的数据类型进行处理。
// 定义一个 case class
case class Person(name: String, age: Int)

// 创建一个 Person 对象
val person1 = Person("Alice", 25)

// 复制一个 Person 对象,并修改部分属性
val person2 = person1.copy(age = 30)

// 打印 person1 和 person2
println(person1) // 输出 Person(Alice,25)
println(person2) // 输出 Person(Alice,30)

// 模式匹配
def processPerson(person: Person): String = person match {
  case Person(name, age) if age < 30 => s"$name is young"
  case Person(name, age) if age >= 30 => s"$name is old"
}

val result1 = processPerson(person1) // 输出 "Alice is young"
val result2 = processPerson(person2) // 输出 "Alice is old"

spark-sql 源码中的 case 类LogicalPlan 的子类 Filter

// case 类 Filter
case class Filter(condition: Expression, child: LogicalPlan)
  extends OrderPreservingUnaryNode with PredicateHelper {
  override def output: Seq[Attribute] = child.output

  override def maxRows: Option[Long] = child.maxRows

  override protected lazy val validConstraints: ExpressionSet = {
    val predicates = splitConjunctivePredicates(condition)
      .filterNot(SubqueryExpression.hasCorrelatedSubquery)
    child.constraints.union(ExpressionSet(predicates))
  }
}

// case 模式匹配
// EliminateOuterJoin的apply
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _, _)) =>
      val newJoinType = buildNewJoinType(f, j)
      if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
  }

3.嵌套函数

spark-sql 源码中的嵌套函数QueryPlan 的嵌套函数 mapExpressions

  def mapExpressions(f: Expression => Expression): this.type = {
    var changed = false

     // 嵌套函数A
    @inline def transformExpression(e: Expression): Expression = {
      val newE = CurrentOrigin.withOrigin(e.origin) {
        f(e)
      }
      if (newE.fastEquals(e)) {
        e
      } else {
        changed = true
        newE
      }
    }
    
    // 嵌套函数B
    def recursiveTransform(arg: Any): AnyRef = arg match {
      case e: Expression => transformExpression(e)    // 执行嵌套函数B
      case Some(value) => Some(recursiveTransform(value))
      case m: Map[_, _] => m
      case d: DataType => d // Avoid unpacking Structs
      case stream: Stream[_] => stream.map(recursiveTransform).force
      case seq: Iterable[_] => seq.map(recursiveTransform)
      case other: AnyRef => other
      case null => null
    }
    // 执行嵌套函数A
    val newArgs = mapProductIterator(recursiveTransform)

    if (changed) makeCopy(newArgs).asInstanceOf[this.type] else this
  }

4.偏函数

偏函数的使用样例

  • 过滤和转换数据:偏函数可以用于过滤和转换数据。通过定义适当的条件,可以使用偏函数来过滤掉不需要的数据或者将数据进行转换。
  • 对特定输入进行处理:偏函数可以对特定类型或特定条件的输入进行处理,而对其他输入不进行处理。
// 定义一个偏函数,只处理正整数和字符串类型的输入
val partialFunc: PartialFunction[Any, String] = {
  case i: Int if i > 0 => s"Positive integer: $i"
  case s: String => s"String: $s"
}

// applyOrElse接口接受两个参数:输入值和默认值。如果偏函数对输入值进行定义,则返回偏函数的结果;如果偏函数没有对输入值进行定义,则返回默认值。
println(partialFunc.applyOrElse(10, (x: Any) => "Not defined")) // 输出:Positive integer: 10
println(partialFunc.applyOrElse(-5, (x: Any) => "Not defined")) // 输出:Not defined
println(partialFunc.applyOrElse("hello", (x: Any) => "Not defined")) // 输出:String: hello
println(partialFunc.applyOrElse(3.14, (x: Any) => "Not defined")) // 输出:Not define

spark-sql 源码中的偏函数
AnalysisHelper 的函数 resolveOperatorsUp

  // 跳过已经分析过的rule,并递归获取子节点
  def resolveOperatorsUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
    if (!analyzed) {
      AnalysisHelper.allowInvokingTransformsInAnalyzer {
        val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule))
        if (self fastEquals afterRuleOnChildren) {
          CurrentOrigin.withOrigin(origin) {
            // rule是偏函数,applyOrElse会执行这个函数
           // 如果偏函数对输入值进行定义,则返回偏函数的结果;如果偏函数没有对输入值进行定义,则返回默认值。
            rule.applyOrElse(self, identity[LogicalPlan])
          }
        } else {
          CurrentOrigin.withOrigin(origin) {
            val afterRule = rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
            afterRule.copyTagsFrom(self)
            afterRule
          }
        }
      }
    } else {
      self
    }
  }

Optimizer 的函数 ConvertToLocalRelation

// Optimizer 
object ConvertToLocalRelation extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {   // 下面函数的内容整体是偏函数,作为transform的入参
    case Project(projectList, LocalRelation(output, data, isStreaming))
        if !projectList.exists(hasUnevaluableExpr) =>
      val projection = new InterpretedMutableProjection(projectList, output)
      projection.initialize(0)
      LocalRelation(projectList.map(_.toAttribute), data.map(projection(_).copy()), isStreaming)

    case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
      LocalRelation(output, data.take(limit), isStreaming)

    case Filter(condition, LocalRelation(output, data, isStreaming))
        if !hasUnevaluableExpr(condition) =>
      val predicate = Predicate.create(condition, output)
      predicate.initialize(0)
      LocalRelation(output, data.filter(row => predicate.eval(row)), isStreaming)
  }
 // 省略...
}

  // TreeNode
  def transform(rule: PartialFunction[BaseType, BaseType]): BaseType = {
    transformDown(rule)
  }

  def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {
    val afterRule = CurrentOrigin.withOrigin(origin) {
      // rule是偏函数,applyOrElse会执行这个函数
      // 如果偏函数对输入值进行定义,则返回偏函数的结果;如果偏函数没有对输入值进行定义,则返回默认值。
      rule.applyOrElse(this, identity[BaseType])
    }

    if (this fastEquals afterRule) {
      // 获取子节点,递归执行transformDown
      mapChildren(_.transformDown(rule))
    } else {
      afterRule.copyTagsFrom(this)
      afterRule.mapChildren(_.transformDown(rule))
    }
  }

5.柯里化函数

柯里化函数的使用样例

  • 部分应用:柯里化函数可以通过部分应用的方式,先给函数提供部分参数,然后返回一个接受剩余参数的新函数。这样可以在不同的上下文中复用同一个函数。
  • 函数组合:柯里化函数可以方便地进行函数组合,将多个函数串联起来。通过将一个函数的输出作为另一个函数的输入,可以构建更复杂的函数逻辑。

如下案例中,add是一个柯里化函数,它接受两个参数 xy。通过部分应用的方式,我们先给 add 函数提供一个参数 1,然后返回一个新的函数 addOne,这个新函数只接受一个参数 y。最后,我们调用 addOne 函数,传递剩余参数 2,得到结果 3

def add(x:Int)(y:Int): Int = x + y

val addOne = add(1) _ // 部分应用,返回一个接受一个参数的新函数

val result = addOne(2) // 调用新函数,传递剩余参数

println(result) // 输出 3

spark-sql 源码中的柯里化函数ParseDriver 的方法 parse

  // 参数1是command,参数2是toResult
  protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
    logDebug(s"Parsing command: $command")
   
    // 使用参数command
    val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
    lexer.removeErrorListeners()
    lexer.addErrorListener(ParseErrorListener)

    val tokenStream = new CommonTokenStream(lexer)
    val parser = new SqlBaseParser(tokenStream)
    parser.addParseListener(PostProcessor)
    // 使用参数command 
    parser.addParseListener(UnclosedCommentProcessor(command, tokenStream))
    // 省略 ...
    try {
      try {
        // first, try parsing with potentially faster SLL mode
        parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
       // 使用参数toResult
       // parser里面包含参数command,parser再作为toResult函数的入参
        toResult(parser)
      }
      catch {
        case e: ParseCancellationException =>
          // 省略...
          // 使用参数toResult
          // parser里面包含参数command,parser再作为toResult函数的入参
          toResult(parser)
      }
    }
    // 省略 ...
  }

6.基于 Product 实现的 TreeNode

Product 的使用样例

  • 元组操作Product 接口为元组类提供了一些常用的方法,如 productElement 用于获取元素值,productArity 用于获取元素数量。
  • 模式匹配Product 接口可以与模式匹配结合使用,方便地对元组进行解构和处理。
// 导入Product接口
import scala.Product

// 定义一个元组类,继承自Product接口
class MyTuple(val first: Int, val second: String) extends Product {
  // 实现Product接口的抽象方法
  def productElement(n: Int): Any = n match {
    case 0 => first
    case 1 => second
    case _ => throw new IndexOutOfBoundsException(s"Tuple index out of range: $n")
  }

  // 实现Product接口的抽象方法
  def productArity: Int = 2

  // 重写toString方法
  override def toString: String = s"MyTuple($first, $second)"
}

// 创建一个MyTuple对象
val tuple = new MyTuple(42, "Hello")

// 获取元素值
val firstElement = tuple.productElement(0)
val secondElement = tuple.productElement(1)

// 获取元素数量
val arity = tuple.productArity

// 打印结果
println(firstElement)  // 输出 42
println(secondElement) // 输出 "Hello"
println(arity)         // 输出 2
println(tuple)         // 输出 "MyTuple(42, Hello)"

spark-sql 源码中的 Product:基于 Product 实现的 TreeNode

abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
  // 省略 ...
  protected def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = {    // 入参是参数类型为B的scala function,返回值是Array[B]
    val arr = Array.ofDim[B](productArity)
    var i = 0
    while (i < arr.length) {
      arr(i) = f(productElement(i))    // productElement会执行传入的函数mp,然后f会执行apply
      i += 1
    }
    arr
  }  
  // 省略 ...
}

  private def mapChildren(
      f: BaseType => BaseType,
      forceCopy: Boolean): BaseType = {
    // 省略...

    val newArgs = mapProductIterator {    // mapProductIterator 的入参是下面的函数mp
      case arg: TreeNode[_] if containsChild(arg) =>
        // 省略...
      case Some(arg: TreeNode[_]) if containsChild(arg) =>
        // 省略...
      case m: Map[_, _] => m.mapValues {
        // 省略...
      }.view.force.toMap // `mapValues` is lazy and we need to force it to materialize
      case d: DataType => d // Avoid unpacking Structs
      case args: Stream[_] => args.map(mapChild).force // Force materialization on stream
      case args: Iterable[_] => args.map(mapChild)
      case nonChild: AnyRef => nonChild
      case null => null
    }
    // 省略...
  }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容