pyspark:源码分析rdd.join,reduceByKey, groupByKey

写这篇是因为最近用spark做了很多数据合并的事情,并且很诡异地出现了两种不同的情况,当数据量较大(千万级)时,有时rdd.join卡住了,有时却是reduceByKey失败了。当然最后发现可能和其他原因也有关系,但是不管怎样我觉得有必要看看源码。

大纲

  1. join源码解析
  2. reduceByKey vs. groupByKey
  3. 总结

1. join源码解析

join源码
关于join的代码主要是下面这段:

def _do_python_join(rdd, other, numPartitions, dispatch):
    vs = rdd.mapValues(lambda v: (1, v))
    ws = other.mapValues(lambda v: (2, v))
    return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x: dispatch(x.__iter__()))

没错,join操作其实就是先union两个rdd,然后再进行groupByKey的操作。印象中,groupByKey的性能不如reduceByKey,事实真的是这样吗?下面是groupByKey的官方解释

 """
        Group the values for each key in the RDD into a single sequence.
        Hash-partitions the resulting RDD with numPartitions partitions.
        .. note:: If you are grouping in order to perform an aggregation (such as a
            sum or average) over each key, using reduceByKey or aggregateByKey will
            provide much better performance.
        >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
        >>> sorted(rdd.groupByKey().mapValues(len).collect())
        [('a', 2), ('b', 1)]
        >>> sorted(rdd.groupByKey().mapValues(list).collect())
        [('a', [1, 1]), ('b', [1])]
        """

如果我的理解没错的话,注解的意思是说如果group是为了做aggregation操作(比如加法等),reduceByKey 或 aggregateByKey比groupByKey好。但是如果只是为了做groupByKey呢?注解里并没有说明。很明显,当我们做数据合并时,其实只是为了做group操作,所以join和union+reduceByKey, 到底哪个更好呢?

2. reduceByKey vs. groupByKey

这部分的代码在class RDD中,源码地址
reduceByKey调用的是combineByKey,对比groupByKey和combineByKey后发现两者的区别有两点:

# serializer
def groupByKey(self, numPartitions=None, partitionFunc=portable_hash):
    ......
    serializer = self._jrdd_deserializer
    ......
    def groupByKey(it):
            merger = ExternalGroupBy(agg, memory, serializer)
            merger.mergeCombiners(it)
            return merger.items()
    return shuffled.mapPartitions(groupByKey, True).mapValues(ResultIterable)

def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
                     numPartitions=None, partitionFunc=portable_hash):
    ......
    serializer = self.ctx.serializer
    ......
    def _mergeCombiners(iterator):
            merger = ExternalMerger(agg, memory, serializer)
            merger.mergeCombiners(iterator)
            return merger.items()
    return shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True)

一是serializer;二是merger。

2.1. ctx.serializer vs. _jrdd_deserializer

从上面的代码可以发现,combineByKey调用的是self.ctx.serializer,而groupByKey调用的是self._jrdd_deserializer,所以看看RDD的这两个属性到底是什么。


RDD初始化

ctx是传参,_jrdd_deserializer有默认值,所以还是得看看ctx是什么。一般我都是以sc.textFile()的方式读取数据,因此看看这个方法怎么得到的rdd。
源码

    @ignore_unicode_prefix
    def textFile(self, name, minPartitions=None, use_unicode=True):
        """
        Read a text file from HDFS, a local file system (available on all
        nodes), or any Hadoop-supported file system URI, and return it as an
        RDD of Strings.
        If use_unicode is False, the strings will be kept as `str` (encoding
        as `utf-8`), which is faster and smaller than unicode. (Added in
        Spark 1.2)
        >>> path = os.path.join(tempdir, "sample-text.txt")
        >>> with open(path, "w") as testFile:
        ...    _ = testFile.write("Hello world!")
        >>> textFile = sc.textFile(path)
        >>> textFile.collect()
        [u'Hello world!']
        """
        minPartitions = minPartitions or min(self.defaultParallelism, 2)
        return RDD(self._jsc.textFile(name, minPartitions), self,
                   UTF8Deserializer(use_unicode))

可以看出ctx=self, _jrdd_deserializer=UTF8Deserializer(use_unicode)

接着看看这里的self是什么(由于代码有点分散直接截图):


SparkContext初始化

现在就清楚了,self.serializer=AutoBatchedSerializer(PickleSerializer())。因此,如果是通过textFile()得到rdd,那么self.ctx.serializer与self._jrdd_deserializer就有区别。PickleSerializer()与UTF8Deserializer(use_unicode)比较的话,个人觉得应该是PickleSerializer()的效率更高些。可见代码

在此说明下,不同方式得到的rdd这两种serializer可能不同,因此这里不能统一下结论。

额外发现:

If use_unicode is False, the strings will be kept as str (encoding
as utf-8), which is faster and smaller than unicode. (Added in
Spark 1.2)

2.2. ExternalMerger vs. ExternalGroupBy

源码
可以详细看下这两个类的注解,我理解的大概过程是这样:两个类都是先将数据存到内存中,如果数据超出内存则通过hash code分到不同的partition中,并且导出到磁盘。一直重复这种操作,将相同key的value先合并到内存,然后导出到磁盘。但是,ExternalGroupBy在导出的时候会先进行sort的操作,ExternalMerger是直接导出。因此,如果只是合并数据的话,ExternalMerger明显比ExternalGroupBy性能要好。

ExternalGroupBy的注解

2.3.代码实践比较groupByKey & reduceByKey

本想试验下,看看groupByKey是否真的有排序功能,结果发现并不完全是。。。

l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26),('Bala',20),('Bala',16),('Jalfaizy',12),('Ankit',15)]
l_r = sc.parallelize(l)
print l_r.groupByKey().map(lambda (x, y): (x, list(y))).collect()
# [('Jalfaizy', [12, 22]), ('Bala', [20, 16, 26]), ('Ankit', [25, 15]), ('saurabh', [20])]
print l_r.map(lambda (x, y): (x, [y])).reduceByKey(lambda x, y: x + y).collect()
# [('Jalfaizy', [12, 22]), ('Ankit', [25, 15]), ('Bala', [26, 20, 16]), ('saurabh', [20])]

v = [('Ankit',5),('Jalfaizy',32),('saurabh',2),('Bala',6),('Bala',10),('Bala',36),('Jalfaizy',11),('Ankit',10)]
v_r = sc.parallelize(v)
print v_r.groupByKey().map(lambda (x, y): (x, list(y))).collect()
# [('Jalfaizy', [32, 11]), ('Bala', [6, 10, 36]), ('Ankit', [5, 10]), ('saurabh', [2])]

print v_r.union(l_r).groupByKey().map(lambda (x, y): (x, list(y))).collect()
# [('Jalfaizy', [11, 12, 22, 32]), ('Bala', [20, 16, 10, 36, 26, 6]), ('Ankit', [5, 10, 15, 25]), ('saurabh', [2, 20])]
print l_r.union(v_r).groupByKey().map(lambda (x, y): (x, list(y))).collect()
# [('Jalfaizy', [22, 32, 12, 11]), ('Ankit', [25, 5, 15, 10]), ('Bala', [20, 16, 10, 36, 26, 6]), ('saurabh', [20, 2])]
print l_r.union(v_r).map(lambda (x, y): (x, [y])).reduceByKey(lambda x, y: x + y).collect()
# [('Jalfaizy', [22, 32, 12, 11]), ('Bala', [20, 16, 6, 26, 10, 36]), ('Ankit', [15, 10, 25, 5]), ('saurabh', [20, 2])]

发现reduceByKey和groupByKey的顺序确实不一样,但是像上面的这种情况,reduceByKey比groupByKey要多一步map操作。

另外,如果细心可以发现groupByKey返回结果用到了ResultIterable,因此结果是以iterator的方式返回。在python中,对于大数据而言,iterator优于list。

For small datasets, iterator and list based approaches have similar performance. For larger datasets, iterators save both time and space.

-- reduceByKey groupByKey
serializer AutoBatchedSerializer(PickleSerializer()) UTF8Deserializer(use_unicode)
merger 无排序 有排序
结果 list iterator

3. 总结

结果对比完后,我也不知道reduceByKey和join哪个更好了。。。于是,用程序说话。同样的数据,同样的数据处理方式,partition数量也相同,资源也差不多,数据量大约:300w(rdd1), 180w(rdd2), 8700w(rdd3)。结果如下:
reduceByKey大约7min,join约10min。
--| reduceByKey|join|groupByKey
---|---|---
rdd1+rdd2+rdd3 |约6min| 约10min | 约10min
rdd1+rdd3|约6min|约8min|--

对比了一下过程,当有多个rdd一起合并时,reduceByKey需要考虑到数据量的问题,但是可以一次性合并多个rdd;join每次只能合并2个rdd,因此需要多次join过程,但我们通过上面的代码就可以看出,其实join是union+groupByKey的过程,因此要重复groupByKey的过程。照上面的数据情况看,虽然这么对比reduceByKey和join耗费的时间相差不多,但个人还是更建议reduceByKey。

最后多一句嘴:因数据量大spark程序卡住时,一定要看看partition数量是否足够大!!!

参考资料

  1. pyspark github
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容