我正在尝试利用 spark分区。我想做点什么
data.write.partitionBy("key").parquet("/location")
这里的问题是,每个分区都会创建大量的 parquet file,如果我试图从 root目录中读取,则会导致读取速度变慢。
为了避免我试图
data.coalese(numPart).write.partitionBy("key").parquet("/location")
但是,这会在每个分区中创建若干个 parquet file。
我应该如何使用分区来避免写后出现许多文件?
首先,我真的会避免使用coalesce,因为这通常会在 transformation链中进一步推高,并且可能会破坏 job的并行性(我在这里问了这个问题:如何防止Spark优化)
每个拼花板分区写入1个文件实际上很容易(请参见Spark dataframe 写入方法写入许多小文件):
data.repartition($"key").write.partitionBy("key").parquet("/location")
如果要设置任意数量的文件(或大小相同的文件),则需要使用另一个可以使用的属性进一步 repartition数据(我无法告诉您这种情况下可能是什么):
data.repartition($"key",$"another_key").write.partitionBy("key").parquet("/location")
另一个_ key可以是 DataSet 的另一个属性,也可以是对现有属性使用某些模运算或舍入运算的派生属性。您甚至可以在 key上使用行数为的窗口函数,然后将其取整如下
data.repartition($"key",floor($"row_number"/N)*N).write.partitionBy("key").parquet("/location")
这会将n个记录放入1个 parquet file中
使用orderby
您还可以通过相应地订购 DataFrame 来控制文件的数量,而无需 repartition:
data.orderBy($"key").write.partitionBy("key").parquet("/location")
这将导致跨越所有分区的spark.sql.shuffle.partitions总数(默认为200)。在$key之后添加第二个排序列甚至是有益的,因为 Parquet将记住 DataFrame 的排序并相应地编写统计信息。例如,可以按ID排序:
data.orderBy($"key",$"id").write.partitionBy("key").parquet("/location")
这不会改变文件的数量,但当您查询 parquet file以获取给定的 key和ID时,它会提高性能。请参见https://www.slideshare.net/ryanblue3/parquet-performance-tuning-the-missing-guide和https://db-blog.web.cern.ch/blog/luca-canali/2017-06-diving-spark-an。D-镶木地板- job负荷-示例
spark2.2+
从spark 2.2开始,您还可以使用新选项maxrecordsperfile来限制每个文件的记录数。如果有n个分区,您仍然可以获得至少n个文件,但是您可以将由1个分区(任务)写入的文件拆分为更小的块:
df.write.option("maxRecordsPerFile", 10000)...
参见http://www.gatorshime.io/expected-feature-in-spark-2-2-max-records-written-per-file/和spark-write-to-disk,其中n个文件小于n个分区
<small style="box-sizing: border-box; font-size: 12.8px; font-weight: 400;"></small><small style="box-sizing: border-box; font-size: 12.8px; font-weight: 400; margin-left: 5px;">[ 评论](javascript:void(0))</small>
[图片上传中...(image-7e3a5e-1593689132254-0)]
<small style="box-sizing: border-box; font-size: 12.8px; font-weight: 400; display: flex; flex-direction: column; margin-left: 5px;">社区小助手2019-10-06</small>
0
这对我很有效:
data.repartition(n, "key").write.partitionBy("key").parquet("/location")
它在每个输出分区(目录)中生成n个文件,并且(奇闻)比使用 coalesce 快,而且(奇闻)比仅对输出 repartition快。
如果您使用的是S3,我还建议您在 local驱动器上执行所有操作(Spark在写操作期间会执行大量文件创建/重命名/删除操作),一旦完成所有操作,请使用Hadoop FileUtil(或仅使用AWS CLI)复制所有操作:
import java.net.URIimport org.apache.hadoop.fs.{FileSystem, FileUtil, Path}// ... def copy( in : String, out : String, sparkSession: SparkSession ) = { FileUtil.copy( FileSystem.get(new URI(in), sparkSession.sparkContext.hadoopConfiguration), new Path(in), FileSystem.get(new URI(out), sparkSession.sparkContext.hadoopConfiguration), new Path(out), false, sparkSession.sparkContext.hadoopConfiguration ) }
编辑: root据评论中的讨论:
您是一个 DataSet ,其分区列为year,但每个给定的年份中的数据量都有很大的不同。因此,一年可能有1GB的数据,而另一年可能有100GB的数据。
以下是处理此问题的一种方法的psuedocode:
val partitionSize = 10000 // Number of rows you want per output file.val yearValues = df.select("YEAR").distinctdistinctGroupByValues.each((yearVal) -> { val subDf = df.filter(s"YEAR = $yearVal") val numPartitionsToUse = subDf.count / partitionSize subDf.repartition(numPartitionsToUse).write(outputPath + "/year=$yearVal")})
但是,我不知道这能起什么作用。Spark可能会在每个列分区的文件数可变的情况下出现读取问题。
另一种方法是编写自己的自定义分区程序,但我不知道其中包含什么,因此无法提供任何代码。