通过demon 演示 rdd的操作
文件 student.txt
学生id 姓名 年龄 按 | 分隔
1|小红|22
2|王芳|24
3|李磊|22
4|王飞|30
student vo 类
@Data
public class StudentVo {
private Integer id;
private String name;
private Integer age;
public StudentVo(List<String> strList){
id = Integer.valueOf(strList.get(0));
name = strList.get(1);
age = Integer.valueOf(strList.get(2));
}
}
spark main 方法的初始化
SparkConf conf = new SparkConf()
.setAppName("MapReduceActionDemon")
.setMaster("local[1]");
JavaSparkContext sc = new JavaSparkContext(conf);
String studentfilePaht = "/Users/riverfan/mytest/spark/mrAction";
String scorefilePaht = "/Users/riverfan/mytest/spark/mrAction";
从文件转化rdd
JavaRDD<String> rdd1 = sc.textFile(studentfilePaht);
Transformations
1 map - 将原来RDD的每个数据项,使用map中用户自定义的函数func进行映射,转变为一个新的元素,并返回一个新的RDD。
sc.textFile(studentfilePaht)
.map(t->{
List<String> strList = SPLITTER.splitToList(t);
return new StudentVo(strList);
}).foreach(t-> System.out.println(t));
输出结果
StudentVo(id=1, name=语文, age=22)
StudentVo(id=1, name=化学, age=80)
StudentVo(id=2, name=语文, age=80)
StudentVo(id=3, name=数学, age=90)
StudentVo(id=3, name=应用, age=99)
2 filter(func)
使用函数func对原RDD中数据项进行过滤,将符合func中条件的数据项组成新的RDD返回
sc.textFile(studentfilePaht)
.map(t -> {
List<String> strList = SPLITTER.splitToList(t);
return new StudentVo(strList);
}).filter(t -> t.getAge() > 25)
.foreach(t -> System.out.println(t));
过来年纪大于25的学生
StudentVo(id=4, name=王飞, age=30)
3 flatMap(func)
类似于map,但是输入数据项可以被映射到0个或多个输出数据集合中,所以函数func的返回值是一个数据项集合而不是一个单一的数据项。??
sc.textFile(studentfilePaht)
.map(t -> {
List<String> strList = SPLITTER.splitToList(t);
return strList;
}).flatMap(t -> t.stream().iterator())
.foreach(t -> System.out.println(t));
数据由多个 list 压扁 为一个 list
1
小红
22
2
王芳
24
3
李磊
22
4
王飞
30
4 mapPartitions
类似于map,但是该操作是在每个分区上分别执行,所以当操作一个类型为T的RDD时func的格式必须是Iterator<T> => Iterator<U>。即mapPartitions需要获取到每个分区的迭代器,在函数中通过这个分区的迭代器对整个分区的元素进行操作。
// 两个分区
sc.textFile(studentfilePaht, 2)
.mapPartitions(t -> {
t.forEachRemaining(tt -> System.out.println(Thread.currentThread().getId() + "--> " + tt));
return t;
}).count();
结果如下
44--> 1|小红|22
44--> 2|王芳|24
44--> 3|李磊|22
45--> 4|王飞|30
5 mapPartitionsWithIndex
类似于mapPartitions,但是需要提供给func一个整型值,这个整型值是分区的索引,所以当处理T类型的RDD时,func的格式必须为(Int, Iterator<T>) => Iterator<U>。
sc.textFile(studentfilePaht, 2)
.mapPartitionsWithIndex((index,y)->{
System.out.println(Thread.currentThread().getId() + "--> index = " + index);
y.forEachRemaining(tt -> System.out.println(Thread.currentThread().getId() + "--> " + tt));
return y;
},false).count();
结果
44--> index = 0
44--> 1|小红|22
44--> 2|王芳|24
44--> 3|李磊|22
45--> index = 1
45--> 4|王飞|30
sample(withReplacement, fraction, seed)
6 union(otherDataset)
返回原数据集和参数指定的数据集合并后的数据集。
JavaRDD<String> jrdd = sc.parallelize(Arrays.asList("hello","river"));
sc.textFile(studentfilePaht, 2)
.union(jrdd)
.collect()
.forEach(t-> System.out.println(t));
结果 将两个数据联合在一起
1|小红|22
2|王芳|24
3|李磊|22
4|王飞|30
hello
river
7 intersection
返回两个数据集的交集。
JavaRDD<String> jrdd = sc.parallelize(Arrays.asList("3|李磊|22","river"));
sc.textFile(studentfilePaht)
.intersection(jrdd)
.collect()
.forEach(t-> System.out.println(t));
3|李磊|22
8 subtract(otherDataset)
返回this RDD中但不在other RDD中的元素
JavaRDD<String> jrdd = sc.parallelize(Arrays.asList("3|李磊|22","river"));
sc.textFile(studentfilePaht)
.subtract(jrdd)
.collect()
.forEach(t-> System.out.println(t));
1|小红|22
2|王芳|24
4|王飞|30
9 distinct([numTasks]))
将RDD中的元素进行去重操作
sc.parallelize(Arrays.asList("river","river","frank"))
.distinct().foreach(t-> System.out.println(t));
10 groupByKey([numTasks])
操作(K,V)格式的数据集,返回 (K, Iterable)格式的数据集。
注意,如果分组是为了按key进行聚合操作(例如,计算sum、average),此时使用reduceByKey或aggregateByKey计算效率会更高。
注意,默认情况下,并行情况取决于父RDD的分区数,但可以通过参数numTasks来设置任务数
未完待续
sc.textFile(studentfilePaht,2)
.mapToPair(t->new Tuple2<>(t.substring(0,t.indexOf("|")),t))
.groupByKey()
.map(t->{
System.out.println(Thread.currentThread().getId() + ":" +t);
return t;
})
.collect()
.forEach(t-> System.out.println(Thread.currentThread().getId() + ":" +t));
通过结果可以看到,groupby是在不同的线程(任务)进行的
44:(3,[3|李磊|22])
44:(1,[1|小红|22])
45:(4,[4|王飞|30])
45:(2,[2|王芳|24])
11 reduceByKey(func, [numTasks])
使用给定的func,将(K,V)对格式的数据集中key相同的值进行聚集,其中func的格式必须为(V,V) => V。可选参数numTasks可以指定reduce任务的数目。
sc.parallelize(Arrays.asList("river|hello", "river|cat", "frank|moon"))
.mapToPair(t -> new Tuple2<>(t.substring(0, t.indexOf("|")), t))
.reduceByKey((t1, t2) -> t1 + "+" + t2)
.collect()
.forEach(t -> System.out.println(t));
返回结果
(river,river|hello+river|cat)
(frank,frank|moon)
12 sortByKey
(K,V)格式的数据集,其中K已实现了Ordered,经过sortByKey操作返回排序后的数据集。指定布尔值参数ascending来指定升序或降序排列。
sc.parallelize(Arrays.asList("river|hello",
"frank|cat1",
"river|cat2",
"frank|cat3",
"river|cat4",
"frank|cat5",
"river|moon"))
.mapToPair(t -> new Tuple2<>(t.substring(0, t.indexOf("|")), t))
.sortByKey(false,2)
.collect()
.forEach(t -> System.out.println(t));
结果如下:
(river,river|hello)
(river,river|cat2)
(river,river|cat4)
(river,river|moon)
(frank,frank|cat1)
(frank,frank|cat3)
(frank,frank|cat5)
13 join(otherDataset, [numTasks])n
用于操作两个键值对格式的数据集,操作两个数据集(K,V)和(K,W)返回(K, (V, W))格式的数据集。通过leftOuterJoin、rightOuterJoin、fullOuterJoin完成外连接操作。
JavaPairRDD<String,String> jrdd =
sc.parallelize(Arrays.asList("1|good boy", "2|go on"))
.mapToPair(t->{
List<String> strList = SPLITTER.splitToList(t);
return new Tuple2<>(strList.get(0),strList.get(1));
});
sc.textFile(studentfilePaht)
.mapToPair(t->{
List<String> strList = SPLITTER.splitToList(t);
return new Tuple2<>(strList.get(0),t);
})
.join(jrdd)
.collect()
.forEach(t -> System.out.println(t));
结果:
(2,(2|王芳|24,go on))
(1,(1|小红|22,good boy))