spark学习笔记1-基础部分

本文是对Spark基础知识的一个学习总结,包含如下几部分的内容:

  • 概述
  • 运行模式
  • Spark Shell
  • RDD/DataFrame/DataSet
  • 独立可执行程序
  • 小结

参考资料:

1、Spark的核心代码是用scala语言开发的,且提供了针对scala,java,python几种语言的官方API,在本文的示例中,我们采用的是基于scala语言的API。所以需要对scala语言有个基础的了解。可以参考scala系列文档,如《Scala学习笔记(1)-快速起步》《Scala学习笔记(2)-基础语法》

2、Spark对数据的处理,处处体现了函数式编程的思想,尤其是集合对象的几个原子操作(filter,map,reduce),熟悉这些对理解spark的使用非常有帮助,可参考文档《函数式编程之集合操作》《Scala学习笔记(5)-函数式编程》

一、概述

(一)简介

Apache Spark是一种通用的用于大数据处理的集群计算框架。Spark是由UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)开发和开源的一个通用计算框架,目前在大数据计算领域得到了广泛的应用,形成了一个高速发展应用广泛的生态系统。

spark的核心是采用scala语言开发的,同时它提供了多种编程语言(如java,scala.python)的客户端Api接口。spark框架可以运行在各种操作系统上。

spark目前最新的版本是2.4.0,是在2018年11月份发布的。本系列文章所介绍的都基于2.4.0版本。

整个spark系统主要由两部分组成:

1)spark core库:包括支持Spark通用执行引擎的API,以及一些核心的用户API。

2)spark扩展库:基于spark core库开发的一些重要的在不同场景下使用的功能,在实际的产品开发中,我们更多的基于这些扩展库来开发自己的应用。

(二)spark扩展库

在Spark Core的基础上,Spark提供了一系列面向不同应用需求的组件,主要有Spark SQL、StructuredStreaming/Spark Streaming、MLlib、GraphX。

下面分别简单介绍下。

1、Spark SQL

Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。Spark SQL支持多种数据源类型,例如Hive表、Parquet以及JSON等。Spark SQL不仅为Spark提供了一个SQL接口,还支持开发者将SQL语句融入到Spark应用程序开发过程中,无论是使用Python、Java还是Scala,用户可以在单个的应用中同时进行SQL查询和复杂的数据分析。

2、StructuredStreaming/Spark Streaming

Spark Streaming是spark中一个非常重要的扩展库,它是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,包括Kafk、Flume、以及TCP socket等,从数据源获取数据之后,可以使用诸如map、reduce和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统和数据库等。

但从Spark2.0开始,提出了新的实时流框架 Structured Streaming (2.0和2.1是实验版本,从Spark2.2开始为稳定版本)来替代Spark streaming,这时Spark streaming就进入维护模式。相比Spark Streaming,Structured Streaming的Api更加好用,功能强大。

3、MLlib

MLlib是Spark提供的一个机器学习算法库,其中包含了多种经典、常见的机器学习算法,主要有分类、回归、聚类、协同过滤等。MLlib不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语,包括一个通用的梯度下降优化基础算法。所有这些方法都被设计为可以在集群上轻松伸缩的架构。

4、GraphX

GraphX是Spark面向图计算提供的框架与算法库。GraphX中提出了弹性分布式属性图的概念,并在此基础上实现了图视图与表视图的有机结合与统一;同时针对图数据处理提供了丰富的操作,例如取子图操作subgraph、顶点属性操作mapVertices、边属性操作mapEdges等。GraphX还实现了与Pregel的结合,可以直接使用一些常用图算法,如PageRank、三角形计数等。

(三)版本下载与安装

我们可以从官方网站https://spark.apache.org/downloads.html下载所需的版本,我们是下载的二进制安装包,版本是2.4.0,下载的压缩文件是 spark-2.4.0-bin-hadoop2.7.tgz。然后把上面的压缩文件解压到某个目录即可。

spark的运行依赖jdk环境,对于2.4.0版本,需要java8及以上的版本。对于python客户端,需要Python 2.7 及以上版本 或 Python3.4及以上版本。对于scala客户端,需要Scala 2.11版本。

因为spark是一个分布式集群系统,我们需要在每台节点上去安装spark。当然,如果只是开发和学习,只需在一台机器上安装。下面章节会介绍spark的运行方式。

二、运行模式

spark有多种运行模式,可以在本地运行,也可以在分布式集群模式下运行,而且集群模式下可以支持多种集群管理器,下面一一介绍。

(一)local(本地模式)

只需要一台机器,运行该模式非常简单,只需要把Spark的安装包解压后,默认也不需修改任何配置文件,取默认值。不用启动Spark的Master、Worker守护进程( 只有集群的Standalone方式时,才需要这两个角色),也不用启动Hadoop的各服务(除非你要用到HDFS)。

运行客户端程序(可以是spark自带的命令行程序,如spark-shell,也可以是程序员利用spark api编写的程序),就可以完成相应的运行。相当于这一个客户端进程,充当了所有的角色。

这种模式,只适合开发阶段使用,我们可以在该模式下开发和测试代码,使的代码的逻辑没问题,后面再提交到集群上去运行和测试。

本文中的例子主要是学习spark的一些核心API,为了搭建环境的简化,采用的是独立模式。

在实际生产环境,spark会采用集群模式来运行,即分布式式运行,spark可以使用多种集群资源管理器来管理自己的集群。

(二)standalone(集群模式之一)

Standalone模式,即独立模式,自带完整的服务,使用spark自带的集群资源管理功能。可单独部署到一个集群中,无需依赖任何其他资源管理系统。即每台机器上只需部署下载的Spark版本即可。

这种模式需要提前启动spark的master和Worker守护进程,才能运行spark客户端程序。

因为Standalone模式不需要依赖任何第三方组件,如果数据量比较小,且不需要hadoop(如不需要访问hdfs服务),则使用Standalone模式是一种可选的简单方便的方案。

(三)On YARN模式(集群模式之二)

该模式,使用hadoop的YARN作为集群资源管理器。这种模式下因为使用yarn的服务进行资源管理,所以不需要启动Spark的Master、Worker守护进程。

如果你的应用不仅使用spark,还用到hadoop生态圈的其它服务,从兼容性上考虑,使用Yarn作为统一的资源管理是更好的选择,这样选择这种模式就比较适合。

(四)On Mesos模式(集群模式之三)

该模式,使用Mesos作为集群资源管理器。如果你的应用还使用了docker,则选择此模式更加通用。

(五)伪分布式集群模式

即在一台机器上模拟集群下的分布式场景,会启动多个进程。上述的2,3,4三种集群模式都可以启动伪分布式集群模式,当然要求机器的配置满足要求。这种模式主要是开发阶段和学习使用。

说明:因为本文主要是介绍spark的基础知识,会通过一些实例介绍如何使用spark来进行数据分析和计算。为了简单化,我们采用的是lcoal模式。

三、Spark Shell

Spark 版本中提供了系列的交互式命令行程序,用它来进行spark API的学习是最方便的了,它支持scala、python、R多种语言的api(在本文中,我们使用scala语言),我们可以利用它来进行API的学习;同时它也是一种以交互方式来进行数据分析的强大工具,我们可以直接利用它进行数据的分析。

spark提供了多个脚本程序(为不同的编程语言提供不同的脚本),位于spark压缩包解压后目录的bin目录下。比如针对scala语言,linux下脚本文件名为spark-shell,windows下文件名为spark-shell.cmd。

在控制台下,运行spark-shell,出现交互式界面,就可以输入scala代码,输入:quit退出交互式程序(注意quit前要有冒号)。如下面的界面:

scala> val value=3+2

value: Int = 5

scala> val str = "hello"

str: String = hello

scala> str.length()

res14: Int = 5

scala> :quit

运行spark shell,会出现scala提示符,然后就可以在该提示符下输入scala代码,按回车执行。退出时输入:quit命令。

在下面的章节中,我们会在spark shell中来举例说明spark的一些核心api的概念和使用方式。

四、RDD /DataFrame/DataSet

(一)基本概念

spark是用来处理数据的,这样就需要一种数据模型来表示数据。RDD,DsataFrame,DataSet是其版本发展过程出现的三种模型,也就是三种API。spark对数据的处理的主要操作就是围绕这些API的处理。

其中RDD(Resilient Distributed Dataset)被称为弹性的分布式数据集,是spark旧的核心API,在Spark 2.0之前,Spark的主要编程接口是RDD。但是在Spark 2.0之后,RDD被新的DataSet数据集取代,DataSet像RDD一样强类型,但在底层有更丰富的优化。DataSet与RDD一样,内置了各种函数操作,通过函数式操作可以完成各种强大的计算。spark2.4.0版本仍然支持RDD接口,但是强烈建议切换到使用数据集,它具有比RDD更好的性能。

DataFrame开始是在spark1.3.0版本提出来的,开始时以RDD为基础,它在概念上等同于关系数据库中的表或R / Python中的数据框,但是进行了更丰富的优化。DataFrame与RDD的主要区别在于,前者带有schema元数据信息,既DataFrame所表示的二维数据集的每一列都带有名称和类型。而RDD中存储的只是一个对象。如下图所示:


如上图所示,左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame多了数据的结构信息,即schema。

Dataset是分布式数据集合。是Spark1.6中添加的新接口,它提供了RDD的优点以,并在性能上进行了优化。在Spark2.0之后,自Spark2.0之后,DataFrame和DataSet合并为更高级的DataSet,新的DataSet具有两个不同的API特性:

  1. 非强类型(untyped),DataSet[Row]是泛型对象的集合,它的别名是DataFrame;

  2. 强类型(strongly-typed),DataSet[T]是具体对象的集合,如scala和java中定义的类.

下面我们通过spark shell来举例说明如何使用DataSet,因为RDD已经全面被DataSet替换,本文中不再介绍RDD的使用。DataFrame的使用我们将在Spark SQL学习笔记中介绍。

(二)创建DataSet对象

使用DataSet数据集,首先要创建DataSet,可以从Hadoop InputFormats(例如HDFS文件)或通过转换其他数据集来创建DataSet。

不够最简单的方式是从列表对象来创建DataSet对象,如下面例子:

scala> val ds = Seq("hello","world").toDS()

ds: org.apache.spark.sql.Dataset[String] = [value: string]

上述代码先利用Seq创建了一个列表,调用toDS方法生成一个Dataset对象。

我们可以调用Dataset对象的show方法来输出其中数据,show方法会以表格的方式输出,这里Dataset中的元素是字符串对象,默认的列名为value。

scala> ds.show()

+-----+

|value|

+-----+

|hello|

|world|

+-----+

我们也可以创建数值型的DataSet,如下面例子:

scala> val ds = Seq(12,15,16).toDS()

ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> ds.show()

+-----+

|value|

+-----+

|   12|

|   15|

|   16|

+-----+

下面我们来自定义一个类,DataSet中存放该类的对象,如下面例子代码:

scala> case class Person(name:String,age:Long)

defined class Person

scala> val ds = Seq(Person("tom",12),Person("kad",22)).toDS()

ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> ds.show()

+----+---+

|name|age|

+----+---+

| tom| 12|

| kad| 22|

+----+---+

上面代码定义了一个Person类,类中有两个成员变量。调用show方法时,自动将类中的变量作为列名来显示。

更多时候,我们会从外部数据源(如文件)来创建DataSet对象,下面举例从Spark安装目录中的本地README文件的文本中创建一个新的DataSet。如下面代码:

scala> val textFile = spark.read.textFile("README.md")

textFile: org.apache.spark.sql.Dataset[String] = [value: string]

上面创建了一个DataSet[String],DataSet中存储的是字符串对象,每个字符串对应README文件中的一行。

当然还有更多创建DataSet对象的方法,上面只是一些最简单的方式。下面我们来接着介绍DataSet的一些常见操作。

(三)操作DataSet对象

DataSet类提供了丰富的方法来对数据进行计算,下面通过举例的方式来介绍一些常见的方法。

scala> textFile.count()

res1: Long = 104

上面代码我们调用了DataSet的count函数,获取了DataSet中包含的对象个数,返回的值就是README.md文件中的行数。

scala> textFile.first()

res4: String = # Apache Spark

上面代码调用了DataSet的first函数,返回了DataSet中的第一个对象,这里返回的是一个String对象,内容是“# Apache Spark”。

如果了解java8,python等编程语言中数据集合的高阶函数,这些语言的数据集(如列表)都支持高阶函数filter,map等。同样spark中的DataSet提供了强大的高阶函数功能,也是我们使用DataSet主要的方式。如下面例子:

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))

linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

scala> linesWithSpark.count()

res5: Long = 20

上面代码先调用了DataSet的filter函数返回一个新的数据集,然后调用count方法获取新数据集中的对象个数。与其它语言中的filter函数一样,spark中的filter函数是一个高阶函数,用于返回满足条件的数据生成的一个新的数据集。上面的filter函数参数是一个Lambda表达式,只有字符串中包含”Spark“子串的字符串才不被过滤。

再比如,如果我们要返回所有字符串长度大于100的数据集,代码如下:

val linesWithSpark = textFile.filter(line => line.length>100)

下面我们再看一个map函数的使用,map函数也是一个常见的集合操作中的高阶函数,其作用是将一个数据集映射成一个新的数据集,map函数与filter函数不一样,它返回的新的数据集中的元素个数与原数据集一样,但数据集中的数据发生了变化,可以是新的数据类型。如下面例子代码:

scala> val linesWithSpark = textFile.map(line => line.length)

linesWithSpark: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> linesWithSpark.count()

res8: Long = 104

scala> linesWithSpark.first()

res9: Int = 14

上面代码中,map函数返回一个新的数据集,新数据集中每个数据是原数据集中字符串对象的长度。

我们再来看集合操作中另一个常见的原子操作reduce,spark的DataSet也提供了reduce函数,reduce函数的作用用来对数据做汇总等计算,比如前面的例子中的count函数实际上就是一个reduce操作的特列。我们可以直接使用reduce函数来统计DataSet中的元素个数,代码如:

scala> val textFile = spark.read.textFile("README.md")

textFile: org.apache.spark.sql.Dataset[String] = [value: string]

scala>  textFile.count()

res11: Long = 104

scala> textFile.map(line=>1).reduce((re,a)=>re+a)

res12: Int = 104

上面代码中最后一行代码,我们先调用了map函数,返回一个新的数据集,数据集中每个元素都是数值1,然后对新的数据集调用reduce函数(这里是连在一起写,没用单独的变量,这正是函数式编程的常用方式,多个函数调用串在一起完成所需的功能)。reduce函数有两个参数,第1个参数是用来存放结果的,第2个参数是代表集合中元素。关于reduce函数的详细含义这里不详细介绍。

从上面结果,可以看出对数据集调用map和reduce函数得到的结果和直接调用count函数得到的结果是一样的。

我们再看一个稍微复杂的例子,textFile数据集中每个元素是对应文本文件中的一行,每行由多个英文单词,我们希望计算出含单词数最多的行所包含的单词数。这个功能的实现代码如下:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

res13: Int = 22

上面代码先调用了DataSet的map函数,返回一个新的数据集,数据集中的每个元素是文件中每行文本中包含的单词数。然后调用reduce函数,计算出新的数据集中的最大值。这里传给reduce函数的Lambda表达式函数体是一个if语句(在scala中,if语句也是一个表达式),我们可以直接用scala库中的Math类的max函数来代替,代码如:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))

res19: Int = 22

可以看出,结果是一样的。

我们在看一个更复杂的例子,就是统计文件中的单词重复出现的次数,如果学习过hadoop的mapreduce功能的会知道,这是mapreduce应用的一个经典例子,我们看看使用spark是怎么实现的?代码如下:

scala> val textFile = spark.read.textFile("README.md")

textFile: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()

wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

scala> wordCounts.collect()

res20: Array[(String, Long)] = Array((online,1), (graphs,1), (["Parallel,1), (["Building,1), (thread,1),......

上面代码,第一行语句,首先创建了一个DataSet,加载了文件中内容,这个上面例子已经反复用过。

第2行语句,先调用DataSet的flatMap函数,flatMap函数也是一个高阶函数,将行数据集转换为单词数据集(即新数据集中的元素是文件中的每个单词),再对新的数据集组合调用DataSet的groupByKey函数和count函数,返回一个数据集,该数据集中的每个元素是一个二维元组(类似key-value键值对),元组中第1个值是单词本身,第2个值是数值1。

第3行语句,针对返回的数据集调用collect函数,该函数返回的是一个数组,数组中每个元素也是一个二维元组,元组中第1个值是单词本身(不重复的单词),第2个值是单词出现的个数。

可以看出,用scala的Dataset来实现mapreduce的功能,比编写Mapreduce代码简单多了,而且更加清晰明了。关于Mapreduce程序的编写可参考文档《mapreduce学习笔记》。

五、独立可执行程序

下面我们用spark提供的scala api来编写一个可独立运行的scala程序,例子代码如下:

import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

上面代码创建了一个scala的单例对象,包含main函数,是一个典型的可独立运行的scala程序。可以看出,相比在spark shell中执行,区别在于

编译和运行spark的scala程序,最好方式是使用scala的sbt工具(类似java中的maven工具)。关于如何使用sbt,我们这里不作详细介绍。

六、小结

在本文中,我们介绍了Spark的基本概念,并通过spark shell演示了spark中的核心Api DataSet的使用。在后面的文章中,我们将会介绍spark中两个重要的扩展库Spark SQL和StructruedStreaming,它们为数据的处理提供了更加方便和强大的操作。

需要说明的是,Spark依然处于快速发展阶段中,其提供的功能可能随着版本的演进也会在不停的演进,就如RDD被DataSet替换,Spark Streaming被StructuredStreaming替换,我们需要关注其最新发展。shi

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容