spark executor 如何执行用户定义的函数

1.建立目录并进入

mkdir spark_test

cd spark_test

2.建立 pom 文件

touch pom.xml

vi  pom.xml

<project>

<groupId>com.packet.org</groupId>

<artifactId>spark</artifactId>

<modelVersion>4.0.0</modelVersion>

<packaging>jar</packaging>

<version>1.0.0</version>

<dependencies>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_2.11</artifactId>

<version>2.4.0</version>

<scope>provided</scope>

</dependency>

</dependencies>

<build>

<pluginManagement>

<plugins>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-compiler-plugin</artifactId>

<version>3.1</version>

<configuration>

<source>1.8</source>

<target>1.8</target>

</configuration>

</plugin>

</plugins>

</pluginManagement>

</build>

</project>

3.建立 文件夹

mkdir -p src/main/java/com/packet/org

4.建立 java 文件并且编辑

touch  src/main/java/com/packet/org/Test.java

vi src/main/java/com/packet/org/Test.java

package com.packet.org;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.JavaRDD;

import java.util.List;

import java.util.ArrayList;

public class Test {

    public static void main(String[] args) {

        List<Integer> l = new ArrayList<>(100);

        for (int i = 0; i < 100; i++) {

            l.add(i);

        }

        SparkConf conf = new SparkConf().setAppName("map");

        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<Integer> one = sc.parallelize(l);

        JavaRDD<Integer> two = one.map(new FunctionOne());

        JavaRDD<Integer> three = two.map(new FunctionTwo());

        System.out.println(three.collect());

    }

}

class FunctionOne implements Function<Integer,Integer> {

    public Integer call(Integer x){

      System.out.println("class FunctionOne x = x + 1 , x = " + x);

      return x = x + 1;

    }

}

class FunctionTwo implements Function<Integer,Integer> {

    public Integer call(Integer x){

      System.out.println("class FunctionTwo x = x + 2 , x = " + x );

      return x = x + 2;

    }

}

5.打包

mvn clean package -DskipTests

6.提交程序到 spark

spark-submit --master spark://192.168.1.16:7077 --class com.packet.org.Test ~/spark_test/target/spark-1.0.0.jar

7.查看executor 执行的部分日志

ResultTask.scala class ResultTask Executor task launch worker for task 0 def runTask(context: TaskContext) func(context, rdd.iterator(partition, context)) , func = class org.apache.spark.SparkContext$$anonfun$runJob$5

在调用 org.apache.spark.SparkContext$$anonfun$runJob$5 函数之前,先调用 rdd 的 iterator 方法来获取分区对象的值的迭代器

RDD.scala class RDD Executor task launch worker for task 0 def iterator(split: Partition, context: TaskContext) if (storageLevel != StorageLevel.NONE)

RDD.scala class RDD Executor task launch worker for task 0 def iterator(split: Partition, context: TaskContext) if (storageLevel != StorageLevel.NONE) else computeOrReadCheckpoint(split, context)

RDD.scala class RDD Executor task launch worker for task 0 def computeOrReadCheckpoint(split: Partition, context: TaskContext) if (isCheckpointedAndMaterialized)

RDD.scala class RDD Executor task launch worker for task 0 def computeOrReadCheckpoint(split: Partition, context: TaskContext) if (isCheckpointedAndMaterialized) else compute(split, context)

MapPartitionsRDD.scala class MapPartitionsRDD Executor task launch worker for task 0 def compute(split: Partition, context: TaskContext) f(context, split.index, firstParent[T].iterator(split, context)) , split.index = 0 , f = class org.apache.spark.rdd.RDD$$anonfun$map$1$$anonfun$apply$5 , f = <function3>

在调用 org.apache.spark.rdd.RDD$$anonfun$map$1$$anonfun$apply$5 之前,先调用 父 RDD 的 iterator 方法 获取 这个 分区 父亲对象的迭代器

RDD.scala class RDD Executor task launch worker for task 0 def firstParent dependencies.head.rdd.asInstanceOf[RDD[U]]

RDD.scala class RDD Executor task launch worker for task 0 def iterator(split: Partition, context: TaskContext) if (storageLevel != StorageLevel.NONE)

RDD.scala class RDD Executor task launch worker for task 0 def iterator(split: Partition, context: TaskContext) if (storageLevel != StorageLevel.NONE) else computeOrReadCheckpoint(split, context)

RDD.scala class RDD Executor task launch worker for task 0 def computeOrReadCheckpoint(split: Partition, context: TaskContext) if (isCheckpointedAndMaterialized)

RDD.scala class RDD Executor task launch worker for task 0 def computeOrReadCheckpoint(split: Partition, context: TaskContext) if (isCheckpointedAndMaterialized) else compute(split, context)

MapPartitionsRDD.scala class MapPartitionsRDD Executor task launch worker for task 0 def compute(split: Partition, context: TaskContext) f(context, split.index, firstParent[T].iterator(split, context)) , split.index = 0 , f = class org.apache.spark.rdd.RDD$$anonfun$map$1$$anonfun$apply$5 , f = <function3>

在调用 org.apache.spark.rdd.RDD$$anonfun$map$1$$anonfun$apply$5 之前,再次调用 父 RDD 的 iterator 方法 获取 这个 分区 父亲对象的迭代器

RDD.scala class RDD Executor task launch worker for task 0 def firstParent dependencies.head.rdd.asInstanceOf[RDD[U]]

RDD.scala class RDD Executor task launch worker for task 0 def iterator(split: Partition, context: TaskContext) if (storageLevel != StorageLevel.NONE)

RDD.scala class RDD Executor task launch worker for task 0 def iterator(split: Partition, context: TaskContext) if (storageLevel != StorageLevel.NONE) else computeOrReadCheckpoint(split, context)

RDD.scala class RDD Executor task launch worker for task 0 def computeOrReadCheckpoint(split: Partition, context: TaskContext) if (isCheckpointedAndMaterialized)

RDD.scala class RDD Executor task launch worker for task 0 def computeOrReadCheckpoint(split: Partition, context: TaskContext) if (isCheckpointedAndMaterialized) else compute(split, context)

最原始的 ParallelCollectionRDD 对象保存了最原始的值的列表,将这个列表的迭代器,封装成一个 InterruptibleIterator 返回

ParallelCollectionRDD.scala class ParallelCollectionRDD Executor task launch worker for task 0 def compute(s: Partition, context: TaskContext) new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)

ParallelCollectionRDD.scala class ParallelCollectionPartition Executor task launch worker for task 0 def iterator values.iterator

RDD.scala class RDD def map[U: ClassTag](f: T => U) Executor task launch worker for task 0 (context, pid, iter) => {} iter.map(cleanF) , cleanF = class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1 , cleanF = <function1>

给刚刚返回的 InterruptibleIterator 对象调用 map 方法,添加的对象是 java 程序中编写的,FunctionOne 对象

RDD.scala class RDD def map[U: ClassTag](f: T => U) Executor task launch worker for task 0 (context, pid, iter) => {} iter.map(cleanF) , cleanF = class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1 , cleanF = <function1>

给刚刚返回的 InterruptibleIterator 对象调用 map 方法,添加的对象是 java 程序中编写的,FunctionTwo 对象

SparkContext.scala class SparkContext def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]) runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => {}Executor task launch worker for task 0 cleanedFunc(it)

对刚刚 返回的 InterruptibleIterator 传入 rdd 的 collect() 方法中定义的函数 (iter: Iterator[T]) => iter.toArray 中,开始触发行为

RDD.scala class RDD def collect() val results = sc.runJob(this, (iter: Iterator[T]) => {} Executor task launch worker for task 0 iter.toArray

开始执行

InterruptibleIterator.scala class InterruptibleIterator Executor task launch worker for task 0 def next() delegate.next()

class FunctionOne x = x + 1 , x = 0

class FunctionTwo x = x + 2 , x = 1

InterruptibleIterator.scala class InterruptibleIterator Executor task launch worker for task 0 def next() delegate.next()

class FunctionOne x = x + 1 , x = 1

class FunctionTwo x = x + 2 , x = 2

InterruptibleIterator.scala class InterruptibleIterator Executor task launch worker for task 0 def next() delegate.next()

class FunctionOne x = x + 1 , x = 2

class FunctionTwo x = x + 2 , x = 3

。。。。


每个 executor 的分区(partition)对象保存了一部分数据。当你在driver端调用了多个 map 的时候,这些 map 中的函数,都会作为 RDD 的成员变量 f,在 task运行 到 func 的时候,会先把 分区对象中保存的数据转换成迭代器对象,然后依次调用 map ,添加你在driver 编写的 方法到 这个 迭代器中。最后调用 迭代器本身的行为方法,触发你要的操作,在遍历每个元素的时候,会经历你添加的多次 map 操作

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容