问题
- 项目中碰到一个新问题,需要利用Spark分析海量小文件,具体大概是这样的:
- 海量的双层压缩包:
- 数量可能几十万或更多,文件大小从几KB到几MB不等,可能是zip+zip压缩,也可能是zip+gz压缩。分析发现,外层的ZIP包中可能含有多个文件,内层的zip或gz包中只会有一个csv文件。
- 实际表头格式也有多种,但处理方法和具体业务结合的较多,本问不再赘述。
- 不同压缩方式的文件混在一起,无法在获取的时候自动分拣。
- 非主流的Spark使用方法:
- 由于存在一些现实因素,再加上担心hadoop对海量小文件支持的不好,因此实验环境是一个单机多进程(多个worker)的spark环境,文件存储在本地,而非HDFS,诸如newapihadoopfile等高大上的接口并没进行分析测试,但貌似整个apache生态圈都不太支持zip。
- 对于rdd和Dataframe的困惑:
- 过去提到Spark,接下来肯定是讨论rdd了。但Spark2.0之后,强化了Dataframe体系(DF),比如Spark机器学习的Mlib库,现在有两套接口,一套rdd的,一套DF的,而且官方更推荐使用后者。基于DF可以建立表,可以使用SQL,非常的方便,但我还不太熟悉这个体系。
- 对于本项目的情况,无论RDD还是DF体系都不能原生支持读取zip。但是都可以透明的读取gz,也就是把gz文件(目录)当作txt或csv等格式直接读取。(没卵用啊)
我的方案
搞了两种方案供大家参考:
方案1:
手动将外层zip解压,得到第二层文件,再将内层是zip格式的文件拿出来进行第二次手动解压,如果内层是gz或csv,则不需要在进行手动操作。
之后可以用RDD或DF方式读取gz或csv,但本项目的表头格式是不统一的(多种表混在一个目录里,分拣不出来),因此采用df的read方法直接读取文件,设置option("header", "true"),这样就能自动得到第一行的表头。
val df = spark.read.option("header", "true").option("encoding", "GBK").csv(args(0))
优点:
- 可以自动识别表头,多种csv表格式混合在一个目录也不怕。后续可以用SQL处理数据,代码很好写。
缺点:
- 需要提前解压zip,如果进行双层zip解压,文件空间占用增大很多倍。测试发现,解压外层zip,空间占用不超过1倍,但再解压内层的zip,占用空间就会暴涨。
- 用rdd方式测试发现,直接读取双层zip和读取解压后的csv,处理时间是相似的!我在虚拟机+SSD环境下,和一个较高性能的服务器上测试,结论相同。因此手动解压方案在时间性能和空间性能上都不占优势。
- 完全基于df体系编写业务逻辑:读文件->过滤无效数据->做处理。数据量变大之后,运行的非常慢,可能需要基于rdd方法的2倍或更长的时间。
- 这套代码暂时也没发现有明显写的不合适的地方,但细节上肯定有不到位之处。一些rdd下的优化措施不太起作用(比如尝试过滤没用的列),后来感觉方案2更有前途,这个版本也懒得再优化了。
方案2(目前的方法):
其实关键代码都是借鉴了网上的一个利用binaryFiles直接读取zip的方案,小改进了一下,支持双层解压了。但原始方案经过多次转载也搞不清原创是谁了,无论是谁都表示感谢吧!!!
具体过程如下:
利用rdd的binaryFiles方法,以二进制方式读取双层压缩包。而DF体系没有提供binaryFiles方法,即不能直接以二进制方式读取zip。
自定义flatMap方法,在里面调用scala的ZipInputStream方法,首先将外层文件通过ZipInputStream解压,得到多个二层文件。
再写一个flatMap ,嵌套到第一层flatMap 当中,完成第二层解压和按行读取文本输出。
3.1. 第二层文件有多个,循环获得第二层文件名,如果扩展名含有gz,则通过scala的GZIPInputStream进行gz解压。(注意此时无法再调用原生方式读取gz,必须自己手动解压)
3.2. 如果第二层扩展名是zip,则再进行一次zip解压。
优点:
- 自动化程度高。这种方式的过滤和容错机制比方案一更好,在存在错误文件或错误数据格式的情况下,留下的有效数据略多一些。
- 实测双层解压过程并没有显著增加处理时间。
关键代码说明:
缩进乱套了......
import java.util.zip.{ ZipInputStream, GZIPInputStream, ZipEntry }
import org.apache.spark.sql.SparkSession
import org.apache.spark.input.PortableDataStream
val spark = SparkSession.builder.appName("doubleziptest")
.config("spark.files.ignoreCorruptFiles", "true") //过滤损坏的文件
.getOrCreate
val dataAndPortableRDD = spark.sparkContext.binaryFiles(args(0))
.flatMap {
case (name: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open) //解压第一层zip
Stream.continually(zis.getNextEntry).takeWhile(_ != null)
.flatMap { x => //内层flatmap,解压第二层或直接获得文本流
if (StringUtils.contains(x.getName.toUpperCase(), ".GZ")) { //第二层是gz
val zis2 = new GZIPInputStream(zis) //解压gz
//注意gz没有getNextEntry的概念
val br = new BufferedReader(new InputStreamReader(zis2))
Stream.continually(br.readLine()).takeWhile(_ != null)
} else if (StringUtils.contains(x.getName.toUpperCase(), ".ZIP")){ //第二层是zip
val zis2 = new ZipInputStream(zis) //解压第二层zip
Stream.continually(zis2.getNextEntry).takeWhile(_ != null)
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis2))
Stream.continually(br.readLine()).takeWhile(_ != null) //解压之后的文件分割为行
}
} else {
//其他的情况...略...
}
}
}
其他的坑:
RDD的表头识别:
- 由于rdd体系无法自动识别csv表头,本项目中表头又是多样且混合在一个目录中的。 因此在解压获得文本之后,要解决识别表头的问题,实际上使用的策略是二层解压之后手动获取csv的第一行,通过字符串分割匹配的方法获得表头,并附加到每一行数据前面。这种方式牺牲了部分性能(慢了50%)。此外还尝试过其他方法,最后感觉这种方法坑小一些。
Spark监控多个FTP:
- 文件来源于多个FTP,尝试了几个Spark读取FTP的方式,感觉都特别坑,网上找到一些帖子,都不能很好的解决问题。此外尝试用flume监控多个FTP源,但遇到一些具体问题之后也放弃了。(github上有个基于DF实现的插件,不能读取整个FTP目录,只能读取单个文件,适合通过FTP读取单个大文件)
RDD vs DF:
- 如果只用RDD完成复杂的业务处理,代码会比较难看,实际对比发现性能上也并没有优势,如果直接用DF方式读取数据,则就是上面的方案一,不支持zip,且不是很好优化。
- 在方案二中,完成上述解压过程之后,将RDD中进行无效行的过滤,然后进行无用列的过滤,再进行一些重分区的优化,最后将有效的数据.toDF(),再通过createTempView方法映射为表,之后就可以用SQL语句了,而且(在不考虑双层解压的情况下),效率和直接用RDD操作基本一样,明显快于完全使用DF的方案一。
- 当然,目前只能说我对DF的理解实在是不够好。
其他的事情还在摸索中……