DataSet 开发概述
DataSource 数据来源
- readTextFile
public class ReadTextFileDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// DataSource<String> dataSource = env.readTextFile("file:///Users/baidu/Desktop/input");
DataSource<String> dataSource = env.readTextFile("hdfs://master:9000/user/yuan/input/wc.count");
dataSource.flatMap(new FlatMapFunction<String, S>() {
@Override
public void flatMap(String value, Collector<S> out) throws Exception {
Arrays.stream(value.split("")).forEach(o -> {
S s = new S();
s.setStr(o);
out.collect(s);
});
}
}).print();
}
public static class S {
String str;
public String getStr() {
return str;
}
public void setStr(String str) {
this.str = str;
}
@Override
public String toString() {
return "S{" +
"str='" + str + '\'' +
'}';
}
}
}
计数器
public static void main(String[] args) throws Exception {
// ExecutionEnvironment 执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> text = env.fromElements("Who's there?",
"I think I hear them. Stand, ho! Who's there?");
text
.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
IntCounter intCounter = new IntCounter(0);
@Override
public void open(Configuration parameters) throws Exception {
// 解决多并行度时 计数总数不正确(因为每个并行度就是一个线程 每个线程之前是不共享数据的)
getRuntimeContext().addAccumulator("c", intCounter);
}
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String s : value.split(" ")) {
intCounter.add(1);
out.collect(new Tuple2<String, Integer>(s, 1));
}
}
})
// 设置几个并行度 就写几份文件
.setParallelism(4)
.writeAsText("file:///Users/baidu/Desktop/222", FileSystem.WriteMode.OVERWRITE);;
JobExecutionResult execute = env.execute("abcd");
Object c = execute.getAccumulatorResult("c");
System.out.println("c = " + c);
}
Sink
分布式缓存
Transformation算子
- Map
public class MapDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> text = env.fromElements("1", "2", "3", "4", "5");
// map 一对一转换 字符串数值
text.map(new MapFunction<String, Integer>() {
public Integer map(String value) throws Exception {
return Integer.valueOf(value);
}
}).print();
}
}
- FlatMap
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> text = env.fromElements("我是一个大傻瓜", "我是一个小朋友");
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
// 一对多 一行内容转换了好几行
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String s : value.split("")) {
out.collect(new Tuple2<String, Integer>(s, 1));
}
}
}).print();
}
- MapPartition
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
data.add(new Tuple2<>(1, "demo1"));
data.add(new Tuple2<>(1, "demo2"));
data.add(new Tuple2<>(1, "demo3"));
data.add(new Tuple2<>(2, "demo4"));
data.add(new Tuple2<>(2, "demo5"));
data.add(new Tuple2<>(2, "demo6"));
data.add(new Tuple2<>(3, "demo7"));
data.add(new Tuple2<>(3, "demo8"));
data.add(new Tuple2<>(4, "demo9"));
data.add(new Tuple2<>(4, "demo10"));
data.add(new Tuple2<>(4, "demo11"));
data.add(new Tuple2<>(4, "demo12"));
data.add(new Tuple2<>(5, "demo13"));
data.add(new Tuple2<>(5, "demo14"));
data.add(new Tuple2<>(5, "demo15"));
data.add(new Tuple2<>(5, "demo16"));
data.add(new Tuple2<>(5, "demo17"));
data.add(new Tuple2<>(6, "demo18"));
data.add(new Tuple2<>(6, "demo19"));
data.add(new Tuple2<>(6, "demo20"));
data.add(new Tuple2<>(6, "demo21"));
DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);
// 按照数字分区 相同的数据会分到一个分区里
collection
.partitionByRange(0)
.mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
@Override
public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
values.forEach(o -> {
System.out.println("线程id:" + Thread.currentThread().getId() + "," + o);
});
System.out.println();
}
})
.setParallelism(4)
.print();
System.out.println("partitionByRange");
// 按照hash分区
collection
.partitionByHash(0)
.mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
@Override
public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
values.forEach(o -> {
System.out.println("线程id:" + Thread.currentThread().getId() + "," + o);
});
System.out.println();
}
})
.setParallelism(4)
.print();
System.out.println("partitionByHash");
// 按照自定义分区
collection
.partitionCustom(new Partitioner<Integer>() {
@Override
public int partition(Integer key, int numPartitions) {
System.out.println("key=" + key + " numPartitions=" + numPartitions);
return key % numPartitions;
}
}, 0)
.mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
@Override
public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
values.forEach(o -> {
System.out.println("线程id:" + Thread.currentThread().getId() + "," + o);
});
//System.out.println();
}
})
.setParallelism(4)
.print();
System.out.println("partitionCustom");
}
- Filter
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Integer> dataSource = env.fromElements(1, 2, 3, 4, 5);
dataSource.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value % 2 == 0;
}
}).print();
}
- Projection of Tuple DataSet(元组数据集投影)
project 转换将删除或移动元组数据集的元组字段。该 project(int…) 方法选择应由其索引保留的元组字段,并在输出元组中定义其顺序。
project 不需要定义函数体
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
List<Tuple3<Integer, Integer, Integer>> data = new ArrayList<>();
data.add(new Tuple3<>(1, 2, 3));
data.add(new Tuple3<>(4, 5, 6));
data.add(new Tuple3<>(7, 8, 9));
// converts Tuple3<Integer, Integer, Integer> into Tuple2<Integer, Integer>
//选出合适的字段且可调换顺序
env
.fromCollection(data)
.project(2, 0)
.print();
}
执行后的结果
(3,1)
(6,4)
(9,7)
- Transformations on Grouped DataSet (分组数据集的转换)
reduce 操作可以对分组的数据集进行操作。指定用于分组的密钥可以通过多种方式完成:
关键表达,groupBy("key")
键选择器功能,implements KeySelector
一个或多个字段位置键(仅限元组数据集),groupBy(0, 1)
案例类别字段(仅案例类别),groupBy("a", "b")
- Reduce on Grouped DataSet(减少分组数据集)
应用于分组数据集的 Reduce 转换使用用户定义的 reduce 函数将每个组简化为单个元素。
对于每组输入元素,reduce 函数依次将成对的元素组合为一个元素,直到每组只剩下一个元素为止。
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
data.add(new Tuple2<>(1, "demo1"));
data.add(new Tuple2<>(1, "demo2"));
data.add(new Tuple2<>(1, "demo3"));
data.add(new Tuple2<>(2, "demo4"));
data.add(new Tuple2<>(2, "demo5"));
data.add(new Tuple2<>(2, "demo6"));
data.add(new Tuple2<>(3, "demo7"));
data.add(new Tuple2<>(3, "demo8"));
data.add(new Tuple2<>(4, "demo9"));
data.add(new Tuple2<>(4, "demo10"));
data.add(new Tuple2<>(4, "demo11"));
data.add(new Tuple2<>(4, "demo12"));
data.add(new Tuple2<>(5, "demo13"));
data.add(new Tuple2<>(5, "demo14"));
data.add(new Tuple2<>(5, "demo15"));
data.add(new Tuple2<>(5, "demo16"));
data.add(new Tuple2<>(5, "demo17"));
data.add(new Tuple2<>(6, "demo18"));
data.add(new Tuple2<>(6, "demo19"));
data.add(new Tuple2<>(6, "demo20"));
data.add(new Tuple2<>(6, "demo21"));
DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);
collection.groupBy(0).reduce(new ReduceFunction<Tuple2<Integer, String>>() {
@Override
public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
return new Tuple2<>(value1.f0,value1.f1+" + "+value2.f1);
}
}).print();
}
- GroupReduce on Grouped DataSet(分组数据集上的 GroupReduce)
应用于分组数据集的 GroupReduce 转换为每个组调用用户定义的 group-reduce 函数。
此与 Reduce 的区别在于,用户定义的函数可一次获取整个组。该函数在组的所有元素上使用 Iterable 调用,并且可以返回任意数量的结果元素。
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
data.add(new Tuple2<>(1, "demo1"));
data.add(new Tuple2<>(1, "demo2"));
data.add(new Tuple2<>(1, "demo3"));
data.add(new Tuple2<>(2, "demo4"));
data.add(new Tuple2<>(2, "demo5"));
data.add(new Tuple2<>(2, "demo6"));
data.add(new Tuple2<>(3, "demo7"));
data.add(new Tuple2<>(3, "demo8"));
data.add(new Tuple2<>(4, "demo9"));
data.add(new Tuple2<>(4, "demo10"));
data.add(new Tuple2<>(4, "demo11"));
data.add(new Tuple2<>(4, "demo12"));
data.add(new Tuple2<>(5, "demo13"));
data.add(new Tuple2<>(5, "demo14"));
data.add(new Tuple2<>(5, "demo15"));
data.add(new Tuple2<>(5, "demo16"));
data.add(new Tuple2<>(5, "demo17"));
data.add(new Tuple2<>(6, "demo18"));
data.add(new Tuple2<>(6, "demo19"));
data.add(new Tuple2<>(6, "demo20"));
data.add(new Tuple2<>(6, "demo21"));
DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);
collection.groupBy(0)
.reduceGroup(new RichGroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
@Override
public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
values.forEach(o -> {
if (o.f0 % 2 == 0) {
out.collect(o);
}
});
}
})
.print();
}
- GroupCombine on a Grouped DataSet 可组合的 GroupReduce 函数
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
data.add(new Tuple2<>(1, "demo1"));
data.add(new Tuple2<>(1, "demo2"));
data.add(new Tuple2<>(1, "demo3"));
data.add(new Tuple2<>(2, "demo4"));
data.add(new Tuple2<>(2, "demo5"));
data.add(new Tuple2<>(2, "demo6"));
data.add(new Tuple2<>(3, "demo7"));
data.add(new Tuple2<>(3, "demo8"));
data.add(new Tuple2<>(4, "demo9"));
data.add(new Tuple2<>(4, "demo10"));
data.add(new Tuple2<>(4, "demo11"));
data.add(new Tuple2<>(4, "demo12"));
data.add(new Tuple2<>(5, "demo13"));
data.add(new Tuple2<>(5, "demo14"));
data.add(new Tuple2<>(5, "demo15"));
data.add(new Tuple2<>(5, "demo16"));
data.add(new Tuple2<>(5, "demo17"));
data.add(new Tuple2<>(6, "demo18"));
data.add(new Tuple2<>(6, "demo19"));
data.add(new Tuple2<>(6, "demo20"));
data.add(new Tuple2<>(6, "demo21"));
DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);
collection.groupBy(0)
// .reduceGroup(new RichGroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
// @Override
// public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
//
// values.forEach(o -> {
// if (o.f0 % 2 == 0) {
// out.collect(o);
// }
// });
// }
// })
.reduceGroup(new GroupReduce())
.print();
}
private static class GroupReduce implements GroupCombineFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>, GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
/**
* 与 reduce 函数相比,group-reduce 函数不是可隐式组合的。
* 为了使 group-reduce 函数可组合,它必须实现 GroupCombineFunction 接口。
*
* 要点:GroupCombineFunction 接口的通用输入和输出类型必须等于 GroupReduceFunction 的通用输入类型,
* 如以下示例所示:
*/
// 实现comblinefunction接口
@Override
public void combine(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
Tuple2<Integer, String> t = StreamSupport.stream(values.spliterator(), false).reduce((o1, o2) -> {
return new Tuple2<>(o1.f0, o1.f1 + " + " + o2.f1);
}).get();
out.collect(t);
}
@Override
public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
values.forEach(o->{
out.collect(o);
});
}
}
- Aggregate on Grouped Tuple DataSet(在分组元组数据集聚合)
有一些常用的聚合操作。聚合转换提供以下内置聚合功能:
Sum
Min, and
Max
聚合转换只能应用于元组数据集,并且仅支持用于分组的字段位置键
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>(1, 1));
data.add(new Tuple2<>(1, 2));
data.add(new Tuple2<>(1, 3));
data.add(new Tuple2<>(2, 4));
data.add(new Tuple2<>(2, 5));
data.add(new Tuple2<>(2, 6));
data.add(new Tuple2<>(3, 7));
data.add(new Tuple2<>(3, 8));
data.add(new Tuple2<>(4, 9));
data.add(new Tuple2<>(4, 10));
data.add(new Tuple2<>(4, 11));
data.add(new Tuple2<>(4, 12));
data.add(new Tuple2<>(5, 13));
data.add(new Tuple2<>(5, 14));
data.add(new Tuple2<>(5, 15));
data.add(new Tuple2<>(5, 16));
data.add(new Tuple2<>(5, 17));
data.add(new Tuple2<>(6, 18));
data.add(new Tuple2<>(6, 19));
data.add(new Tuple2<>(6, 20));
data.add(new Tuple2<>(6, 21));
DataSource<Tuple2<Integer, Integer>> collection = env.fromCollection(data);
collection.groupBy(0)
.aggregate(Aggregations.SUM,1)
.print();
}
- MinBy / MaxBy on Grouped Tuple DataSet(分组元组数据集上的 MinBy / MaxBy)
MinBy(MaxBy)转换为每个元组组选择一个元组。
选定的元组是其一个或多个指定字段的值最小(最大)的元组。用于比较的字段必须是有效的关键字段,即可比较的字段。
如果多个元组具有最小(最大)字段值,则返回这些元组中的任意元组。
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>(1, 1));
data.add(new Tuple2<>(1, 2));
data.add(new Tuple2<>(1, 3));
data.add(new Tuple2<>(2, 4));
data.add(new Tuple2<>(2, 5));
data.add(new Tuple2<>(2, 6));
data.add(new Tuple2<>(3, 7));
data.add(new Tuple2<>(3, 8));
data.add(new Tuple2<>(4, 9));
data.add(new Tuple2<>(4, 10));
data.add(new Tuple2<>(4, 11));
data.add(new Tuple2<>(4, 12));
data.add(new Tuple2<>(5, 13));
data.add(new Tuple2<>(5, 14));
data.add(new Tuple2<>(5, 15));
data.add(new Tuple2<>(5, 16));
data.add(new Tuple2<>(5, 17));
data.add(new Tuple2<>(6, 18));
data.add(new Tuple2<>(6, 19));
data.add(new Tuple2<>(6, 20));
data.add(new Tuple2<>(6, 21));
DataSource<Tuple2<Integer, Integer>> collection = env.fromCollection(data);
collection.groupBy(0)
.maxBy(1)
.print();
}
- Reduce on full DataSet减少完整的 DataSet)
Reduce 转换将用户定义的 reduce 函数应用于 DataSet 的所有元素。reduce 函数随后将成对的元素组合为一个元素,直到仅剩下一个元素为止。
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
data.add(new Tuple2<>(1, "demo1"));
data.add(new Tuple2<>(1, "demo2"));
data.add(new Tuple2<>(1, "demo3"));
data.add(new Tuple2<>(2, "demo4"));
data.add(new Tuple2<>(2, "demo5"));
data.add(new Tuple2<>(2, "demo6"));
data.add(new Tuple2<>(3, "demo7"));
data.add(new Tuple2<>(3, "demo8"));
data.add(new Tuple2<>(4, "demo9"));
data.add(new Tuple2<>(4, "demo10"));
data.add(new Tuple2<>(4, "demo11"));
data.add(new Tuple2<>(4, "demo12"));
data.add(new Tuple2<>(5, "demo13"));
data.add(new Tuple2<>(5, "demo14"));
data.add(new Tuple2<>(5, "demo15"));
data.add(new Tuple2<>(5, "demo16"));
data.add(new Tuple2<>(5, "demo17"));
data.add(new Tuple2<>(6, "demo18"));
data.add(new Tuple2<>(6, "demo19"));
data.add(new Tuple2<>(6, "demo20"));
data.add(new Tuple2<>(6, "demo21"));
DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);
collection
.reduce(new ReduceFunction<Tuple2<Integer, String>>() {
@Override
public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
return new Tuple2<>(value1.f0 + value2.f0, value1.f1 + " + " + value2.f1);
}
}).print();
}
- GroupReduce on full DataSet
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
data.add(new Tuple2<>(1, "demo1"));
data.add(new Tuple2<>(1, "demo2"));
data.add(new Tuple2<>(1, "demo3"));
data.add(new Tuple2<>(2, "demo4"));
data.add(new Tuple2<>(2, "demo5"));
data.add(new Tuple2<>(2, "demo6"));
data.add(new Tuple2<>(3, "demo7"));
data.add(new Tuple2<>(3, "demo8"));
DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);
collection
// .groupBy(0)
.reduce(new ReduceFunction<Tuple2<Integer, String>>() {
@Override
public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
return new Tuple2<>(value1.f0 + value2.f0, value1.f1 + " + " + value2.f1);
}
}).print();
}
- GroupCombine on a full DataSet(完整数据集上的 GroupCombine)
完整数据集上的 GroupCombine 与分组数据集上的 GroupCombine 相似。
数据在所有节点上进行分区,然后以贪婪的方式进行组合(即,仅将适合内存的数据进行一次组合)。
Aggregate on full Tuple DataSet(完整数据集上聚合)
聚合转换只能应用于元组数据集。MinBy / MaxBy on full Tuple DataSet(完整数据集上的 MinBy/MaxBy)
MinBy(MaxBy)转换从元组的数据集中选择一个元组。
选定的元组是其一个或多个指定字段的值最小(最大)的元组。
用于比较的字段必须是有效的关键字段,即可比较的字段。
如果多个元组具有最小(最大)字段值,则返回这些元组中的任意元组。Distinct(去重)
Distinct 转换计算源数据集的不同元素的数据集。Join
OuterJoin 合并两个数据流 join with
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple2<Integer, String>> stringDataSource1 =
env.fromElements(
new Tuple2(1, "小王"),
new Tuple2(2, "小里"),
new Tuple2(3, "小张"),
new Tuple2(4, "小四")
);
DataSource<Tuple2<Integer, String>> stringDataSource2 =
env.fromElements(
new Tuple2(1, "北京"),
new Tuple2(2, "上海"),
new Tuple2(3, "成都"),
new Tuple2(5, "重庆")
);
stringDataSource1
//.leftOuterJoin(stringDataSource2)
//.rightOuterJoin(stringDataSource2)
//.fullOuterJoin(stringDataSource2)
.join(stringDataSource2)
// 第一个数据 中 第一个字段
.where(0)
// 第二个数据 中 第一个字段
.equalTo(0)
.with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
if (first == null) {
return new Tuple3<>(second.f0, "-", second.f1);
} else if (second == null) {
return new Tuple3<>(first.f0, first.f1, "-");
} else {
return new Tuple3<>(first.f0, first.f1, second.f1);
}
}
})
.print();
}
- Cross
Cross 转换将两个数据集组合为一个数据集。它构建两个输入数据集的元素的所有成对组合,即构建笛卡尔积。
Cross 转换要么在每对元素上调用用户定义的交叉函数,要么输出 Tuple2
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple1<Integer>> stringDataSource1 =
env.fromElements(
new Tuple1(1),
new Tuple1(2)
);
DataSource<Tuple1<String>> stringDataSource2 =
env.fromElements(
new Tuple1("北京"),
new Tuple1("上海"),
new Tuple1("重庆")
);
stringDataSource1
.cross(stringDataSource2)
.with(new CrossFunction<Tuple1<Integer>, Tuple1<String>, Tuple2<Integer, String>>() {
@Override
public Tuple2<Integer, String> cross(Tuple1<Integer> val1, Tuple1<String> val2) throws Exception {
return new Tuple2(val1.f0, val2.f0);
}
}).print();
}
(1,北京)
(1,上海)
(1,重庆)
(2,北京)
(2,上海)
(2,重庆)
- CoGroup 与join不同的是 join是一对一有多少相同的数据就产出多少条。 cogroup是多对多 会按照key分组
CoGroup 转换共同处理两个数据集的组。两个数据集都分组在一个定义的键上,并且两个共享相同键的数据集的组一起交给用户定义的 co-group function。
如果对于一个特定的键,只有一个 DataSet 有一个组,则使用该组和一个空组调用共同组功能。协同功能可以分别迭代两个组的元素并返回任意数量的结果元素。
与 Reduce,GroupReduce 和 Join 相似,可以使用不同的键选择方法来定义键。
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple2<Integer, String>> stringDataSource1 =
env.fromElements(
new Tuple2(1, "小王"),
new Tuple2(1, "小周"),
new Tuple2(2, "小里"),
new Tuple2(3, "小张"),
new Tuple2(4, "小四")
);
DataSource<Tuple2<Integer, String>> stringDataSource2 =
env.fromElements(
new Tuple2(1, "北京"),
new Tuple2(1, "邯郸"),
new Tuple2(2, "上海"),
new Tuple2(3, "成都"),
new Tuple2(5, "重庆")
);
stringDataSource1
.coGroup(stringDataSource2)
// 第一个数据 中 第一个字段
.where(0)
// 第二个数据 中 第一个字段
.equalTo(0)
.with(new CoGroupFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>() {
@Override
public void coGroup(Iterable<Tuple2<Integer, String>> first, Iterable<Tuple2<Integer, String>> second, Collector<Tuple2<Integer, String>> out) throws Exception {
Tuple2<Integer, String> t1 = StreamSupport.stream(first.spliterator(), false).reduce((o1, o2) -> {
return new Tuple2<>(o1.f0, o1.f1 + " + " + o2.f1);
}).orElse(null);
Tuple2<Integer, String> t2 = StreamSupport.stream(second.spliterator(), false).reduce((o1, o2) -> {
return new Tuple2<>(o1.f0, o1.f1 + " + " + o2.f1);
}).orElse(null);
if (t1 != null && t2 != null) {
out.collect(new Tuple2<>(t1.f0, t1.f1 + " + " + t2.f1));
}
}
})
.print();
}
(3,小张 + 成都)
(1,小王 + 小周 + 北京 + 邯郸)
(2,小里 + 上海)
- Union 合并两个以上相同类型的数据集
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple2<Integer, String>> stringDataSource1 =
env.fromElements(
new Tuple2(1, "小王"),
new Tuple2(1, "小周"),
new Tuple2(2, "小里"),
new Tuple2(3, "小张"),
new Tuple2(4, "小四")
);
DataSource<Tuple2<Integer, String>> stringDataSource2 =
env.fromElements(
new Tuple2(1, "北京"),
new Tuple2(2, "上海"),
new Tuple2(3, "成都"),
new Tuple2(5, "重庆")
);
stringDataSource1.union(stringDataSource2)
.print();
}
(1,小王)
(4,小四)
(1,北京)
(1,小周)
(2,上海)
(2,小里)
(3,小张)
(3,小张)
(5,重庆)
Rebalance(重新平衡)
均匀地重新平衡数据集的并行分区,以消除数据偏斜。
重要:此操作会通过网络重新整理整个 DataSet。可能会花费大量时间。Hash-Partition (哈希分区) 详见mapPartition
重要:此操作会通过网络重新整理整个 DataSet。可能会花费大量时间。Range-Partition(范围分区)
重要:此操作需要在 DataSet 上额外传递一次以计算范围,通过网络对整个 DataSet 进行边界划分和改组。这会花费大量时间。Sort Partition(分区排序)
以指定顺序对指定字段上的 DataSet 的所有分区进行本地排序。可以将字段指定为字段表达式或字段位置。
可以通过链接 sortPartition() 调用在多个字段上对分区进行排序。First-n(前 n 个(任意)元素)
返回数据集的前 n 个(任意)元素。First-n 可以应用于常规数据集,分组的数据集或分组排序的数据集。可以将分组键指定为键选择器功能或字段位置键。