Flink(1.13) Transform

image.png

转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑.

map

  • 作用
    将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素

  • 参数
    lambda表达式或MapFunction实现类。

  • 返回
    得到一个新的数据流: 新的流的元素是原来流的元素的平方

  • 示例
    打印输入字符串的原始字符串及长度
    如:hello -> (hello,5)

  • 程序

    @Test
    public void map() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);


        SingleOutputStreamOperator<Tuple2<String, Integer>> returns = source.map((MapFunction<String, Tuple2<String, Integer>>)
                value -> Tuple2.of(value, value.length()))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        returns.print("map>>>");

        env.execute();

    }
  • 输入
[admin@hadoop102 flink_kafka-0]$ nc -lk 9999
hello
  • 输出
map>>>:1> (hello,5)

flatMap

  • 作用
    消费一个元素并产生零个或多个元素

  • 参数
    FlatMapFunction实现类

  • 返回
    DataStream → DataStream

  • 示例
    将文件中每行内容按照空格或者\t进行分割,返回一个个单词。

  • 程序

    @Test
    public void flatMap() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> dataSource = env.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");


        FlatMapOperator<String, String> returns = dataSource
                .flatMap((FlatMapFunction<String, String>) (value, out)
                -> Arrays.stream(value.split(" "))
                .forEach(out::collect))
                .returns(Types.STRING);

        returns.print();

    }
  • wordcount.txt 内容
java python hello
pon xml log batch
python log java word
count xml python hello
exe txt log xml pon java
  • 输出
java
python
hello
python
log
java
word
exe
txt
log
xml
pon
java
count
xml
python
hello
pon
xml
log
batch

filter

  • 作用
    根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃

  • 参数
    FlatMapFunction实现类

  • 返回
    DataStream → DataStream

  • 示例
    依然读取wordcount.txt文件,过滤文件中为log的数据

  • 程序

    @Test
    public void flatMap() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> dataSource = env.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");


        // 扁平化
        FlatMapOperator<String, String> flatMap = dataSource
                .flatMap((FlatMapFunction<String, String>) (value, out)
                -> Arrays.stream(value.split(" "))
                .forEach(out::collect))
                .returns(Types.STRING);

        // 过滤
        FilterOperator<String> filter = flatMap.filter(s -> !"log".equals(s));


        filter.print();

    }
  • 输出
pon
xml
batch
java
python
hello
count
xml
python
hello
python
java
word
exe
txt
xml
pon
java

keyBy

  • 作用
    把流中的数据分到不同的分区(并行度)中.具有相同key的元素会分到同一个分区中.一个分区中可以有多重不同的key.
    在内部是使用的是key的hash分区来实现的.

  • 参数
    Key选择器函数: interface KeySelector<IN, KEY>
    注意: 什么值不可以作为KeySelector的Key:

  1. 没有覆写hashCode方法的POJO, 而是依赖Object的hashCode. 因为这样分组没有任何的意义: 每个元素都会得到一个独立无二的组. 实际情况是:可以运行, 但是分的组没有意义.
  2. 任何类型的数组
  • 返回
    DataStream → KeyedStream

  • 示例
    使用nc模拟客户端,向程序输入一行字符串,按照空格切分,统计单词出现个数。

  • 程序

    @Test
    public void group() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        // 监听 9999 端口
        DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);


        // 扁平化
        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out)
                -> Arrays.stream(value.split(" "))
                .forEach(s -> out.collect(Tuple2.of(s, 1))))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        // 分组
        KeyedStream<Tuple2<String, Integer>, Object> keyBy = flatMap.keyBy((KeySelector<Tuple2<String, Integer>, Object>) value -> value.f0);


        // 统计
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyBy.sum(1);


        sum.print();

        env.execute();

    }

  • 输入
[admin@hadoop102 flink_kafka-0]$ nc -lk 9999
java python java java python hello
java python java hello
  • 输出
3> (java,1)
5> (python,1)
3> (java,2)
5> (python,2)
3> (java,3)
5> (hello,1)
5> (python,3)
3> (java,4)
5> (hello,2)
3> (java,5)

shuffle

  • 作用
    把流中的元素随机打乱. 对同一个组数据, 每次执行得到的结果都不同.
  • 参数
  • 返回
    DataStream → DataStream
  • 示例
    监听9999端口,打印数据,并将数据重新分配其他分区。
  • 程序
  @Test
    public void shuffle() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 监听 9999 端口
        DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);

        source.print("print>>>");

        source.shuffle().print("shuffle>>>");

        env.execute();
    }
  • 输入
[admin@hadoop102 flink_kafka-0]$ nc -lk 9999
java
python
scala
spark
  • 输出
shuffle>>>:11> java
print>>>:5> java // 从11 分配到 5
print>>>:6> python 
shuffle>>>:6> python   // 从6 分配到 6
print>>>:7> scala
shuffle>>>:5> scala  // 从7 分配到 5
print>>>:8> spark
shuffle>>>:4> spark  // 从8 分配到 4
  • shuffle分区策略
@Internal
public class ShufflePartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    private Random random = new Random();

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return random.nextInt(numberOfChannels);
    }
}

numberOfChannels:当前环境的最大并发度,若不设置默认并发度为cpu核数。
若并发度为10,那么每次分配策略则是1~10之间的数。

  • 作用
    若原始数据分配不均衡,可能造成数据倾斜,使用shuffle能够一定程度上减少数据倾斜问题。

connect

  • 作用
    在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。
    Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

  • 参数
    另外一个流

  • 返回
    DataStream[A], DataStream[B] -> ConnectedStreams[A,B]

  • 注意

  1. 两个流中存储的数据类型可以不同
  2. 只是机械的合并在一起, 内部仍然是分离的2个流
  3. 只能2个流进行connect, 不能有第3个参与

把两个流连接在一起: 貌合神离,输入类型可以不同,但是返回的类型必须是一致。

  • 程序
    @Test
    public void connect() throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 字符串
        DataStreamSource<String> strSource = env.fromElements("a", "b", "c", "d");

        // 数字
        DataStreamSource<Integer> numSource = env.fromElements(1,2,3,4,5,6,7,8,9);

        // 连接
        ConnectedStreams<String, Integer> connect = strSource.connect(numSource);


        /**
         * 两个
         */
        SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<String, Integer, String>() {
            @Override
            public String map1(String value) {
                return value;
            }

            @Override
            public String map2(Integer value) {
                return value.toString();
            }
        });


        result.print();

        env.execute();

    }
  • 输出
11> a
10> 4
14> d
13> c
12> b
8> 2
15> 9
9> 3
12> 6
13> 7
14> 8
11> 5
7> 1
  • CoMapFunction

connect 也有自己所对应的算子,都会一Co开头,是connect的缩写。

每个Stream都有自己独立处理逻辑(map1和map2),这也说明了为啥只能2个流进行connect, 不能有第3个参与的原因。

@Public
public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {

    /**
     * This method is called for each element in the first of the connected streams.
     *
     * @param value The stream element
     * @return The resulting element
     * @throws Exception The function may throw exceptions which cause the streaming program to fail
     *     and go into recovery.
     */
    OUT map1(IN1 value) throws Exception;

    /**
     * This method is called for each element in the second of the connected streams.
     *
     * @param value The stream element
     * @return The resulting element
     * @throws Exception The function may throw exceptions which cause the streaming program to fail
     *     and go into recovery.
     */
    OUT map2(IN2 value) throws Exception;
}

虽然输入参数不同,但是返回类型(OUT)是一致的。

 CoMapFunction<IN1, IN2, OUT> 

union

  • 作用
    两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream

  • 参数
    另外一个流

  • 返回
    DataStream<T>

  • 注意

  1. 参数类型必须一致
  2. 可以连续处理多个流
  • 程序
@Test
    public void union() throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 数字
        DataStreamSource<Integer> numSource1 = env.fromElements(1,2,3,4,5,6,7,8,9);
        DataStreamSource<Integer> numSource2 = env.fromElements(111,22,33,4,4,5,22);
        DataStreamSource<Integer> numSource3 = env.fromElements(2,313,43,14,1);
        DataStreamSource<Integer> numSource4 = env.fromElements(4,14,314,31,4);

        // 合并
        DataStream<Integer> union = numSource1.union(numSource1).union(numSource2).union(numSource3).union(numSource4);

        union.print();

        env.execute();
    }
  • 输出
2> 5
13> 111
2> 3
15> 33
5> 4
12> 14
15> 4
1> 4
9> 14
3> 3
12> 8
8> 43
11> 4
7> 313
9> 6
6> 5
13> 314
10> 1
16> 4
14> 22
10> 7
4> 4
14> 31
6> 2
13> 8
7> 5
11> 7
3> 22
8> 6
15> 1
15> 9
1> 2
14> 1
16> 2
14> 9
  • connect与 union 区别:
  1. union之前两个或多个流的类型必须是一样,connect可以不一样
  2. connect只能操作两个流,union可以操作多个。

简单滚动聚合算子

  • 常见的滚动聚合算子
    sum, min,max
    minBy,maxBy

  • 作用
    KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream

  • 参数
    如果流中存储的是POJO或者scala的样例类, 参数使用字段名. 如果流中存储的是元组, 参数就是位置(基于0...)

  • 返回
    KeyedStream -> SingleOutputStreamOperator

  • 程序

    @Test
    public void sum3() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);

        SingleOutputStreamOperator<UserBean> map = source.map((MapFunction<String, UserBean>) value
                -> new UserBean(value.split(",")))
                .returns(Types.POJO(UserBean.class));

        KeyedStream<UserBean, Integer> keyBy = map.keyBy(e -> e.getId());

        SingleOutputStreamOperator<UserBean> sum = keyBy.sum("age");

        sum.print();

        env.execute();
    }
  • 执行情况

1:输入

8,zhangsan111,19,F

1:输出

2> UserBean(id=8, name=zhangsan111, age=19, sex=F)

2:输入

8,zhangsan111,1,F

2:输出

2> UserBean(id=8, name=zhangsan111, age=20, sex=F)

3:输入,修改性别M

8,zhangsan111,19,M

3:输出,除了年龄,其他都没有改变

2> UserBean(id=8, name=zhangsan111, age=39, sex=F)

4:输入,更改id

7,zhangfeilong,18,M

4:输出

15> UserBean(id=7, name=zhangfeilong, age=18, sex=M)

5:输入,更改id

7,zhangfeilong,10,F

5:输出,id为7,更改性别

15> UserBean(id=7, name=zhangfeilong, age=28, sex=M)

6:输入,id为8

8,wangfff,10,F

6:输出(统计上次id为8的数据结果)

2> UserBean(id=8, name=zhangsan111, age=49, sex=F)

总体输出情况

2> UserBean(id=8, name=zhangsan111, age=19, sex=F)
2> UserBean(id=8, name=zhangsan111, age=20, sex=F)
2> UserBean(id=8, name=zhangsan111, age=39, sex=F)
15> UserBean(id=7, name=zhangfeilong, age=18, sex=M)
15> UserBean(id=7, name=zhangfeilong, age=28, sex=M)
  • 总结:
  1. 组与组之间相互隔离,状态不会被影响。
  2. 所有的数据除了聚合字段(keyBy.sum("age")),一旦状态确认将不会改变(第一次是什么之后的就是什么,即使发送修改,但是状态不会改变)。
  3. min、max 和 sum 同样如此,也只会更改聚合字段的状态。
  • minBy,maxBy

min 和max 的作用就是去状态的最大值或最小值,使用方式和sum的方式一样,作用也一样只会更改聚合字段的状态。若要更改除聚合字段以外的字段状态,那么需要加上By,但是sum并没有SumBy

minBy案例演示

  @Test
    public void sum3() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);

        SingleOutputStreamOperator<UserBean> map = source.map((MapFunction<String, UserBean>) value
                -> new UserBean(value.split(",")))
                .returns(Types.POJO(UserBean.class));

        KeyedStream<UserBean, Integer> keyBy = map.keyBy(e -> e.getId());

        SingleOutputStreamOperator<UserBean> sum = keyBy.minBy("age");

        sum.print();

        env.execute();
    }

1: 输入

10,lifeng,10,F

1: 输出

10,lifeng,10,F

2: 输入

10,lifeng,10,M

2: 输出,不比上一个状态小,所以不更改状态

10,lifeng,10,F

3: 输入

10,李菲菲,8,F

1: 输出,比上一个状态小,更改

9> UserBean(id=10, name=李菲菲, age=8, sex=F)
  • 整个输出
9> UserBean(id=10, name=lifeng, age=10, sex=F)
9> UserBean(id=10, name=lifeng, age=10, sex=F)
9> UserBean(id=10, name=李菲菲, age=8, sex=F)

不分组的聚合

  • 方式一:map
   @Test
    public void sum1() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 34, 431, 3, 234, 311);

        SingleOutputStreamOperator<Tuple2<String, Integer>> a = source.map(
                (MapFunction<Integer, Tuple2<String, Integer>>) value
                        -> Tuple2.of("a", value))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        KeyedStream<Tuple2<String, Integer>, String> keyBy = a.keyBy(
                (KeySelector<Tuple2<String, Integer>, String>) value -> value.f0);

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyBy.sum(1);

        sum.print();
        env.execute();
    }
  • 方式二:process
    @Test
    public void sum2() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8,9);

        SingleOutputStreamOperator<Integer> process = source.process(new ProcessFunction<Integer, Integer>() {
            int count = 0;

            @Override
            public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
                count += value;
                out.collect(count);
            }
        }).setParallelism(1);

        process.print().setParallelism(1);

        env.execute();
    }

reduce

  • 作用
    一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
    为什么还要把中间值也保存下来? 考虑流式数据的特点: 没有终点, 也就没有最终的概念了. 任何一个中间的聚合结果都是值!

  • 参数
    interface ReduceFunction<T>

  • 返回
    KeyedStream -> SingleOutputStreamOperator

  • 程序
    计算:DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8,9);的和

    @Test
    public void reduce() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8,9);

        SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map((MapFunction<Integer, Tuple2<String, Integer>>)
                value -> Tuple2.of("a", value)).returns(Types.TUPLE(Types.STRING,Types.INT));

        SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = map.keyBy(s -> s.f0).reduce((ReduceFunction<Tuple2<String, Integer>>) (v1, v2) -> Tuple2.of("a", v1.f1 + v2.f1));

        reduce.print().setParallelism(1);

        env.execute();

    }
  • 结果
(a,5)
(a,12)
(a,21)
(a,24)
(a,32)
(a,38)
(a,40)
(a,44)
(a,45)
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容