PySpark 基础和RDD编程指南

 Spark - 概述

Apache Spark是一个闪电般快速的实时处理框架。它进行内存计算以实时分析数据。由于 Apache Hadoop MapReduce 仅执行批处理并且缺乏实时处理功能,因此它开始出现。因此,引入了Apache Spark,因为它可以实时执行流处理,也可以处理批处理。

除了实时和批处理之外,Apache Spark还支持交互式查询和迭代算法。Apache Spark有自己的集群管理器,可以托管其应用程序。它利用Apache Hadoop进行存储和处理。它使用 HDFS (Hadoop分布式文件系统)进行存储,它也可以在 YARN 上运行Spark应用程序。

PySpark - 概述

Apache Spark是用 Scala编程语言 编写的。为了用Spark支持Python,Apache Spark社区发布了一个工具PySpark。使用PySpark,您也可以使用Python编程语言中的 RDD 。正是由于一个名为 Py4j 的库,他们才能实现这一目标。

PySpark提供了 PySpark Shell ,它将Python API链接到spark核心并初始化Spark上下文。今天,大多数数据科学家和分析专家都使用Python,因为它具有丰富的库集。将Python与Spark集成对他们来说是一个福音

一:安装:

pip install pyspark    包比较大, 安装时间挺长的

二 SparkContext 对象和SparkConf

2.1:SparkContext对象

SparkContext是任何spark功能的入口点,相当于main函数;每个JVM进程中,只能有一个活跃(active)的 SparkContext 对象。 关闭用:stop()

class pyspark.SparkContext (

  master = None,

  appName = None,

  sparkHome = None,

  pyFiles = None,

  environment = None,

  batchSize = 0,

  serializer = PickleSerializer(),

  conf = None,

  gateway = None,

  jsc = None,

  profiler_cls = <class 'pyspark.profiler.BasicProfiler'>

)

以下是SparkContext的参数。

Master - 它是连接到的集群的URL。

appName - 您的工作名称。

sparkHome - Spark安装目录。

pyFiles - 要发送到集群并添加到PYTHONPATH的.zip或.py文件。

environment - 工作节点环境变量。

batchSize - 表示为单个Java对象的Python对象的数量。 设置1以禁用批处理,设置0以根据对象大小自动选择批处理大小,或设置为-1以使用无限批处理大小。

serializer - RDD序列化器。

Conf - L {SparkConf}的一个对象,用于设置所有Spark属性。

gateway - 使用现有网关和JVM,否则初始化新JVM。

JSC - JavaSparkContext实例。

profiler_cls - 用于进行性能分析的一类自定义Profiler(默认为pyspark.profiler.BasicProfiler)

用法:

from pyspark import SparkContext

sc = SparkContext("local", "First App")

2.2:SparkConf对象

提供运行Spark应用程序的配置,

以下是SparkConf最常用的一些属性

set(key,value) - 设置配置属性。

setMaster(value) - 设置主URL。

setAppName(value) - 设置应用程序名称。

get(key,defaultValue = None) - 获取密钥的配置值。

setSparkHome(value) - 在工作节点上设置Spark安装路径。

用法:

conf = SparkConf().setAppName("PySpark App").setMaster("spark://master:7077")

sc = SparkContext(conf=conf)


三:弹性分布式数据集(RDD)

总体上来说,每个 Spark 应用程序都包含一个驱动器(driver)程序,驱动器程序运行用户的 main 函数,并在集群上执行各种并行操作。Spark 最重要的一个抽象概念就是弹性分布式数据集(resilient distributed dataset – RDD), RDD是一个可分区的元素集合,这些元素分布在集群的各个节点上,并且可以在这些元素上执行并行操作。RDD通常是通过HDFS(或者Hadoop支持的其它文件系统)上的文件,或者驱动器中的Scala集合对象来创建或转换得到;其次,用户也可以请求Spark将RDD持久化到内存里,以便在不同的并行操作里复用之;最后,RDD具备容错性,可以从节点失败中自动恢复。

Spark 第二个重要抽象概念是共享变量,共享变量是一种可以在并行操作之间共享使用的变量。默认情况下,当Spark把一系列任务调度到不同节点上运行时,Spark会同时把每个变量的副本和任务代码一起发送给各个节点。但有时候,我们需要在任务之间,或者任务和驱动器之间共享一些变量。Spark 支持两种类型的共享变量:广播变量 和 累加器,广播变量可以用于在各个节点上缓存数据,而累加器则是用来执行跨节点的 “累加” 操作,例如:计数和求和。

3.1: RDD代表 Resilient Distributed Dataset,它们是在多个节点上运行和操作以在集群上进行并行处理的元素

RDD是一个可容错、可并行操作的分布式元素集合。有两种方法可以创建 RDD 对象:

1:由驱动程序中的集合对象通过并行化操作创建,

    例如:sc.parallelize([1, 2, 3, 4, 5, 6])

2:或者从外部存储系统中数据集加载(如:共享文件系统、HDFS、HBase或者其他Hadoop支持的数据源)

    例如:

        path = "hdfs://master:9000/examples/examples/src/main/resources/people.txt"

        logData = sc.textFile(path).cache()

RDD 支持两种类型的算子:transformation 和 action

transformation算子:可以将已有RDD转换得到一个新的RDD,  transformation 算子都是懒惰的,并不立即计算结果,而是记录下对基础数据集(如:一个数据文件)的转换操作。只有等到某个 action 算子需要计算一个结果返回给驱动器的时候,transformation 算子所记录的操作才会被计算。默认情况下,每次调用 action 算子的时候,每个由 transformation 转换得到的RDD都会被重新计算

action算子:是基于RDD的计算,并将结果返回给驱动器(driver)

3.2 闭包

在本地模式下运行,所有代码都在运行于单个JVM中,所以RDD的元素都能够被累加并保存到counter变量中,这是因为本地模式下,counter变量和驱动器节点在同一个内存空间中。

然而,在集群模式下,情况会更复杂。为了执行这个作业,Spark会将 RDD 算子的计算过程分割成多个独立的任务(task)- 每个任务分发给不同的执行器(executor)去执行。而执行之前,Spark需要计算闭包。闭包是由执行器执行RDD算子(本例中的foreach())时所需要的变量和方法组成的。闭包将会被序列化,并发送给每个执行器。由于本地模式下,只有一个执行器,所有任务都共享同样的闭包。而在其他模式下,情况则有所不同,每个执行器都运行于不同的worker节点,并且都拥有独立的闭包副本

3.3 转换算子 – transformation

Transformation算子 含义

map(func) 返回一个新的分布式数据集,其中每个元素都是由源RDD中一个元素经func转换得到的。

filter(func) 返回一个新的数据集,其中包含的元素来自源RDD中元素经func过滤后(func返回true时才选中)的结果

flatMap(func) 类似于map,但每个输入元素可以映射到0到n个输出元素(所以要求func必须返回一个Seq而不是单个元素)

mapPartitions(func) 类似于map,但基于每个RDD分区(或者数据block)独立运行,所以如果RDD包含元素类型为T,则 func 必须是 Iterator<T> => Iterator<U> 的映射函数。

mapPartitionsWithIndex(func) 类似于 mapPartitions,只是func 多了一个整型的分区索引值,因此如果RDD包含元素类型为T,则 func 必须是 Iterator<T> => Iterator<U> 的映射函数。

sample(withReplacement, fraction, seed) 采样部分(比例取决于 fraction )数据,同时可以指定是否使用回置采样(withReplacement),以及随机数种子(seed)

union(otherDataset) 返回源数据集和参数数据集(otherDataset)的并集

intersection(otherDataset) 返回源数据集和参数数据集(otherDataset)的交集

distinct([numTasks])) 返回对源数据集做元素去重后的新数据集

groupByKey([numTasks]) 只对包含键值对的RDD有效,如源RDD包含 (K, V) 对,则该算子返回一个新的数据集包含 (K, Iterable<V>) 对。注意:如果你需要按key分组聚合的话(如sum或average),推荐使用 reduceByKey或者 aggregateByKey 以获得更好的性能。注意:默认情况下,输出计算的并行度取决于源RDD的分区个数。当然,你也可以通过设置可选参数 numTasks 来指定并行任务的个数。

reduceByKey(func, [numTasks]) 如果源RDD包含元素类型 (K, V) 对,则该算子也返回包含(K, V) 对的RDD,只不过每个key对应的value是经过func聚合后的结果,而func本身是一个 (V, V) => V 的映射函数。另外,和 groupByKey 类似,可以通过可选参数 numTasks 指定reduce任务的个数。

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 如果源RDD包含 (K, V) 对,则返回新RDD包含 (K, U) 对,其中每个key对应的value都是由 combOp 函数 和 一个“0”值zeroValue 聚合得到。允许聚合后value类型和输入value类型不同,避免了不必要的开销。和 groupByKey 类似,可以通过可选参数 numTasks 指定reduce任务的个数。

sortByKey([ascending], [numTasks]) 如果源RDD包含元素类型 (K, V) 对,其中K可排序,则返回新的RDD包含 (K, V) 对,并按照 K 排序(升序还是降序取决于 ascending 参数)

join(otherDataset, [numTasks]) 如果源RDD包含元素类型 (K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中将包含内关联后key对应的 (K, (V, W)) 对。外关联(Outer joins)操作请参考 leftOuterJoin、rightOuterJoin 以及 fullOuterJoin 算子。

cogroup(otherDataset, [numTasks]) 如果源RDD包含元素类型 (K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中包含 (K, (Iterable<V>, Iterable<W>))。该算子还有个别名:groupWith

cartesian(otherDataset) 如果源RDD包含元素类型 T 且参数RDD(otherDataset)包含元素类型 U,则返回的新RDD包含前二者的笛卡尔积,其元素类型为 (T, U) 对。

pipe(command, [envVars]) 以shell命令行管道处理RDD的每个分区,如:Perl 或者 bash 脚本。RDD中每个元素都将依次写入进程的标准输入(stdin),然后按行输出到标准输出(stdout),每一行输出字符串即成为一个新的RDD元素。

coalesce(numPartitions) 将RDD的分区数减少到numPartitions。当以后大数据集被过滤成小数据集后,减少分区数,可以提升效率。

repartition(numPartitions) 将RDD数据重新混洗(reshuffle)并随机分布到新的分区中,使数据分布更均衡,新的分区个数取决于numPartitions。该算子总是需要通过网络混洗所有数据。

repartitionAndSortWithinPartitions(partitioner) 根据partitioner(spark自带有HashPartitioner和RangePartitioner等)重新分区RDD,并且在每个结果分区中按key做排序。这是一个组合算子,功能上等价于先 repartition 再在每个分区内排序,但这个算子内部做了优化(将排序过程下推到混洗同时进行),因此性能更好。

3.4:动作算子 – action

Action算子 作用

reduce(func) 将RDD中元素按func进行聚合(func是一个 (T,T) => T 的映射函数,其中T为源RDD元素类型,并且func需要满足 交换律 和 结合律 以便支持并行计算)

collect() 将数据集中所有元素以数组形式返回驱动器(driver)程序。通常用于,在RDD进行了filter或其他过滤操作后,将一个足够小的数据子集返回到驱动器内存中。

count() 返回数据集中元素个数

first() 返回数据集中首个元素(类似于 take(1) )

take(n) 返回数据集中前 n 个元素

takeSample(withReplacement,num, [seed]) 返回数据集的随机采样子集,最多包含 num 个元素,withReplacement 表示是否使用回置采样,最后一个参数为可选参数seed,随机数生成器的种子。

takeOrdered(n, [ordering]) 按元素排序(可以通过 ordering 自定义排序规则)后,返回前 n 个元素

saveAsTextFile(path) 将数据集中元素保存到指定目录下的文本文件中(或者多个文本文件),支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。保存过程中,Spark会调用每个元素的toString方法,并将结果保存成文件中的一行。

saveAsSequenceFile(path)(Java and Scala) 将数据集中元素保存到指定目录下的Hadoop Sequence文件中,支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。适用于实现了Writable接口的键值对RDD。在Scala中,同样也适用于能够被隐式转换为Writable的类型(Spark实现了所有基本类型的隐式转换,如:Int,Double,String 等)

saveAsObjectFile(path)(Java and Scala) 将RDD元素以Java序列化的格式保存成文件,保存结果文件可以使用 SparkContext.objectFile 来读取。

countByKey() 只适用于包含键值对(K, V)的RDD,并返回一个哈希表,包含 (K, Int) 对,表示每个key的个数。

foreach(func) 在RDD的每个元素上运行 func 函数。通常被用于累加操作,如:更新一个累加器(Accumulator ) 或者 和外部存储系统互操作。

    nums = sc.parallelize([1, 2, 3, 4, 5, 6])

    temp = nums.reduce(add)

    print("Number of elements in RDD -> {}".format(temp))

    x = sc.parallelize([('spark', 1), ('hadoop', 4)])

    y = sc.parallelize([('spark', 2), ('hadoop', 5)])

    joined = x.join(y)

    joined_map = joined.collect()

def textFile( 

      path: String, 

      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { 

    assertNotStopped() 

    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 

      minPartitions).map(pair => pair._2.toString).setName(path) 

  } 

      path: String 是一个URI,這个URI可以是HDFS、本地文件(全部的节点都可以),

      或者其他Hadoop支持的文件系统URI返回的是一个字符串类型的RDD,也就是是RDD的内部形式是Iterator[(String)]

minPartitions=  math.min(defaultParallelism, 2) 是指定数据的分区,如果不指定分区,当你的核数大于2的时候,不指定分区数那么就是 2

当你的数据大于128M时候,Spark是为每一个快(block)创建一个分片(Hadoop-2.X之后为128m一个block)

从HDFS读取一个文件: path = "hdfs://master:9000/examples/examples/src/main/resources/people.txt"

logData = sc.textFile(path).cache()

sc.addFile 上传文件(sc是您的默认SparkContext),并使用 SparkFiles.get 获取工作者的路径。

    因此,SparkFiles解析通过 SparkContext.addFile() 添加的文件的路径

    get(filename):它指定通过SparkContext.addFile()添加的文件的路径

    getrootdirectory():它指定根目录的路径,该目录包含通过SparkContext.addFile()添加的文件


file_path = "./data.txt"

    file_name = "data.txt"

    sc = SparkContext('local', 'addfile app')

    sc.addFile(file_path)

    print("Absolute Path -> {}".format(SparkFiles.get(file_name)))

3.5:混洗(Shuffle)算子

有一些Spark算子会触发众所周知的混洗(Shuffle)事件。Spark中的混洗机制是用于将数据重新分布,其结果是所有数据将在各个分区间重新分组。一般情况下,混洗需要跨执行器(Executor)或跨机器复制数据,这也是混洗操作一般都比较复杂而且开销大的原因

3.6:RDD持久化

Spark的一项关键能力就是它可以持久化(或者缓存)数据集在内存中,从而跨操作复用这些数据集。如果你持久化了一个RDD,那么每个节点上都会存储该RDD的一些分区,这些分区是由对应的节点计算出来并保持在内存中,后续可以在其他施加在该RDD上的action算子中复用(或者从这些数据集派生新的RDD)。这使得后续动作的速度提高很多(通常高于10倍)。因此,缓存对于迭代算法和快速交互式分析是一个很关键的工具

可以用persist() 或者 cache() 来标记一下需要持久化的RDD

cache()和persist()的区别在于,cahe()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,

即persist(MEMORY_ONLY),将数据持久化到内存中,如果需要从内存中清除缓存,调用unpersist()方法即可

class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)

存储级别 含义

MEMORY_ONLY 以未序列化的 Java 对象形式将 RDD 存储在 JVM 内存中。如果RDD不能全部装进内存,那么将一部分分区缓存,而另一部分分区将每次用到时重新计算。这个是Spark的RDD的默认存储级别。

MEMORY_AND_DISK 以未序列化的Java对象形式存储RDD在JVM中。如果RDD不能全部装进内存,则将不能装进内存的分区放到磁盘上,然后每次用到的时候从磁盘上读取。

MEMORY_ONLY_SER 以序列化形式存储 RDD(每个分区一个字节数组)。通常这种方式比未序列化存储方式要更省空间,尤其是如果你选用了一个比较好的序列化协议(fast serializer),但是这种方式也相应的会消耗更多的CPU来读取数据。

MEMORY_AND_DISK_SER 和 MEMORY_ONLY_SER 类似,只是当内存装不下的时候,会将分区的数据吐到磁盘上,而不是每次用到都重新计算。

DISK_ONLY RDD 数据只存储于磁盘上。

MEMORY_ONLY_2, MEMORY_AND_DISK_2等 和上面没有”_2″的级别相对应,只不过每个分区数据会在两个节点上保存两份副本。

OFF_HEAP(实验性的) 将RDD以序列化格式保存到Tachyon。与MEMORY_ONLY_SER相比,OFF_HEAP减少了垃圾回收开销,并且使执行器(executor)进程更小且可以共用同一个内存池,这一特性在需要大量消耗内存和多Spark应用并发的场景下比较吸引人。而且,因为RDD存储于Tachyon中,所以一个执行器挂了并不会导致数据缓存的丢失。这种模式下Tachyon 的内存是可丢弃的。因此,Tachyon并不会重建一个它逐出内存的block。如果你打算用Tachyon做为堆外存储,Spark和Tachyon具有开箱即用的兼容性。请参考这里,有建议使用的Spark和Tachyon的匹配版本对:page。

3.7 RDD共享变量

广播变量:一种只读的共享变量,它是在每个机器节点上保存一个缓存,广播变量创建之后,集群中任何函数都不应该再使用原始变量v,这样才能保证v不会被多次复制到同一个节点上

累加器:累加器是一种只支持满足结合律的“累加”操作的变量,因此它可以很高效地支持并行计算。利用累加器可以实现计数(类似MapReduce中的计数器)或者求和

3.7.1:广播变量可以通过一个变量v来创建,只需调用 SparkContext.broadcast(v)

class pyspark.Broadcast (

  sc = None,

  value = None,

  pickle_registry = None,

  path = None

)

word_new = sc.broadcast(['python', 'spark', 'pyspark', 'kafka', 'hadoop', 'java'])

data = word_new.value

3.7.2:创捷累加器时需要赋一个初始值v,调用 SparkContext.accumulator(v) 可以创建一个累加器

    num = sc.accumulator(10)

    print(f"num:{num}")

    num_list = sc.parallelize([20, 30, 40, 50, 60])

    def f(x):

        num += x

    fiall = num_list.foreach(f)

参考文档:

 http://codingdict.com/article/8882 

https://github.com/gm19900510/data_analysis_python

https://spark-reference-doc-cn.readthedocs.io/zh_CN/latest/programming-guide/quick-start.html

https://www.iteblog.com/archives/1400.html  

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容