1.常用的Transformation算子
类型 | 功能描述 |
---|---|
map | 将RDD中的每个元素传入自定义函数,获取一个新的元素,然后用新的元素组成新的RDD |
filter | 对RDD中每个元素进行判断,如果返回true则保留,返回false则剔除 |
flatMap | 与map类似,但是对每个元素都可以返回一个或多个新元素 |
groupByKey | 根据key进行分组,每个key对应一个Iterable<value> |
reduceByKey | 对相同的key对应的多个value进行reduce操作 |
sortByKey | 对每个元素按照key进行排序,默认升序,传入false则降序,还可以传入自定义的比较器 |
join | 对两个包含<key,value>对的RDD进行join操作,类似于mysql中的表关联,返回<key,(value1,value2)> |
cogroup | 同join,但是每个key对应的元祖中是两个Iterable类型,即<key,(Iterable<value1>,Iterable<value2>)> |
2.实战演练
Java版本
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.*;
public class TransformationStudy {
public static void main(String[] args) {
//map();
//filter();
//flatMap();
//groupByKey();
//reduceByKey();
//sortByKey();
//join();
cogroup();
}
private static void map(){
SparkConf conf = new SparkConf()
.setAppName("map")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("warn");
JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
JavaRDD<Integer> multiNumberRDD = numberRDD.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer integer) throws Exception {
return integer * 2;
}
});
multiNumberRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
System.out.println(integer);
}
});
sc.close();
}
private static void filter(){
SparkConf conf = new SparkConf()
.setAppName("filter")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("warn");
JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
JavaRDD<Integer> evenNumberRDD = numberRDD.filter(new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) throws Exception {
return (integer & 1) == 0;
}
});
evenNumberRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
System.out.println(integer);
}
});
sc.close();
}
private static void flatMap(){
SparkConf conf = new SparkConf()
.setAppName("flatMap")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("warn");
List<String> list = Arrays.asList("hello you", "hello me", "hello world");
JavaRDD<String> lines = sc.parallelize(list);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
String[] split = s.split(" ");
List<String> ls = new ArrayList<>(split.length);
Collections.addAll(ls, split);
return ls.iterator();
}
});
words.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
sc.close();
}
private static void groupByKey(){
SparkConf conf = new SparkConf()
.setAppName("groupByKey")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("warn");
List<Tuple2<String,Integer>> list = Arrays.asList(
new Tuple2<>("class1",90),
new Tuple2<>("class2",80),
new Tuple2<>("class1",75),
new Tuple2<>("class2",65)
);
JavaPairRDD<String,Integer> scoresRDD = sc.parallelizePairs(list);
JavaPairRDD<String,Iterable<Integer>> groupedScoresRDD = scoresRDD.groupByKey();
groupedScoresRDD.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
@Override
public void call(Tuple2<String, Iterable<Integer>> stringIterableTuple2) throws Exception {
System.out.println("class:" + stringIterableTuple2._1);
for (Integer a_2 : stringIterableTuple2._2) {
System.out.println(a_2);
}
System.out.println("================================");
}
});
sc.close();
}
private static void reduceByKey(){
SparkConf conf = new SparkConf()
.setAppName("reduceByKey")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("warn");
List<Tuple2<String,Integer>> list = Arrays.asList(
new Tuple2<>("class1",90),
new Tuple2<>("class2",80),
new Tuple2<>("class1",75),
new Tuple2<>("class2",65)
);
JavaPairRDD<String,Tuple2<Integer,Integer>> scoresRDD = sc.parallelizePairs(list).mapValues(new Function<Integer, Tuple2<Integer,Integer>>() {
@Override
public Tuple2<Integer,Integer> call(Integer integer) throws Exception {
return new Tuple2<Integer,Integer>(integer,1);
}
});
JavaPairRDD<String,Tuple2<Integer,Integer>> reducedScoresRDD = scoresRDD.reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> integerIntegerTuple2, Tuple2<Integer, Integer> integerIntegerTuple22) throws Exception {
return new Tuple2<>(integerIntegerTuple2._1+integerIntegerTuple22._1 , integerIntegerTuple2._2+integerIntegerTuple2._2);
}
});
reducedScoresRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, Integer>>>() {
@Override
public void call(Tuple2<String, Tuple2<Integer, Integer>> stringTuple2Tuple2) throws Exception {
System.out.println(stringTuple2Tuple2._1 + ":" + stringTuple2Tuple2._2._1.doubleValue() / stringTuple2Tuple2._2._2);
}
});
sc.close();
}
private static void sortByKey(){
SparkConf conf = new SparkConf()
.setAppName("sortByKey")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("warn");
List<Tuple2<Integer,String>> list = Arrays.asList(
new Tuple2<>(75,"leo"),
new Tuple2<>(50,"tom"),
new Tuple2<>(100,"marray"),
new Tuple2<>(86,"jack")
);
JavaPairRDD<Integer,String> scoresRDD = sc.parallelizePairs(list);
JavaPairRDD<Integer,String> sortedScoresRDD = scoresRDD.sortByKey(false);
sortedScoresRDD.foreach(x->{
System.out.println(x._2+": "+x._1);
});
sc.close();
}
private static void join(){
SparkConf conf = new SparkConf()
.setAppName("join")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("warn");
List<Tuple2<Integer,String>> students = Arrays.asList(
new Tuple2<>(1,"leo"),
new Tuple2<>(2,"tom"),
new Tuple2<>(3,"marray"),
new Tuple2<>(4,"jack")
);
List<Tuple2<Integer,Integer>> scores = Arrays.asList(
new Tuple2<>(1,75),
new Tuple2<>(2,50),
new Tuple2<>(3,100),
new Tuple2<>(4,86)
);
JavaPairRDD<Integer,String> studentsRDD = sc.parallelizePairs(students);
JavaPairRDD<Integer,Integer> scoresRDD = sc.parallelizePairs(scores);
JavaPairRDD<Integer,Tuple2<String,Integer>> joinedRDD = studentsRDD.join(scoresRDD).sortByKey();
joinedRDD.foreach(x->{
System.out.println("students id : " + x._1);
System.out.println("students name : " + x._2._1);
System.out.println("students score : " + x._2._2);
System.out.println("==================================");
});
sc.close();
}
private static void cogroup(){
SparkConf conf = new SparkConf()
.setAppName("cogroup")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("warn");
List<Tuple2<Integer,String>> students = Arrays.asList(
new Tuple2<>(1,"leo1"),
new Tuple2<>(2,"tom1"),
new Tuple2<>(3,"marray1"),
new Tuple2<>(4,"jack1"),
new Tuple2<>(1,"leo2"),
new Tuple2<>(2,"tom2"),
new Tuple2<>(3,"marray2"),
new Tuple2<>(4,"jack2")
);
List<Tuple2<Integer,Integer>> scores = Arrays.asList(
new Tuple2<>(1,75),
new Tuple2<>(2,50),
new Tuple2<>(3,100),
new Tuple2<>(4,86),
new Tuple2<>(1,67),
new Tuple2<>(2,61),
new Tuple2<>(3,98),
new Tuple2<>(4,78)
);
JavaPairRDD<Integer,String> studentsRDD = sc.parallelizePairs(students);
JavaPairRDD<Integer,Integer> scoresRDD = sc.parallelizePairs(scores);
JavaPairRDD<Integer,Tuple2<Iterable<String>,Iterable<Integer>>> cogroupRDD = studentsRDD.cogroup(scoresRDD).sortByKey();
cogroupRDD.foreach(x->{
System.out.println("students id : " + x._1);
System.out.println("students name : " + x._2._1);
System.out.println("students score : " + x._2._2);
System.out.println("==================================");
});
sc.close();
}
}
Scala版本
import org.apache.spark.{SparkConf, SparkContext}
object TransformationStudy {
def main(args: Array[String]): Unit = {
cogroup()
}
private def map(): Unit ={
val conf = new SparkConf().setAppName("map").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
val list = Array(1,2,3,4,5,6)
val mapRDD = sc.parallelize(list,1)
mapRDD.map(x=>2*x).foreach(x=>println(x))
sc.stop()
}
private def filter(): Unit ={
val conf = new SparkConf().setAppName("filter").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
val list = Array(1,2,3,4,5,6,7,8,9,10)
val listRDD = sc.parallelize(list,1)
list.filter(x => (x&1) == 0).foreach(x=>println(x))
sc.stop()
}
private def flatMap(): Unit ={
val conf = new SparkConf().setAppName("flatMap").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
val list = Array("hello me", "hello you", "hello world")
val listRDD = sc.parallelize(list,1)
listRDD.flatMap(x=>x.split(" ")).foreach(x=>println(x))
sc.stop()
}
private def groupByKey(): Unit ={
val conf = new SparkConf().setAppName("groupByKey").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
val list = Array(("class1", 80),("class2",76),("class1",90),("class2",93))
val listRDD = sc.parallelize(list,1)
listRDD.groupByKey().foreach(x=>{
println(x)
})
sc.stop()
}
private def reduceByKey(): Unit ={
val conf = new SparkConf().setAppName("reduceByKey").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
val list = Array(("class1", 80),("class2",76),("class1",90),("class2",93))
val listRDD = sc.parallelize(list,1)
val mapedListRDD = listRDD.map(x=>(x._1,(x._2,1)))
mapedListRDD.reduceByKey((x,y)=>(x._1+y._1, x._2+y._2)).map(x=>(x._1, x._2._1.toDouble / x._2._2)).foreach(println)
}
private def sortByKey(): Unit ={
val conf = new SparkConf().setAppName("sortByKey").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
val list = Array((75, "leo"), (50, "tom"), (100, "marry"), (86, "jack"))
val listRDD = sc.parallelize(list,1)
listRDD.sortByKey(ascending = false).foreach(println)
}
private def join(): Unit ={
val conf = new SparkConf().setAppName("join").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
val students = Array((1, "leo"), (2, "tom"), (3, "marry"), (4, "jack"))
val scores = Array((1, 75), (2, 50), (3, 100), (4, 86))
val studentsRDD = sc.parallelize(students,1)
val scoresRDD = sc.parallelize(scores,1)
studentsRDD.join(scoresRDD).sortByKey().foreach(println)
}
private def cogroup(): Unit ={
val conf = new SparkConf().setAppName("cogroup").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
val students = Array((1, "leo"), (2, "tom"), (3, "marry"), (4, "jack"))
val scores = Array((1, 75), (2, 50), (3, 100), (4, 86),(1, 33), (2, 45), (3, 99), (4, 67))
val studentsRDD = sc.parallelize(students,1)
val scoresRDD = sc.parallelize(scores,1)
studentsRDD.cogroup(scoresRDD).sortByKey().foreach(println)
}
}
3.常用的Action算子
类型 | 功能描述 |
---|---|
reduce | 将RDD中的所有元素进行聚合操作,第一个和第二个元素聚合,值与第三个元素聚合,值与第四给元素聚合,以此类推 |
collect | 将RDD中所有元素获取到本地客户端,若数据量特别大,可能会造成网络拥堵 |
count | 获取RDD元素的总个数 |
take(n) | 获取RDD中前n给元素 |
saveAsTextFile | 将RDD元素保存到文件中,对每个元素调用toString方法,saveAsTextFile按照执行task的多少生成多少个文件,如果只想生成一个文件,则在RDD上调用coalesce(1,true).saveAsTextFile(),此时,Spark只起一个task来执行保存的动作,也就只有一个文件产生了,又或者,可以调用repartition(1),它其实是coalesce的一个包装,默认第二个参数为true |
countByKey | 对每个key对应的值进行count计数 |
foreach | 遍历RDD中的每个元素,与collect不同,foreach是在worker节点上对数据进行遍历操作,与collect把数据传回客户端进行遍历操作相比,性能更好 |
4.实战演练
java版本
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
import java.util.*;
public class ActionStudy {
public static void main(String[] args) {
//reduce();
//collect();
//count();
//take();
//saveAsTextFile();
countByKey();
}
private static void reduce(){
SparkConf conf = new SparkConf()
.setAppName("reduce")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("warn");
JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10),1);
Integer sum = numberRDD.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
System.out.println(sum);
sc.close();
}
private static void collect(){
SparkConf conf = new SparkConf()
.setAppName("collect")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("warn");
JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
List<Integer> numbers = numberRDD.collect();
for (Integer e : numbers){
System.out.println(e);
}
sc.close();
}
private static void count(){
SparkConf conf = new SparkConf()
.setAppName("count")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("warn");
JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
long count = numberRDD.count();
System.out.println(count);
sc.close();
}
private static void take(){
SparkConf conf = new SparkConf()
.setAppName("take")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("warn");
JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
List<Integer> numbers = numberRDD.take(3);
for (Integer e : numbers){
System.out.println(e);
}
sc.close();
}
private static void saveAsTextFile(){
SparkConf conf = new SparkConf()
.setAppName("saveAsTextFile")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("warn");
JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10),3);
numberRDD.saveAsTextFile("./output");
sc.close();
}
private static void countByKey(){
SparkConf conf = new SparkConf()
.setAppName("countByKey")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("warn");
List<Tuple2<Integer,String>> list = Arrays.asList(
new Tuple2<>(75,"leo"),
new Tuple2<>(50,"tom"),
new Tuple2<>(100,"marray"),
new Tuple2<>(86,"jack"),
new Tuple2<>(75,"leo"),
new Tuple2<>(50,"tom"),
new Tuple2<>(100,"marray"),
new Tuple2<>(86,"jack")
);
JavaPairRDD<Integer,String> scoresRDD = sc.parallelizePairs(list);
Map<Integer,Long> countMap = scoresRDD.countByKey();
System.out.println(countMap);
sc.close();
}
}