Spark RDD
怎么理解 RDD 的粗粒度模式 ?对比 细粒度模式
Spark RDD 的 task 数量是由什么决定的?
一份待处理的原始数据会被按照相应的逻辑(例如jdbc和hdfs的split逻辑)切分成n份,每份数据对应到RDD中的一个Partition,Partition的数量决定了task的数量,影响着程序的并行度
支持保存点(checkpoint)
虽然RDD可以通过lineage实现fault recovery,但是这个恢复可能是很耗时的,因此提供保存点很有必要,通常保存点在有宽依赖时(shuffle耗时)很有用,相反,窄依赖时则不值得使用
Spark RDD 之 Partition
参考:https://blog.csdn.net/u011564172/article/details/53611109
分析Partition主要是分析其子类。我们关注两个常用的子类,JdbcPartition和HadoopPartition。
此外,RDD源码中有5个方法,代表其组成,如下:
getPartitions 是数据源如何被切分的逻辑,返回值是Partition组成的数组,第一个方法compute是消费切割后的Partition的方法,所以学习Partition,要结合getPartitions和compute方法。
JdbcPartition
在 JdbcRDD.scala 文件中,定义了两个类:JdbcPartition 和 JdbcRDD,和一个对象:JdbcRDD。如图所示:
先看下 class JdbcRDD 的定义:
class JdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection: () => Connection,
sql: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
extends RDD[T](sc, Nil) with Logging {
...
}
然后举一个 JdbcRDD 的例子,每一个参数都和上面的参数对应:
val sc = new SparkContext("local[1]", "test")
val rdd = new JdbcRDD(sc,
() => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") },
"SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
1, 100, 3,
(r: ResultSet) => { r.getInt(1) } ).count()
我们再看下 JdbcPartition 的定义:
private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition {
override def index: Int = idx
}
private[spark],用来限制访问权限,表示只有 org.apache.spark 包下的类可以访问 class JdbcPartition 。
JdbcRDD 类继承了 RDD,并重写了 getPartitions 方法,我们看下getPartitions 的实现,其返回了 JdbcPartition,如图所示:
我们分析下 上图中的 getPartitions,按照如上图所示算法将1到100分为3份(partition数量),结果为(1,33)、(34,66)、(67,100),封装为JdbcPartition并返回,这样数据切分的部分就完成了。
再看下 JdbcRDD 的compute方法:
逻辑清晰,将Partition强转为JdbcPartition,获取连接并预处理sql,将
例子中的”SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?”问号分别用Partition的lower和upper替换(即getPartitions切分好的(1,33)、(34,66)、(67,100))并执行查询。至此,JdbcPartition如何发挥作用就分析完了。
HadoopPartition
举个简单例子:
val sc = new SparkContext("local[1]", "test")
sc.textFile("hdfs://your-file-path").count()
略。。。
决定partition数量的因素
Partition数量可以在初始化RDD时指定(如JdbcPartition例子),不指定的话(如HadoopPartition例子),则
读取spark.default.parallelism配置,不同类型资源管理器取值不同,如下
比较计算得到的goalSize和block大小blockSize,取其中较小者,再和minSize(由属性mapreduce.input.fileinputformat.split.minsize确定,默认值为0,则minSize默认值为1)取较大的。
假设待处理文件大小fSize=512M(视为一个大文件,不考虑1.1系数),block大小bSize=128M,
sc.textFile(…, 3)
根据上面的公式goalSize=512M/3 > bSize=128M
取其较小者bSize,则按照bSize切分,split数=512M/128=4,即partition数=4sc.textFile(…, 5)
根据上面的公式goalSize=512M/5 < bSize=128M
取其较小者goalSize,则按照goalSize切分,split数=512M/(512M/5)=5,即partition数=5
可见指定numPartitions,小于block数时无效,大于则生效。
上面分析了决定Partition数量的因数,接下来就该考虑Partition数量的影响以及合适的值。
-
Partition数量的影响
- Partition数量太少
太少的影响显而易见,就是资源不能充分利用,例如local模式下,有16core,但是Partition数量仅为8的话,有一半的core没利用到。 - Partition数量太多
太多,资源利用没什么问题,但是导致task过多,task的序列化和传输的时间开销增大。
那么多少的partition数是合适的呢,这里我们参考spark doc给出的建议,Typically you want 2-4 partitions for each CPU in your cluster。
- Partition数量太少
-
Partition调整
- repartition
reparation是coalesce(numPartitions, shuffle = true),repartition不仅会调整Partition数,也会将Partitioner修改为hashPartitioner,产生shuffle操作。 - coalesce
coalesce函数可以控制是否shuffle,但当shuffle为false时,只能减小Partition数,无法增大。
- repartition