原作者:By Justin Kestelyn (@kestelyn)
第一部分是作者简介和接触到Spark的背景,此段略去。
知识整理
首先回顾一下Spark中的关键知识点,不求全,只说那些最为基本的。包括partitioning区块, caching缓存, serialization序列化和the shuffle operation洗牌操作。
Partitions-区块
Spark的基本抽象元素是RDD-弹性分布式数据集,Spark就是运用RDD来简化诸如join和groupBy等复杂操作,同时隐藏了一个事实:你其实在操作碎片化的数据。而碎片化,又是Spark能进行分布式处理的前提,碎片化的程度是RDD的区块数量的函数(即区块数量越大,碎片化程度越高。——译者)。区块的数量非常重要,因为Spark里的stage(阶段)每次只会处理一个区块(也只会将该区块内的数据载入内存)。也就是说,如果你的区块数低于活跃的阶段数,你的集群cluster就不会被充分利用。而且,区块数越少,每个区块内的数据则越多,程序的内存压力则越大;另一方面,区块数太多,也会降低表现,因为要花太多时间进行网络和磁盘读写。这里说的这些内容最后就会形成Spark的并行处理理念,并影响你将来如何优化程序的表现。
Caching-缓存
Spark的优点起码有二:简单方便的API处理复杂的事情;另一个就是与MapReduce不同,Spark允许你随时在内存缓存中间数据。如果你的数据结构有频繁使用的数据时,缓存就显得非常重要了,比如有查询表或者机器学习算法中的分数矩阵。不过,缓存本身也可能导致问题,比如它要求大量内存。如何调节缓存是个学问,当做好了这一点时,可以显著提高表现。
Serialization-序列化
对于分布式计算来说,一般需要避免频繁读写数据,因为代价昂贵。应对的办法是将代码传递给数据。这就是为什么很多框架都基于JVM,因为它能让你在拥有数据的那台机器上执行代码。序列化,就是将代码翻译为压缩格式的过程,以便与代码在网络间传输。默认情况下,Spark使用标准的Java序列化工具,如果你追求更快的速度,可以使用Kyro序列化工具,该工具还能减少集群的内存压力并提高稳定性。
Shuffle-洗牌
尽管移动数据代价昂贵,但有时也是必须的。比如,有些操作需要将数据合并到一个节点,以便一起部署到内存中,例如有一个键值对RDD,当你要对key使用reduceByKey操作时。这个昂贵的重新整理数据的过程就叫洗牌。该过程既包括序列化,也包括Akka-也就是Spark的内部消息系统,从而会导致相当的磁盘和网络IO,并增加内存压力。错误Akka和序列化参数可能导致的问题,取决于数据的大小。对于这部分,详见这篇。
教训
教训1: Spark消耗大量内存
如前所述,Spark有个优点就是能缓存数据到内存。但坏处是,一旦使用了这个功能,Spark就会变成内存怪兽。首先,JVM和YARN就会消耗很多内存,而剩下来给数据移动和缓存的内存往往就是不够的了。driver中还会不断积留洗牌操作产生的副产品,比如元数据,这就给需要长时间运行的程序带来了很大压力。最后,Java和Scala的类文件都会导致RDD有隐藏开销。10-Char字节的Java String实际上占用的字节数是60bytes。更严重的是,当出现问题想找原因时,Spark的分布式属性又导致log文件到处都是,执行数量不断叠加,最后得花极大的精力来追溯到底问题出在哪。
因此,首先要做的就是要好好调试一下Spark。对初学者来说,合理分区、妥善管理内存压力以充分利用资源,这几部比较重要。另外,熟悉自己手头的数据,包括其大小、类型以及分布规律。最后这个比较重要,因为不然的话就会数据分区错乱。解决之道就是使用自定义分区工具。最后,前文提到的Kryo序列化也更快速和有效。
对于积累起来的元数据,有两个选择。第一是可以设置spark.cleaner.ttl参数出发自动清理。但是这样做同时会将所有持久化了的RDD也清理掉,而且我还发现如果后面有HDFS的操作,会导致其他问题。另一个解决办法,也是我目前在用的方法,就是将长期的任务拆分,将中间的文件写入磁盘。这样一来,环境总会保持新鲜,不用担心清理问题。
教训2: 避免移动数据
总的来说,避免洗牌,最小化数据移动,可以提高运行速度和提高执行稳定性。不过也要注意,确实有些时候,额外的洗牌很有帮助,比如有些数据没法自动分为很多个区(啥时候洗牌越多越好)。
那又该如何避免移动数据呢?最明确的答案就是避免导致洗牌的操作,比如repartition, coalesce, ByKey, groupByKey, reduceByKey, cogroup以及join。
Spark还额外提供了两个机制帮助做到这一点。首先是Broadcast variables-广播变量,这是个只读变量,缓存在每台机器的本地内存,从而无需在每次做任务时重新拷贝了。使用广播变量还能让你在大的和小的RDD之间进行有效的joins操作,还能将查询表放入内存,这样就比RDD的lookup方法更快的能查询到数据。
Accumulatorsl累加器则可以在执行过程中并行更新变量。它与广播变量不一样的地方在于,其虽然只能在driver过程中被读取,却能允许Spark程序合计counters,sums,generated lists这些结果。关于累加器需要注意的是,它不仅局限于基础类型,也可以累加Accumulable类。
教训3: 避免数据移动太难了
使用上文的机制其实比较难,比如说,要想广播一个RDD变量,首先得在driver节点collect()。而想要累积分布式计算的执行结果,则需要将数据序列化回到driver并在那里累积。这么做的结果就是driver上的内存压力变大了。collect RDD,持久化元数据,累加器,这些会让你driver的内存迅速耗光。当然你也可以增加driver的内存,不过这么做也是有个度的。
上面我提到了几件事:更少的分区会增加每个分区的数据量,以及Spark如何用Akka传递消息的。如果你有2GB的RDD,但只分成了20个分区,那么每当需要将数据从一个分区序列化到另一个节点时,你要通过Akka传输100MB的数据包.但默认情况下,Akka的缓存只有10MB!当然你也可以设置akka.frameSize
,但这么做的前提是你完全了解自己的数据,以及这些数据移动是怎么进行的.
教训4: 速度!
前三个教训主要针对的稳定性.接下来我说一下我是怎么显著提升速度的.
第一个毋庸置疑是熟练运用缓存.当然,内存是有限的,不能瞎缓存.不过一般来所,要用到两次的数据都该缓存.另外,不用了的RDD一般会自动反持久化,当然你可以手动unpersist()
.
第二,我发现广播变量特别有用.我常用它们做大的映射操作和查询表格.它们比缓存了的RDD要有优势得多,因为RDD即便缓存了,查询速度也要O(m),m是在单独一个分区中数据的长度.而广播了的hash map则只需要O(1).
最后,有时候我还得调整Spark的并行执行过程.我的情况是,要做一些key-value的跨RDD查询,在第一个RDD查询值,然后在第二个RDD进行一些操作.简单地实现方式是让Spark一次处理一个key,这样的结果显然是资源利用率极低,每次只有几个工作在进行,大部分的executors根本没用到.原因是没有使用RDD的API来运行,从而无法利用到Spark的并行机制.最终的做法也很简单,在driver上建立一个线程池,一次处理多个key.每个线程都可以生成一些任务提交给YARN,这样之后我可以充分燃烧我的CPU了.
最后的这个教训告诉我们,想用Spark,得会用.
结语
我希望上述的简单概念梳理和教训总结(以及相应文档)能少许改善一点Spark的复杂度.每当我学到了一些的时候,都会记录下来那些内容,作为引用(译者),帮助我进一步理解Spark机制,配置,调试等高级功能.希望上述这些内容对你也有帮助.