Spark学习记录

Spark基础知识

  1. Spark是基于内存的计算框架,但是也存在磁盘IO。
  2. 使用的排序算法:归并排序(大数据里基本都使用该排序算法)
  3. 四种运行模式:
    • Local (一般用于测试)
    • Standalone(Spark自带模式)
    • Mesos
    • YARN
  4. Spark容错的几种方法:
    • 缓存
    • checkpoint
    • 重新计算

基本术语

  • Application: Spark应用程序,即我们编写的包含main函数的整个程序
  • Driver Program: 运行main函数并创建SparkContext
  • Cluster Manager: 在集群上获取资源的外部服务(Standalone模式下是Master,Yarn模式下是ResourceManager)
  • Worker Node: 集群中任何可以运行Application代码的节点,每个节点可起一个或多个Executor
  • Executor: 在某个Worker Node上为某个Application启动的一个进程,该进程负责运行Task,并负责将数据存储到内存或磁盘上,每个Executor由若干core(核)组成,即一个进程由若干线程组成,Executor核数和内存大小可调节,每个Application都有各自独立的Executors
  • Task: 被送到某个Executor上的工作单元(最小单位)
  • Job: 一个Job包含多个Stage
  • Stage: 一个Stage包含多个Task

Block和partition

  • HDFS中,Block是分布式存储的最小单元,位于存储空间,大小固定(一般设置为128M),Block存在冗余(即副本),一个Block内的内容只能来自同一份文件,不允许跨文件
  • partition是RDD的最小单元,位于内存空间,大小不固定,Stage根据pipeline划分Tasks时,一个Task就对应一个partition

Spark运行简单原理

工作原理

工作原理:

  1. 集群启动,Worker Node注册到Cluster Manger并持续发送心跳
  2. 提交Application,Driver Pragram创建SparkContext上下文
  3. SparkContext向Cluster Manager申请资源
  4. Cluster Manger找到资源后告诉SparkContext去哪些Worker Node申请资源
  5. SparkContext向对应的Worker Node申请资源
  6. Worker Node创建Executor后告诉SparkContext资源已经分配好了
  7. SparkContext把Task分配到各Executor上执行
  8. Executor将计算结果返回给SparkContext
  9. 整个Application完成后,SparkContext通知Cluster Manager释放资源
  10. Culster Manger让对应的Worker Node释放资源,Executor被释放

注意:

  • 运行模式有多种,区别在于使用了不同的资源管理框架来分配集群资源。
  • Spark应用程序(Application)的原理是相同的,注意不要和资源管理框架混淆。

Standalone Client模式运行原理

编写代码,打jar包,以Client模式提交到Standalone集群中运行Spark程序。

  1. 在Shell中用spark-submit脚本提交Spark Application
  2. 在该机器上启动一个Driver进程
  3. 在Driver进程中new一个SparkContext对象
  4. SparkContext初始化的时候,首先会构造一个DAGScheduler
  5. SparkContext初始化的时候,其次会构造一个TaskScheduler
  6. TaskSchedulerBackend(TaskScheduler的后台进程)连接Master并向Master注册当前Application
  7. Master收到该Application的注册请求后,使用资源调度算法,在Spark集群的Worker Nodes上为该Application启动多个Executor
  8. Executor启动后反向注册到TaskScheduler上, 当所有Executor都反向注册完后,SparkContext初始化完毕
  9. SparkContext初始化完毕后,继续执行我们编写的程序,DAGScheduler开始切分Job,遇到一个Action算子(源码中使用sc.runJob())后,构成一个Job
  10. DAGScheduler根据宽依赖切分Job,构成多个Stage,然后DAGScheduler根据Pipeline将各个Stage封装成对应的TaskSet
  11. DAGScheduler将TaskSet传给TaskScheduler
  12. TaskScheduler根据之前获得的Executor信息把TaskSet里的每个Task提交到相应的Executor进程中执行
  13. Task被放到TaskRunner中,Executor进程启多个线程并行执行TaskRunner

注意:

  1. 各模式的运行原理是相同,区别在于:
  • Standalone模式:Master、Worker集群管理
  • Yarn模式:ResourceManager、NodeManager集群管理
  1. 两种提交模式的区别:
  • Client模式:Driver进程运行在提交Application的Client上
  • Cluster模式:在Client上提交Application后,Master分配一个Worker来启动Driver进程完成SparkContext的初始化

RDD基础知识

RDD:弹性分布式数据集(Resilient Distributed Dataset)

RDD是一个抽象的概念,具有如下特性:

  1. 一个RDD由多个partition(分区)组成,每个partition对应一个节点上的一块内存,因此partition不是抽象的概念
  2. 多个partition并行计算,因此partition数就等于计算并行度。
  3. RDD之间存在依赖列表(关系),可用于重新计算丢失的数据。
  4. [可选]K-V格式的RDD可以重新分区。
  5. [可选]Spark择优选择每个partition的计算位置(本地性),例如HDFS系统。
RDD原理图

Hadoop集群一般与Spark集群建在相同的节点上,各节点磁盘上分布式存储文件,构成HDFS系统。当Spark需要加载内容到内存中时,根据本地性,会从本节点的磁盘上获取内容并加载到内存中,HDFS系统中的Block对应于Spark中的partition。多个partition并行计算。

RDD持久化

使用持久化的时机:

  • 当RDD需要被重复使用的时候
  • 需要做容错的时候

几种RDD持久化策略:

  //持久化策略定义
  class StorageLevel private(
        private var _useDisk: Boolean,    // 是否使用本地磁盘(不是HDFS)
        private var _useMemory: Boolean,    // 是否使用内存
        private var _useOffHeap: Boolean,    // 是否不适用Java堆内存
        private var _deserialized: Boolean,    // 是否不序列化
        private var _replication: Int = 1    // 副本数(默认为1)
  )
  // 多种持久化策略定义
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

持久化策略的选择建议:

  1. 优先选择默认的 MEMORY_ONLY
  2. 如果内存空间紧张,选择 XXX_SER
  3. 如果需要对缓存的数据具备一定的容错性,选择 XXX_2
  4. 如果某RDD的计算代价比较大,则选择 MEMORY_AND_DISK

MEMORY_ONLYMEMORY_AND_DISK区别

  1. MEMORY_ONLY将RDD缓存到内存中的持久化区域,如果该区域内存不够,则多余的数据不缓存
  2. MEMORY_AND_DISK优先缓存RDD到内存中的持久化区域,如果该区域内存不够,则将多余的数据存到本地磁盘

RDD算子

分类:Transformation算子、Action算子

Transformation算子
  1. filter(f : T => Boolean) : RDD[T] => RDD[T]
  2. map(f : T => U) : RDD[T] => RDD[U]
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
  1. flatMap(f : T => Seq[U]) : RDD[T] => RDD[U]
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
  1. sample(fraction : Float) : RDD[T] => RDD[T] 取样
  2. groupByKey() : RDD[(K,V)] => RDD[(K,Seq[V])]
  3. reduceByKey(f : (V,V) => V) : RDD[(K,V)] => RDD[(K,V)]
  4. union() : (RDD[T],RDD[T]) => RDD[T]
  5. join() : (RDD[(K,V)],RDD[(K,W)]) => RDD[(K,(V,W))]
  6. cogroup() : (RDD[(K,V)],RDD[(K,W)]) => RDD[(K,(Seq[V],Seq[W]))]
  7. crossProduct() : (RDD[T],RDD[U]) => RDD[(T,U)]
  8. mapValues(f : V => W) : RDD[(K,V)] => RDD[(K,W)]
  9. sort(c : Comparator[K]) : RDD[(K,V)] => RDD[(K,V)]
  10. partitionBy(p: Partitioner[K]) : RDD[(K,V)] => RDD[(K,V)]
Action算子
  1. count() : RDD[T] => Long
  2. collect() : RDD[T] => Seq[T] 慎用,数据量大时会OOM
  3. reduce(f : (T,T) => T) : RDD[T] => T
  4. lookup(k : K) : RDD[(K,V)] => Seq[V]
  5. save(path : String) : RDD存储到磁盘

注意:
在一个Application可以有多个Action操作,那么运行时就会提交多个Job。
这些Job是按顺序依次执行的,前一个Job执行完才会提交后一个Job执行。

RDD的宽窄依赖

RDD的宽窄依赖是根据RDD算子判定的。例如:map、filter是窄依赖;groupByKey是宽依赖。

窄依赖:父RDD的一个partition只对应子RDD的一个partition,即子RDD的各partition计算不依赖于父RDD的其他partition,不发生shuffle(洗牌)操作

宽依赖:父RDD的一个partition对应子RDD的多个partition,即子RDD的各partition计算依赖于父RDD的其他partition,发生shuffle(洗牌)操作

注意:如果算子是窄依赖,则父RDD和子RDD的partition数量始终是相同的;如果算子是宽依赖(shuffle算子),一般情况下父RDD和子RDD的partition数量是相同的(除非是shuffle算子本身改变了partition数量或者设置了默认partition数)。
例如:父RDD的partition数为5,groupByKey()后,子RDD的partition数还是5,但是如果使用groupByKey(10),那么子RDD的partition数为10。
如果设置了spark.default.parallelism=6,那么groupByKey()后,子RDD的partition数是6,groupByKey(10)不受影响,子RDD的partition数还是10。

因此优先级如下:

  1. shuffle算子传入partition个数参数,则改变RDD的partition数为该参数值;
  2. 设置spark.default.parallelism,则改变partition数为该设置值;
  3. 否则子RDD的partition数和父RDD的partition数保持一样。

shuffle算子(宽依赖)理解:
shuffle算子本身分为两个阶段:map阶段reduce阶段

  • map阶段就是map算子,是窄依赖,父RDD和子RDD各partition一一对应。
  • reduce阶段是宽依赖,根据目标partition数对map阶段计算出来的RDD内各partition进行划分,然后取每个partition的对应部分组成新的partition,最终形成新的RDD(partition数就变成目标partition数了)。

shuffle算子有:

  • repartition、coalesce(可控制是否shuffle)
  • *ByKey:groupByKey、reduceByKey等
  • join、cogroup

共享变量

  1. 广播变量
    广播变量是只读的,且会被缓存到集群的各个台机器中,而不是像普通变量一样被copy到Task内(网络传输中需要序列化和反序列化),当广播变量创建(sc.broadcast(v))完成后不允许被修改。

  2. 累加器
    累加器只能用于累加计数(数字类型和自定义类型)。创建方法(sc.accumulator(v))。
    累加器只能在Driver端被读取,在Executor内只能累加不能读取。

----------------------
欢迎补充修正

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容