目录
1、Flink使用WaterMark处理乱序事件
2、累加器和计数器
3、Window使用
4、流的切分和合并
5、任务链
6、Flink消费kafka数据起始offset配置
7、Flink消费kafka数据,消费者offset提交配置
8、数据源
9、数据存放
10、运行时环境的区别
11、keyedStream中进行聚合操作
一.Flink使用WaterMark处理乱序事件[1]
watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。
流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)
但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。
关于水位线机制这篇博客讲的比较通俗易懂,Flink流计算编程--watermark(水位线)简介
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//将TimeCharactersistic设置为EventTime。
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = env.socketTextStream("localhost", 9999)
// 指定watemark的实现
text.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator )
自定义的实现AssignerWithPeriodicWatermarks接口的类
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
//最大允许的乱序时间是10s,告诉Flink希望消息最多有10s的延迟,每个窗口仅在waterMark通过时被处理
val maxOutOfOrderness = 10000L;
var currentMaxTimestamp: Long;
//extractTimestamp方法是从数据本身中提取EventTime
override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp;
}
//getCurrentWatermar方法,是用currentMaxTimestamp - maxOutOfOrderness来获取的
override def getCurrentWatermark(): Watermark = {
new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
使用WaterMark是,window的触发时机?
1.首先
window的设定无关数据本身,而是系统定义好了的。如果window大小是3秒,那么1分钟内会把window划分为如下的形式:
[00:00:00,00:00:03)
[00:00:03,00:00:06)
...
[00:00:57,00:01:00)
2.其次
输入的数据中,根据自身的Event Time,将数据划分到不同的window中,如果window中有数据,则当watermark时间>=Event Time时,就符合了window触发的条件了,最终决定window触发,还是由数据本身的Event Time所属的window中的window_end_time决定。
3.最终
window的触发要符合以下几个条件:
watermark时间 >= window_end_time
- 在
[window_start_time,window_end_time)
中有数据存在
4.注意
- watermark是一个全局的值,不是某一个key下的值,所以即使不是同一个key的数据,其warmark也会增加。
- 对于late element太多的数据而言,由于Event Time < watermark时间,所以来一条就触发一个window。
- 也可以使用AllowedLateness功能设置消息的最大允许时间来解决处理延迟的消息。
二.累加器和计数器[2]
Flink和Spark一样,都提供了累加器供我们使用,它们大多用于一些计数,计算一些指标的场景。
计数器也是一种累加器,它是最简单的累加器,作计数功能使用。在Flink类部,内置了很多计数器,比如IntCounter,LongCounter和DoubleCounter等等。
那么如何使用累加器呢?主要分为下面的几部:
- 第一步:在自定义的转换操作里创建累加器对象:
private IntCounter numLines = new IntCounter();
- 第二步:注册累加器对象,通常是在
rich function的open()
方法中。这里你还需要定义累加器的名字getRuntimeContext().addAccumulator(“num-lines”, this.numLines);
- 第三步:在operator函数的任何地方使用累加器,包括在open()和close()方法中this.numLines.add(1);
- 第四步:结果存储在JobExecutionResult里:
JobExecutionResult jobExecutionResult =env.execute("Accumulator"); jobExecutionResult .getAccumulatorResult("num-lines");
如果需要可以选择实现Accumulator或者SimpleAccumulator来自定义累加器。
- Accumulator<V, R>是最灵活的:它定义了需要进行累加的值的类型V以及最后结果的类型R
- SimpleAccumulator则是在进行累计数据类型和返回的数据类型一致的情况下使用的,例如计数器。
/**
* Flink的累加器使用
*/
object flinkBatch {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements("Hello Jason What are you doing Hello world")
val counts = text
.flatMap(_.toLowerCase.split(" "))
.map(new RichMapFunction[String, String] {
//创建累加器
val acc = new IntCounter()
override def open(parameters: Configuration): Unit = {
super.open(parameters)
//注册累加器
getRuntimeContext.addAccumulator("accumulator", acc)
}
override def map(in: String): String = {
//使用累加器
this.acc.add(1)
in
}
}).map((_,1))
.groupBy(0)
.sum(1)
counts.writeAsText("d:/test.txt/").setParallelism(1)
val res = env.execute("Accumulator Test")
//获取累加器的结果
val num = res.getAccumulatorResult[Int]("accumulator")
println(num)
}
}
三.Window使用[3]
在KeyedStream中进行使用,根据某个特征针对每个key用windows进行分组
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));
dataStream.keyBy(0).timeWindow(Time.seconds(5),Time.seconds(2));
四.流的切分和合并[4]
1.将流分割成多个流
SplitStream split = dataStream.split(new OutputSelector() {
@Override
public Iterable select(Integer value) {
List output = new ArrayList();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
从split stream中选择一个流
// SplitStream split;
DataStream even = split.select("even");
DataStream odd = split.select("odd");
DataStream all = split.select("even","odd");
2.合并多个数据流成一个新的数据流
dataStream.union(otherStream1, otherStream2, ...);
五.任务链[5]
把很多转化操作的任务链接在一起放到同一个thread中执行,可以获得更好的性能。使用 StreamExecutionEnvironment.disableOperatorChaining()可以在整个job中去除某个链节点。
(1)Start new chain
omeStream.filter(...).map(...).startNewChain().map(...);
(2)Disable chaining
someStream.map(...).disableChaining();
(3)Set slot sharing group
someStream.filter(...).slotSharingGroup("name");
六.Flink消费kafka数据起始offset配置[6]
Flink读取Kafka数据确定开始位置有以下几种设置方式:
1.从topic的最早offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。
flinkKafkaConsumer.setStartFromEarliest()
2.从topic的最新offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。
flinkKafkaConsumer.setStartFromLatest()
3.从指定的时间戳(毫秒)开始消费数据,Kafka中每个分区中数据大于等于设置的时间戳的数据位置将被当做开始消费的位置。如果kafka中保存有消费者组的消费位置将被忽略。
flinkKafkaConsumer.setStartFromTimestamp(…)
4.默认的设置。根据代码中设置的group.id设置的消费者组,去kafka中或者zookeeper中找到对应的消费者
offset位置消费数据。如果没有找到对应的消费者组的位置,那么将按照auto.offset.reset设置的策略读取offset。
flinkKafkaConsumer.setStartFromGroupOffsets()
七.Flink消费kafka数据,消费者offset提交配置[7]
Flink提供了消费kafka数据的offset如何提交给Kafka的配置。注意,Flink并不依赖提交给Kafka或者zookeeper中的offset来保证容错。提交的offset只是为了外部来查询监视kafka数据消费的情况。配置offset的提交方式取决于是否为job设置开启checkpoint。可以使用env.enableCheckpointing(5000)
来设置开启checkpoint。
1.关闭checkpoint
如果禁用了checkpoint,那么offset位置的提交取决于Flink读取kafka客户端的配置,例如enable.auto.commit 配置是否开启自动提交offset, auto.commit.interval.ms决定自动提交offset的周期。
2.开启checkpoint
如果开启了checkpoint,那么当checkpoint保存状态完成后,将checkpoint中保存的offset位置提交到kafka。这样保证了Kafka中保存的offset和checkpoint中保存的offset一致,可以通过配置setCommitOffsetsOnCheckpoints(true/false)
来配置是否将checkpoint中的offset提交到kafka中(默认是true)。如果使用这种方式,那么properties中配置的kafka offset自动提交参数enable.auto.commit和周期提交参数auto.commit.interval.ms参数将被忽略。
八.数据源[8]
StreamExecutionEnvironment提供的一些访问数据源的接口
1.基于文件的数据源
readTextFile(path)
readFile(fileInputFormat, path)
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
2.基于Socket的数据源
socketTextStream //Linux中启动Socket端口 nc -lk 9999
3.基于Collection的数据源
fromCollection(Collection)
fromCollection(Iterator, Class)
fromElements(T ...)
fromParallelCollection(SplittableIterator, Class)
generateSequence(from, to)
4.addSource
添加新的源函数,例如从kafka中读取数据,
// SimpleStringSchema 指定key的格式
FlinkKafkaConsumer011<String> consumer011 = new FlinkKafkaConsumer011<>(
"flink-topic",
new SimpleStringSchema(),
props);
env.addSource(consumer);
九.数据存放[9]
writeAsText() / 以字符串的形式逐行写入文件
writeAsCsv(...) / 将元组写出以逗号分隔的csv文件。注意:只能作用到元组数据上。
print() / printToErr() / 控制台直接输出结果,调用对象的toString()方法得到输出结果
writeUsingOutputFormat() / FileOutputFormat
writeToSocket
addSink / 自定义接收函数或使用FlinkKafkaProducer将数据写入到Kafka
十.运行时环境的区别[10]
创建StreamExecutionEnvironment 对象有多种方式,如下所示。但是通常用默认方式就可以,它可以根据所处环境自动做出正确的选择。
//默认
StreamExecutionEnvironment.getExecutionEnvironment();
//从本地环境创建
StreamExecutionEnvironment.createLocalEnvironment();
//远程创建
StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles);
十一.keyedStream中进行聚合操作[11]
对于Key的指定方式可以参考我的这篇文章 Flink中指定Key的几种方式
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");