【Spark简介】
Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架。最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一。Spark作为Apache顶级的开源项目,是一个快速、通用的大规模数据处理引擎,和Hadoop的MapReduce计算框架类似,但是相对于MapReduce,Spark凭借其可伸缩、基于内存计算等特点,以及可以直接读写Hadoop上任何格式数据的优势,进行批处理时更加高效,并有更低的延迟。Spark已经成为轻量级大数据快速处理的统一平台,各种不同的应用,如实时流处理、机器学习、交互式查询等,都可以通过Spark建立在不同的存储和运行系统上。相对于MapReduce上的批量计算、迭代型计算以及基于Hive的SQL查询,Spark可以带来上百倍的性能提升。
【Spark生态系统组件】
Spark Core:整个生态系统的核心组件,是一个分布式大数据处理框架。Spark Core提供了多种资源调度管理,通过内存计算、有向无环图(DAG)等机制保证分布式计算的快速,并引入了RDD的抽象保证数据的高容错性。
Spark Streaming:基于微批量方式的计算和处理,可以用于对实时数据流进行高吞吐、高容错的流式处理。
Spark SQL:通过JDBC API将Spark数据集暴露出去,而且还可以用传统的BI和可视化工具在Spark数据上执行类似SQL的查询。用户还可以用Spark SQL对不同格式的数据执行ETL,将其转化暴露给特定的查询。
Spark MLlib:一个可扩展的Spark机器学习库,由通用的学习算法和工具组成。打通数据收集、数据清理、特征提取,模型训练,测试、评估整个流程。
Spark GraphX:用于图计算和并行图计算的API。通过引入弹性分布式属性图,一种顶点和边都带有属性的有向多重图,扩展了Spark RDD。
【Spark应用场景】
1、快速查询系统:基于日志数据的快速查询系统业务构建于Spark之上,利用其快速查询以及内存表等优势,能够承担大部分日志数据的即时查询工作;在性能方面,普遍比Hive快2~10倍,如果使用内存表的功能,性能将会比Hive快百倍。
2、实时日志采集处理:通过Spark Streaming实时进行业务日志采集,快速迭代处理,并进行综合分析,能够满足线上系统分析要求。
3、业务推荐系统:使用Spark将业务推荐系统的小时和天级别的模型训练转变为分钟级别的模型训练,有效优化相关排名、个性化推荐以及热点点击分析等。
4、定制广告系统:在定制广告业务方面需要大数据做应用分析、效果分析、定向优化等,借助Spark快速迭代的优势,实现了“数据实时采集、算法实时训练、系统实时预测”的流程,支持上亿的请求量处理;模拟广告投放计算效率高、延迟小,同MapReduce相比延迟至少降低一个数量级。
【Spark四大特性】
1、速度快:与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上;而基于磁盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效地处理数据流。
2、易使用:支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同应用。
3、通用性:Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(通过Spark MLlib)和图计算(Spark GraphX)。
这些不同类型的处理都可以在同一应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台处理问题,减少开发和维护的人力成本和部署平台的物理成本。
4、兼容性:Spark可以非常方便地与其他开源产品进行融合。比如,可以使用Hadoop的YARN作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase等。这对于已部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark强大的处理能力。Spark也可以不依赖第三方的资源管理器和调度器,它实现了Standalone作为其内置资源管理器和调度框架。
【Spark基本架构】
Cluster Manager:Spark的集群管理器,主要负责资源的分配与管理。集群管理器分配的资源属于一级分配,它将各个Worker上的内存、CPU等资源分配给应用程序,但是并不负责对Executor的资源分配。目前,Standalone、YARN、Mesos、EC2等都可以作为Spark的集群管理器。
Worker:Spark的工作节点。对Spark应用程序来说,由集群管理器分配得到资源的Worker节点主要负责以下工作:创建Executor,将资源和任务进一步分配给Executor,同步资源信息给Cluster Manager。
Executor:执行计算任务的一线进程。主要负责任务的执行以及与Worker、Driver App的信息同步。
Driver App:客户端驱动程序,也可以理解为客户端应用程序,用于将任务程序转换为RDD和DAG,并与Cluster Manager进行通信与调度。
【Spark三种部署方式】
1、Standalone模式:Spark框架自带了完整的资源调度管理服务,可以独立部署到一个集群中,而不需要依赖其他系统来为其提供资源管理调度服务。在架构的设计上,Spark与MapReduce1.0完全一致,都是由一个Master和若干个Slave构成,并且以槽(slot)作为资源分配单位。不同的是,Spark中的槽不再像MapReduce1.0那样分为Map槽和Reduce槽,而是只设计了统一的一种槽提供给各种任务来使用。
2、Spark on Mesos模式:Mesos是一种资源调度管理框架,可以为运行在它上面的Spark提供服务。Spark on Mesos模式中,Spark程序所需要的各种资源,都由Mesos负责调度。
3、Spark on YARN模式:Spark可运行于YARN之上,与Hadoop进行统一部署,即“Spark on YARN”,依赖于YARN的资源管理和调度依赖,分布式存储则依赖HDFS。
【Spark一些基本概念】
RDD:Resillient Distributed Dataset(弹性分布式数据集)的简称,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型。
DAG:是Directed Acyclic Graph(有向无环图)的简称,反映RDD之间的依赖关系。
Executor:是运行在工作节点(WorkerNode)的一个进程,负责运行Task。
Application:用户编写的Spark应用程序。
Task:运行在Executor上的工作单元。
Job:一个Job包含多个RDD及作用于相应RDD上的各种操作。
Stage:是Job的基本调度单位,一个Job会分为多组Task,每组Task被称为Stage,或者也被称为TaskSet,代表了一组关联的、相互之间没有Shuffle依赖关系的任务组成的任务集。
【Spark RDD设计背景】
许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘工具,共同之处是,不同计算阶段之间会重用中间结果。目前的MapReduce框架都是把中间结果写入到HDFS中,带来了大量的数据复制、磁盘IO和序列化开销。RDD就是为了满足这种需求而出现的,它提供了一个抽象的数据架构,不必关心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理,不同RDD之间的转换操作形成依赖关系,可以实现管道化,避免中间数据存储。一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个RDD可分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算。
【Spark RDD基本概念】
RDD是一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,不能直接修改,只能基于稳定的物理存储中的数据集创建RDD,或者通过在其他RDD上执行确定的转换操作(如map、join和group by)而创建得到新的RDD。RDD提供了丰富的操作以支持常见的数据运算,分为“动作”(Action)和“转换”(Transformation)两种类型。RDD提供的转换接口都非常简单,都是类似map、filter、groupBy、join等粗粒度的数据转换操作,而不是针对某个数据项的细粒度修改。RDD典型的执行过程如下:1、RDD读入外部数据源进行创建。2、RDD经过系列的转换(Transformation)操作,每一次都会产生不同的RDD,供给下一个转换操作使用。3、最后一个RDD经过“动作”操作进行转换,并输出到外部数据源。
【Spark任务运行流程】
1、首先为应用构建起基本的运行环境,即由Driver创建一个SparkContext,进行资源的申请、任务的分配和监控。
2、资源管理器为Executor分配资源,并启动Executor进程。
3、SparkContext根据RDD的依赖关系构建DAG图,DAG图提交给DAGScheduler解析成Stage,然后把一个个TaskSet提交给底层调度器TaskScheduler处理;Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor运行,并提供应用程序代码。
4、Task在Executor上运行,把执行结果反馈给TaskScheduler和DAGScheduler,运行完毕后写入数据并释放所有资源。
【RDD间的依赖关系】
窄依赖:表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区.
宽依赖:表现为存在一个父RDD的一个分区对应一个子RDD的多个分区。
Spark通过分析各个RDD的依赖关系生成了DAG,再通过分析各个RDD中的分区之间的依赖关系来决定如何划分Stage,具体划分方法是:
- 在DAG中进行反向解析,遇到宽依赖就断开
- 遇到窄依赖就把当前的RDD加入到Stage中
- 将窄依赖尽量划分在同一个Stage中,可以实现流水线计算
【Stage的类型】
Spark Stage的类型包括两种:ShuffleMapStage和ResultStage,具体如下:
- ShuffleMapStage:不是最终的Stage,在它之后还有其他Stage,所以,它的输出一定需要经过Shuffle过程,并作为后续Stage的输入;这种Stage是以Shuffle为输出边界,其输入边界可以是从外部获取数据,也可以是另一个ShuffleMapStage的输出,其输出可以是另一个Stage的开始;在一个Job里可能有该类型的Stage,也可能没有该类型Stage;
- ResultStage:最终的Stage,没有输出,而是直接产生结果或存储。这种Stage是直接输出结果,其输入边界可以是从外部获取数据,也可以是另一个ShuffleMapStage的输出。在一个Job里必定有该类型Stage。因此,一个Job含有一个或多个Stage,其中至少含有一个ResultStage。
【Spark RDD算子】
Spark 算子大致可以分为以下两类:
1、Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理。
Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。典型算子:map、flatMap、filter、reduceByKey。
2、Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业。
Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统。典型算子:take、count、savaAsTextFile等。
从小方向来说,Spark 算子大致可以分为以下三类:
1、Value数据类型的Transformation算子,这种变换并不触发提交作业,针对处理的数据项是Value型的数据。
2、Key-Value数据类型的Transfromation算子,这种变换并不触发提交 作业,针对处理的数据项是Key-Value型的数据对。
3、Action算子,这类算子会触发SparkContext提交Job作业。
【RDD持久化工作原理】
Spark非常重要的一个功能特性就是可以将RDD 持久化在内存中,当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition,这样针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD ,而不需要计算多次该RDD。RDD持久化对于迭代式算法和快速交互式应用来说是非常重要的。
要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。Spark的持久化机制是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。
cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中去除缓存,那么可以使用unpersist()方法。
【RDD持久化策略】
- MEMORY_ONLY:以非序列化的Java对象的方式持久化在JVM内存中。如果内存无法完全存储RDD所有的partition,那么那些没有持久化的partition就会在下一次需要使用它的时候,重新被计算。
- MEMORY_AND_DISK:同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中。下次需要使用这些partition时,需要从磁盘上读取。
- MEMORY_ONLY_SER:同MEMORY_ONLY,但是会使用Java序列化方式,将Java对象序列化后进行持久化。可以减少内存开销,但是需要进行反序列化,因此会加大CPU开销。
- MEMORY_AND_DSK_SER:同MEMORY_AND_DSK。但是使用序列化方式持久化Java对象。
- DISK_ONLY:使用非序列化Java对象的方式持久化,完全存储到磁盘上。
- MEMORY_ONLY_2或者MEMORY_AND_DISK_2等:如果是尾部加了2的持久化级别,表示会将持久化数据复用一份,保存到其他节点,从而在数据丢失时,不需要再次计算,只需要使用备份数据即可。
【RDD持久化策略选择原则】
Spark 的存储级别的选择,核心问题是在内存使用率和 CPU 效率之间进行权衡。建议按下面的过程进行存储级别的选择 :
- 如果使用默认的存储级别(MEMORY_ONLY),存储在内存中的 RDD 没有发生溢出,那么就选择默认的存储级别。默认存储级别可以最大程度的提高 CPU 的效率,可以使在 RDD 上的操作以最快的速度运行。
- 如果内存不能全部存储 RDD,那么使用 MEMORY_ONLY_SER,并挑选一个快速序列化库将对象序列化,以节省内存空间。使用这种存储级别,计算速度仍然很快。
- 除了在计算该数据集的代价特别高,或者在需要过滤大量数据的情况下,尽量不要将溢出的数据存储到磁盘。因为,重新计算这个数据分区的耗时与从磁盘读取这些数据的耗时差不多。
- 如果想快速还原故障,建议使用多副本存储级别(例如,使用 Spark 作为 web 应用的后台服务,在服务出故障时需要快速恢复的场景下)。所有的存储级别都通过重新计算丢失的数据的方式,提供了完全容错机制。但是多副本级别在发生数据丢失时,不需要重新计算对应的数据库,可以让任务继续运行。
【Spark共享变量】
Spark中的两个重要抽象是RDD和共享变量。在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本,更新这些副本的值也不会影响驱动器中的对应变量。但是有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。为了满足这种需求,Spark提供了两种类型的变量:广播变量(broadcast variables)和累加器(accumulators)。广播变量用来把变量在所有节点的内存之间进行共享。累加器则支持在所有不同节点之间进行累加计算(比如计数或者求和)。
【广播变量(broadcast variable)】
广播变量用来高效分发较大的公共对象。如果要在分布式计算里面分发大对象,例如:字典,集合等,都由Driver端进行分发,广播变量允许在每个机器上缓存一个只读的变量,而不是为每个任务都生成一个副本。如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源,如果将这个变量声明为广播变量,那么只是每个executor拥有一份,这个executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。通过这种方式,就可以非常高效地给每个节点提供一个大的输入数据集的副本。Spark的“动作”操作会跨越多个阶段(stage),对于每个阶段内的所有任务所需要的公共数据,Spark都会自动进行广播,通过广播方式进行传播的变量。广播变量在Driver端定义,只能在Driver端改变广播变量的值,Executor端无法修改。
【累加器(Accumulator)】
累加器用来对信息进行聚合,累加器是仅仅被相关操作累加的变量,通常可以被用来实现计数器(counter)和求和(sum)。Spark原生支持数值型(numeric)的累加器,程序开发人员可以编写对新类型的支持。如果创建累加器时指定了名字,则可以在Spark UI界面看到,这有利于理解每个执行阶段的进程。
一个数值型的累加器,可以通过调用SparkContext.accumulator()来创建。运行在集群中的任务,就可以使用add方法来把数值累加到累加器上,但是,这些任务只能做累加操作,不能读取累加器的值,只有任务控制节点(Driver Program)可以使用value方法来读取累加器的值。
累加器提供了将工作节点中的值聚合到驱动器程序中的语法。解决driver和excutor端的数据不能共享的问题。累加器在Driver端定义赋初始值,且只能在Driver端读取,在Excutor端更新。常用在调试时对作业执行过程中的事件进行计数。
【Spark SQL】
Spark SQL是Spark中用于结构化数据处理的模块。与基础的Spark RDD API不同,Spark SQL的接口提供了更多关于数据的结构信息和计算任务的运行时信息。在Spark应用中,可以无缝的使用SQL语句亦或是DataFrame API对结构化数据进程查询。
Spark SQL有以下几个特点:
- 数据兼容:可从Hive表、外部数据库(JDBC)、RDD、Parquet文件、JSON文件获取数据,可通过Scala方法或SQL方式操作这些数据,并把结果转回RDD。
- 组件扩展:SQL语法解析器、分析器、优化器均可重新定义。
- 性能优化:内存列存储、动态字节码生成等优化技术,内存缓存数据。
- 多语言支持:Scala、Java、Python、R。
【Spark SQL的优点】
1、集成:Apache Spark SQL将SQL查询与Spark程序集成。可以将结构化数据作为分布式数据集(RDD)查询,可以使用Spark SQL紧密集成属性与复杂的分析算法一起运行SQL查询。
2、统一数据访问:使用Spark SQL,可以加载和查询不同来源数据。Schema-RDD允许单个接口高效处理结构化数据。例如,Apache Hive tables, parquet files, and JSON files.
3、高兼容性:在Apache Spark SQL中,可以在现有仓库上运行未修改的Hive查询,兼容现有Hive数据以及UDF。
4、标准连接:可通过JDBC或ODBC连接,包括具有行业标准JDBC和ODBC连接的服务器模式。
5、可扩展性:为了支持查询容错和大型作业,利用了RDD模型,使用相同的引擎进行交互式查询。
6、性能优化:Spark SQL中的查询优化引擎在整个计划中选择最佳的执行计划。
【Spark SQL之DataFrame】
DataFrame是一个以命名列方式组织的分布式数据集。在概念上类似于关系型数据库中的一张表。DataFrame让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能。RDD是分布式的 Java对象的集合,比如,RDD[Person]是以Person为类型参数,但是,Person类的内部结构对于RDD而言却是不可知的。DataFrame是一种以RDD为基础的分布式数据集,也就是分布式的Row对象的集合(每个Row对象代表一行记录),提供了详细的结构信息,也就是常说的模式(schema),Spark SQL可以清楚地知道该数据集中包含哪些列、每列的名称和类型。和RDD一样,DataFrame的各种变换操作也采用惰性机制,只是记录了各种转换的逻辑转换路线图(DAG图),不会发生真正的计算,这个DAG图相当于一个逻辑查询计划,最终,会被翻译成物理查询计划,生成RDD DAG,按照RDD DAG的执行方式去完成最终的计算得到结果。
【Hive on Mapreduce VS SparkSQL】
适用场景上:Hive on Mapreduce的出现可以让那些精通SQL技能、但是不熟悉MapReduce 、编程能力较弱与不擅长Java语言的用户能够在HDFS大规模数据集上很方便地利用SQL 语言查询、汇总、分析数据,毕竟精通SQL语言的人要比精通Java语言的多得多,Hive适合处理离线非实时数据。
SparkSQL既可以运行本地local模式,也可以以Standalone、cluster等多种模式运行在Yarn、Mesos上,还可以运行在云端例如EC2。此外,Spark的数据来源非常广泛,可以处理来自HDFS、HBase、 Hive、Cassandra、Tachyon上的各种类型的数据。适用于实时性要求或者速度要求较高的场所。
性能上:sparksql和hive on spark时间差不多,但都比hive on mapreduce快很多,官方数据认为spark会被传统mapreduce快10-100倍。
【Spark MLlib】
MLlib是Spark的机器学习(ML)库。旨在简化机器学习的工程实践工作,并方便扩展到更大规模。MLlib由一些通用的学习算法和工具组成,包括:1.机器学习算法:如常规机器学习算法包括分类、回归、聚类和协同过滤。2.特征工程:特征提取、特征转换、特征选择以及降维。3.管道:构造、评估和调整的管道的工具。4.存储:保存和加载算法、模型及管道。5.实用工具:线性代数,统计,数据处理等。
MLlib目前分为两个代码包:spark.mllib 包含基于RDD的原始算法API。spark.ml 则提供了基于DataFrames 高层次的API,可以用来构建机器学习管道。推荐使用spark.ml,因为基于DataFrames的API更加通用而且灵活,包括Spark数据源,SQL/DataFrame查询,为多种机器学习算法与编程语言提供统一的API,DataFrames有助于实现机器学习管道,特别是特征转换。spark2.0开始,基于RDD的API已经进入的维护模式。
【Spark Streaming】
Spark streaming是Spark核心API的一个扩展,它对实时流式数据的处理具有可扩展性、高吞吐量、可容错性等特点。可以从kafka、flume、Twitter、ZeroMQ、Kinesis等源获取数据,也可以通过由高阶函数map、reduce、join、window等组成的复杂算法计算出数据。最后,处理后的数据可以推送到文件系统、数据库、实时仪表盘中,或者可以将处理后的数据应用到Spark的机器学习算法、图处理算法中去。(数据流是连续到达的无穷序列。流处理将不断流动的输入数据分成独立的单元进行处理。流处理是对流数据的低延迟处理和分析。实时处理用例包括:网站监控,网络监控,欺诈识别,网页点击,广告,物联网传感器等。)