1.case 模式匹配
case 模式匹配的使用样例:
# 1. 匹配特定的数据类型
def processValue(value: Any): String = value match {
case s: String => s"String: $s"
case i: Int => s"Int: $i"
case d: Double => s"Double: $d"
case _ => "Other"
}
val result1 = processValue("Hello") // 输出 "String: Hello"
val result2 = processValue(123) // 输出 "Int: 123"
val result3 = processValue(3.14) // 输出 "Double: 3.14"
val result4 = processValue(true) // 输出 "Other"
# 2.根据不同的输入执行不同的逻辑
def processInput(input: Any): String = input match {
case 1 => "One"
case "two" => "Two"
case _: Double => "A Double"
case _ => "Other"
}
val result1 = processInput(1) // 输出 "One"
val result2 = processInput("two") // 输出 "Two"
val result3 = processInput(3.14) // 输出 "A Double"
val result4 = processInput(true) // 输出 "Other"
# 3.解构数据结构
val tuple = (1, "two", 3.14)
val (a, b, c) = tuple // 解构元组
println(a) // 输出 1
println(b) // 输出 "two"
println(c) // 输出 3.14
val list = List(1, 2, 3, 4, 5)
val result = list.map {
case x if x % 2 == 0 => "Even"
case _ => "Odd"
}
println(result) // 输出 List("Odd", "Even", "Odd", "Even", "Odd")
spark-sql 源码中的 case 模式匹配:Analyzer
的 ResolveRelations
object ResolveRelations extends Rule[LogicalPlan] {
def resolveViews(plan: LogicalPlan): LogicalPlan = plan match {
case view @ View(desc, isTempView, _, child) if !child.resolved =>
// 省略 ...
case p @ SubqueryAlias(_, view: View) =>
p.copy(child = resolveViews(view))
case _ => plan
}
}
2.case 类
case 类的使用场景:
-
数据传递:
case class
可以用于封装一组相关的数据,并且很容易进行复制和传递。 -
模式匹配:
case class
可以与模式匹配结合使用,方便地根据不同的数据类型进行处理。
// 定义一个 case class
case class Person(name: String, age: Int)
// 创建一个 Person 对象
val person1 = Person("Alice", 25)
// 复制一个 Person 对象,并修改部分属性
val person2 = person1.copy(age = 30)
// 打印 person1 和 person2
println(person1) // 输出 Person(Alice,25)
println(person2) // 输出 Person(Alice,30)
// 模式匹配
def processPerson(person: Person): String = person match {
case Person(name, age) if age < 30 => s"$name is young"
case Person(name, age) if age >= 30 => s"$name is old"
}
val result1 = processPerson(person1) // 输出 "Alice is young"
val result2 = processPerson(person2) // 输出 "Alice is old"
spark-sql 源码中的 case 类:LogicalPlan
的子类 Filter
// case 类 Filter
case class Filter(condition: Expression, child: LogicalPlan)
extends OrderPreservingUnaryNode with PredicateHelper {
override def output: Seq[Attribute] = child.output
override def maxRows: Option[Long] = child.maxRows
override protected lazy val validConstraints: ExpressionSet = {
val predicates = splitConjunctivePredicates(condition)
.filterNot(SubqueryExpression.hasCorrelatedSubquery)
child.constraints.union(ExpressionSet(predicates))
}
}
// case 模式匹配
// EliminateOuterJoin的apply
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _, _)) =>
val newJoinType = buildNewJoinType(f, j)
if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
}
3.嵌套函数
spark-sql 源码中的嵌套函数:QueryPlan
的嵌套函数 mapExpressions
def mapExpressions(f: Expression => Expression): this.type = {
var changed = false
// 嵌套函数A
@inline def transformExpression(e: Expression): Expression = {
val newE = CurrentOrigin.withOrigin(e.origin) {
f(e)
}
if (newE.fastEquals(e)) {
e
} else {
changed = true
newE
}
}
// 嵌套函数B
def recursiveTransform(arg: Any): AnyRef = arg match {
case e: Expression => transformExpression(e) // 执行嵌套函数B
case Some(value) => Some(recursiveTransform(value))
case m: Map[_, _] => m
case d: DataType => d // Avoid unpacking Structs
case stream: Stream[_] => stream.map(recursiveTransform).force
case seq: Iterable[_] => seq.map(recursiveTransform)
case other: AnyRef => other
case null => null
}
// 执行嵌套函数A
val newArgs = mapProductIterator(recursiveTransform)
if (changed) makeCopy(newArgs).asInstanceOf[this.type] else this
}
4.偏函数
偏函数的使用样例:
- 过滤和转换数据:偏函数可以用于过滤和转换数据。通过定义适当的条件,可以使用偏函数来过滤掉不需要的数据或者将数据进行转换。
- 对特定输入进行处理:偏函数可以对特定类型或特定条件的输入进行处理,而对其他输入不进行处理。
// 定义一个偏函数,只处理正整数和字符串类型的输入
val partialFunc: PartialFunction[Any, String] = {
case i: Int if i > 0 => s"Positive integer: $i"
case s: String => s"String: $s"
}
// applyOrElse接口接受两个参数:输入值和默认值。如果偏函数对输入值进行定义,则返回偏函数的结果;如果偏函数没有对输入值进行定义,则返回默认值。
println(partialFunc.applyOrElse(10, (x: Any) => "Not defined")) // 输出:Positive integer: 10
println(partialFunc.applyOrElse(-5, (x: Any) => "Not defined")) // 输出:Not defined
println(partialFunc.applyOrElse("hello", (x: Any) => "Not defined")) // 输出:String: hello
println(partialFunc.applyOrElse(3.14, (x: Any) => "Not defined")) // 输出:Not define
spark-sql 源码中的偏函数:
① AnalysisHelper
的函数 resolveOperatorsUp
// 跳过已经分析过的rule,并递归获取子节点
def resolveOperatorsUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
if (!analyzed) {
AnalysisHelper.allowInvokingTransformsInAnalyzer {
val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule))
if (self fastEquals afterRuleOnChildren) {
CurrentOrigin.withOrigin(origin) {
// rule是偏函数,applyOrElse会执行这个函数
// 如果偏函数对输入值进行定义,则返回偏函数的结果;如果偏函数没有对输入值进行定义,则返回默认值。
rule.applyOrElse(self, identity[LogicalPlan])
}
} else {
CurrentOrigin.withOrigin(origin) {
val afterRule = rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
afterRule.copyTagsFrom(self)
afterRule
}
}
}
} else {
self
}
}
② Optimizer
的函数 ConvertToLocalRelation
// Optimizer
object ConvertToLocalRelation extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform { // 下面函数的内容整体是偏函数,作为transform的入参
case Project(projectList, LocalRelation(output, data, isStreaming))
if !projectList.exists(hasUnevaluableExpr) =>
val projection = new InterpretedMutableProjection(projectList, output)
projection.initialize(0)
LocalRelation(projectList.map(_.toAttribute), data.map(projection(_).copy()), isStreaming)
case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
LocalRelation(output, data.take(limit), isStreaming)
case Filter(condition, LocalRelation(output, data, isStreaming))
if !hasUnevaluableExpr(condition) =>
val predicate = Predicate.create(condition, output)
predicate.initialize(0)
LocalRelation(output, data.filter(row => predicate.eval(row)), isStreaming)
}
// 省略...
}
// TreeNode
def transform(rule: PartialFunction[BaseType, BaseType]): BaseType = {
transformDown(rule)
}
def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {
val afterRule = CurrentOrigin.withOrigin(origin) {
// rule是偏函数,applyOrElse会执行这个函数
// 如果偏函数对输入值进行定义,则返回偏函数的结果;如果偏函数没有对输入值进行定义,则返回默认值。
rule.applyOrElse(this, identity[BaseType])
}
if (this fastEquals afterRule) {
// 获取子节点,递归执行transformDown
mapChildren(_.transformDown(rule))
} else {
afterRule.copyTagsFrom(this)
afterRule.mapChildren(_.transformDown(rule))
}
}
5.柯里化函数
柯里化函数的使用样例:
- 部分应用:柯里化函数可以通过部分应用的方式,先给函数提供部分参数,然后返回一个接受剩余参数的新函数。这样可以在不同的上下文中复用同一个函数。
- 函数组合:柯里化函数可以方便地进行函数组合,将多个函数串联起来。通过将一个函数的输出作为另一个函数的输入,可以构建更复杂的函数逻辑。
如下案例中,add
是一个柯里化函数,它接受两个参数 x
和 y
。通过部分应用的方式,我们先给 add
函数提供一个参数 1
,然后返回一个新的函数 addOne
,这个新函数只接受一个参数 y
。最后,我们调用 addOne
函数,传递剩余参数 2
,得到结果 3
def add(x:Int)(y:Int): Int = x + y
val addOne = add(1) _ // 部分应用,返回一个接受一个参数的新函数
val result = addOne(2) // 调用新函数,传递剩余参数
println(result) // 输出 3
spark-sql 源码中的柯里化函数:ParseDriver
的方法 parse
// 参数1是command,参数2是toResult
protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
logDebug(s"Parsing command: $command")
// 使用参数command
val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
lexer.removeErrorListeners()
lexer.addErrorListener(ParseErrorListener)
val tokenStream = new CommonTokenStream(lexer)
val parser = new SqlBaseParser(tokenStream)
parser.addParseListener(PostProcessor)
// 使用参数command
parser.addParseListener(UnclosedCommentProcessor(command, tokenStream))
// 省略 ...
try {
try {
// first, try parsing with potentially faster SLL mode
parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
// 使用参数toResult
// parser里面包含参数command,parser再作为toResult函数的入参
toResult(parser)
}
catch {
case e: ParseCancellationException =>
// 省略...
// 使用参数toResult
// parser里面包含参数command,parser再作为toResult函数的入参
toResult(parser)
}
}
// 省略 ...
}
6.基于 Product 实现的 TreeNode
Product 的使用样例:
-
元组操作:
Product
接口为元组类提供了一些常用的方法,如productElement
用于获取元素值,productArity
用于获取元素数量。 -
模式匹配:
Product
接口可以与模式匹配结合使用,方便地对元组进行解构和处理。
// 导入Product接口
import scala.Product
// 定义一个元组类,继承自Product接口
class MyTuple(val first: Int, val second: String) extends Product {
// 实现Product接口的抽象方法
def productElement(n: Int): Any = n match {
case 0 => first
case 1 => second
case _ => throw new IndexOutOfBoundsException(s"Tuple index out of range: $n")
}
// 实现Product接口的抽象方法
def productArity: Int = 2
// 重写toString方法
override def toString: String = s"MyTuple($first, $second)"
}
// 创建一个MyTuple对象
val tuple = new MyTuple(42, "Hello")
// 获取元素值
val firstElement = tuple.productElement(0)
val secondElement = tuple.productElement(1)
// 获取元素数量
val arity = tuple.productArity
// 打印结果
println(firstElement) // 输出 42
println(secondElement) // 输出 "Hello"
println(arity) // 输出 2
println(tuple) // 输出 "MyTuple(42, Hello)"
spark-sql 源码中的 Product
:基于 Product
实现的 TreeNode
abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
// 省略 ...
protected def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = { // 入参是参数类型为B的scala function,返回值是Array[B]
val arr = Array.ofDim[B](productArity)
var i = 0
while (i < arr.length) {
arr(i) = f(productElement(i)) // productElement会执行传入的函数mp,然后f会执行apply
i += 1
}
arr
}
// 省略 ...
}
private def mapChildren(
f: BaseType => BaseType,
forceCopy: Boolean): BaseType = {
// 省略...
val newArgs = mapProductIterator { // mapProductIterator 的入参是下面的函数mp
case arg: TreeNode[_] if containsChild(arg) =>
// 省略...
case Some(arg: TreeNode[_]) if containsChild(arg) =>
// 省略...
case m: Map[_, _] => m.mapValues {
// 省略...
}.view.force.toMap // `mapValues` is lazy and we need to force it to materialize
case d: DataType => d // Avoid unpacking Structs
case args: Stream[_] => args.map(mapChild).force // Force materialization on stream
case args: Iterable[_] => args.map(mapChild)
case nonChild: AnyRef => nonChild
case null => null
}
// 省略...
}