Spark runJob方法别用

学习博主的文章《Spark driver端得到executor返回值的方法》之后有感

文章中通过阅读count方法的源码,count方法将task的返回值返回到driver端,然后进行聚合,源码如下

defcount(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

sparkcontext的runJob方法,Utils.getIteratorSize _这个方法主要是计算每个iterator的元素个数,也即是每个分区的元素个数,返回值就是元素个数:

/**

* Counts the number of elements of an iterator using a while loop rather than calling

* [[scala.collection.Iterator#size]] because it uses a for loop, which is slightly slower

* in the current version of Scala.

*/

def getIteratorSize(iterator:Iterator[_]):Long = {

var count =0L

  while (iterator.hasNext) {

count +=1L

    iterator.next()

}

count

}

返回结果为各个分区的元素个数,使用sum方法进行统计。

博主的文章中使用实际案例代码说明了使用方法,学习后有如下启发,在driver端不止可以获取每个task中的数据量,是否还可以获取具体的变量值呢?做如下测试:



val rdd = sc.parallelize(1 to 10 ,3)

import org.apache.spark.TaskContext

import scala.collection.mutable._

val funccc = (itr : Iterator[Int]) => {

  val lst = new ListBuffer[Int]

      itr.foreach(each=>{

        lst.append(each)

      })

      (TaskContext.getPartitionId(),lst)

    }

val res = sc.runJob(rdd,funccc)


在driver端查看获取的结果

res(1)._2.foreach(println)

4

5

6

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容