以 flink 源码中的一个example 做为例子, 这里是我的一个copy修改过 WordCountStreaming
下面是节略后的代码
public class WordCountStreaming {
public static void main(String[] args) throws Exception{
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setGlobalJobParameters(params);
DataStream<String> text = env.fromElements(WordCountData.WORDS);
DataStream<Tuple2<String, Integer>> counts =
text.flatMap(new Tokenizer())
.keyBy(0)
// .timeWindow(Time.seconds(2))
.sum(1);
counts.print();
env.execute("WordCountStreaming");
}
}
困惑1
如下所示,"你看这个【the】呀, 重复输出了呀!"
为什么输出结果中会重复输出每个单词, 而不是一个单词对应一个最终结果?
...
(the,9)
(oppressor,1)
...
(the,10)
(proud,1)
...
(the,11)
(pangs,1)
...
(the,12)
(law,1)
...
(the,13)
(insolence,1)
...
(the,14)
(spurns,1)
...
原因解析
这是因为我们没有划分窗口,这样Flink在处理数据时,是基于每个数据项(分割出来的单词)进行一次计算并输出结果。 如下图所示, 第一个 数据a 进来,会统计当前有1个a 并输出 {count:1, word:a}, 后面再陆续有其它的a进来, 都会加到之前的sum结果上, 并且会输出当前这个数据项a 加完后的结果 {count: n, word:a }, 在输出结果上看感觉是a重复输出了。 而有些新道友可能期待的是只需要最后结果: a 总共有多少个?
原因嘛还是对流的概念没有理解到位。 流是无界的,永远没有最后一个数据项。 也就不知道 哪个a会是最后一个a啦。 我们就可以简单的把每一个数据项都当做为最后一个数据进行计算输出, 就有了我们看到的输出结果: 每处理到一个数据项, 就会有一次输出。
而我们是例子因为是一个文件,是有界的!可能会让人在潜意识里用批处理的思维来理解了。 如果用的是一个socket 连接, 可能更好理解些吧。
可参考下图理解一下:
如果我们想分段进行统计并得到一次聚合的结果怎么办呢?
这时就需要窗口的概念了。如下图所示
困惑2
我直接加上了 timeWindow来划分发 2s 的窗口,但是为毛鸟都不输出了。
一去掉就又变成上面那样样子了~
.keyBy(0)
.timeWindow(Time.seconds(2))
原因解析
首先我们要知道 Flink 中是怎么定义时间的。Flink 定义了三种时间
- ProcessingTime : 执行操作时对应的机器系统时间
- EventTime : 产生数据项的事件发生时间。
- IngestTime : flink 接收到数据的时间
可以参考官网的图来说明
Flink 窗口默认是基于 ProcessingTime的。 我们上面代码划分的窗口就是一个基于ProcessingTime 的窗口。 ProcessingTime 只与当前的系统时间有关, 窗口划分完毕后, 只有系统时间到了窗口结束时间后, 才会触发对应窗口的计算逻辑。
而我们有示例代码中, 源数据是有限的,在计算机看来太少了, 几个毫秒也就发送完了。 源数据发送完后,Flink 会发送一个 超级无敌大的 watemark,试图触发所有还没有触发的窗口。 但 watermark 只对 EventTime 才有意义。 基于ProcessingTime 的窗口忽视所有的 watermark,心心念念地等待对应的系统时间的到来。 Flink 发达完最后一个watermark 后,若没有触发到哪个窗口的计算,会认为整个 job 也应该结束了, 没有触发的 windown 因为触发条件不满足而丢弃不执行。 由于我们的源数据很快就发完,还没有到达第一个时间窗口的结束时间(没撑到2s),所以就没有触发对应的窗口计算逻辑且没有任何输出,整个job 就结束了。 可参考下图。
怎么让这个示例代码有输出呢 ?
- 使用 EventTime。 source 数据源发送完时, Flink 会发送一个超级无敌大的watermark 来触发最后一个 EventTime-base 的窗口的计算处理。 这样即使数据时间不能打满一个窗口,也是会触发到计算输出的。
- 控制一下 source 发送数据节奏, 让它打满几个时间窗口。比如发送一个单词出来就 sleep 个0.5秒。可能存在最后一个窗口没有触发,数据会丢弃掉的情况。对于学习试验的道友来说, 至少能看到输出。 要解决数据丢失的问题就需要其它的方式, 这里不探讨了。