spark RDD的迭代执行流程

1.研究的重点和方法

本文主要就是研究rdd的执行流程,主要重点是如何形成rdd的图谱,以及如何形成递归的迭代器进行数据的迭代计算的;
笔者对与rdd的执行的流程的研究了网络上的相关文章、相关的rdd的代码的阅读,简单的样例代码的调试,以及原理代码的编写;其中样例代码的调试个人觉得非常重要,可以清晰的实践一下你的代码的执行流程。

2.RDD图谱的形成

2.1编写简单的样例代码

因为重点是研究rdd的执行流程,样例代码只设计了窄依赖的变换,因为其流程相对于款依赖相对简单。
文件系统我们只读取本地文件
集群模式为本机模式,为了调试方便,只启动一个执行线程
样例代码如下:

var sparkConf = new SparkConf().setAppName("Test").setMaster("local[1]"); //启动一个线程
var sc = new SparkContext(sparkConf);
//读取本地文件、并调用两次简单的窄依赖变换
sc.textFile("/home/jack/t.txt")
  .map((_, 1))
  .filter(_._2 == 1)
  .collect();   //通过collect 提交真正的action,开始执行任务
2.2 SparkContext.textFile

执行代码进行调试模式,断点进入textFile,来看看textFile的执行

  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

继续跟踪hadoopFile方法,可以看到hadoopFile关键逻辑是实例化了一个hadoopRDD类,并返回;

    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)

基于以上,textFile的关键代码是调用了hadoopFile方法,hadoopFile方法返回一个HadoopRDD,而hadoopRDD.map又返回了一个MapPartitionRDD,最终textFile最终返回了这个RDD。

再看hadoopRDD.map 的逻辑

  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

  HadoopRDD的map方法是其父类RDD实现的,在map方法中返回了一个新的MarPartitionRDD,并将当前RDD实例和用户的map函数传递进去。
  MapPartitionRDD与HadoopRDD同样都是继承于RDD类,MapPartitionRDD的特点是会通过传递进来的参数,持有父RDD和用户的执行函数。


RDD继承关系

  而textFile为什么没有直接返回HadoopRDD,我觉得应该是HadoopRDD的迭代器是一个key,Value迭代器,其中key为行所在的字节offset值、value是行值,而我们一般来说只关心value值,所以其做了一次map变换,只返回了value的迭代。
  在RDD类中,同样有filter、flatMap、distinct 等窄依赖等实现,基本套路都是生成一个新的MapPartitionRDD,并将当前的RDD作为父RDD传递给新的RDD。
  综上所述, SparkContext.textFile,在核心流程上既是返回了new HadoopRDD().map(),也就是一个MapPartitionRDD。

2.3 RDD.map、filter等变换方法

  如上文所述, map、filter、flatmap 等方法的思路其实大同小异,都是返回一个携带前置父RDD实例以及用户传递的具体transform方法的MapParittionRDD实例。而这里要重点强调的是三个点:

  • MarPartitionRDD要持有前置的父RDD
    MapPartitionRDD会持有前置的、变换前的RDD实例,这样形成一个可以向前追溯的RDD图谱,为后续的迭代计算提供依据。
  • MapParitionRDD要持有用户传递的具体transform方法
    具体算法就也会在MapPartitionRDD中存储。这样后续的迭代变化时能够回溯计算;
  • map、filter、flatMap的具体迭代算法实现在scala的 Iterator中

以map方法为例展开讲述以上三点,我们看到返回的MapPartitionRDD的构造形式是:

new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))

  this即当前调用map的父RDD实例,这个较容易理解。
  cleanF是用户传递的map具体用户函数, 但(context,pid,iter)=>iter.map(cleanF)这个形式,如果scala不那么熟悉的话,一眼望过去着实看得有点吃力,其实这个类似于与c语言中的函数指针、java中的函数接口,相当与定义了一个方法的类型;例如在scala中 var f:(Int)=>Int,即定义了一个方法类型:入参为Int,返回值为Int,翻到MapPartitionRDD的源码,可以看到第二个参数的定义为为 f: (TaskContext, Int, Iterator[T]) => Iterator[U], 那么这个f函数变量到底干了啥呢,结合map具体传递形式,我们可以看到f就是一个函数,内部调用了第三个参数传递进来的迭代器(Iterator[T]), 调用到Iterator的map方法,map方法的参数是用户的cleanF,并返回一个新的迭代器(Iterator[U]); 这些形成的transform的方法最终也会形成一个调用链, 最终由ResultTask的runTask从rdd的最后一个节点开始调用,这个下文再详细介绍。
  关于迭代器算法的实现,最终都在Iterator中,这些实现并非Spark框架中的,而都是scala语言框架的,我想这也是spark首选scala来实现其理念的原因吧。Iterator的map、filter、flatMap 等方法同样接收用户的方法,并返回一个新的Iterator,持续的进行迭代,以下就是Iterator中map的实现,可以看出他的核心代码就是返回一个迭代器,而next方法的返回值, 是把前置的Iterator (self)的next方法返回值传递给用户定义的回调方法(f)进行返回,从而形成了一个递归的迭代调用:

  def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
    def hasNext = self.hasNext
    def next() = f(self.next())
  }

  综上所述,RDD通过不断的map、filter、flatMap等变换,生成了一系列保存前置rdd和用户变换函数的新的RDD, 这些rdd按照调用顺序形成了一个有序的图谱,等待最后的action动作最终的执行,如下图所示。在《spark性能调优与原理分析》一书中,作者把该过程做了一个我认为比较形象的比喻:rdd的图谱形成的过程就像一个一步步形成的菜谱,每次transform调用都会记录rdd的一次转换,菜谱会记录做菜的先后步骤,但只有要吃这道菜的时候,才会按照菜谱进行操作。而吃菜的动作就是后续我们要讲述的collect、foreach等action操作触发的数据迭代计算的过程。


RDD执行变换流程.png

3. RDD数据的递归迭代计算

  在rdd的图谱形成后,终归我们是要调用一些action动作函数触发数据的迭代计算的,最终返回我们的数据结果。

3.1 触发计算的时机和位置

我们通过实际的方法调用的堆栈中最终找到,触发数据计算的位置是在 ResultTask中的runTask方法中;

  override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L

    func(context, rdd.iterator(partition, context))
  }

runTask的最后,会调用最后一个子rdd的iterator方法,从而开始调用链条的头端iterator。

3.2 iterator和compute方法

  iterator 方法是由RDD类实现的,其返回一个Iterator迭代器,iterator的内部经过几次判定最终还是调用到rdd的compute方法;

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      getOrCompute(split, context)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
  {
    if (isCheckpointedAndMaterialized) {
      firstParent[T].iterator(split, context)
    } else {
      compute(split, context)
    }
  }

  @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T]

而翻看到compute方法,发现其是个抽象方法,约定了其返回值类型是Iterator,其实现是下方给MapPartitionRDD或者 HadoopRDD来实现的。

  • MapPartitionRDD的 compute
    我们先来看MapPartitionRDD的实现
  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

  可以看到调用了其保存的函数变量f, 而重要的第三个参数是firstParent.iterator(), 其实就是调用了父RDD对象的iterator()方法,而父iterator的调用就是父rdd的compute,例如如果是map调用,那就是iter.map(cleanF),f(,,firstParent.iterator)的展开就是 firstParent[T].iterator().map(clieanF);这样依次的触发上一级rdd的iterator,形成了一个迭代式的调用,最终得到返回的数据。

  • HadoopRDD的 compute
      这时我们肯定会想,sc.TextFile第一次创建的HadoopRDD的compute返回的迭代器是什么呢?这个迭代器总归要去读取数据源了吧;我们翻开HadoopRDD的compute实现:
  override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
    val iter = new NextIterator[(K, V)] {
      //....................删除了很多代码
      private var reader: RecordReader[K, V] = null
      private val inputFormat = getInputFormat(jobConf)
      reader =   inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
      private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
      private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()

      override def getNext(): (K, V) = {
        try {
          finished = !reader.next(key, value)
        } catch {
          case e: FileNotFoundException if ignoreMissingFiles =>
            logWarning(s"Skipped missing file: ${split.inputSplit}", e)
            finished = true
          // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
          case e: FileNotFoundException if !ignoreMissingFiles => throw e
          case e: IOException if ignoreCorruptFiles =>
            logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
            finished = true
        }
        if (!finished) {
          inputMetrics.incRecordsRead(1)
        }
        if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
          updateBytesRead()
        }
        (key, value)
      }
    }
    new InterruptibleIterator[(K, V)](context, iter)
  }

  果然compute方法不一样了, 代码比较多,删除了很多,但其核心实现还是返回一个Iterator;而这个Iterator的getNext方法,是利用一个LineRecorderReader,不断的去读取数据源中的行来实现的。也即是迭代一次,就去数据源读取一行,直到所在分区数据结束,具体的数据的行数据的迭代原理可以参看hadoop的LineRecordReader、UncompressedSplitLineReader、SplitLineReader以及LineReader等实现。

3.2 迭代计算数据

  最后我们可以看到数据的迭代计算是与图谱的形成反向触发的的,图谱形成后,action动作触发的计算由最后的子rdd开始触发,通过iterator和compute方法,依次向前获取数据,最终到达首个hadoopRdd的compute,读取文件中的一行,这样形成一个类似迭代计算的流,如图所示:


rdd调用迭代流程.png

而迭代的计算也有一个非常巨大的优势,既是我们在读取文件的时候不用一次性的把文件都加载都内存中,而是形成一个不断的计算流一条条的加载、计算、保存,节省了巨量的内存,这种思路是非常值得我们参考和学习的。

4.使用java来实现一个简单的rdd迭代调用

4.1 基本思路

  在研究完spark的rdd的调用模式后,发现其实现思路还是挺不错的,再研究过程中因为spark的rdd部分都是用scala语言编写的,因为scala的不熟悉带来了一些难度,在研究的过程中其实笔者还采取了一个办法就是把相关的代码看的大体明白后,用java又实现了一个简约模式,当java版的rdd成功运行后,也对其模式调用有了一些更深的理解。

4.2 基本实现类

java 简约版的rdd实现包括了如下的组件:

  • Iterable类, 其实就是iterator,模仿了scala的Iterator,参考其实现了最基础的map、filter和foreach
  • RDD类, java版rdd,实现了spark中的RDD,实现RDD的map、filter和foreach,实现iterator,就是返回compute, 留着 compute作为抽象方法
  • MapRDD类, 模仿MapPartitionRDD,重写了compute方法
  • ListRDD类, 模仿HadoopRDD,不同的是ListRDD的数据源是一个ArrayList,compute返回的迭代器是对ArrayList的迭代。
4.2 实现代码

Iterable类

import java.util.NoSuchElementException;

public abstract class Iterable<T> {
    @FunctionalInterface
    interface MapCallback<T1, T2> {
        T2 invoke(T1 num);
    }

    @FunctionalInterface
    interface ForeachCallback<T1> {
        void invoke(T1 num);
    }

    @FunctionalInterface
    interface FilterCallback<T1> {
        boolean invoke(T1 t);
    }

    public <RT> Iterable<RT> map(MapCallback<T, RT> callback) {
        Iterable<T> self = Iterable.this;
        return new Iterable<RT>() {
            @Override
            public boolean hasNext() {
                return self.hasNext();
            }

            @Override
            public RT next() {
                return callback.invoke(self.next());
            }
        };
    }

    public Iterable<T> filter(FilterCallback<T> callback) {

        return new Iterable<T>() {
            Iterable<T> self = Iterable.this;
            T nextValue = null;

            @Override
            public boolean hasNext() {
                if (self.hasNext() == false) {
                    nextValue = null;
                    return false;
                }
                else {
                    nextValue = self.next();
                    while (callback.invoke(nextValue) == false) {
                        if (self.hasNext() == false) {
                            nextValue = null;
                            return false;
                        }
                        nextValue = self.next();
                    }
                    return true;
                }
            }

            @Override
            public T next() {
                if (nextValue != null) {
                    T curr = nextValue;
                    nextValue = null;
                    return curr;
                }
                else if (this.hasNext()) {
                    T curr = nextValue;
                    nextValue = null;
                    return curr;
                }
                return null;
            }
        };
    }

    public void foreach(ForeachCallback<T> callback) {
        while (this.hasNext()) {
            callback.invoke(this.next());
        }
    }

    public abstract boolean hasNext();

    public abstract T next();

}

RDD类

public abstract class RDD<T> {
    interface RDDCallback<T1, T2> {
        Iterable<T2> invoke(Iterable<T1> it);
    }

    public Iterable<T> iterator() {
        return compute();
    }

    public <U> RDD<U> map(Iterable.MapCallback<T, U> callback) {
        return new MapRDD<U, T>(this, (it -> it.map(callback)));
    }

    public RDD<T> filter(Iterable.FilterCallback<T> callback) {
        return new MapRDD<T, T>(this, (it -> it.filter(callback)));
    }

    public abstract Iterable<T> compute();

    public void foreach(Iterable.ForeachCallback<T> callback) {
        this.iterator().foreach(callback);
    }
    
}

MapRDD

public class MapRDD<U, T> extends RDD<U> {

    RDDCallback<T, U> f;
    RDD<T> parent;

    public MapRDD(RDD<T> parent, RDDCallback<T, U> f) {
        this.parent = parent;
        this.f = f;
    }

    @Override
    public Iterable<U> compute() {
        Iterable<U> it = f.invoke(parent.iterator());
        return it;
    }
}

ListRDD

import java.util.Iterator;
import java.util.List;

public class ListRDD<T> extends RDD<T> {
    List<T> list;

    private ListRDD(List<T> list) {
        this.list = list;
    }

    @Override
    public Iterable<T> compute() {
        return new Iterable<T>() {
            Iterator<T> iterator = list.iterator();

            @Override
            public boolean hasNext() {
                return iterator.hasNext();
            }

            @Override
            public T next() {
                return iterator.next();
            }
        };
    }

    public static <T> RDD<T> makeRDD(List<T> list) {
        return new ListRDD<>(list);
    }
}

ok,下面是主函数,开始调用

import java.util.ArrayList;
import java.util.List;

public class JavaMain {
    public static void main(String[] args) {
        //创建一个List作为数据源
        List<Integer>  list = new ArrayList<>();
        for(int i = 0; i <= 5; i++) list.add(i);
        //通过list创建一个ListRDD,并依次调用map、filter和foreach
        ListRDD.makeRDD(list)
                .map((num)->new Pair<>(String.valueOf(num), num))
                .filter(p->p.getValue() > 1)
                .foreach((p)->{System.out.println(p.toString());});
    }
}

运行后,输出结果

2=2
3=3
4=4
5=5
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容