翻译:Hadoop权威指南之Spark-4

本文原始地址

Persistence

回到本章开头的例子,我们可以把“年度-气温”的中间数据集缓存在内存中:

scala> tuples.cache()
res1: tuples.type = MappedRDD[4] at map at <console>:18

调用cache()不会立刻把RDD缓存到内存中,只是对这个RDD做一个标记,当Spark job运行的时候,实际的缓存行为才会发生。因此我们首先强制运行一个job:

scala> tuples.reduceByKey((a, b) => Math.max(a, b)).foreach(println(_))
INFO BlockManagerInfo: Added rdd_4_0 in memory on 192.168.1.90:64640
INFO BlockManagerInfo: Added rdd_4_1 in memory on 192.168.1.90:64640
(1950,22)
(1949,111)

关于BlockManagerInfo的日志显示,作为job运行的一部分,RDD的分区会被保持在内存中。日志显示这个RDD的编号是4(在调用cache()方法之后的控制台输出中,也能看到这个信息),它包含两个分区,标签分别是0和1。如果在这个缓存的数据集上运行另一个job,我们会看到这个RDD将从内存中加载。这次我们计算最低气温:

scala> tuples.reduceByKey((a, b) => Math.min(a, b)).foreach(println(_))
INFO BlockManager: Found block rdd_4_0 locally
INFO BlockManager: Found block rdd_4_1 locally
(1949,78)
(1950,-11)

这是在微小数据集上的简单示例,但是对于更大的job,节省的时间将很可观。在MapReduce中,为了执行另一个计算,输入数据集必须再次从磁盘加载。即使中间数据可以作为输入(比如一个清洗后的数据集,无效行和不必要的字段都已移除),也不能改变“数据必须从磁盘加载”的事实,这是很慢的。Spark会把数据集缓存在一个跨集群的内存高速缓存中,这就意味着任何基于此数据集的计算都会执行的非常快。

在对数据进行交互式探索时,这种效率是极其有用的。这也自然适合某些类型的算法,比如迭代算法,一次迭代计算的结果可以缓存在内存中,成为下次迭代计算的输入。这种算法也可以用MapReduce实现,每次迭代都是一个单独的MapReduce job,因此每次迭代的结果必须写入磁盘,然后在下次迭代时再读回来。

缓存的RDD只能被同一个application中的job获取。要在不同的application之间共享数据集,第一个application必须使用某个saveAs*()方法(saveAsTextFile(),saveAsHadoopFile()等等)来写到外部存储中,然后第二个application使用SparkContext中的对应方法(textFile(),hadoopFile()等等)再次加载。同样的,当一个application终止时,它缓存的所有RDD都被销毁,除非显式的保存下来,否则不能再次访问。

Persistence levels

调用cache()会把RDD的每个分区持久化到执行器(executor)的内存中。如果执行器没有足够的内存来存储这个RDD分区,计算不会失败,相反该分区将会根据需要进行重算。对于带有很多trsansformation的复杂程序,这是很昂贵的。因此Spark提供了不同类型的持久化行为供用户选择,在调用persist()时指定StorageLevel参数即可。

默认的持久化级别是MEMORY_ONLY,这种方式使用对象的常规内存表示。要使用更紧凑的表现形式,可以把分区中的元素序列化为字节数组(byte array)。这种级别是MEMORY_ONLY_SER,相比MEMORY_ONLY,这种级别会导致CPU的压力,如果序列化之后的RDD分区能够适应内存,而常规的内存表示不适合,那么这种压力就是值得的。MEMORY_ONLY_SER还会减轻垃圾回收的压力,因为每个RDD都以字节数组的形式存储,而不是很多的对象。

在driver程序的日志文件中,检查BlockManager相关的信息,可以看到一个RDD分区是否不适合内存。另外,每个driver的SparkContext会在4040端口启动一个HTTP服务,提供关于运行环境以及正在运行的job的有用信息,包括缓存的RDD分区的信息。

默认情况下,使用常规的Java序列化框架来序列化RDD分区,不过Kryo序列化框架(下节讨论)通常是更好的选择,在大小和速度两方面都更优秀。如果把序列化后的分区进行压缩,可以节省更多的空间(再一次付出CPU的代价),设置spark.rdd.compress属性为true来启用压缩,属性spark.io.compression.codec是可选设置。

如果重算一个数据集非常昂贵,那么MEMORY_AND_DISK(如果数据集在内存中放不下,就写到磁盘上)或者MEMORY_AND_DISK_SER(如果序列化后的数据集在内存中放不下,就写到磁盘上)是合适的。

还有一些更高级的和实验中的持久化级别,用来在集群中的多个节点上复制分区,或者使用off-heap内存——更多细节,查看Spark文档。

Serialization

在Spark中需要考虑序列化的两个方面:序列化数据和序列化函数(或闭包)。

Data

首先来看数据的序列化。默认情况下,Spark使用Java序列化框架在执行器之间的网络上传输数据,或者以序列化的形式来缓存数据。对程序员来说,Java的序列化很好理解,只需确定你使用的类实现了java.io.Serializable接口或者java.io.Externalizable接口,但从性能和大小的角度来看,这种方式的效率不高。

对于大多数的Spark程序,更好的选择是Kryo序列化框架。Kryo是一个高效的通用的Java序列化库。要使用Kryo,在driver程序的SparkConf上设置spark.serializer属性如下:

conf.set("spark.serializer",  "org.apache.spark.serializer.KryoSerializer")

Kryo不要求你的类实现特定接口,因此简单的Java对象不需要任何改动即可在RDD中使用。话虽如此,如果在使用一个类之前把它注册到Kryo会更加高效。这是因为Kryo会创建一个引用,指向那个序列化对象的类(一个对象对应一个引用),如果类已注册,该引用是个整数ID,如果类没有注册,该引用是类的全名。这个引导仅仅适用于你自己的类,Scala类和许多其他的框架类(比如Avro Generic或者Thrift类)已经由Spark注册了。

向Kryo注册类也很简单。创建一个KryoRegistrator的子类,覆盖registerClasses()方法:

class CustomKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[WeatherRecord])
  }
}

最后,在driver程序中,把属性spark.kryo.registrator设置为你的KryoRegistrator实现类的完整类名:

conf.set("spark.kryo.registrator", "CustomKryoRegistrator")

Functions

通常,函数的序列化将"刚好工作":在Scala中,函数都是可序列化的,使用标准Java序列化机制。这也是Spark向远程执行器节点发送函数时使用的方式。即使在本地模式下运行,Spark也会序列化函数。如果你在无意中引入了不可序列化的函数(比如,从一个非序列化类的方法转换过来的函数),你会在开发过程的早期阶段发现它。

Shared Variables

Spark程序经常需要访问一些数据,这些数据不是一个RDD的一部分。例如,下面的程序在一个map()操作中使用了一个查找表(lookup table):

val lookup = Map(1 -> "a", 2 -> "e", 3 -> "i", 4 -> "o", 5 -> "u")
val result = sc.parallelize(Array(2, 1, 3)).map(lookup(_))
assert(result.collect().toSet === Set("a", "e", "i"))

这段程序会正确工作(变量lookup被序列化为闭包的一部分,传递给map()),但是还有一个更高效的方式来达到同样的目的:使用广播变量。

Broadcast Variables

广播变量在序列化之后发送给每一个执行器,在那里缓存起来,因此后续的任务可以在需要时访问。这与普通的变量不同。普通的变量会序列化为闭包的一部分,然后在网络上传输,一个任务一次传输。广播变量的角色,与MapReduce中的分布式缓存相似,不过Spark内部的实现是把数据存储在内存中,仅当内存被耗尽时才写到磁盘。

广播变量的创建方法是,把需要广播的变量传递给SparkContext的broadcast()方法。T类型的变量被包装进Broadcast[T],然后返回:

val lookup: Broadcast[Map[Int, String]] =
    sc.broadcast(Map(1 -> "a", 2 -> "e", 3 -> "i", 4 -> "o", 5 -> "u"))
val result = sc.parallelize(Array(2, 1, 3)).map(lookup.value(_))
assert(result.collect().toSet === Set("a", "e", "i"))

在RDD的map()操作中,调用这个广播变量的value来访问它。

顾名思义,广播变量是单向传送的,从driver到task——没有办法更新一个广播变量,然后回传给driver。为此,我们需要一个累加器。

Accumulators

累加器是一个共享变量,和MapReduce中的计数器一样,任务只能对其增加。在job完成以后,累加器的最终值可以在driver程序中获取。下面的例子中,使用累加器计算一个整数RDD中的元素数量,同时使用reduce()操作对RDD中的值求和:

val count: Accumulator[Int] = sc.accumulator(0)
val result = sc.parallelize(Array(1, 2, 3))
  .map(i => { count += 1; i })
  .reduce((x, y) => x + y)
assert(count.value === 3)
assert(result === 6)

第一行代码使用SparkContext的accumulator()方法,创建了一个累加器变量count。map()操作是一个恒等函数,副作用是增加count。当Spark job的结果计算出来之后,累加器的值通过调用value来访问。

在这个例子中,我们使用一个Int作为累加器,但任何的数值类型都是可以的。Spark还提供了两种方法,一是使用累加器的结果类型与“被增量”的类型不同(参见SparkContext的accumulable()方法),二是可以累加可变集合中的值(通过accumulableCollection())。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容