PySpark 核心概念和操作(词频统计)

1. Spark核心概念

1.1 Spark简介

Apache Spark是新兴的一种快速通用的大规模数据处理引擎。它的优势有三个方面:

  • 通用计算引擎 能够运行MapReduce、数据挖掘、图运算、流式计算、SQL等多种框架
  • 基于内存 数据可缓存在内存中,特别适用于需要迭代多次运算的场景
  • 与Hadoop集成 能够直接读写HDFS中的数据,并能运行在YARN之上

Spark是用Scala语言编写的,所提供的API也很好地利用了这门语言的特性,当然作为数据科学的一环,它也可以使用Java和Python编写应用。这里我们将用Python给大家做讲解。

1.2 Spark核心

Spark支持多种运行模式。单机部署下,既可以用本地(Local)模式运行,也可以使用伪分布式模式来运行;当以分布式集群部署的时候,可以根据实际情况选择Spark自带的独立(Standalone)运行模式、YARN运行模式或者Mesos模式。虽然模式多,但是Spark的运行架构基本由三部分组成,包括SparkContext(驱动程序)ClusterManager(集群资源管理器)Executor(任务执行进程)。 

  • SparkContext提交作业,向ClusterManager申请资源;
  • ClusterManager会根据当前集群的资源使用情况,进行有条件的FIFO策略:先分配的应用程序尽可能多地获取资源,后分配的应用程序则在剩余资源中筛选,没有合适资源的应用程序只能等待其他应用程序释放资源;
  • ClusterManager默认情况下会将应用程序分布在尽可能多的Worker上,这种分配算法有利于充分利用集群资源,适合内存使用多的场景,以便更好地做到数据处理的本地性;另一种则是分布在尽可能少的Worker上,这种适合CPU密集型且内存使用较少的场景;
  • Excutor创建后与SparkContext保持通讯,SparkContext分配任务集给Excutor,Excutor按照一定的调度策略执行任务集。

Spark包含1个driver(笔记本电脑或者集群网关机器上)和若干个executor(在各个节点上),通过SparkContext(简称sc)连接Spark集群创建RDD累加器(accumlator)广播变量(broadcast variables),简单可以认为SparkContext(驱动程序)是Spark程序的根本。

Driver会把计算任务分成一系列小的task,然后送到executor执行。executor之间可以通信,在每个executor完成自己的task以后,所有的信息会被传回。

1.3 RDD(弹性分布式数据集)介绍

在Spark里,所有的处理和计算任务都会被组织成一系列Resilient Distributed Dataset(弹性分布式数据集,简称RDD)上的transformations(转换)actions(动作)

RDD是一个包含诸多元素、被划分到不同节点上进行并行处理的数据集合,可以将RDD持久化到内存中,这样就可以有效地在并行操作中复用(在机器学习这种需要反复迭代的任务中非常有效)。在节点发生错误时RDD也可以自动恢复。

说起来,RDD就像一个NumPy array或者一个Pandas Series,可以视作一个有序的item集合。

只不过这些item并不存在driver端的内存里,而是被分割成很多个partitions,每个partition的数据存在集群的executor的内存中。

1.4 RDD transformations和actions

大家还对python的list comprehension有印象吗,RDDs可以进行一系列的变换得到新的RDD,有点类似那个过程,我们先给大家提一下RDD上最最常用到的transformation:

  • map() 对RDD的每一个item都执行同一个操作
  • flatMap() 对RDD中的item执行同一个操作以后得到一个list,然后以平铺的方式把这些list里所有的结果组成新的list
  • filter() 筛选出来满足条件的item
  • distinct() 对RDD中的item去重
  • sample() 从RDD中的item中采样一部分出来,有放回或者无放回
  • sortBy() 对RDD中的item进行排序

特别注意:Spark的一个核心概念是惰性计算。当你把一个RDD转换成另一个的时候,这个转换不会立即生效执行!!!Spark会把它先记在心里,等到真的需要拿到转换结果的时候,才会重新组织你的transformations(因为可能有一连串的变换)
这样可以避免不必要的中间结果存储和通信。记住哦,transformation属于多行计算

刚才提到了惰性计算,那么什么东西能让它真的执行转换与运算呢?
是的,就是我们马上提到的Actions,下面是常见的action,当他们出现的时候,表明我们需要执行刚才定义的transform了:

  • collect(): 计算所有的items并返回所有的结果到driver端,接着 collect()会以Python list的形式返回结果
  • first(): 和上面是类似的,不过只返回第1个item
  • take(n): 类似,但是返回n个item
  • count(): 计算RDD中item的个数
  • top(n): 返回头n个items,按照自然结果排序
  • reduce(): 对RDD中的items做聚合

1.5 针对更复杂的transformations和actions

咱们刚才已经见识到了Spark中最常见的transform和action,但是有时候我们会遇到更复杂的结构,比如非常非常经典的是以元组形式组织的k-v对(key, value)

我们把它叫做pair RDDs,而Sark中针对这种item结构的数据,定义了一些transformation和action:

  • reduceByKey(): 对所有有着相同key的items执行reduce操作
  • groupByKey(): 返回类似(key, listOfValues)元组的RDD,后面的value List 是同一个key下面的
  • sortByKey(): 按照key排序
  • countByKey(): 按照key去对item个数进行统计
  • collectAsMap(): 和collect有些类似,但是返回的是k-v的字典

2. PySpark之词频统计

首先,我们导入pyspark的包,创建SparkContext,建立RDD

import pyspark
from pyspark import SparkContext
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)

然后读取文本文件
textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
textFile是一个方法,可以用来加载文本数据,默认是从HDFS上加载,如果要加载本地文件,就必须使用file:///加路径的形式
从文本中读取数据后就要开始进行词频统计了

wordCount = textFile.flatMap(lambda line:line.split(" ")).\
                  map(lambda word:(word, 1)).reduceByKey(lambda x,y:x+y)

flatMap会逐行遍历文本内容,然后对每行内容进行lambda函数的操作,即line:line.split(" "),该操作会把每一行内容赋值给line,然后对每一个line进行split(" ")操作,即对每一行用空格分隔成单独的单词,这样每一行都是一个由单词组成的集合,因为有很多行,所以就有很多歌这样的单词集合,执行完 textFile.flatMap(lambda line:line.split(" "))后会把这些单词集合组成一个大的单词集合

map(lambda word:(word, 1))中的map对上述产生的单词集合进行遍历,对于每一个单词进行map函数内的操作,即lambda word:(word,1),该操作会把每个单词赋值给word,然后组成一个键值对,这个键值对的key是这个单词,而value是1,这样就把每个单词变成了这个单词的键值对形式。执行完这个map后就会获得一个RDD,这个RDD的每一个元素是很多个键值对

reduceByKey(lambda x,y:x+y)会对RDD中的每个元素根据key进行分组,然后对该分组进行括号内的操作,即lambda x,y:x+y,通过对具有相同key的元素进行该操作,reduce操作会把具有相同key的元素的value进行相加,这样最后变成一个大的键值对,key是相同的,value是具有相同key的键值对的个数,这样,词频统计就完成了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352