转行写spark程序快一年时间了,我最深刻的体会是实现功能容易,但如何提高程序的执行效率却是个难题。我们用的spark主要是spark sql框架,使用spark sql实现数据的清洗、抽取以及计算。期间,我们用了大部分的时间对程序做优化,现将对程序的优化方法总结如下:
1. 数据存储优化
在数据存储上,经过了从hdfs切换到cassandra,再从cassandra换到内存文件系统alluxio上。
1.1 hadfs存储
hdfs是hadoop的分布式文件系统,被设计成适合运行在通用硬件上的分布式文件系统。能够存储多种格式的数据,包括文本及parquet格式等等。它是一个主从结构,一个hdfs集群是由一个名字节点,它是一个管理文件命名空间和调节客户端访问文件的主服务器,当然还有一些数据节点,通常是一个节点一个机器,它来管理对应节点的存储,如下图1所示。hdfs对外开放文件命名空间并允许用户数据以文件形式存储。
hdfs内部是将一个文件分割成一个或多个块,这些块被存储在一组数据节点中。名字节点用来操作文件命名空间的文件或目录操作,如打开,关闭,重命名等等。它同时确定块与数据节点的映射。数据节点负责来自文件系统客户的读写请求。数据节点同时还要执行块的创建,删除,和来自名字节点的块复制指令。
1.2 cassandra存储
cassandra是一个面向带索引的Nosql数据库,数据是以松散结构的多维哈希表存储在数据库中。它最大的优点就是写速度非常快,并且由于有索引,可以避免重复数据的出现,有自动去重的功能,而读的速度却不那么尽如人意。
1.3 alluxio+parquet存储
由于使用hdfs和cassandra加载数据时,数据的加载时间就占用了很大部分,甚至占用了一半的时间。基于数据加载速度的考虑,后来采用alluxio存储需要加载的数据,使用hdfs作为持久层,这样数据可以直接从内存加载,加载的速度大大提高了。
现在的spark程序主要使用spark sql框架,程序结构如下图2所示。spark sql程序运行在yarn集群上,直接访问alluxio内存文件系统加载数据,持久化的数据存储在hdfs防止丢失。alluxio作为一个内存的文件系统,也可以存放多种类型的格式的文件,为了节省存储开销,提高访问速度,此处我们在alluxio上使用parquet格式存储数据。
2. 逻辑结构优化
2.1 数据加载逻辑优化
由于程序计算需要加载的历史数据较多,而程序的每轮执行都要加载历史数据,历史数据的加载占用了数据加载的大部分时间。因此我们后来改用数据增量加载的形式,即将数据分天存放。每次程序重启时,先加载历史数据,即当天以前的数据,将历史数据加载后进行预处理后,保存在内存中。以后每轮就只需加载当天数据,再跟历史数据合并。这样以后每轮都避免了重新加载历史数据,并进行预处理的时间,显著地提高了程序的执行效率。
2.2 数据分区合并
我们使用alluxio,以parquet的格式存储源数据时,如果小文件过多,数据的加载速度会慢很多。因此在存储源数据时,建议使用coalesce将数据合并到几个分区中,以防止小文件过多。虽然reparation也能够实现分区聚合功能,但reparation会出现shuffle,严重影响spark程序的执行效率。
3. 配置参数优化
3.1 任务调度模式修改
spark的任务调度模式分为FIFO(先进先出)和FAIR(公平竞争)。FIFO的调度机制是将队列中的job按照先进先出的方式进行调度执行,而FAIR则按照是使需要资源较少的任务先执行,如所有任务都得按先进先出的方式,则小作业也被阻塞不能执行。设置调度模式的方式,只需一行代码如下:
sparkConf.set(“spark.scheduler.mode”,”FAIR”)
3.2 执行并行度参数设置
并行度决定了spark作业划分task的数量,一般情况下,task越多,程序执行的并行度就越高。但task数量也不能太多,因为task的创建也需要耗时间和内存资源。一般建议设置的并行度是num-executorsexecutor-cores的2-3倍。如果不设置,程序默认的并行度只是num-executorsexecutor-cores的不到2倍。
4.shuffle调优
4.1 定义广播变量
大多数spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。shuffle调优的一个途径就是尽可能地减少shuffle,在两个表join的过程中,如果一个表不会经常改变,同时数据量又不会太大时,将这个表广播出去,这样集群上的每个节点上都会保存这个表,这样需要join操作的另一个表就可以在自己的节点上完成关联操作,可以尽可能地减少shuffle。
4.2 减少数据倾斜
在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。
因此出现数据倾斜的时候,spark作业运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。数据倾斜的例子如下图所示,在三个节点上对应的以hello为key的总共有6条数据,这些数据都会被拉取到同一个task中进行处理,而以world为key的只有3条。实际情况中task可能个别key的数据量可能更大,比key少的可能多n多倍。因此key多的task的运行速度可能会比key少的task执行速度要慢n倍,而整个程序的执行速度是由最慢的task决定的。同时,如果某一个task处理的数据量过多的话,还会出现内存溢出的危险。
实际写spark程序时,要尽量避免出现数据倾斜的情况,如果出现上述现象时,原则是采用两阶段聚合的方式。首先针对某些key较多的数据,进行拆分,将一个key拆分成多个,比如上图3,对hello进行拆分,将hello前面加上1-10之间的任意随机数,变成1_hello,2_hello直到10_hello,对这些拆分后的key首先聚合,聚合后变成(1_hello,2),(2_hello,2)在再将这些随机数前缀去掉,再进行聚合,实现方式如下图4。
5. 升级spark版本
还有一种较为有效的办法是升级spark版本,spark 2.1版本较spark 1.6版本性能有更多的提升。其中的优势在《spark sql执行流程》的最后有所介绍。