1.研究的重点和方法
本文主要就是研究rdd的执行流程,主要重点是如何形成rdd的图谱,以及如何形成递归的迭代器进行数据的迭代计算的;
笔者对与rdd的执行的流程的研究了网络上的相关文章、相关的rdd的代码的阅读,简单的样例代码的调试,以及原理代码的编写;其中样例代码的调试个人觉得非常重要,可以清晰的实践一下你的代码的执行流程。
2.RDD图谱的形成
2.1编写简单的样例代码
因为重点是研究rdd的执行流程,样例代码只设计了窄依赖的变换,因为其流程相对于款依赖相对简单。
文件系统我们只读取本地文件
集群模式为本机模式,为了调试方便,只启动一个执行线程
样例代码如下:
var sparkConf = new SparkConf().setAppName("Test").setMaster("local[1]"); //启动一个线程
var sc = new SparkContext(sparkConf);
//读取本地文件、并调用两次简单的窄依赖变换
sc.textFile("/home/jack/t.txt")
.map((_, 1))
.filter(_._2 == 1)
.collect(); //通过collect 提交真正的action,开始执行任务
2.2 SparkContext.textFile
执行代码进行调试模式,断点进入textFile,来看看textFile的执行
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
继续跟踪hadoopFile方法,可以看到hadoopFile关键逻辑是实例化了一个hadoopRDD类,并返回;
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
基于以上,textFile的关键代码是调用了hadoopFile方法,hadoopFile方法返回一个HadoopRDD,而hadoopRDD.map又返回了一个MapPartitionRDD,最终textFile最终返回了这个RDD。
再看hadoopRDD.map 的逻辑
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
HadoopRDD的map方法是其父类RDD实现的,在map方法中返回了一个新的MarPartitionRDD,并将当前RDD实例和用户的map函数传递进去。
MapPartitionRDD与HadoopRDD同样都是继承于RDD类,MapPartitionRDD的特点是会通过传递进来的参数,持有父RDD和用户的执行函数。
而textFile为什么没有直接返回HadoopRDD,我觉得应该是HadoopRDD的迭代器是一个key,Value迭代器,其中key为行所在的字节offset值、value是行值,而我们一般来说只关心value值,所以其做了一次map变换,只返回了value的迭代。
在RDD类中,同样有filter、flatMap、distinct 等窄依赖等实现,基本套路都是生成一个新的MapPartitionRDD,并将当前的RDD作为父RDD传递给新的RDD。
综上所述, SparkContext.textFile,在核心流程上既是返回了new HadoopRDD().map(),也就是一个MapPartitionRDD。
2.3 RDD.map、filter等变换方法
如上文所述, map、filter、flatmap 等方法的思路其实大同小异,都是返回一个携带前置父RDD实例以及用户传递的具体transform方法的MapParittionRDD实例。而这里要重点强调的是三个点:
-
MarPartitionRDD要持有前置的父RDD
MapPartitionRDD会持有前置的、变换前的RDD实例,这样形成一个可以向前追溯的RDD图谱,为后续的迭代计算提供依据。 -
MapParitionRDD要持有用户传递的具体transform方法
具体算法就也会在MapPartitionRDD中存储。这样后续的迭代变化时能够回溯计算; - map、filter、flatMap的具体迭代算法实现在scala的 Iterator中
以map方法为例展开讲述以上三点,我们看到返回的MapPartitionRDD的构造形式是:
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
this即当前调用map的父RDD实例,这个较容易理解。
cleanF是用户传递的map具体用户函数, 但(context,pid,iter)=>iter.map(cleanF)这个形式,如果scala不那么熟悉的话,一眼望过去着实看得有点吃力,其实这个类似于与c语言中的函数指针、java中的函数接口,相当与定义了一个方法的类型;例如在scala中 var f:(Int)=>Int,即定义了一个方法类型:入参为Int,返回值为Int,翻到MapPartitionRDD的源码,可以看到第二个参数的定义为为 f: (TaskContext, Int, Iterator[T]) => Iterator[U], 那么这个f函数变量到底干了啥呢,结合map具体传递形式,我们可以看到f就是一个函数,内部调用了第三个参数传递进来的迭代器(Iterator[T]), 调用到Iterator的map方法,map方法的参数是用户的cleanF,并返回一个新的迭代器(Iterator[U]); 这些形成的transform的方法最终也会形成一个调用链, 最终由ResultTask的runTask从rdd的最后一个节点开始调用,这个下文再详细介绍。
关于迭代器算法的实现,最终都在Iterator中,这些实现并非Spark框架中的,而都是scala语言框架的,我想这也是spark首选scala来实现其理念的原因吧。Iterator的map、filter、flatMap 等方法同样接收用户的方法,并返回一个新的Iterator,持续的进行迭代,以下就是Iterator中map的实现,可以看出他的核心代码就是返回一个迭代器,而next方法的返回值, 是把前置的Iterator (self)的next方法返回值传递给用户定义的回调方法(f)进行返回,从而形成了一个递归的迭代调用:
def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
def hasNext = self.hasNext
def next() = f(self.next())
}
综上所述,RDD通过不断的map、filter、flatMap等变换,生成了一系列保存前置rdd和用户变换函数的新的RDD, 这些rdd按照调用顺序形成了一个有序的图谱,等待最后的action动作最终的执行,如下图所示。在《spark性能调优与原理分析》一书中,作者把该过程做了一个我认为比较形象的比喻:rdd的图谱形成的过程就像一个一步步形成的菜谱,每次transform调用都会记录rdd的一次转换,菜谱会记录做菜的先后步骤,但只有要吃这道菜的时候,才会按照菜谱进行操作。而吃菜的动作就是后续我们要讲述的collect、foreach等action操作触发的数据迭代计算的过程。
3. RDD数据的递归迭代计算
在rdd的图谱形成后,终归我们是要调用一些action动作函数触发数据的迭代计算的,最终返回我们的数据结果。
3.1 触发计算的时机和位置
我们通过实际的方法调用的堆栈中最终找到,触发数据计算的位置是在 ResultTask中的runTask方法中;
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
func(context, rdd.iterator(partition, context))
}
runTask的最后,会调用最后一个子rdd的iterator方法,从而开始调用链条的头端iterator。
3.2 iterator和compute方法
iterator 方法是由RDD类实现的,其返回一个Iterator迭代器,iterator的内部经过几次判定最终还是调用到rdd的compute方法;
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
而翻看到compute方法,发现其是个抽象方法,约定了其返回值类型是Iterator,其实现是下方给MapPartitionRDD或者 HadoopRDD来实现的。
-
MapPartitionRDD的 compute
我们先来看MapPartitionRDD的实现
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
可以看到调用了其保存的函数变量f, 而重要的第三个参数是firstParent.iterator(), 其实就是调用了父RDD对象的iterator()方法,而父iterator的调用就是父rdd的compute,例如如果是map调用,那就是iter.map(cleanF),f(,,firstParent.iterator)的展开就是 firstParent[T].iterator().map(clieanF);这样依次的触发上一级rdd的iterator,形成了一个迭代式的调用,最终得到返回的数据。
-
HadoopRDD的 compute
这时我们肯定会想,sc.TextFile第一次创建的HadoopRDD的compute返回的迭代器是什么呢?这个迭代器总归要去读取数据源了吧;我们翻开HadoopRDD的compute实现:
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new NextIterator[(K, V)] {
//....................删除了很多代码
private var reader: RecordReader[K, V] = null
private val inputFormat = getInputFormat(jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
override def getNext(): (K, V) = {
try {
finished = !reader.next(key, value)
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${split.inputSplit}", e)
finished = true
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
finished = true
}
if (!finished) {
inputMetrics.incRecordsRead(1)
}
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
updateBytesRead()
}
(key, value)
}
}
new InterruptibleIterator[(K, V)](context, iter)
}
果然compute方法不一样了, 代码比较多,删除了很多,但其核心实现还是返回一个Iterator;而这个Iterator的getNext方法,是利用一个LineRecorderReader,不断的去读取数据源中的行来实现的。也即是迭代一次,就去数据源读取一行,直到所在分区数据结束,具体的数据的行数据的迭代原理可以参看hadoop的LineRecordReader、UncompressedSplitLineReader、SplitLineReader以及LineReader等实现。
3.2 迭代计算数据
最后我们可以看到数据的迭代计算是与图谱的形成反向触发的的,图谱形成后,action动作触发的计算由最后的子rdd开始触发,通过iterator和compute方法,依次向前获取数据,最终到达首个hadoopRdd的compute,读取文件中的一行,这样形成一个类似迭代计算的流,如图所示:
而迭代的计算也有一个非常巨大的优势,既是我们在读取文件的时候不用一次性的把文件都加载都内存中,而是形成一个不断的计算流一条条的加载、计算、保存,节省了巨量的内存,这种思路是非常值得我们参考和学习的。
4.使用java来实现一个简单的rdd迭代调用
4.1 基本思路
在研究完spark的rdd的调用模式后,发现其实现思路还是挺不错的,再研究过程中因为spark的rdd部分都是用scala语言编写的,因为scala的不熟悉带来了一些难度,在研究的过程中其实笔者还采取了一个办法就是把相关的代码看的大体明白后,用java又实现了一个简约模式,当java版的rdd成功运行后,也对其模式调用有了一些更深的理解。
4.2 基本实现类
java 简约版的rdd实现包括了如下的组件:
- Iterable类, 其实就是iterator,模仿了scala的Iterator,参考其实现了最基础的map、filter和foreach
- RDD类, java版rdd,实现了spark中的RDD,实现RDD的map、filter和foreach,实现iterator,就是返回compute, 留着 compute作为抽象方法
- MapRDD类, 模仿MapPartitionRDD,重写了compute方法
- ListRDD类, 模仿HadoopRDD,不同的是ListRDD的数据源是一个ArrayList,compute返回的迭代器是对ArrayList的迭代。
4.2 实现代码
Iterable类
import java.util.NoSuchElementException;
public abstract class Iterable<T> {
@FunctionalInterface
interface MapCallback<T1, T2> {
T2 invoke(T1 num);
}
@FunctionalInterface
interface ForeachCallback<T1> {
void invoke(T1 num);
}
@FunctionalInterface
interface FilterCallback<T1> {
boolean invoke(T1 t);
}
public <RT> Iterable<RT> map(MapCallback<T, RT> callback) {
Iterable<T> self = Iterable.this;
return new Iterable<RT>() {
@Override
public boolean hasNext() {
return self.hasNext();
}
@Override
public RT next() {
return callback.invoke(self.next());
}
};
}
public Iterable<T> filter(FilterCallback<T> callback) {
return new Iterable<T>() {
Iterable<T> self = Iterable.this;
T nextValue = null;
@Override
public boolean hasNext() {
if (self.hasNext() == false) {
nextValue = null;
return false;
}
else {
nextValue = self.next();
while (callback.invoke(nextValue) == false) {
if (self.hasNext() == false) {
nextValue = null;
return false;
}
nextValue = self.next();
}
return true;
}
}
@Override
public T next() {
if (nextValue != null) {
T curr = nextValue;
nextValue = null;
return curr;
}
else if (this.hasNext()) {
T curr = nextValue;
nextValue = null;
return curr;
}
return null;
}
};
}
public void foreach(ForeachCallback<T> callback) {
while (this.hasNext()) {
callback.invoke(this.next());
}
}
public abstract boolean hasNext();
public abstract T next();
}
RDD类
public abstract class RDD<T> {
interface RDDCallback<T1, T2> {
Iterable<T2> invoke(Iterable<T1> it);
}
public Iterable<T> iterator() {
return compute();
}
public <U> RDD<U> map(Iterable.MapCallback<T, U> callback) {
return new MapRDD<U, T>(this, (it -> it.map(callback)));
}
public RDD<T> filter(Iterable.FilterCallback<T> callback) {
return new MapRDD<T, T>(this, (it -> it.filter(callback)));
}
public abstract Iterable<T> compute();
public void foreach(Iterable.ForeachCallback<T> callback) {
this.iterator().foreach(callback);
}
}
MapRDD
public class MapRDD<U, T> extends RDD<U> {
RDDCallback<T, U> f;
RDD<T> parent;
public MapRDD(RDD<T> parent, RDDCallback<T, U> f) {
this.parent = parent;
this.f = f;
}
@Override
public Iterable<U> compute() {
Iterable<U> it = f.invoke(parent.iterator());
return it;
}
}
ListRDD
import java.util.Iterator;
import java.util.List;
public class ListRDD<T> extends RDD<T> {
List<T> list;
private ListRDD(List<T> list) {
this.list = list;
}
@Override
public Iterable<T> compute() {
return new Iterable<T>() {
Iterator<T> iterator = list.iterator();
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public T next() {
return iterator.next();
}
};
}
public static <T> RDD<T> makeRDD(List<T> list) {
return new ListRDD<>(list);
}
}
ok,下面是主函数,开始调用
import java.util.ArrayList;
import java.util.List;
public class JavaMain {
public static void main(String[] args) {
//创建一个List作为数据源
List<Integer> list = new ArrayList<>();
for(int i = 0; i <= 5; i++) list.add(i);
//通过list创建一个ListRDD,并依次调用map、filter和foreach
ListRDD.makeRDD(list)
.map((num)->new Pair<>(String.valueOf(num), num))
.filter(p->p.getValue() > 1)
.foreach((p)->{System.out.println(p.toString());});
}
}
运行后,输出结果
2=2
3=3
4=4
5=5