Spark学习总结

Spark相关

Spark是用于大数据处理的集群计算框架,没有使用MapReduce作为执行引擎,而是使用了自研的分布式运行环境(DAG引擎)在集群上执行工作。Spark可以在YARN上运行,并支持Hadoop文件及HDFS。

Spark最突出的表现在于它能将作业与作业之间产生的大规模的工作数据集存储在内存中,在性能上要优于等效的MapReduce工作流,通常可以高出一个数量级。因为MapReduce的数据集始终需要从磁盘上加载。

Spark常用于迭代算法(对一个数据集重复应用某个函数,直到满足条件退出)和交互式分析(用户向数据集发出一系列专用的探索性查询)

Spark与MapReduce的区别

  • Spark与MapReduce一样,也有作业(job)的概念,Spark的作业比MapReduce的作业更为通用,Spark作业是由任意的多阶段(stages)有向无环图(DAG)构成,其中每个阶段相当于MapReduce中的map阶段或者reduce阶段。

  • 这些阶段在Spark运行环境中被分解成多个任务(task),任务并行运行在分布于集群中的RDD(弹性分布式数据集Resilient Distributed Dataset)分区上。像MapReduce中的任务一样。

  • Spark作业始终运行在应用上下文中(applicationContext,用实例SparkContext表示),它提供了RDD分组以及共享变量。一个应用(application)可以串行或者并行运行多个作业,并为这些作业提供访问由同一应用的先前作业所缓存的RDD的机制。

弹性分布式数据集RDD

RDD是Spark最核心的概念,它是在集群中跨多个机器分区存储的一个只读的对象集合。在典型的Spark程序中,首先要加载一个或多个RDD,作为输入再通过一系列转换得到一组目标RDD,然后对这些目标RDD执行一个动作,如计算出结果或者写入持久存储器。

“弹性分布式数据集”中的“弹性”指的是Spark可以通过重新安排计算来自动重建丢失的分区。

加载RDD或者执行转换不会立即触发任何数据处理操作,只是重建了一个计算的计划。只有当对RDD执行某个动作时,才会出发真正的计算。

创建

RDD的创建有三种方法:

1、并行化一个集合(内存中的对象集合):该方法适用于对少量的输入数据进行并行的CPU密集型计算

2、使用外部存储器(如:HDFS)中的数据集:创建一个对外部数据集的引用,如:为文本文件创建一个String对象的RDD

 val text: RDD[String] = sc.textFile(inputPath)

Spark内部使用MapReduce API的TextInputFormat来读取文件,其文件分割行为与Hadoop一致,因此在使用HDFS的情况下,每个HDFS块对应于一个Spark分区。

3、对现有的RDD进行转换。

转换和动作

Spark为RDD提供了两大类操作:转换(transformation)和动作(action)。转换时从现有的RDD生成新的RDD,而动作则触发对RDD的计算并对计算结果执行某种操作,返回给用户或保存在外部存储器(计算在内存中进行)。动作时立即性的,而转换则是惰性的,因为在对RDD执行一个动作之前都不会为该RDD的任何转换操作采取实际行动。

判断一个操作是转换还是动作:观察其返回类型,如果返回的类型是RDD,则是一个转换操作,否则是一个动作。

Spark API文档:https://spark.apache.org/docs/2.2.0/api/java/index.html

聚合转换:按键为键值对RDD进行聚合操作的三个主要转换函数是:reduceByKey()、foldByKey()、aggregateByKey()

持久化

调用cache()方法会在executor的内存中持久化保存RDD的每个分区(缓存),如果executor没有足够的内存来存储RDD分区,计算不会失败,但需要重新计算分区。因此Spark提供了不同级别的持久化行为。

  • 默认的持久化级别是MEMORY_ONLY,使用对象在内存中的常规表示方法

  • MEMORY_ONLY_SER级别:通过把分区中的元素序列化为字节数组来实现,多了一份序列化CPU开销,但生成的序列化RDD分区大小适合被保存在内存中。

  • MEMORY_AND_DISK级别:如果数据集的大小不适合保存在内存中,就将其溢出到磁盘。

  • MEMORY_AND_DISK_SER级别:如果序列化数据集的大小不适合保存在内存中,就将其溢出到磁盘。

序列化

在使用Spark时,要从两方面来考虑序列化:数据序列化和函数序列化(闭包函数)

数据序列化

默认情况下,Spark在通过网络将数据从一个executor发送到另一个executor时,或者以序列化的形式缓存(持久化)数据时,使用的是Java序列化机制:类实现java.io.Serializable或者java.io.Externalizable接口,该机制性能、效率不高。

使用Kryo序列化机制是更好的选择,Kryo是一个高效的Java序列化库,在驱动程序的SparkConf中设置spark.serializer属性即可使用Kryo

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

在Kryo中注册类:创建一个KryoRegistrator子类,然后重写registerClasses()方法,可提升序列化性能。

函数序列化:通常函数序列化会使用默认的Java序列化机制。

共享变量

共享变量分为广播变量和累加器变量两种。

广播变量

广播变量(broadcast variable)在经过序列化后被发送到各个executor,然后缓存在那里,以便后期任务可以在需要时访问。相当于MapReduce中的分布式缓存,区别在于Spark将数据保存在内存中,只有在内存耗尽时才会溢出到磁盘上。创建一个广播变量:通过SparkContext的broadcast()方法传递。广播变量是单向传播的。

累加器

累加器(accumulator)是在任务中只能对它做加法的共享变量,类似于MapReduce中的累加器,当作业完成后,driver程序可以检索累加器的最终值。通过SparkContext的accumulator()方法来创建一个累加器变量。

Spark作业的运行机制

实体

  • driver:负责托管应用(SparkContext)并未作业调度任务。通常作为一个不由集群管理器(cluster manager)管理的客户端来运行。

  • executor:专属于应用,在应用运行期运行,并执行该应用的任务,一般运行在集群的计算机上。

作业的提交

Spark作业运行机制.png

Spark作业运行过程:

  • 当对RDD执行一个动作(比如count())时,会自动提交一个Soark作业,导致内部的SprakContext调用runJob()。步骤1。

  • 然后将调用传递给作为driver的一部分运行的调度程序。步骤2。调度程序由两部分组成:DAG调度程序和任务调度程序。其中DAG调度程序把作业分解为若干阶段(stage),并由这些阶段构成一个DAG。任务调度程序则负责把每个阶段中的任务提交到集群中。

  • 当DAG调度程序已构建一个完整的多阶段DAG,它就将每个阶段的任务集合提交给任务调度程序。步骤3

  • 当任务集合被发送到任务调度程序后,任务调度程序开始为executor分配任务,分配的任务通过调度程序后端启动。步骤4

  • 调度程序后端向executor发送远程启动任务的消息。步骤5.

  • executor接收到消息通知后开始运行任务。步骤6

DAG的构建

任务可分为两种类型:shuffle map任务和result任务。

  • shuffle map任务:与MapReduce中的shuffle的map任务,每个shuffle map任务在一个RDD分区上运算,然后把结果发送回driver,再由driver将每个分区的计算结果汇集成最终结果

DAG调度程序负责将一个阶段分解成若干任务以提交给任务调度程序。DAG调度程序会为每个任务赋予一个位置偏好(placement preference),以允许任务调度程序充分利用数据本地化(data locality),例如:对于存储在HDFS上的输入RDD分区而言,它的任务的位置偏好就是托管这些分区的数据块的datanode(称为node local),而对于在内存中缓存的RDD分区,其任务位置偏好则是那些保存RDD分区的executor(称为process local)

任务调度

当任务集合被发送到任务调度程序后,任务调度程序用为该应用运行的executor的列表,在斟酌位置偏好的同时构建任务到executor的映射。接着任务调度程序将任务分配给具有可用内核的executor,并且在executor完成任务时继续分配更多的任务,直到任务集合全部完成。

任务调度程序在为某个executor分配任务时,首先分配的是进程本地任务(process-local),再分配节点本地任务(node-local),然后分配机架本地任务(rack-local),最后分配任意任务(非本地)或者推测任务(speculative task)。推测任务时现有任务的复本,如果任务运行得比预期缓慢,则调度器可以将其作为备份来运行。

当任务完成或者失败时,executor会向driver发送状态更新消息,如果失败了,任务调度程序将在另一个executor上重新提交任务,若是启用了推测任务,则还会为运行缓慢的任务启动推测任务。

任务执行

executor按如下方式运行任务:

  • 首先,确保任务的JAR包和文件依赖关系都是最新的,executor在本地高速缓存中保留了先前任务已使用的所有依赖,因此只有在它们更新的情况下才会重新下载。

  • 然后,反序列化任务代码,因为任务代码是以启动任务消息的一部分而发生的序列化字节

  • 最后JVM执行任务代码,任务运行在与executor相同的JVM,因此任务的启动没有进程开销。

任务执行结果被序列化并发送到executor后端,然后以状态更新消息的形式返回driver。

执行器和集群管理器

Spark依靠executor(执行器)来运行构成Sprak作业的任务,负责管理executor生命周期的是集群管理器(cluster manager),Spark有多种不同特性的集群管理器,可分为如下几种:

  • 本地模式:在本地模式下,一个executor与driver运行在用一个JVM中,该模式对于测试或运行小规模作业时非常有用。其主URL为local、local[n]n 个线程或者local(*)机器的每个内核一个线程。

  • 独立模式:独立模式的集群管理器是一个简单的分布式实现,它运行了一个master以及多个worker。当Spark应用启动时,master要求worker代表应用生成多个executor进程。其主URL为:spark://host:port

  • Mesos模式:Apache Mesos是一个通用的集群资源管理器,允许根据组织策略在不同的应用之间细化资源共享。这种模式的主URL为:mesos://host:port

  • YARN模式:每个运行的Spark应用对应于一个YARN应用实例,每个executor在自己的YARN容器中运行。其主URL为:yarn-client或yarn-cluster

运行在YARN上的Spark

为了在YARN上运行,Spark提供了两种部署方式:YARN客户端模式和YARN集群模式。YRAN客户端模式的driver在客户端运行,而YARN集群模式的driver在YARN的application master集群上运行。

1.YARN客户端模式:

YARN客户端模式Spark.png
  • 在YARN客户端模式下,当driver构建新的SparkContext实例时,便启动了与YARN之间的交互。步骤1

  • SparkContext向YARN资源管理器提交一个YARN应用。步骤2

  • YARN资源管理器则启动集群节点管理器上的YARN容器,并为在其中运行一个SparkExecutorLauncher的application master。步骤3

  • ExecutorLauncher向资源管理器请求资源。步骤4

  • 启动ExecutorBackend进程作为分配给它的容器。步骤5

每个executor在启动时会连接回SparkContext,注册自身。即向SparkContext提供了关于可用于运行任务的executor的数量及其位置信息。这些信息被用在任务的位置偏好策略中。启动的executor的数量在saprk-shell、spark-submit或py-spark中设置。executor使用的内核数默认为1个,内存默认2014MB

2.YARN集群模式:

YARN集群模式Spark.png

启动流程与客户端模式基本一致,仅是在步骤1,sprak-submit不会允许任何用户代码,代码在集群上运行。


参考资料:《Hadoop权威指南》

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 222,104评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,816评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,697评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,836评论 1 298
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,851评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,441评论 1 310
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,992评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,899评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,457评论 1 318
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,529评论 3 341
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,664评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,346评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,025评论 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,511评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,611评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,081评论 3 377
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,675评论 2 359

推荐阅读更多精彩内容