CS190 Scalable Machine Learning Spark -Spark Tutorial

spark ML


参考文档:

Apache Spark
Spark's Python API
Python


Spark Context

创建SparkContext后,向master要资源,master分配资源给sc。


executors
executors

常用命令:

# Display the type of the Spark Context sc
type(sc)
#Out: pyspark.context.SparkContext
# List sc's attributes
dir(sc)
# Use help to obtain more detailed information
help(sc)

# After reading the help we've decided we want to use sc.version to see what version of Spark we are running
sc.version


Using RDD

Distributed data and using a collection to create an RDD

Spark中数据集表示为一个list,list被分成很多分区,存储在不同机器的内存中。
图示: 数据集被分层不同分区,存储在不同worker的内存中


partitions
partitions
data = xrange(1, 10001)
# Parallelize data using 8 partitions
# This operation is a transformation of data into an RDD
# Spark uses lazy evaluation, so no Spark jobs are run at this point
xrangeRDD = sc.parallelize(data, 8)

# Let's see what type sc.parallelize() returned
print 'type of xrangeRDD: {0}'.format(type(xrangeRDD))

#Out: type of xrangeRDD: <class 'pyspark.rdd.PipelinedRDD'>

# How about if we use a range
dataRange = range(1, 10001)
rangeRDD = sc.parallelize(dataRange, 8)
print 'type of dataRangeRDD: {0}'.format(type(rangeRDD))
#Out: type of dataRangeRDD: <class 'pyspark.rdd.RDD'>

# Each RDD gets a unique ID
print 'xrangeRDD id: {0}'.format(xrangeRDD.id())

# We can name each newly created RDD using the setName() method
xrangeRDD.setName('My first RDD')

# Let's view the lineage (the set of transformations) of the RDD using toDebugString()
print xrangeRDD.toDebugString()

# Let's see how many partitions the RDD will be split into by using the getNumPartitions()
xrangeRDD.getNumPartitions()

map()

Subtract one from each value using map

foo
foo
# Create sub function to subtract 1
def sub(value):
    """Subtracts one from `value`.
    Args:
       value (int): A number.
    Returns:
        int: `value` minus one.
    """    
    return (value - 1)

#Transform xrangeRDD through map transformation using sub function
#Because map is a transformation and Spark uses lazy evaluation, no jobs, stages,
#or tasks will be launched when we run this code.

subRDD = xrangeRDD.map(sub)

collect()

将RDD结果转换为list
action 操作,此时才会实际执行

collect
collect
# Let's collect the data
print subRDD.collect()
#Out: [0, 1, 2, 3, 4……,9999]

count()

count RDD中记录个数
action 操作


count
count
print xrangeRDD.count()
#Out: 10000
print subRDD.count()
#Out: 10000

filter()

过滤操作,从原RDD 中提取满足条件的记录,生成新的RDD
transformation operation
图:筛选小于10的数据


filter
filter
# Define a function to filter a single value
def ten(value):
    """Return whether value is below ten.
    Args:
        value (int): A number.
    Returns:
        bool: Whether `value` is less than ten.
    """
    if (value < 10):
        return True
    else:
        return False
# The ten function could also be written concisely as: def ten(value): return value < 10

# Pass the function ten to the filter transformation
# Filter is a transformation so no tasks are run
filteredRDD = subRDD.filter(ten)

# View the results using collect()
# Collect is an action and triggers the filter transformation to run
print filteredRDD.collect()
#Out:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Using Python lambda() functions

#filter values less than 10
lambdaRDD = subRDD.filter(lambda x: x < 10)
lambdaRDD.collect()
#Out: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 

# Let's collect the even values less than 10
evenRDD = lambdaRDD.filter(lambda x: x % 2 == 0)
evenRDD.collect()
#Out:[0, 2, 4, 6, 8]

常用actions

action 说明
first() Return the first element in this RDD.
take(num) Take the first num elements of the RDD.
top(num, key=None) Get the top N elements from a RDD.Note: It returns the list sorted in descending order.
takeOrdered(num, key=None) Get the N elements from a RDD ordered in ascending order or as specified by the optional key function.
reduce(f) Reduces the elements of this RDD using the specified commutative and associative binary operator.
countByValue() Return the count of each unique value in this RDD as a dictionary of (value, count) pairs
takeSample(withReplacement, num, seed=None) returns an array with a random sample of elements from the dataset

reduce(f) 先在各分区上计算,然后再汇总各个分区的结果进行计算。 函数f要符合 交换律和结合律,如果不符合的话,计算结果可能会发生变化。

The reduce() action reduces the elements of a RDD to a single value by applying a function that takes two parameters and returns a single value.
The function should be commutative and associative, as reduce() is applied at the partition level and then again to aggregate results from partitions. If these rules don't hold, the results from reduce() will be inconsistent. Reducing locally at partitions makes reduce() very efficient.

# Let's get the first element
print filteredRDD.first()
#Out:0

# The first 4
print filteredRDD.take(4)
#Out:[0, 1, 2, 3]

# Note that it is ok to take more elements than the RDD has
print filteredRDD.take(12)
#Out:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

# Retrieve the three smallest elements ascending order 升序
print filteredRDD.takeOrdered(3)
#Out:[0, 1, 2]

# Retrieve the five largest elements   descending order 降序
print filteredRDD.top(5)
#Out:[9, 8, 7, 6, 5]

# Pass a lambda function to takeOrdered to reverse the order
filteredRDD.takeOrdered(4, lambda s: -s)
#Out:[9, 8, 7, 6]
# Obtain Python's add function
from operator import add
# Efficiently sum the RDD using reduce
print filteredRDD.reduce(add)
# Sum using reduce with a lambda function
print filteredRDD.reduce(lambda a, b: a + b)
# Note that subtraction is not both associative and commutative
print filteredRDD.reduce(lambda a, b: a - b)
print filteredRDD.repartition(4).reduce(lambda a, b: a - b)
# While addition is
print filteredRDD.repartition(4).reduce(lambda a, b: a + b)

#Out:45
#Out:45
#Out:-45
#Out:21
#Out:45
# takeSample reusing elements
print filteredRDD.takeSample(withReplacement=True, num=6)
# takeSample without reuse
print filteredRDD.takeSample(withReplacement=False, num=6)

# Set seed for predictability
print filteredRDD.takeSample(withReplacement=False, num=6, seed=500)
# Try reruning this cell and the cell above -- the results from this cell will remain constant
# Use ctrl-enter to run without moving to the next cell

# Create new base RDD to show countByValue
repetitiveRDD = sc.parallelize([1, 2, 3, 1, 2, 3, 1, 2, 1, 2, 3, 3, 3, 4, 5, 4, 6])
print repetitiveRDD.countByValue()
#Out:defaultdict(<type 'int'>, {1: 4, 2: 4, 3: 5, 4: 2, 5: 1, 6: 1})

常用transformations

flatMap()

Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
# Let's create a new base RDD to work from
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)

# Use map
singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x + 's'))
# Use flatMap
singularAndPluralWordsRDD = wordsRDD.flatMap(lambda x: (x, x + 's'))

# View the results
print singularAndPluralWordsRDDMap.collect()
#Out:[('cat', 'cats'), ('elephant', 'elephants'), ('rat', 'rats'), ('rat', 'rats'), ('cat', 'cats')]

print singularAndPluralWordsRDD.collect()
#Out:['cat', 'cats', 'elephant', 'elephants', 'rat', 'rats', 'rat', 'rats', 'cat', 'cats']

# View the number of elements in the RDD
print singularAndPluralWordsRDDMap.count()
#Out:5
print singularAndPluralWordsRDD.count()
#Out:10

groupByKey and reduceByKey

groupByKey(numPartitions=None)
Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.

reduceByKey (func, numPartitions=None)
Merge the values for each key using an associative reduce function.
类似MR的combiner会在每个mapper上先 merge,后再汇总到reducer

groupByKey() figure
groupByKey() figure

reduceByKey() figure
reduceByKey() figure

尽量少用groupByKey(),用reduceByKey()或下面2个代替
combineByKey() can be used when you are combining elements but your return type differs from your input value type.
foldByKey() merges the values for each key using an associative function and a neutral "zero value".

pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])
# mapValues only used to improve format for printing
print pairRDD.groupByKey().mapValues(lambda x: list(x)).collect()

# Different ways to sum by key
print pairRDD.groupByKey().map(lambda (k, v): (k, sum(v))).collect()
# Using mapValues, which is recommended when they key doesn't change
print pairRDD.groupByKey().mapValues(lambda x: sum(x)).collect()
# reduceByKey is more efficient / scalable
print pairRDD.reduceByKey(add).collect()

#Out:[('a', [1, 2]), ('b', [1])]
#Out:[('a', 3), ('b', 1)]
#Out:[('a', 3), ('b', 1)]
#Out:[('a', 3), ('b', 1)]

mapPartitions()

# mapPartitions takes a function that takes an iterator and returns an iterator
print wordsRDD.getNumPartitions()
print wordsRDD.collect()
itemsRDD = wordsRDD.mapPartitions(lambda iterator: [','.join(iterator)])
print itemsRDD.collect()

#Out:4
#Out:['cat', 'elephant', 'rat', 'rat', 'cat']
#Out:['cat', 'elephant', 'rat', 'rat,cat']

mapPartitionsWithIndex()

itemsByPartRDD = wordsRDD.mapPartitionsWithIndex(lambda index, iterator: [(index, list(iterator))])
# We can see that three of the (partitions) workers have one element and the fourth worker has two
# elements, although things may not bode well for the rat...
print itemsByPartRDD.collect()
# Rerun without returning a list (acts more like flatMap)
itemsByPartRDD = wordsRDD.mapPartitionsWithIndex(lambda index, iterator: (index, list(iterator)))
print itemsByPartRDD.collect()

#Out: [(0, ['cat']), (1, ['elephant']), (2, ['rat']), (3, ['rat', 'cat'])]
#Out: [0, ['cat'], 1, ['elephant'], 2, ['rat'], 3, ['rat', 'cat']]

Caching RDDs and storage options

.cache() 将RDD 保存在内存中

# Name the RDD
filteredRDD.setName('My Filtered RDD')
# Cache the RDD
filteredRDD.cache()
# Is it cached
print filteredRDD.is_cached

不需用时,可以用unpersist() 将数据从 内存和磁盘清理掉

# If we are done with the RDD we can unpersist it so that its memory can be reclaimed
filteredRDD.unpersist()
# Storage level for a non cached RDD
print filteredRDD.getStorageLevel()
filteredRDD.cache()
# Storage level for a cached RDD
print filteredRDD.getStorageLevel()

#Out:Serialized 1x Replicated
#Out:Memory Serialized 1x Replicated

Code style

用括号括起来
每个 method、transformation、action 单独一行

# Final version
(sc
 .parallelize(data)
 .map(lambda y: y - 1)
 .filter(lambda x: x < 10)
 .collect())
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容