为了能用scala开发spark,简单的介绍一下scala特性.scala即面向对象编程,也同时面向方法编程,也可以说是面向容器编程,在编译阶段会把scala代码编译成java字节码,运行于jvm上.
在这里要介绍闭包,迭代器,隐式函数,因为这几个特性在开发spark,或者查看源码的时候,都会经常遇到的. 代码列子
闭包
定义:
闭包是一个函数,返回值依赖于声明在函数外部的一个或多个变量。
闭包通常来讲可以简单的认为是可以访问一个函数里面局部变量的另外一个函数.
我先介绍scala单机闭包.
def main(args: Array[String]) {
closure1
println("-----------------")
closure2
}
def closure2(): Unit ={
val fs = new Array[()=>Int](4)
def set(i:Int) { fs(i) = () => i }
var j = 0
while(j < 4) {set(j); j=j+1}
fs.foreach( f => println(f()))
}
def closure1(): Unit ={
val fs = new Array[()=>Int](4)
var i = 0
while(i < 4) { fs(i) = () => i; i=i+1}
fs.foreach( f => println(f()))
}
//输出
4
4
4
4
-----------------
0
1
2
3
可以看出闭包的绑定的是变量的引用,或者是地址. 所以说closure1输出都是4,当fs.foreach的时候i=4. closure2中传入set函数,是重新生成一个变量i.
scala闭包到spark分布式计算是如何展现的.我举个例子就明白了.
val spark = SparkSession
.builder()
// .master("local[*]")
.appName("hello world")
.config("spark.some.config.option", "some-value")
.getOrCreate()
val rdd = spark.sparkContext.parallelize(Seq(1,3,4,5,6,1000,10))
var counter = 0
rdd.foreach(x => counter += x)
println("result " + counter)
结果会是多少,1033? 实际输出0
rdd.foreach是一个分布式操作,counter+=x操作可能在多个jvm中进行,假设有两个executor,在driver,executor1,executor2中都会有counter,初始化值都为0,相互不干扰. 因为foreach操作都是在executor上进行的,driver中并没有操作,所以counter还是0
迭代器
scala其实也是面向集合编程的,万物皆集合.你会发现对象都会有map,foreach,flatMap等.Iterator并不是集合,但是操作集合的方法. 了解它,你就会更加了解scala的集合和spark RDD的底层实现.
举个例子:
val it = Iterator[String]("name1","name2","name3")
def rename(name:String): String ={
println(name)
name+"1"
}
//并未实际执行rename
val it2 = it.map(name=>rename(name))
it2.foreach(name=>{
println(name)
})
如果是存储至mysql或者外部的数据库,foreach可以控制每1000条处理写入一次.
你认为输出结果?
name1
name2
name3
name11
name21
name31
实际输出结果:
name1
name11
name2
name21
name3
name31
以下是scala Iterator map实现原理,其实就是重新定义next函数. spark也是基于此原来来实现懒处理的
/** Creates a new iterator that maps all produced values of this iterator
* to new values using a transformation function.
*
* @param f the transformation function
* @return a new iterator which transforms every value produced by this
* iterator by applying the function `f` to it.
* @note Reuse: $consumesAndProducesIterator
*/
def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
def hasNext = self.hasNext
def next() = f(self.next())
}
隐式函数
我们经常引入第三方库,但当我们想要扩展新功能的时候通常是很不方便的,因为我们不能直接修改其代码。scala提供了隐式转换机制和隐式参数帮我们解决诸如这样的问题。
Scala中的隐式转换是一种非常强大的代码查找机制。当函数、构造器调用缺少参数或者某一实例调用了其他类型的方法导致编译不通过时,编译器会尝试搜索一些特定的区域,尝试使编译通过.
假设有这样一个类,如果你想在它上面添加eat方法,你会怎么做?
class People{
var state:String = "walking"
def doSomething(): Unit ={
println(s"i'm $state")
}
}
如果java做起来,可能比较麻烦,当然可以把在源码基础上改。但是一个是违反java的设计原则,一个使用起来比较麻烦.介绍一下scala可以怎么实现:
class PeopleFunction(people: People){
def eat(): Unit ={
people.state = "eating"
people.doSomething()
}
}
package object action {
implicit def peopleImpl(people: People): PeopleFunction ={
new PeopleFunction(people)
}
}
//main函数
val people = new People
people.eat()
这样就可以实现了,在编译阶段,会自动把eat函数的字节码,写入people中,这样实现就非常简单.