How Apache Flink manages Kafka consumer offsets
链接:https://data-artisans.com/blog/how-apache-flink-manages-kafka-consumer-offsets
Step 1:
例子:一个kafka topic,有两个partition,每个含有"A,B,C,D,E"信息。offset从0开始。
Step 2:
第二步,kafka consumer开始从partition 0开始读取信息,"A"正在处理,第一个consumer的offset变成了1。
Step 3:
第三步,“A”到达了Flink Map Task。每个consumer继续读取他们下一个记录(partition 0读取“B”,partition 1读取“A”)。各自更新offset,同时Job Master开始触发checkpoint。
Step 4:
接下来,kafka consumer已经创建他们的状态快照(“offset = 2, 1”),存在了Job Master。Source从partition 0和1分别发出“B”和“A”后面发出了checkpoint barrier。在operator task中对barriers进行align操作,保证了一致性。消息A到达Flink Map Task,而上面的consumer继续读取下一个记录(消息“C”)。
Step 5:
当 Flink Map Task 从sources接收到了全部的checkpoint barriers(同一版本的barrier),那么就会checkpoint他的state到Job Master里面。同时,consumers继续读取记录。
Step 6:
当所有的task报告完成了他们的state checkpoint后,那么Job Master就会完成这个版本的checkpont。那么这个checkpoint就可以用于故障恢复了。值得一提的是,Apache Flink并不依赖于kafka的offsets来从潜在的系统故障中恢复。
Recovery in case of a failure
当故障发生时(instance,worker故障),所有的operator task重启,并且重设到最近完成的checkpoint。根据最近完成的checkpoint,kafka source分别从offset 2 和 1开始。当job重启后,就跟没发生过故障的正常系统一样。
Kafka + Flink: A Practical, How-To Guide
链接:https://data-artisans.com/blog/kafka-flink-a-practical-how-to
Flink with kafka:
- guarantees exactly-once delivery of events
- does not create problems due to backpressure
- has high throughput
- is easy to use for application developers.
A 5-minute Introduction to Kafka
Apache Kafka是一个分布式,高吞吐的消息队列系统,专为多个数据消费者提供流数据。kafka通过使用日志数据结构将传入的消息持久化到磁盘上,从而使流数据持久。下游的consumers可以从不同的partition,按不同的速度,也可以读取过去的信息(比如,“replaying history”)读取stream数据。
topic是kafka中重要的抽象。一个topic表示一个逻辑上的数据流,含有多个partition。partition是topic的data 子集,存在于不同的物理node中。
-
producers
是把数据存入到topic的service 。比如“console producer”。 -
consumer
是从topic中读取数据的service 。 -
Kafka broker
管理topic的individual partitions,安装在节点上的service,使consumers和producers能操纵topic中的数据。要是一个partition有副本,那么多个broker可能管理相同的partition。其中一个broker是leader,其他的是followers。 -
message offset
是kafka用来给每个partition中的每条信息配置的独一无二的ID,表示增量的逻辑timestamp。 -
consumer groups
由多个消费者组成。每个consumer group,信息保证至少一次消费。如下图所示,web服务端输入数据到一个具有4个partition的topic中。2个broker各自管理2个partition。3个consumers划分到2个consumer groups。两个consumer groups都会看到写入topic的所有消息,即使它们都消耗了topic的重叠子集(partitions)。
Hands-on: Use Kafka topics with Flink
1.Preparation: Get Kafka and start it locally
2.Consume data using Flink
使用flink的consumer订阅topic。需要添加依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9.1</version>
</dependency>
#我使用的
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka- 0.10_2.11</artifactId>
<version>1.4.1</version>
</dependency>
这里介绍如何使用flink消费kafka中的数据。从一个topic中读取string,做些简单的修改,打印输出到standard output。这里使用kafka自带的console producer。从命令行中读取kafka参数。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream < String > messageStream = env.addSource(new FlinkKafkaConsumer082 < >
(parameterTool.getRequired("topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));
//命令行:--topic test --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myGroup
创建好一个datastream后,可以对其进行transform。比如,对每个word加入固定的前缀,然后打印出来。
messageStream
.rebalance()
.map ( s -> “Kafka and Flink says: ” + s)
.print();
rebalance() 使数据重新partition,如此以致所有机器接收到messages(比如,当kafka的partition的数目比flink的parallel instances数目少时)
3.Produce data using Flink
介绍如何使用flink往kafka的topic中写入数据。创建一个简单的string generator源头。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream < String > ;
messageStream = env.addSource(new SimpleStringGenerator());
从命令行中读取kafka参数:
ParameterTool parameterTool = ParameterTool.fromArgs(args);
messageStream.addSink(new KafkaSink < > (parameterTool.getRequired("bootstrap.servers"), parameterTool.getRequired("topic"), new SimpleStringSchema()));
//命令行: --topic test --bootstrap.servers localhost:9092
Running the example in a cluster
当在集群中创建新topic时,建议设置适当的partition数目,使所有的flink parallel instances 能接收到数据,为了达到这个目的,partition的数目至少等于Flink instances的数目。
FAQ
Q:How are Kafka partitions assigned to Flink workers?
A:当flink task数目比kafka partition数目大的时候,某些consumer将会空闲,不读取任何数据。
反之,task会订阅多个partition。
在失败之后,Flink将partitions 分配给Flink实例,保持partitions 的确定性,这对于exactly-once 是至关重要的。
Q:How does Flink retain the order of messages in Kafka partitions?
A:kafka partition是有序的,在flink的job中,在 record-at-a-time 的转变中(e.g., map, flatMap, filter, etc)维持着输入的顺序。Partitioning 和grouping 的转变中顺序变了。flink写入kafka的时候,可以指定partition,或者不使用partition的时候,flink将使用直接的map flink实例到kafka partitions。这意味着使用kafka作为中间管道的多个Flink jobs保留了分区内的顺序:
Q:How does Flink guarantee exactly-once processing with Kafka?
A:flink kafka consumer是一个有状态的operator,状态是kafka offsets。
Q:How does Flink handle backpressure when reading from Kafka topics?
A:backpressure :如果下游节点(sink)处理速度赶不上上游节点(kafka spout),那么flink将会减缓上游节点。flink的kafka consumer自带backpressure:只有后面的operators赶不上处理kafka messages,flink将会减缓kafka的消费,导致brokers的请求减少。当operator速度提高时,flink的消费messages的速度也会提升。这行为使sources和flink之间加入了个缓冲区,在峰值的时候持久化缓冲events。
flink中提交kafka offset
这个offset和容错无关,主要用于更新offset用于外部的检测