flink DataSet

DataSet 开发概述

DataSource 数据来源

  • readTextFile
 
public class ReadTextFileDemo {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//        DataSource<String> dataSource = env.readTextFile("file:///Users/baidu/Desktop/input");
        DataSource<String> dataSource = env.readTextFile("hdfs://master:9000/user/yuan/input/wc.count");
        dataSource.flatMap(new FlatMapFunction<String, S>() {
            @Override
            public void flatMap(String value, Collector<S> out) throws Exception {
                Arrays.stream(value.split("")).forEach(o -> {
                    S s = new S();
                    s.setStr(o);
                    out.collect(s);
                });
            }
        }).print();

    }


    public static class S {
        String str;

        public String getStr() {
            return str;
        }

        public void setStr(String str) {
            this.str = str;
        }

        @Override
        public String toString() {
            return "S{" +
                    "str='" + str + '\'' +
                    '}';
        }
    }
}

计数器

    public static void main(String[] args) throws Exception {

        // ExecutionEnvironment 执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> text = env.fromElements("Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        text
                .flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {

                    IntCounter intCounter = new IntCounter(0);

                    @Override
                    public void open(Configuration parameters) throws Exception {
                         // 解决多并行度时 计数总数不正确(因为每个并行度就是一个线程 每个线程之前是不共享数据的)
                        getRuntimeContext().addAccumulator("c", intCounter);
                    }

                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        for (String s : value.split(" ")) {
                            intCounter.add(1);
                            out.collect(new Tuple2<String, Integer>(s, 1));
                        }
                    }
                })
                // 设置几个并行度 就写几份文件
                .setParallelism(4)
                .writeAsText("file:///Users/baidu/Desktop/222", FileSystem.WriteMode.OVERWRITE);;


        JobExecutionResult execute = env.execute("abcd");

        Object c = execute.getAccumulatorResult("c");

        System.out.println("c = " + c);

    }

Sink

分布式缓存

Transformation算子

  • Map
public class MapDemo {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> text = env.fromElements("1", "2", "3", "4", "5");
        // map 一对一转换 字符串数值
        text.map(new MapFunction<String, Integer>() {
            public Integer map(String value) throws Exception {
                return Integer.valueOf(value);
            }
        }).print();
    }

}
  • FlatMap
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> text = env.fromElements("我是一个大傻瓜", "我是一个小朋友");

        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            // 一对多 一行内容转换了好几行
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                for (String s : value.split("")) {
                    out.collect(new Tuple2<String, Integer>(s, 1));
                }
            }
        }).print();
    }
  • MapPartition
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "demo1"));
        data.add(new Tuple2<>(1, "demo2"));
        data.add(new Tuple2<>(1, "demo3"));
        data.add(new Tuple2<>(2, "demo4"));
        data.add(new Tuple2<>(2, "demo5"));
        data.add(new Tuple2<>(2, "demo6"));
        data.add(new Tuple2<>(3, "demo7"));
        data.add(new Tuple2<>(3, "demo8"));
        data.add(new Tuple2<>(4, "demo9"));
        data.add(new Tuple2<>(4, "demo10"));
        data.add(new Tuple2<>(4, "demo11"));
        data.add(new Tuple2<>(4, "demo12"));
        data.add(new Tuple2<>(5, "demo13"));
        data.add(new Tuple2<>(5, "demo14"));
        data.add(new Tuple2<>(5, "demo15"));
        data.add(new Tuple2<>(5, "demo16"));
        data.add(new Tuple2<>(5, "demo17"));
        data.add(new Tuple2<>(6, "demo18"));
        data.add(new Tuple2<>(6, "demo19"));
        data.add(new Tuple2<>(6, "demo20"));
        data.add(new Tuple2<>(6, "demo21"));


        DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);

        // 按照数字分区 相同的数据会分到一个分区里
        collection
                .partitionByRange(0)
                .mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                    @Override
                    public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
                        values.forEach(o -> {
                            System.out.println("线程id:" + Thread.currentThread().getId() + "," + o);
                        });
                        System.out.println();
                    }
                })
                .setParallelism(4)
                .print();
        System.out.println("partitionByRange");

        // 按照hash分区 
        collection
                .partitionByHash(0)
                .mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                    @Override
                    public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
                        values.forEach(o -> {
                            System.out.println("线程id:" + Thread.currentThread().getId() + "," + o);
                        });
                        System.out.println();
                    }
                })
                .setParallelism(4)
                .print();
        System.out.println("partitionByHash");

        // 按照自定义分区 
        collection
                .partitionCustom(new Partitioner<Integer>() {
                    @Override
                    public int partition(Integer key, int numPartitions) {
                        System.out.println("key=" + key + "  numPartitions=" + numPartitions);
                        return key % numPartitions;
                    }
                }, 0)
                .mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                    @Override
                    public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
                        values.forEach(o -> {
                            System.out.println("线程id:" + Thread.currentThread().getId() + "," + o);
                        });
                        //System.out.println();
                    }
                })
                .setParallelism(4)
                .print();
        System.out.println("partitionCustom");

    }

  • Filter
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Integer> dataSource = env.fromElements(1, 2, 3, 4, 5);

        dataSource.filter(new FilterFunction<Integer>() {
            @Override
            public boolean filter(Integer value) throws Exception {
                return value % 2 == 0;
            }
        }).print();

    }
  • Projection of Tuple DataSet(元组数据集投影)
    project 转换将删除或移动元组数据集的元组字段。该 project(int…) 方法选择应由其索引保留的元组字段,并在输出元组中定义其顺序。
    project 不需要定义函数体
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        List<Tuple3<Integer, Integer, Integer>> data = new ArrayList<>();
        data.add(new Tuple3<>(1, 2, 3));
        data.add(new Tuple3<>(4, 5, 6));
        data.add(new Tuple3<>(7, 8, 9));

        // converts Tuple3<Integer, Integer, Integer> into Tuple2<Integer, Integer> 
        //选出合适的字段且可调换顺序
        env
                .fromCollection(data)
                .project(2, 0)
                .print();

    }

执行后的结果
(3,1)
(6,4)
(9,7)

  • Transformations on Grouped DataSet (分组数据集的转换)
    reduce 操作可以对分组的数据集进行操作。指定用于分组的密钥可以通过多种方式完成:
关键表达,groupBy("key")
键选择器功能,implements KeySelector
一个或多个字段位置键(仅限元组数据集),groupBy(0, 1)
案例类别字段(仅案例类别),groupBy("a", "b")
  • Reduce on Grouped DataSet(减少分组数据集)
    应用于分组数据集的 Reduce 转换使用用户定义的 reduce 函数将每个组简化为单个元素。
    对于每组输入元素,reduce 函数依次将成对的元素组合为一个元素,直到每组只剩下一个元素为止。
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "demo1"));
        data.add(new Tuple2<>(1, "demo2"));
        data.add(new Tuple2<>(1, "demo3"));
        data.add(new Tuple2<>(2, "demo4"));
        data.add(new Tuple2<>(2, "demo5"));
        data.add(new Tuple2<>(2, "demo6"));
        data.add(new Tuple2<>(3, "demo7"));
        data.add(new Tuple2<>(3, "demo8"));
        data.add(new Tuple2<>(4, "demo9"));
        data.add(new Tuple2<>(4, "demo10"));
        data.add(new Tuple2<>(4, "demo11"));
        data.add(new Tuple2<>(4, "demo12"));
        data.add(new Tuple2<>(5, "demo13"));
        data.add(new Tuple2<>(5, "demo14"));
        data.add(new Tuple2<>(5, "demo15"));
        data.add(new Tuple2<>(5, "demo16"));
        data.add(new Tuple2<>(5, "demo17"));
        data.add(new Tuple2<>(6, "demo18"));
        data.add(new Tuple2<>(6, "demo19"));
        data.add(new Tuple2<>(6, "demo20"));
        data.add(new Tuple2<>(6, "demo21"));

        DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);

        collection.groupBy(0).reduce(new ReduceFunction<Tuple2<Integer, String>>() {
            @Override
            public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
                return new Tuple2<>(value1.f0,value1.f1+" + "+value2.f1);
            }
        }).print();


    }

  • GroupReduce on Grouped DataSet(分组数据集上的 GroupReduce)
    应用于分组数据集的 GroupReduce 转换为每个组调用用户定义的 group-reduce 函数。
    此与 Reduce 的区别在于,用户定义的函数可一次获取整个组。该函数在组的所有元素上使用 Iterable 调用,并且可以返回任意数量的结果元素。
  public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "demo1"));
        data.add(new Tuple2<>(1, "demo2"));
        data.add(new Tuple2<>(1, "demo3"));
        data.add(new Tuple2<>(2, "demo4"));
        data.add(new Tuple2<>(2, "demo5"));
        data.add(new Tuple2<>(2, "demo6"));
        data.add(new Tuple2<>(3, "demo7"));
        data.add(new Tuple2<>(3, "demo8"));
        data.add(new Tuple2<>(4, "demo9"));
        data.add(new Tuple2<>(4, "demo10"));
        data.add(new Tuple2<>(4, "demo11"));
        data.add(new Tuple2<>(4, "demo12"));
        data.add(new Tuple2<>(5, "demo13"));
        data.add(new Tuple2<>(5, "demo14"));
        data.add(new Tuple2<>(5, "demo15"));
        data.add(new Tuple2<>(5, "demo16"));
        data.add(new Tuple2<>(5, "demo17"));
        data.add(new Tuple2<>(6, "demo18"));
        data.add(new Tuple2<>(6, "demo19"));
        data.add(new Tuple2<>(6, "demo20"));
        data.add(new Tuple2<>(6, "demo21"));

        DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);

        collection.groupBy(0)
                .reduceGroup(new RichGroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                    @Override
                    public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {

                        values.forEach(o -> {
                            if (o.f0 % 2 == 0) {
                                out.collect(o);
                            }
                        });
                    }
                })
                .print();
    }

  • GroupCombine on a Grouped DataSet 可组合的 GroupReduce 函数

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "demo1"));
        data.add(new Tuple2<>(1, "demo2"));
        data.add(new Tuple2<>(1, "demo3"));
        data.add(new Tuple2<>(2, "demo4"));
        data.add(new Tuple2<>(2, "demo5"));
        data.add(new Tuple2<>(2, "demo6"));
        data.add(new Tuple2<>(3, "demo7"));
        data.add(new Tuple2<>(3, "demo8"));
        data.add(new Tuple2<>(4, "demo9"));
        data.add(new Tuple2<>(4, "demo10"));
        data.add(new Tuple2<>(4, "demo11"));
        data.add(new Tuple2<>(4, "demo12"));
        data.add(new Tuple2<>(5, "demo13"));
        data.add(new Tuple2<>(5, "demo14"));
        data.add(new Tuple2<>(5, "demo15"));
        data.add(new Tuple2<>(5, "demo16"));
        data.add(new Tuple2<>(5, "demo17"));
        data.add(new Tuple2<>(6, "demo18"));
        data.add(new Tuple2<>(6, "demo19"));
        data.add(new Tuple2<>(6, "demo20"));
        data.add(new Tuple2<>(6, "demo21"));

        DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);

        collection.groupBy(0)
//                .reduceGroup(new RichGroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
//                    @Override
//                    public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
//
//                        values.forEach(o -> {
//                            if (o.f0 % 2 == 0) {
//                                out.collect(o);
//                            }
//                        });
//                    }
//                })
                .reduceGroup(new GroupReduce())
                .print();
    }

    private static class GroupReduce implements GroupCombineFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>, GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {

/**
  * 与 reduce 函数相比,group-reduce 函数不是可隐式组合的。
  * 为了使 group-reduce 函数可组合,它必须实现 GroupCombineFunction 接口。
  *
  * 要点:GroupCombineFunction 接口的通用输入和输出类型必须等于 GroupReduceFunction 的通用输入类型,
  * 如以下示例所示:
  */
        // 实现comblinefunction接口 
        @Override
        public void combine(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
            Tuple2<Integer, String> t = StreamSupport.stream(values.spliterator(), false).reduce((o1, o2) -> {
                return new Tuple2<>(o1.f0, o1.f1 + " + " + o2.f1);
            }).get();
            out.collect(t);
        }

        @Override
        public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
            values.forEach(o->{
                out.collect(o);
            });
        }
    }

  • Aggregate on Grouped Tuple DataSet(在分组元组数据集聚合)
    有一些常用的聚合操作。聚合转换提供以下内置聚合功能:
    Sum
    Min, and
    Max

聚合转换只能应用于元组数据集,并且仅支持用于分组的字段位置键

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, Integer>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, 1));
        data.add(new Tuple2<>(1, 2));
        data.add(new Tuple2<>(1, 3));
        data.add(new Tuple2<>(2, 4));
        data.add(new Tuple2<>(2, 5));
        data.add(new Tuple2<>(2, 6));
        data.add(new Tuple2<>(3, 7));
        data.add(new Tuple2<>(3, 8));
        data.add(new Tuple2<>(4, 9));
        data.add(new Tuple2<>(4, 10));
        data.add(new Tuple2<>(4, 11));
        data.add(new Tuple2<>(4, 12));
        data.add(new Tuple2<>(5, 13));
        data.add(new Tuple2<>(5, 14));
        data.add(new Tuple2<>(5, 15));
        data.add(new Tuple2<>(5, 16));
        data.add(new Tuple2<>(5, 17));
        data.add(new Tuple2<>(6, 18));
        data.add(new Tuple2<>(6, 19));
        data.add(new Tuple2<>(6, 20));
        data.add(new Tuple2<>(6, 21));

        DataSource<Tuple2<Integer, Integer>> collection = env.fromCollection(data);

        collection.groupBy(0)
                .aggregate(Aggregations.SUM,1)
                .print();
    }

  • MinBy / MaxBy on Grouped Tuple DataSet(分组元组数据集上的 MinBy / MaxBy)
    MinBy(MaxBy)转换为每个元组组选择一个元组。
    选定的元组是其一个或多个指定字段的值最小(最大)的元组。用于比较的字段必须是有效的关键字段,即可比较的字段。
    如果多个元组具有最小(最大)字段值,则返回这些元组中的任意元组。
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, Integer>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, 1));
        data.add(new Tuple2<>(1, 2));
        data.add(new Tuple2<>(1, 3));
        data.add(new Tuple2<>(2, 4));
        data.add(new Tuple2<>(2, 5));
        data.add(new Tuple2<>(2, 6));
        data.add(new Tuple2<>(3, 7));
        data.add(new Tuple2<>(3, 8));
        data.add(new Tuple2<>(4, 9));
        data.add(new Tuple2<>(4, 10));
        data.add(new Tuple2<>(4, 11));
        data.add(new Tuple2<>(4, 12));
        data.add(new Tuple2<>(5, 13));
        data.add(new Tuple2<>(5, 14));
        data.add(new Tuple2<>(5, 15));
        data.add(new Tuple2<>(5, 16));
        data.add(new Tuple2<>(5, 17));
        data.add(new Tuple2<>(6, 18));
        data.add(new Tuple2<>(6, 19));
        data.add(new Tuple2<>(6, 20));
        data.add(new Tuple2<>(6, 21));

        DataSource<Tuple2<Integer, Integer>> collection = env.fromCollection(data);

        collection.groupBy(0)
                .maxBy(1)
                .print();
    }

  • Reduce on full DataSet减少完整的 DataSet)
    Reduce 转换将用户定义的 reduce 函数应用于 DataSet 的所有元素。reduce 函数随后将成对的元素组合为一个元素,直到仅剩下一个元素为止。
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "demo1"));
        data.add(new Tuple2<>(1, "demo2"));
        data.add(new Tuple2<>(1, "demo3"));
        data.add(new Tuple2<>(2, "demo4"));
        data.add(new Tuple2<>(2, "demo5"));
        data.add(new Tuple2<>(2, "demo6"));
        data.add(new Tuple2<>(3, "demo7"));
        data.add(new Tuple2<>(3, "demo8"));
        data.add(new Tuple2<>(4, "demo9"));
        data.add(new Tuple2<>(4, "demo10"));
        data.add(new Tuple2<>(4, "demo11"));
        data.add(new Tuple2<>(4, "demo12"));
        data.add(new Tuple2<>(5, "demo13"));
        data.add(new Tuple2<>(5, "demo14"));
        data.add(new Tuple2<>(5, "demo15"));
        data.add(new Tuple2<>(5, "demo16"));
        data.add(new Tuple2<>(5, "demo17"));
        data.add(new Tuple2<>(6, "demo18"));
        data.add(new Tuple2<>(6, "demo19"));
        data.add(new Tuple2<>(6, "demo20"));
        data.add(new Tuple2<>(6, "demo21"));

        DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);

        collection
                .reduce(new ReduceFunction<Tuple2<Integer, String>>() {
                    @Override
                    public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
                        return new Tuple2<>(value1.f0 + value2.f0, value1.f1 + " + " + value2.f1);
                    }
                }).print();
    }

  • GroupReduce on full DataSet
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "demo1"));
        data.add(new Tuple2<>(1, "demo2"));
        data.add(new Tuple2<>(1, "demo3"));
        data.add(new Tuple2<>(2, "demo4"));
        data.add(new Tuple2<>(2, "demo5"));
        data.add(new Tuple2<>(2, "demo6"));
        data.add(new Tuple2<>(3, "demo7"));
        data.add(new Tuple2<>(3, "demo8"));
      

        DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);

        collection
//                .groupBy(0)
                .reduce(new ReduceFunction<Tuple2<Integer, String>>() {
                    @Override
                    public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
                        return new Tuple2<>(value1.f0 + value2.f0, value1.f1 + " + " + value2.f1);
                    }
                }).print();


    }

  • GroupCombine on a full DataSet(完整数据集上的 GroupCombine)
    完整数据集上的 GroupCombine 与分组数据集上的 GroupCombine 相似。
    数据在所有节点上进行分区,然后以贪婪的方式进行组合(即,仅将适合内存的数据进行一次组合)。
  • Aggregate on full Tuple DataSet(完整数据集上聚合)
    聚合转换只能应用于元组数据集。

  • MinBy / MaxBy on full Tuple DataSet(完整数据集上的 MinBy/MaxBy)
    MinBy(MaxBy)转换从元组的数据集中选择一个元组。
    选定的元组是其一个或多个指定字段的值最小(最大)的元组。
    用于比较的字段必须是有效的关键字段,即可比较的字段。
    如果多个元组具有最小(最大)字段值,则返回这些元组中的任意元组。

  • Distinct(去重)
    Distinct 转换计算源数据集的不同元素的数据集。

  • Join

  • OuterJoin 合并两个数据流 join with

    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple2<Integer, String>> stringDataSource1 =
                env.fromElements(
                        new Tuple2(1, "小王"),
                        new Tuple2(2, "小里"),
                        new Tuple2(3, "小张"),
                        new Tuple2(4, "小四")
                );


        DataSource<Tuple2<Integer, String>> stringDataSource2 =
                env.fromElements(
                        new Tuple2(1, "北京"),
                        new Tuple2(2, "上海"),
                        new Tuple2(3, "成都"),
                        new Tuple2(5, "重庆")
                );


        stringDataSource1
                //.leftOuterJoin(stringDataSource2)
                //.rightOuterJoin(stringDataSource2)
                //.fullOuterJoin(stringDataSource2)
                .join(stringDataSource2)
                // 第一个数据 中 第一个字段
                .where(0)
                // 第二个数据 中 第一个字段
                .equalTo(0)
                .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
                    @Override
                    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
                        if (first == null) {
                            return new Tuple3<>(second.f0, "-", second.f1);
                        } else if (second == null) {
                            return new Tuple3<>(first.f0, first.f1, "-");
                        } else {
                            return new Tuple3<>(first.f0, first.f1, second.f1);
                        }
                    }
                })
                .print();
    }

  • Cross
    Cross 转换将两个数据集组合为一个数据集。它构建两个输入数据集的元素的所有成对组合,即构建笛卡尔积。
    Cross 转换要么在每对元素上调用用户定义的交叉函数,要么输出 Tuple2
    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple1<Integer>> stringDataSource1 =
                env.fromElements(
                        new Tuple1(1),
                        new Tuple1(2)
                );

        DataSource<Tuple1<String>> stringDataSource2 =
                env.fromElements(
                        new Tuple1("北京"),
                        new Tuple1("上海"),
                        new Tuple1("重庆")
                );

        stringDataSource1
                .cross(stringDataSource2)
                .with(new CrossFunction<Tuple1<Integer>, Tuple1<String>, Tuple2<Integer, String>>() {
                    @Override
                    public Tuple2<Integer, String> cross(Tuple1<Integer> val1, Tuple1<String> val2) throws Exception {
                        return new Tuple2(val1.f0, val2.f0);
                    }
                }).print();
    }

(1,北京)
(1,上海)
(1,重庆)
(2,北京)
(2,上海)
(2,重庆)

  • CoGroup 与join不同的是 join是一对一有多少相同的数据就产出多少条。 cogroup是多对多 会按照key分组
    CoGroup 转换共同处理两个数据集的组。两个数据集都分组在一个定义的键上,并且两个共享相同键的数据集的组一起交给用户定义的 co-group function。
    如果对于一个特定的键,只有一个 DataSet 有一个组,则使用该组和一个空组调用共同组功能。协同功能可以分别迭代两个组的元素并返回任意数量的结果元素。

与 Reduce,GroupReduce 和 Join 相似,可以使用不同的键选择方法来定义键。

    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple2<Integer, String>> stringDataSource1 =
                env.fromElements(
                        new Tuple2(1, "小王"),
                        new Tuple2(1, "小周"),
                        new Tuple2(2, "小里"),
                        new Tuple2(3, "小张"),
                        new Tuple2(4, "小四")
                );


        DataSource<Tuple2<Integer, String>> stringDataSource2 =
                env.fromElements(
                        new Tuple2(1, "北京"),
                        new Tuple2(1, "邯郸"),
                        new Tuple2(2, "上海"),
                        new Tuple2(3, "成都"),
                        new Tuple2(5, "重庆")
                );


        stringDataSource1
                .coGroup(stringDataSource2)
                // 第一个数据 中 第一个字段
                .where(0)
                // 第二个数据 中 第一个字段
                .equalTo(0)
                .with(new CoGroupFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                    @Override
                    public void coGroup(Iterable<Tuple2<Integer, String>> first, Iterable<Tuple2<Integer, String>> second, Collector<Tuple2<Integer, String>> out) throws Exception {
                        Tuple2<Integer, String> t1 = StreamSupport.stream(first.spliterator(), false).reduce((o1, o2) -> {
                            return new Tuple2<>(o1.f0, o1.f1 + " + " + o2.f1);
                        }).orElse(null);
                        Tuple2<Integer, String> t2 = StreamSupport.stream(second.spliterator(), false).reduce((o1, o2) -> {
                            return new Tuple2<>(o1.f0, o1.f1 + " + " + o2.f1);
                        }).orElse(null);
                        if (t1 != null && t2 != null) {
                            out.collect(new Tuple2<>(t1.f0, t1.f1 + " + " + t2.f1));
                        }
                    }
                })
                .print();
    }
(3,小张 + 成都)
(1,小王 + 小周 + 北京 + 邯郸)
(2,小里 + 上海)

  • Union 合并两个以上相同类型的数据集
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple2<Integer, String>> stringDataSource1 =
                env.fromElements(
                        new Tuple2(1, "小王"),
                        new Tuple2(1, "小周"),
                        new Tuple2(2, "小里"),
                        new Tuple2(3, "小张"),
                        new Tuple2(4, "小四")
                );


        DataSource<Tuple2<Integer, String>> stringDataSource2 =
                env.fromElements(
                        new Tuple2(1, "北京"),
                        new Tuple2(2, "上海"),
                        new Tuple2(3, "成都"),
                        new Tuple2(5, "重庆")
                );

        stringDataSource1.union(stringDataSource2)
                .print();
    }
(1,小王)
(4,小四)
(1,北京)
(1,小周)
(2,上海)
(2,小里)
(3,小张)
(3,小张)
(5,重庆)
  • Rebalance(重新平衡)
    均匀地重新平衡数据集的并行分区,以消除数据偏斜。
    重要:此操作会通过网络重新整理整个 DataSet。可能会花费大量时间。

  • Hash-Partition (哈希分区) 详见mapPartition
    重要:此操作会通过网络重新整理整个 DataSet。可能会花费大量时间。

  • Range-Partition(范围分区)
    重要:此操作需要在 DataSet 上额外传递一次以计算范围,通过网络对整个 DataSet 进行边界划分和改组。这会花费大量时间。

  • Sort Partition(分区排序)
    以指定顺序对指定字段上的 DataSet 的所有分区进行本地排序。可以将字段指定为字段表达式或字段位置。
    可以通过链接 sortPartition() 调用在多个字段上对分区进行排序。

  • First-n(前 n 个(任意)元素)
    返回数据集的前 n 个(任意)元素。First-n 可以应用于常规数据集,分组的数据集或分组排序的数据集。可以将分组键指定为键选择器功能或字段位置键。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351