Spark读取双层ZIP压缩包

问题

  • 项目中碰到一个新问题,需要利用Spark分析海量小文件,具体大概是这样的:
  1. 海量的双层压缩包:
  • 数量可能几十万或更多,文件大小从几KB到几MB不等,可能是zip+zip压缩,也可能是zip+gz压缩。分析发现,外层的ZIP包中可能含有多个文件,内层的zip或gz包中只会有一个csv文件。
  • 实际表头格式也有多种,但处理方法和具体业务结合的较多,本问不再赘述。
  • 不同压缩方式的文件混在一起,无法在获取的时候自动分拣。
  1. 非主流的Spark使用方法:
  • 由于存在一些现实因素,再加上担心hadoop对海量小文件支持的不好,因此实验环境是一个单机多进程(多个worker)的spark环境,文件存储在本地,而非HDFS,诸如newapihadoopfile等高大上的接口并没进行分析测试,但貌似整个apache生态圈都不太支持zip。
  1. 对于rdd和Dataframe的困惑:
  • 过去提到Spark,接下来肯定是讨论rdd了。但Spark2.0之后,强化了Dataframe体系(DF),比如Spark机器学习的Mlib库,现在有两套接口,一套rdd的,一套DF的,而且官方更推荐使用后者。基于DF可以建立表,可以使用SQL,非常的方便,但我还不太熟悉这个体系。
  • 对于本项目的情况,无论RDD还是DF体系都不能原生支持读取zip。但是都可以透明的读取gz,也就是把gz文件(目录)当作txt或csv等格式直接读取。(没卵用啊)

我的方案

搞了两种方案供大家参考:

方案1:

  1. 手动将外层zip解压,得到第二层文件,再将内层是zip格式的文件拿出来进行第二次手动解压,如果内层是gz或csv,则不需要在进行手动操作。

  2. 之后可以用RDD或DF方式读取gz或csv,但本项目的表头格式是不统一的(多种表混在一个目录里,分拣不出来),因此采用df的read方法直接读取文件,设置option("header", "true"),这样就能自动得到第一行的表头。

val df = spark.read.option("header", "true").option("encoding", "GBK").csv(args(0)) 

优点:

  • 可以自动识别表头,多种csv表格式混合在一个目录也不怕。后续可以用SQL处理数据,代码很好写。

缺点:

  • 需要提前解压zip,如果进行双层zip解压,文件空间占用增大很多倍。测试发现,解压外层zip,空间占用不超过1倍,但再解压内层的zip,占用空间就会暴涨。
  • 用rdd方式测试发现,直接读取双层zip和读取解压后的csv,处理时间是相似的!我在虚拟机+SSD环境下,和一个较高性能的服务器上测试,结论相同。因此手动解压方案在时间性能和空间性能上都不占优势。
  • 完全基于df体系编写业务逻辑:读文件->过滤无效数据->做处理。数据量变大之后,运行的非常慢,可能需要基于rdd方法的2倍或更长的时间。
  • 这套代码暂时也没发现有明显写的不合适的地方,但细节上肯定有不到位之处。一些rdd下的优化措施不太起作用(比如尝试过滤没用的列),后来感觉方案2更有前途,这个版本也懒得再优化了。

方案2(目前的方法):

其实关键代码都是借鉴了网上的一个利用binaryFiles直接读取zip的方案,小改进了一下,支持双层解压了。但原始方案经过多次转载也搞不清原创是谁了,无论是谁都表示感谢吧!!!

具体过程如下:

  1. 利用rdd的binaryFiles方法,以二进制方式读取双层压缩包。而DF体系没有提供binaryFiles方法,即不能直接以二进制方式读取zip。

  2. 自定义flatMap方法,在里面调用scala的ZipInputStream方法,首先将外层文件通过ZipInputStream解压,得到多个二层文件。

  3. 再写一个flatMap ,嵌套到第一层flatMap 当中,完成第二层解压和按行读取文本输出。
    3.1. 第二层文件有多个,循环获得第二层文件名,如果扩展名含有gz,则通过scala的GZIPInputStream进行gz解压。(注意此时无法再调用原生方式读取gz,必须自己手动解压)
    3.2. 如果第二层扩展名是zip,则再进行一次zip解压。

优点:

  • 自动化程度高。这种方式的过滤和容错机制比方案一更好,在存在错误文件或错误数据格式的情况下,留下的有效数据略多一些。
  • 实测双层解压过程并没有显著增加处理时间。

关键代码说明:

缩进乱套了......

import java.util.zip.{ ZipInputStream, GZIPInputStream, ZipEntry }
import org.apache.spark.sql.SparkSession
import org.apache.spark.input.PortableDataStream

 val spark = SparkSession.builder.appName("doubleziptest")
      .config("spark.files.ignoreCorruptFiles", "true") //过滤损坏的文件
      .getOrCreate

 val dataAndPortableRDD = spark.sparkContext.binaryFiles(args(0))
       .flatMap {
         case (name: String, content: PortableDataStream) =>
         val zis = new ZipInputStream(content.open) //解压第一层zip
         Stream.continually(zis.getNextEntry).takeWhile(_ != null)
             .flatMap { x => //内层flatmap,解压第二层或直接获得文本流
              if (StringUtils.contains(x.getName.toUpperCase(), ".GZ")) { //第二层是gz
                 val zis2 = new GZIPInputStream(zis) //解压gz
                 //注意gz没有getNextEntry的概念
                 val br = new BufferedReader(new InputStreamReader(zis2))  
                 Stream.continually(br.readLine()).takeWhile(_ != null) 
               } else if (StringUtils.contains(x.getName.toUpperCase(), ".ZIP")){ //第二层是zip
                 val zis2 = new ZipInputStream(zis) //解压第二层zip
                 Stream.continually(zis2.getNextEntry).takeWhile(_ != null)
                   .flatMap { _ =>
                     val br = new BufferedReader(new InputStreamReader(zis2))
                     Stream.continually(br.readLine()).takeWhile(_ != null) //解压之后的文件分割为行
                   }
               } else { 
                    //其他的情况...略...
               }
            }
       }

其他的坑:

RDD的表头识别:

  • 由于rdd体系无法自动识别csv表头,本项目中表头又是多样且混合在一个目录中的。 因此在解压获得文本之后,要解决识别表头的问题,实际上使用的策略是二层解压之后手动获取csv的第一行,通过字符串分割匹配的方法获得表头,并附加到每一行数据前面。这种方式牺牲了部分性能(慢了50%)。此外还尝试过其他方法,最后感觉这种方法坑小一些。

Spark监控多个FTP:

  • 文件来源于多个FTP,尝试了几个Spark读取FTP的方式,感觉都特别坑,网上找到一些帖子,都不能很好的解决问题。此外尝试用flume监控多个FTP源,但遇到一些具体问题之后也放弃了。(github上有个基于DF实现的插件,不能读取整个FTP目录,只能读取单个文件,适合通过FTP读取单个大文件)

RDD vs DF:

  • 如果只用RDD完成复杂的业务处理,代码会比较难看,实际对比发现性能上也并没有优势,如果直接用DF方式读取数据,则就是上面的方案一,不支持zip,且不是很好优化。
  • 在方案二中,完成上述解压过程之后,将RDD中进行无效行的过滤,然后进行无用列的过滤,再进行一些重分区的优化,最后将有效的数据.toDF(),再通过createTempView方法映射为表,之后就可以用SQL语句了,而且(在不考虑双层解压的情况下),效率和直接用RDD操作基本一样,明显快于完全使用DF的方案一。
  • 当然,目前只能说我对DF的理解实在是不够好。

其他的事情还在摸索中……

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容