1.建立目录并进入
mkdir spark_test
cd spark_test
2.建立 pom 文件
touch pom.xml
vi pom.xml
<project>
<groupId>com.packet.org</groupId>
<artifactId>spark</artifactId>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<version>1.0.0</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
3.建立 文件夹
mkdir -p src/main/java/com/packet/org
4.建立 java 文件并且编辑
touch src/main/java/com/packet/org/Test.java
vi src/main/java/com/packet/org/Test.java
package com.packet.org;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
import java.util.ArrayList;
public class Test {
public static void main(String[] args) {
List<Integer> l = new ArrayList<>(100);
for (int i = 0; i < 100; i++) {
l.add(i);
}
SparkConf conf = new SparkConf().setAppName("map");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> one = sc.parallelize(l);
JavaRDD<Integer> two = one.map(new FunctionOne());
JavaRDD<Integer> three = two.map(new FunctionTwo());
System.out.println(three.collect());
}
}
class FunctionOne implements Function<Integer,Integer> {
public Integer call(Integer x){
System.out.println("class FunctionOne x = x + 1 , x = " + x);
return x = x + 1;
}
}
class FunctionTwo implements Function<Integer,Integer> {
public Integer call(Integer x){
System.out.println("class FunctionTwo x = x + 2 , x = " + x );
return x = x + 2;
}
}
5.打包
mvn clean package -DskipTests
6.提交程序到 spark
spark-submit --master spark://192.168.1.16:7077 --class com.packet.org.Test ~/spark_test/target/spark-1.0.0.jar
7.查看executor 执行的部分日志
ResultTask.scala class ResultTask Executor task launch worker for task 0 def runTask(context: TaskContext) func(context, rdd.iterator(partition, context)) , func = class org.apache.spark.SparkContext$$anonfun$runJob$5
在调用 org.apache.spark.SparkContext$$anonfun$runJob$5 函数之前,先调用 rdd 的 iterator 方法来获取分区对象的值的迭代器
RDD.scala class RDD Executor task launch worker for task 0 def iterator(split: Partition, context: TaskContext) if (storageLevel != StorageLevel.NONE)
RDD.scala class RDD Executor task launch worker for task 0 def iterator(split: Partition, context: TaskContext) if (storageLevel != StorageLevel.NONE) else computeOrReadCheckpoint(split, context)
RDD.scala class RDD Executor task launch worker for task 0 def computeOrReadCheckpoint(split: Partition, context: TaskContext) if (isCheckpointedAndMaterialized)
RDD.scala class RDD Executor task launch worker for task 0 def computeOrReadCheckpoint(split: Partition, context: TaskContext) if (isCheckpointedAndMaterialized) else compute(split, context)
MapPartitionsRDD.scala class MapPartitionsRDD Executor task launch worker for task 0 def compute(split: Partition, context: TaskContext) f(context, split.index, firstParent[T].iterator(split, context)) , split.index = 0 , f = class org.apache.spark.rdd.RDD$$anonfun$map$1$$anonfun$apply$5 , f = <function3>
在调用 org.apache.spark.rdd.RDD$$anonfun$map$1$$anonfun$apply$5 之前,先调用 父 RDD 的 iterator 方法 获取 这个 分区 父亲对象的迭代器
RDD.scala class RDD Executor task launch worker for task 0 def firstParent dependencies.head.rdd.asInstanceOf[RDD[U]]
RDD.scala class RDD Executor task launch worker for task 0 def iterator(split: Partition, context: TaskContext) if (storageLevel != StorageLevel.NONE)
RDD.scala class RDD Executor task launch worker for task 0 def iterator(split: Partition, context: TaskContext) if (storageLevel != StorageLevel.NONE) else computeOrReadCheckpoint(split, context)
RDD.scala class RDD Executor task launch worker for task 0 def computeOrReadCheckpoint(split: Partition, context: TaskContext) if (isCheckpointedAndMaterialized)
RDD.scala class RDD Executor task launch worker for task 0 def computeOrReadCheckpoint(split: Partition, context: TaskContext) if (isCheckpointedAndMaterialized) else compute(split, context)
MapPartitionsRDD.scala class MapPartitionsRDD Executor task launch worker for task 0 def compute(split: Partition, context: TaskContext) f(context, split.index, firstParent[T].iterator(split, context)) , split.index = 0 , f = class org.apache.spark.rdd.RDD$$anonfun$map$1$$anonfun$apply$5 , f = <function3>
在调用 org.apache.spark.rdd.RDD$$anonfun$map$1$$anonfun$apply$5 之前,再次调用 父 RDD 的 iterator 方法 获取 这个 分区 父亲对象的迭代器
RDD.scala class RDD Executor task launch worker for task 0 def firstParent dependencies.head.rdd.asInstanceOf[RDD[U]]
RDD.scala class RDD Executor task launch worker for task 0 def iterator(split: Partition, context: TaskContext) if (storageLevel != StorageLevel.NONE)
RDD.scala class RDD Executor task launch worker for task 0 def iterator(split: Partition, context: TaskContext) if (storageLevel != StorageLevel.NONE) else computeOrReadCheckpoint(split, context)
RDD.scala class RDD Executor task launch worker for task 0 def computeOrReadCheckpoint(split: Partition, context: TaskContext) if (isCheckpointedAndMaterialized)
RDD.scala class RDD Executor task launch worker for task 0 def computeOrReadCheckpoint(split: Partition, context: TaskContext) if (isCheckpointedAndMaterialized) else compute(split, context)
最原始的 ParallelCollectionRDD 对象保存了最原始的值的列表,将这个列表的迭代器,封装成一个 InterruptibleIterator 返回
ParallelCollectionRDD.scala class ParallelCollectionRDD Executor task launch worker for task 0 def compute(s: Partition, context: TaskContext) new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
ParallelCollectionRDD.scala class ParallelCollectionPartition Executor task launch worker for task 0 def iterator values.iterator
RDD.scala class RDD def map[U: ClassTag](f: T => U) Executor task launch worker for task 0 (context, pid, iter) => {} iter.map(cleanF) , cleanF = class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1 , cleanF = <function1>
给刚刚返回的 InterruptibleIterator 对象调用 map 方法,添加的对象是 java 程序中编写的,FunctionOne 对象
RDD.scala class RDD def map[U: ClassTag](f: T => U) Executor task launch worker for task 0 (context, pid, iter) => {} iter.map(cleanF) , cleanF = class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1 , cleanF = <function1>
给刚刚返回的 InterruptibleIterator 对象调用 map 方法,添加的对象是 java 程序中编写的,FunctionTwo 对象
SparkContext.scala class SparkContext def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]) runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => {}Executor task launch worker for task 0 cleanedFunc(it)
对刚刚 返回的 InterruptibleIterator 传入 rdd 的 collect() 方法中定义的函数 (iter: Iterator[T]) => iter.toArray 中,开始触发行为
RDD.scala class RDD def collect() val results = sc.runJob(this, (iter: Iterator[T]) => {} Executor task launch worker for task 0 iter.toArray
开始执行
InterruptibleIterator.scala class InterruptibleIterator Executor task launch worker for task 0 def next() delegate.next()
class FunctionOne x = x + 1 , x = 0
class FunctionTwo x = x + 2 , x = 1
InterruptibleIterator.scala class InterruptibleIterator Executor task launch worker for task 0 def next() delegate.next()
class FunctionOne x = x + 1 , x = 1
class FunctionTwo x = x + 2 , x = 2
InterruptibleIterator.scala class InterruptibleIterator Executor task launch worker for task 0 def next() delegate.next()
class FunctionOne x = x + 1 , x = 2
class FunctionTwo x = x + 2 , x = 3
。。。。
每个 executor 的分区(partition)对象保存了一部分数据。当你在driver端调用了多个 map 的时候,这些 map 中的函数,都会作为 RDD 的成员变量 f,在 task运行 到 func 的时候,会先把 分区对象中保存的数据转换成迭代器对象,然后依次调用 map ,添加你在driver 编写的 方法到 这个 迭代器中。最后调用 迭代器本身的行为方法,触发你要的操作,在遍历每个元素的时候,会经历你添加的多次 map 操作