写这篇是因为最近用spark做了很多数据合并的事情,并且很诡异地出现了两种不同的情况,当数据量较大(千万级)时,有时rdd.join卡住了,有时却是reduceByKey失败了。当然最后发现可能和其他原因也有关系,但是不管怎样我觉得有必要看看源码。
大纲
- join源码解析
- reduceByKey vs. groupByKey
- 总结
1. join源码解析
join源码
关于join的代码主要是下面这段:
def _do_python_join(rdd, other, numPartitions, dispatch):
vs = rdd.mapValues(lambda v: (1, v))
ws = other.mapValues(lambda v: (2, v))
return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x: dispatch(x.__iter__()))
没错,join操作其实就是先union两个rdd,然后再进行groupByKey的操作。印象中,groupByKey的性能不如reduceByKey,事实真的是这样吗?下面是groupByKey的官方解释
"""
Group the values for each key in the RDD into a single sequence.
Hash-partitions the resulting RDD with numPartitions partitions.
.. note:: If you are grouping in order to perform an aggregation (such as a
sum or average) over each key, using reduceByKey or aggregateByKey will
provide much better performance.
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.groupByKey().mapValues(len).collect())
[('a', 2), ('b', 1)]
>>> sorted(rdd.groupByKey().mapValues(list).collect())
[('a', [1, 1]), ('b', [1])]
"""
如果我的理解没错的话,注解的意思是说如果group是为了做aggregation操作(比如加法等),reduceByKey 或 aggregateByKey比groupByKey好。但是如果只是为了做groupByKey呢?注解里并没有说明。很明显,当我们做数据合并时,其实只是为了做group操作,所以join和union+reduceByKey, 到底哪个更好呢?
2. reduceByKey vs. groupByKey
这部分的代码在class RDD中,源码地址
reduceByKey调用的是combineByKey,对比groupByKey和combineByKey后发现两者的区别有两点:
# serializer
def groupByKey(self, numPartitions=None, partitionFunc=portable_hash):
......
serializer = self._jrdd_deserializer
......
def groupByKey(it):
merger = ExternalGroupBy(agg, memory, serializer)
merger.mergeCombiners(it)
return merger.items()
return shuffled.mapPartitions(groupByKey, True).mapValues(ResultIterable)
def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
numPartitions=None, partitionFunc=portable_hash):
......
serializer = self.ctx.serializer
......
def _mergeCombiners(iterator):
merger = ExternalMerger(agg, memory, serializer)
merger.mergeCombiners(iterator)
return merger.items()
return shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True)
一是serializer;二是merger。
2.1. ctx.serializer vs. _jrdd_deserializer
从上面的代码可以发现,combineByKey调用的是self.ctx.serializer,而groupByKey调用的是self._jrdd_deserializer,所以看看RDD的这两个属性到底是什么。

ctx是传参,_jrdd_deserializer有默认值,所以还是得看看ctx是什么。一般我都是以sc.textFile()的方式读取数据,因此看看这个方法怎么得到的rdd。
源码
@ignore_unicode_prefix
def textFile(self, name, minPartitions=None, use_unicode=True):
"""
Read a text file from HDFS, a local file system (available on all
nodes), or any Hadoop-supported file system URI, and return it as an
RDD of Strings.
If use_unicode is False, the strings will be kept as `str` (encoding
as `utf-8`), which is faster and smaller than unicode. (Added in
Spark 1.2)
>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
... _ = testFile.write("Hello world!")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
[u'Hello world!']
"""
minPartitions = minPartitions or min(self.defaultParallelism, 2)
return RDD(self._jsc.textFile(name, minPartitions), self,
UTF8Deserializer(use_unicode))
可以看出ctx=self, _jrdd_deserializer=UTF8Deserializer(use_unicode)
接着看看这里的self是什么(由于代码有点分散直接截图):


现在就清楚了,self.serializer=AutoBatchedSerializer(PickleSerializer())。因此,如果是通过textFile()得到rdd,那么self.ctx.serializer与self._jrdd_deserializer就有区别。PickleSerializer()与UTF8Deserializer(use_unicode)比较的话,个人觉得应该是PickleSerializer()的效率更高些。可见代码
在此说明下,不同方式得到的rdd这两种serializer可能不同,因此这里不能统一下结论。
额外发现:
If use_unicode is False, the strings will be kept as
str(encoding
asutf-8), which is faster and smaller than unicode. (Added in
Spark 1.2)
2.2. ExternalMerger vs. ExternalGroupBy
源码
可以详细看下这两个类的注解,我理解的大概过程是这样:两个类都是先将数据存到内存中,如果数据超出内存则通过hash code分到不同的partition中,并且导出到磁盘。一直重复这种操作,将相同key的value先合并到内存,然后导出到磁盘。但是,ExternalGroupBy在导出的时候会先进行sort的操作,ExternalMerger是直接导出。因此,如果只是合并数据的话,ExternalMerger明显比ExternalGroupBy性能要好。

2.3.代码实践比较groupByKey & reduceByKey
本想试验下,看看groupByKey是否真的有排序功能,结果发现并不完全是。。。
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26),('Bala',20),('Bala',16),('Jalfaizy',12),('Ankit',15)]
l_r = sc.parallelize(l)
print l_r.groupByKey().map(lambda (x, y): (x, list(y))).collect()
# [('Jalfaizy', [12, 22]), ('Bala', [20, 16, 26]), ('Ankit', [25, 15]), ('saurabh', [20])]
print l_r.map(lambda (x, y): (x, [y])).reduceByKey(lambda x, y: x + y).collect()
# [('Jalfaizy', [12, 22]), ('Ankit', [25, 15]), ('Bala', [26, 20, 16]), ('saurabh', [20])]
v = [('Ankit',5),('Jalfaizy',32),('saurabh',2),('Bala',6),('Bala',10),('Bala',36),('Jalfaizy',11),('Ankit',10)]
v_r = sc.parallelize(v)
print v_r.groupByKey().map(lambda (x, y): (x, list(y))).collect()
# [('Jalfaizy', [32, 11]), ('Bala', [6, 10, 36]), ('Ankit', [5, 10]), ('saurabh', [2])]
print v_r.union(l_r).groupByKey().map(lambda (x, y): (x, list(y))).collect()
# [('Jalfaizy', [11, 12, 22, 32]), ('Bala', [20, 16, 10, 36, 26, 6]), ('Ankit', [5, 10, 15, 25]), ('saurabh', [2, 20])]
print l_r.union(v_r).groupByKey().map(lambda (x, y): (x, list(y))).collect()
# [('Jalfaizy', [22, 32, 12, 11]), ('Ankit', [25, 5, 15, 10]), ('Bala', [20, 16, 10, 36, 26, 6]), ('saurabh', [20, 2])]
print l_r.union(v_r).map(lambda (x, y): (x, [y])).reduceByKey(lambda x, y: x + y).collect()
# [('Jalfaizy', [22, 32, 12, 11]), ('Bala', [20, 16, 6, 26, 10, 36]), ('Ankit', [15, 10, 25, 5]), ('saurabh', [20, 2])]
发现reduceByKey和groupByKey的顺序确实不一样,但是像上面的这种情况,reduceByKey比groupByKey要多一步map操作。
另外,如果细心可以发现groupByKey返回结果用到了ResultIterable,因此结果是以iterator的方式返回。在python中,对于大数据而言,iterator优于list。
For small datasets, iterator and list based approaches have similar performance. For larger datasets, iterators save both time and space.
| -- | reduceByKey | groupByKey |
|---|---|---|
| serializer | AutoBatchedSerializer(PickleSerializer()) | UTF8Deserializer(use_unicode) |
| merger | 无排序 | 有排序 |
| 结果 | list | iterator |
3. 总结
结果对比完后,我也不知道reduceByKey和join哪个更好了。。。于是,用程序说话。同样的数据,同样的数据处理方式,partition数量也相同,资源也差不多,数据量大约:300w(rdd1), 180w(rdd2), 8700w(rdd3)。结果如下:
reduceByKey大约7min,join约10min。
--| reduceByKey|join|groupByKey
---|---|---
rdd1+rdd2+rdd3 |约6min| 约10min | 约10min
rdd1+rdd3|约6min|约8min|--
对比了一下过程,当有多个rdd一起合并时,reduceByKey需要考虑到数据量的问题,但是可以一次性合并多个rdd;join每次只能合并2个rdd,因此需要多次join过程,但我们通过上面的代码就可以看出,其实join是union+groupByKey的过程,因此要重复groupByKey的过程。照上面的数据情况看,虽然这么对比reduceByKey和join耗费的时间相差不多,但个人还是更建议reduceByKey。
最后多一句嘴:因数据量大spark程序卡住时,一定要看看partition数量是否足够大!!!