PySpark
PySpark 是 Spark 为 Python 开发者提供的 API。
-
子模块
pyspark.sql 模块
pyspark.streaming 模块
pyspark.ml 包
pyspark.mllib 包、
-
PySpark 提供的类
- pyspark.SparkConf
pyspark.SparkConf 类提供了对一个 Spark 应用程序配置的操作方法。用于将各种Spark参数设置为键值对。
- pyspark.SparkContext
pyspark.SparkContext 类提供了应用与 Spark 交互的主入口点,表示应用与 Spark 集群的连接,基于这个连接,应用可以在该集群上创建 RDD 和 广播变量 (pyspark.Broadcast)
- pyspark.SparkFiles、
SparkFiles 只包含类方法,开发者不应创建 SparkFiles 类的实例[2] 。
- pyspark.RDD
这个类是为 PySpark 操作 RDD 提供了基础方法[1] 。
first() 是 pyspark.RDD 类提供的方法,返回 RDD 的第一个元素。
aggregate() 方法使用给定的组合函数和中性“零值,先聚合每个分区的元素,然后再聚合所有分区的结果。
cache() 使用默认存储级别(MEMORY_ONLY)对此 RDD 进行持久化[3] 。
collect() 返回一个列表,包含此 RDD 中所有元素[3] 。
- pyspark.Accumulator
一种“只允许添加”的共享变量,Spark 任务只能向其添加值[1] 。
- pyspark.Broadcast
Spark 提供了两种共享变量[3] :广播变量 和 累加器,pyspark.Broadcast 类提供了对广播变量的操作方法。
- pyspark.Accumulator
pyspark.Accumulator 提供了对累加器变量的操作方法[3] 。
累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。
#python 2
text=sc.textFile("shakespare.txt")
print text
for operator import add
def tokenize(text):
return text.split()
words = text.flatMap(tokenize)
print words
wc = words.map(lambda x: (x,1))
print wc.toDebugString()
counts = wc.reduceBykey(add)
counts.saveAsTextFile("wc")
Spark应用基本模板
## Spark Application - execute with spark-submit
## Imports
from pyspark import SparkConf, SparkContext
## Module Constants
APP_NAME = "My Spark Application"
## Closure Functions
## Main functionality
def main(sc):
pass
if __name__ == "__main__":
# Configure Spark
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster("local[*]")
sc = SparkContext(conf=conf)
# Execute Main functionality
main(sc)
下面这段代码做了什么呢搜索?app.py同目录下的ontime目录下的2个CSV文件。最终结果显示,4月的总延误时间(单位分钟),既有早点的(如果你从美国大陆飞往夏威夷或者阿拉斯加),但对大部分大型航空公司都是延误的。注意,我们在app.py中使用matplotlib直接将结果可视化出来了:
## Spark Application - execute with spark-submit
## Imports
import csv
import matplotlib.pyplot as plt
from StringIO import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from pyspark import SparkConf, SparkContext
## Module Constants
APP_NAME = "Flight Delay Analysis"
DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"
fields = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',
'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')
Flight = namedtuple('Flight', fields)
## Closure Functions
def parse(row):
"""
Parses a row and returns a named tuple.
"""
row[0] = datetime.strptime(row[0], DATE_FMT).date()
row[5] = datetime.strptime(row[5], TIME_FMT).time()
row[6] = float(row[6])
row[7] = datetime.strptime(row[7], TIME_FMT).time()
row[8] = float(row[8])
row[9] = float(row[9])
row[10] = float(row[10])
return Flight(*row[:11])
def split(line):
"""
Operator function for splitting a line with csv module
"""
reader = csv.reader(StringIO(line))
return reader.next()
def plot(delays):
"""
Show a bar chart of the total delay per airline
"""
airlines = [d[0] for d in delays]
minutes = [d[1] for d in delays]
index = list(xrange(len(airlines)))
fig, axe = plt.subplots()
bars = axe.barh(index, minutes)
# Add the total minutes to the right
for idx, air, min in zip(index, airlines, minutes):
if min > 0:
bars[idx].set_color('#d9230f')
axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center')
else:
bars[idx].set_color('#469408')
axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center')
# Set the ticks
ticks = plt.yticks([idx+ 0.5 for idx in index], airlines)
xt = plt.xticks()[0]
plt.xticks(xt, [' '] * len(xt))
# minimize chart junk
plt.grid(axis = 'x', color ='white', linestyle='-')
plt.title('Total Minutes Delayed per Airline')
plt.show()
## Main functionality
def main(sc):
# Load the airlines lookup dictionary
airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect())
# Broadcast the lookup dictionary to the cluster
airline_lookup = sc.broadcast(airlines)
# Read the CSV Data into an RDD
flights = sc.textFile("ontime/flights.csv").map(split).map(parse)
# Map the total delay to the airline (joined using the broadcast value)
delays = flights.map(lambda f: (airline_lookup.value[f.airline],
add(f.dep_delay, f.arv_delay)))
# Reduce the total delay for the month to the airline
delays = delays.reduceByKey(add).collect()
delays = sorted(delays, key=itemgetter(1))
# Provide output from the driver
for d in delays:
print "%0.0f minutes delayed\t%s" % (d[1], d[0])
# Show a bar chart of the delays
plot(delays)
if __name__ == "__main__":
# Configure Spark
conf = SparkConf().setMaster("local[*]")
conf = conf.setAppName(APP_NAME)
sc = SparkContext(conf=conf)
# Execute Main functionality
main(sc)
Spark不能解决分布式存储问题(通常Spark从HDFS中获取数据),但是它为分布式计算提供了丰富的函数式编程API。这个框架建立在伸缩分布式数据集(RDD)之上。RDD是种编程抽象,代表被分区的对象集合,允许进行分布式操作。RDD有容错能力(可伸缩的部分),更重要的时,可以存储到节点上的worker内存里进行立即重用。内存存储提供了快速和简单表示的迭代算法,以及实时交互分析