转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑.
map
作用
将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素参数
lambda表达式或MapFunction
实现类。返回
得到一个新的数据流: 新的流的元素是原来流的元素的平方
。示例
打印输入字符串的原始字符串及长度
如:hello -> (hello,5)程序
@Test
public void map() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> returns = source.map((MapFunction<String, Tuple2<String, Integer>>)
value -> Tuple2.of(value, value.length()))
.returns(Types.TUPLE(Types.STRING, Types.INT));
returns.print("map>>>");
env.execute();
}
- 输入
[admin@hadoop102 flink_kafka-0]$ nc -lk 9999
hello
- 输出
map>>>:1> (hello,5)
flatMap
作用
消费一个元素并产生零个或多个元素参数
FlatMapFunction实现类返回
DataStream → DataStream示例
将文件中每行内容按照空格
或者\t
进行分割,返回一个个单词。程序
@Test
public void flatMap() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> dataSource = env.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");
FlatMapOperator<String, String> returns = dataSource
.flatMap((FlatMapFunction<String, String>) (value, out)
-> Arrays.stream(value.split(" "))
.forEach(out::collect))
.returns(Types.STRING);
returns.print();
}
- wordcount.txt 内容
java python hello
pon xml log batch
python log java word
count xml python hello
exe txt log xml pon java
- 输出
java
python
hello
python
log
java
word
exe
txt
log
xml
pon
java
count
xml
python
hello
pon
xml
log
batch
filter
作用
根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃参数
FlatMapFunction实现类返回
DataStream → DataStream示例
依然读取wordcount.txt
文件,过滤文件中为log
的数据程序
@Test
public void flatMap() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> dataSource = env.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");
// 扁平化
FlatMapOperator<String, String> flatMap = dataSource
.flatMap((FlatMapFunction<String, String>) (value, out)
-> Arrays.stream(value.split(" "))
.forEach(out::collect))
.returns(Types.STRING);
// 过滤
FilterOperator<String> filter = flatMap.filter(s -> !"log".equals(s));
filter.print();
}
- 输出
pon
xml
batch
java
python
hello
count
xml
python
hello
python
java
word
exe
txt
xml
pon
java
keyBy
作用
把流中的数据分到不同的分区(并行度)中.具有相同key的元素
会分到同一个分区中.一个分区中可以有多重不同的key.
在内部是使用的是key的hash分区
来实现的.参数
Key选择器函数: interface KeySelector<IN, KEY>
注意: 什么值不可以
作为KeySelector的Key:
- 没有覆写hashCode方法的POJO, 而是依赖Object的hashCode. 因为这样分组没有任何的意义: 每个元素都会得到一个独立无二的组. 实际情况是:可以运行, 但是分的组没有意义.
- 任何类型的
数组
返回
DataStream → KeyedStream示例
使用nc
模拟客户端,向程序输入一行字符串,按照空格切分,统计单词出现个数。程序
@Test
public void group() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 监听 9999 端口
DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
// 扁平化
SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out)
-> Arrays.stream(value.split(" "))
.forEach(s -> out.collect(Tuple2.of(s, 1))))
.returns(Types.TUPLE(Types.STRING, Types.INT));
// 分组
KeyedStream<Tuple2<String, Integer>, Object> keyBy = flatMap.keyBy((KeySelector<Tuple2<String, Integer>, Object>) value -> value.f0);
// 统计
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyBy.sum(1);
sum.print();
env.execute();
}
- 输入
[admin@hadoop102 flink_kafka-0]$ nc -lk 9999
java python java java python hello
java python java hello
- 输出
3> (java,1)
5> (python,1)
3> (java,2)
5> (python,2)
3> (java,3)
5> (hello,1)
5> (python,3)
3> (java,4)
5> (hello,2)
3> (java,5)
shuffle
- 作用
把流中的元素随机
打乱. 对同一个组数据, 每次执行得到的结果都不同. - 参数
无 - 返回
DataStream → DataStream - 示例
监听9999
端口,打印数据,并将数据重新分配其他分区。 - 程序
@Test
public void shuffle() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 监听 9999 端口
DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
source.print("print>>>");
source.shuffle().print("shuffle>>>");
env.execute();
}
- 输入
[admin@hadoop102 flink_kafka-0]$ nc -lk 9999
java
python
scala
spark
- 输出
shuffle>>>:11> java
print>>>:5> java // 从11 分配到 5
print>>>:6> python
shuffle>>>:6> python // 从6 分配到 6
print>>>:7> scala
shuffle>>>:5> scala // 从7 分配到 5
print>>>:8> spark
shuffle>>>:4> spark // 从8 分配到 4
- shuffle分区策略
@Internal
public class ShufflePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
private Random random = new Random();
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return random.nextInt(numberOfChannels);
}
}
numberOfChannels
:当前环境的最大并发度,若不设置默认并发度为cpu核数。
若并发度为10,那么每次分配策略则是1~10之间的数。
- 作用
若原始数据分配不均衡,可能造成数据倾斜,使用shuffle
能够一定程度上减少数据倾斜问题。
connect
作用
在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。
Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。参数
另外一个流返回
DataStream[A], DataStream[B] -> ConnectedStreams[A,B]注意
- 两个流中存储的数据类型
可以不同
- 只是机械的合并在一起, 内部仍然是分离的2个流
- 只能2个流进行connect,
不能
有第3个参与
把两个流连接在一起: 貌合神离,输入类型可以不同,但是返回的类型必须是一致。
- 程序
@Test
public void connect() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 字符串
DataStreamSource<String> strSource = env.fromElements("a", "b", "c", "d");
// 数字
DataStreamSource<Integer> numSource = env.fromElements(1,2,3,4,5,6,7,8,9);
// 连接
ConnectedStreams<String, Integer> connect = strSource.connect(numSource);
/**
* 两个
*/
SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<String, Integer, String>() {
@Override
public String map1(String value) {
return value;
}
@Override
public String map2(Integer value) {
return value.toString();
}
});
result.print();
env.execute();
}
- 输出
11> a
10> 4
14> d
13> c
12> b
8> 2
15> 9
9> 3
12> 6
13> 7
14> 8
11> 5
7> 1
- CoMapFunction
connect 也有自己所对应的算子,都会一
Co
开头,是connect的缩写。
每个Stream
都有自己独立处理逻辑(map1和map2),这也说明了为啥只能2个流进行connect, 不能有第3个参与的原因。
@Public
public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
/**
* This method is called for each element in the first of the connected streams.
*
* @param value The stream element
* @return The resulting element
* @throws Exception The function may throw exceptions which cause the streaming program to fail
* and go into recovery.
*/
OUT map1(IN1 value) throws Exception;
/**
* This method is called for each element in the second of the connected streams.
*
* @param value The stream element
* @return The resulting element
* @throws Exception The function may throw exceptions which cause the streaming program to fail
* and go into recovery.
*/
OUT map2(IN2 value) throws Exception;
}
虽然输入参数不同,但是返回类型(OUT
)是一致的。
CoMapFunction<IN1, IN2, OUT>
union
作用
对两个或者两个以上
的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream
参数
另外一个流返回
DataStream<T>注意
- 参数类型必须一致
- 可以连续处理多个流
- 程序
@Test
public void union() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 数字
DataStreamSource<Integer> numSource1 = env.fromElements(1,2,3,4,5,6,7,8,9);
DataStreamSource<Integer> numSource2 = env.fromElements(111,22,33,4,4,5,22);
DataStreamSource<Integer> numSource3 = env.fromElements(2,313,43,14,1);
DataStreamSource<Integer> numSource4 = env.fromElements(4,14,314,31,4);
// 合并
DataStream<Integer> union = numSource1.union(numSource1).union(numSource2).union(numSource3).union(numSource4);
union.print();
env.execute();
}
- 输出
2> 5
13> 111
2> 3
15> 33
5> 4
12> 14
15> 4
1> 4
9> 14
3> 3
12> 8
8> 43
11> 4
7> 313
9> 6
6> 5
13> 314
10> 1
16> 4
14> 22
10> 7
4> 4
14> 31
6> 2
13> 8
7> 5
11> 7
3> 22
8> 6
15> 1
15> 9
1> 2
14> 1
16> 2
14> 9
- connect与 union 区别:
- union之前两个或多个流的类型必须是一样,connect可以不一样
- connect只能操作两个流,union可以操作多个。
简单滚动聚合算子
常见的滚动聚合算子
sum, min,max
minBy,maxBy作用
KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream参数
如果流中存储的是POJO或者scala的样例类, 参数使用字段名. 如果流中存储的是元组, 参数就是位置(基于0...)返回
KeyedStream -> SingleOutputStreamOperator程序
@Test
public void sum3() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
SingleOutputStreamOperator<UserBean> map = source.map((MapFunction<String, UserBean>) value
-> new UserBean(value.split(",")))
.returns(Types.POJO(UserBean.class));
KeyedStream<UserBean, Integer> keyBy = map.keyBy(e -> e.getId());
SingleOutputStreamOperator<UserBean> sum = keyBy.sum("age");
sum.print();
env.execute();
}
- 执行情况
1:输入
8,zhangsan111,19,F
1:输出
2> UserBean(id=8, name=zhangsan111, age=19, sex=F)
2:输入
8,zhangsan111,1,F
2:输出
2> UserBean(id=8, name=zhangsan111, age=20, sex=F)
3:输入,修改性别M
8,zhangsan111,19,M
3:输出,除了年龄,其他都没有改变
2> UserBean(id=8, name=zhangsan111, age=39, sex=F)
4:输入,更改id
7,zhangfeilong,18,M
4:输出
15> UserBean(id=7, name=zhangfeilong, age=18, sex=M)
5:输入,更改id
7,zhangfeilong,10,F
5:输出,id为7,更改性别
15> UserBean(id=7, name=zhangfeilong, age=28, sex=M)
6:输入,id为8
8,wangfff,10,F
6:输出(统计上次id为8的数据结果)
2> UserBean(id=8, name=zhangsan111, age=49, sex=F)
总体输出情况
2> UserBean(id=8, name=zhangsan111, age=19, sex=F)
2> UserBean(id=8, name=zhangsan111, age=20, sex=F)
2> UserBean(id=8, name=zhangsan111, age=39, sex=F)
15> UserBean(id=7, name=zhangfeilong, age=18, sex=M)
15> UserBean(id=7, name=zhangfeilong, age=28, sex=M)
- 总结:
- 组与组之间相互隔离,状态不会被影响。
- 所有的数据除了聚合字段(
keyBy.sum("age")
),一旦状态确认将不会改变(第一次是什么之后的就是什么,即使发送修改,但是状态不会改变)。 - min、max 和 sum 同样如此,也只会更改聚合字段的状态。
- minBy,maxBy
min 和max 的作用就是去状态的最大值或最小值,使用方式和
sum
的方式一样,作用也一样只会更改聚合字段
的状态。若要更改除聚合字段以外的字段状态,那么需要加上By
,但是sum
并没有SumBy
。
以minBy
案例演示
@Test
public void sum3() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
SingleOutputStreamOperator<UserBean> map = source.map((MapFunction<String, UserBean>) value
-> new UserBean(value.split(",")))
.returns(Types.POJO(UserBean.class));
KeyedStream<UserBean, Integer> keyBy = map.keyBy(e -> e.getId());
SingleOutputStreamOperator<UserBean> sum = keyBy.minBy("age");
sum.print();
env.execute();
}
1: 输入
10,lifeng,10,F
1: 输出
10,lifeng,10,F
2: 输入
10,lifeng,10,M
2: 输出,不比上一个状态小,所以不更改状态
10,lifeng,10,F
3: 输入
10,李菲菲,8,F
1: 输出,比上一个状态小,更改
9> UserBean(id=10, name=李菲菲, age=8, sex=F)
- 整个输出
9> UserBean(id=10, name=lifeng, age=10, sex=F)
9> UserBean(id=10, name=lifeng, age=10, sex=F)
9> UserBean(id=10, name=李菲菲, age=8, sex=F)
不分组的聚合
- 方式一:map
@Test
public void sum1() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 34, 431, 3, 234, 311);
SingleOutputStreamOperator<Tuple2<String, Integer>> a = source.map(
(MapFunction<Integer, Tuple2<String, Integer>>) value
-> Tuple2.of("a", value))
.returns(Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2<String, Integer>, String> keyBy = a.keyBy(
(KeySelector<Tuple2<String, Integer>, String>) value -> value.f0);
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyBy.sum(1);
sum.print();
env.execute();
}
- 方式二:process
@Test
public void sum2() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8,9);
SingleOutputStreamOperator<Integer> process = source.process(new ProcessFunction<Integer, Integer>() {
int count = 0;
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
count += value;
out.collect(count);
}
}).setParallelism(1);
process.print().setParallelism(1);
env.execute();
}
reduce
作用
一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
为什么还要把中间值也保存下来? 考虑流式数据的特点: 没有终点, 也就没有最终的概念了. 任何一个中间的聚合结果都是值!参数
interface ReduceFunction<T>返回
KeyedStream -> SingleOutputStreamOperator程序
计算:DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8,9);
的和
@Test
public void reduce() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8,9);
SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map((MapFunction<Integer, Tuple2<String, Integer>>)
value -> Tuple2.of("a", value)).returns(Types.TUPLE(Types.STRING,Types.INT));
SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = map.keyBy(s -> s.f0).reduce((ReduceFunction<Tuple2<String, Integer>>) (v1, v2) -> Tuple2.of("a", v1.f1 + v2.f1));
reduce.print().setParallelism(1);
env.execute();
}
- 结果
(a,5)
(a,12)
(a,21)
(a,24)
(a,32)
(a,38)
(a,40)
(a,44)
(a,45)