spark parquet分区:大量文件

我正在尝试利用 spark分区。我想做点什么

data.write.partitionBy("key").parquet("/location")
这里的问题是,每个分区都会创建大量的 parquet file,如果我试图从 root目录中读取,则会导致读取速度变慢。

为了避免我试图

data.coalese(numPart).write.partitionBy("key").parquet("/location")
但是,这会在每个分区中创建若干个 parquet file。

我应该如何使用分区来避免写后出现许多文件?

首先,我真的会避免使用coalesce,因为这通常会在 transformation链中进一步推高,并且可能会破坏 job的并行性(我在这里问了这个问题:如何防止Spark优化)

每个拼花板分区写入1个文件实际上很容易(请参见Spark dataframe 写入方法写入许多小文件):

data.repartition($"key").write.partitionBy("key").parquet("/location")

如果要设置任意数量的文件(或大小相同的文件),则需要使用另一个可以使用的属性进一步 repartition数据(我无法告诉您这种情况下可能是什么):

data.repartition($"key",$"another_key").write.partitionBy("key").parquet("/location")

另一个_ key可以是 DataSet 的另一个属性,也可以是对现有属性使用某些模运算或舍入运算的派生属性。您甚至可以在 key上使用行数为的窗口函数,然后将其取整如下

data.repartition($"key",floor($"row_number"/N)*N).write.partitionBy("key").parquet("/location")

这会将n个记录放入1个 parquet file中

使用orderby

您还可以通过相应地订购 DataFrame 来控制文件的数量,而无需 repartition:

data.orderBy($"key").write.partitionBy("key").parquet("/location")

这将导致跨越所有分区的spark.sql.shuffle.partitions总数(默认为200)。在$key之后添加第二个排序列甚至是有益的,因为 Parquet将记住 DataFrame 的排序并相应地编写统计信息。例如,可以按ID排序:

data.orderBy($"key",$"id").write.partitionBy("key").parquet("/location")

这不会改变文件的数量,但当您查询 parquet file以获取给定的 key和ID时,它会提高性能。请参见https://www.slideshare.net/ryanblue3/parquet-performance-tuning-the-missing-guidehttps://db-blog.web.cern.ch/blog/luca-canali/2017-06-diving-spark-an。D-镶木地板- job负荷-示例

spark2.2+

从spark 2.2开始,您还可以使用新选项maxrecordsperfile来限制每个文件的记录数。如果有n个分区,您仍然可以获得至少n个文件,但是您可以将由1个分区(任务)写入的文件拆分为更小的块:

df.write.option("maxRecordsPerFile", 10000)...

参见http://www.gatorshime.io/expected-feature-in-spark-2-2-max-records-written-per-file/和spark-write-to-disk,其中n个文件小于n个分区

<small style="box-sizing: border-box; font-size: 12.8px; font-weight: 400;"></small><small style="box-sizing: border-box; font-size: 12.8px; font-weight: 400; margin-left: 5px;">[ 评论](javascript:void(0))</small>

[图片上传中...(image-7e3a5e-1593689132254-0)]

<small style="box-sizing: border-box; font-size: 12.8px; font-weight: 400; display: flex; flex-direction: column; margin-left: 5px;">社区小助手2019-10-06</small>


0

这对我很有效:

data.repartition(n, "key").write.partitionBy("key").parquet("/location")

它在每个输出分区(目录)中生成n个文件,并且(奇闻)比使用 coalesce 快,而且(奇闻)比仅对输出 repartition快。

如果您使用的是S3,我还建议您在 local驱动器上执行所有操作(Spark在写操作期间会执行大量文件创建/重命名/删除操作),一旦完成所有操作,请使用Hadoop FileUtil(或仅使用AWS CLI)复制所有操作:

import java.net.URIimport org.apache.hadoop.fs.{FileSystem, FileUtil, Path}// ...  def copy(          in : String,          out : String,          sparkSession: SparkSession          ) = {    FileUtil.copy(      FileSystem.get(new URI(in), sparkSession.sparkContext.hadoopConfiguration),      new Path(in),      FileSystem.get(new URI(out), sparkSession.sparkContext.hadoopConfiguration),      new Path(out),      false,      sparkSession.sparkContext.hadoopConfiguration    )  }

编辑: root据评论中的讨论:

您是一个 DataSet ,其分区列为year,但每个给定的年份中的数据量都有很大的不同。因此,一年可能有1GB的数据,而另一年可能有100GB的数据。

以下是处理此问题的一种方法的psuedocode:

val partitionSize = 10000 // Number of rows you want per output file.val yearValues = df.select("YEAR").distinctdistinctGroupByValues.each((yearVal) -> {  val subDf = df.filter(s"YEAR = $yearVal")  val numPartitionsToUse = subDf.count / partitionSize  subDf.repartition(numPartitionsToUse).write(outputPath + "/year=$yearVal")})

但是,我不知道这能起什么作用。Spark可能会在每个列分区的文件数可变的情况下出现读取问题。

另一种方法是编写自己的自定义分区程序,但我不知道其中包含什么,因此无法提供任何代码。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。