用好Kafka的前提是理解Kafka基本运行方式,本文希望说明通过一些Kafka的基本概念,为建立一个Kafka使用模型进行准备。
基本过程
Kafka不仅仅是一收一发这样简单,它为高效、可靠地传递消息提供了大量特性。下图是Kafka的基本消息处理过程。
- 生产阶段:生产者(Producer)处理消息,和之前的消息打包,等待批量发布;
- 发布阶段:生产者将批量消息发送给Broker,主Broker将消息记录在自己的日志文件中;
- 提交阶段:将消息复制到追随者(Follower Broker)的日志中;
- 追赶阶段:Broker处理从消费者上一次处理位置(Offset)到新提交消息之间的消息;
- 获取节点:消费者(Comsuer)批量从Broker获取消息。
在上述过程中要特别注意三点:1、Kafka的生产者和消费者都是按批处理消息;2、消费者端通过消费组(Consumer Group)构成消费集群;3、消费位置。
理解批处理
Kafka生产端和消费端都是按批传递消息,这样可以减少Kafka消息递送逻辑执行的次数,例如:网络传输,资源调度等,降低消息处理的平均时延,提高吞吐量。下面分别从生产端和消费端了解批量控制的相关参数。
生产端
生产端和发送方法(KafkaProducer.send()
)相关的参数:
参数 | 定义 | 默认值 | 解释 |
---|---|---|---|
batch.size | The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. | 16384byte | 等凑够了发。 |
linger.ms | The producer groups together any records that arrive in between request transmissions into a single batched request. | 0 | 等多长时间发。 |
compression.type | Specify the final compression type for a given topic. | 压缩后传输 | |
acks | The number of acknowledgments the producer requires the leader to have received before considering a request complete. | 1 |
batch.size
和linger.ms
两个参数可以控制批次的大小。批次越大系统整体的吞吐量就越大,但是人为引入的延时也越长,因此,这两个参数应该根据业务的实际情况进行调优。linger.ms
的默认值是0ms
,但是这不代表消息是单条发送的,Kafka会将同时到达消息打包发送。一般建议将linger.ms
的值设置为5ms
。
另外,需要注意参数acks=all
和max.in.flight.requests.per.connection=5
。发送请求需要被Broker确认,参数max.in.flight.requests.per.connection
指定允许同时执行的没有被确认的发送请求数,超过了无法发送。acks
指定了确认条件,包括:
-
acks=0
,Broker收到请求后就回复; -
acks=1
,消息已经写到Leader的日志中; -
acks=all
,消息已经写到所有的Broker日志中。
消费端
在消费端和拉取(KafkaConsumer.poll()
)批次相关的参数有如下几个:
参数 | 定义 | 默认值 | 解释 |
---|---|---|---|
fetch.min.bytes | The minimum amount of data the server should return for a fetch request. | 1byte |
broker 中数据小于这个值时,fetch 操作被阻塞等待数据。 |
fetch.max.wait.ms | The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes. | 500ms |
broker 数据不够时最多等多长时间响应fetch 操作。 |
max.partition.fetch.bytes | The maximum amount of data per-partition the server will return. | 1M | 允许每次fetch 每个分区返回的最大数据量。 |
消费者执行poll
方法时并不是直接访问Broker的数据,而是通过fetch
循环。取数据时,如果获得的数据小于fetch.min.bytes
,那么Broker会阻塞poll
直到获得了足够的数据后再返回给消费者。如果一直没有足够的数据怎么办?参数fetch.max.wait.ms
指定了Broker最长的阻塞时间,如果数据不够,但是达到了等待时间,那么也会返回数据。参数max.partition.fetch.bytes
控制了每个分区(Partition)一次可获取的最大数据,这个主要和内存控制有关,如果分区很多,那个又都包含很多数据,就需要配置相应的内存。
消费端提交消费位置(Offset)
Kafka中,一个分区只能被同一个消费组中一个消费者消费,消费按顺序进行(也可以指定),每条消息都有自己的offset
代表其在分区中的位置。poll
方法从上一次已提交的offset
之后拉取数据,因此,消费完数据必须执行commit
,才能保证消费向后进行。
参数 | 定义 | 默认值 |
---|---|---|
enable.auto.commit | If true the consumer's offset will be periodically committed in the background. | true |
auto.commit.interval.ms | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true. | 5000 |
上面两个参数的含义很好理解。需要注意的是,多次执行poll
方法不会触发commit
,如果auto.commit.interval.ms
较长,就会执行多个poll
之后再提交。KafkaConsumer.close()
方法会执行提交,所以,只要保证执行了close
,消费者程序退出时也能进行提交。
不自动提交,就需要进行手动提交。如果不特别指定,提交的是最后一次poll
的最后一个offset
。这样带来一个问题,应该在何时执行提交操作,获得消息后还是消息处理后?先提交可能会消息丢失,后提交可能会重复消费。
理解消费者集群(Consumer Group)
消费者组应该是Kafka中最重要的概念,它提供了管理消费者集群的机制。
和消费组相关的主要参数如下:
参数 | 定义 | 默认值 |
---|---|---|
group.id | A unique string that identifies the consumer group this consumer belongs to. | |
group.instance.id | A unique identifier of the consumer instance provided by the end user. | |
partition.assignment.strategy | A list of class names or class types, ordered by preference, of supported partition assignment strategies that the client will use to distribute partition ownership amongst consumer instances when group management is used. | RangeAssignor |
session.timeout.ms | The timeout used to detect client failures when using Kafka's group management facility. 检查多长时间没有收到心跳。 | 10秒 |
heartbeat.interval.ms | Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. 建议不要大于session.timeout.out值的1/3。 | 3秒 |
Kafka解决横向扩展的方法是添加分区(Partition)和消费者(Consumer)。生产者生产的消息可以分发到多个分区中,每个分区都由消费组(可多个消费组)内的唯一的消费者消费,一个消费者可以消费多个分区。设置相同group.id
的消费者构成一个消费组。消费者集群管理机制就是处理分区和消费者的分配关系。
Kafka通过subscribe
方法实现自动分配,通过assign
方法实现手工分配。通常都会采用自动分配的方式,这样才能充分发挥Kafka的特性。Kafka内置几种分配策略可以通过参数partition.assignment.strategy
指定。
如果采用
assgin
方式手工指定,和组管理相关的机制不再生效。
消费者集群管理机制在kafka中叫Rebalance
,它解决的是当分区和消费者分配关系发生变化时的重新分配。触发Rebalance
有两种情况:1、消费组内的成员数量发生变化,消费者加入或离开;2、订阅信息发生变化,分区数量变化或订阅主题发生变化。
Rebalance
的默认机制是组内消费者全员业务中断。显然这在很多场景下都是不合理的,也很难接受,所以如何避免无效的Rebalance
始终是个热点问题。针对这个问题Kafka已经提供了优化方法。
方法一,修改分配策略。将参数partition.assignment.strategy
设置为CooperativeStickyAssignor
可以优化业务中断的问题,它会尽量保证业务进行,避免不必要的重新分配。
方法二,静态成员关系(Static Membership)。通过参数group.instance.id
给消费组中的每个消费者分配一个固定的id,当这个消费者下线再上线时(在session.timeout.ms
范围内)不会触发Rebalance
,而是将之前已有的分配关系直接给这个消费者。
注意:session.timeout.ms
参数受限于Broker参数group.min.session.timeout.ms(6 sec)
和group.max.session.timeout.ms(30 min)
,即大小不能超过这个上下限。
消费应用的思考
无论Kafka提供了多少高级特性,如果消费应用本身存在问题,仍然不可能有效支撑业务处理的需求,因此,应该关注一下实现消费应用时面临的基本问题。
参数 | 定义 | 默认值 |
---|---|---|
max.poll.interval.ms | The maximum delay between invocations of poll() when using consumer group management. For consumers using a non-null group.instance.id which reach this timeout, partitions will not be immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be reassigned after expiration of session.timeout.ms . |
5分钟 |
max.poll.records | The maximum number of records returned in a single call to poll(). Note, that max.poll.records does not impact the underlying fetching behavior. The consumer will cache the records from each fetch request and returns them incrementally from each poll. | 500 |
client.id | An id string to pass to the server when making requests. |
max.poll.interval.ms
参数非常重要,如果消费者没有指定group.instance.id
,poll
间隔超时后会触发Rebalance
,如果指定了group.instance.id
,等待session.timeout.ms
超时再触发Rebalance
。max.poll.records
参数的目的是控制消息批次的总体时间,避免发生消费超时。
PS:
max.poll.records
参数和前面的fetch.xxx
参数是什么关系?如果max.poll.records
很小,fetch.min.bytes
很大,那么fetch
的结果缓存起来?这个问题意义不大,这样设置并不合理。
从这个两个参数可以看出,Kafka认为消费应用必须考虑消息处理时长的问题,如果处理消息的业务逻辑耗时与参数设置不匹配,有可能发生意料之外的结果。例如我们的业务逻辑是自动提交,但是因为poll
间隔超时触发Rebalance
,如果提交方法已经执行,那么会导致提交之后到超时发生之间的消息产生重复消费。如果是所有消息处理完成后手动提交,那么也会导致未进行提交操作产生重复消费。
控制消费应用执行时间是一个必须认真对待的问题,虽然可以通过减少max.poll.records
参数缩小一次poll
的执行时间,但是,如果处理逻辑中包含对外部服务的调用,那么就有可能因为外部服务的延时阻塞整个消费应用,即使减少消息数量也没有用。更可控的方式应该是给单个消息的处理设置超时时间,保证每条消息的处理都在指定时间范围内,从而保证整体不超时。还可以将poll
方法和消息处理独立开,用不同的线程执行,如果消息没有处理完,poll
方法线程用KafkaConsumer.pause()
方法暂停获取数据,这样poll
方法继续按时间间隔执行,但是不获取新数据。当调用外部服务时,还需要考虑外部服务是否支持并发调用,如果需要应该引入多线程或NIO的机制,提高整个业务的吞吐能力。并发操作又会使超时控制变得更复杂。
消费应用不可避免会发生异常情况,由于Kafka中数据是按批处理的,提交的也是最后消费位置,那么就一定会发生分区中消息状态和消费应用处理状态不一致的情况。根据前面已经提到的提交策略,要么接受丢失消息,要么接受重复消费。通常我们不能接受消息丢失,因此必须支持处理重复消息,也就是消费应用必须具备幂等性。
消费应用避免不了进行代码升级、修改配置等维护操作,因此,还必须考虑优雅关机问题,保证结束应用前将消息和状态提交处理完毕。(前面的重复消费和幂等性一定程度上可以解决这个问题,但还是应该进行主动控制。)
最后要说明的是,Broker,Java版的生产客户端和消费客户端都支持通过JMX获取运行指标,也可以在消费应用中通过metrics
方法获取。这些指标是进行精细调优的基础。没了便于监控,应该给消费者设置client.id
参数(生产者也支持),这样提取运行指标时就可以直接指定是哪个客户端。
最后
Kafka的功能真的是太多了,本文只是介绍了最常用的一些参数。还有很多特性应该深入研究,例如:事物,日志管理(保存周期等),保持消费顺序,消费端异步提交,安全,自定义分区分配策略等。
建议阅读
99th Percentile Latency at Scale with Apache Kafka
kafka 重平衡解决方案: cooperative协议和static membership功能详解
From Eager to Smarter in Apache Kafka Consumer Rebalances
Incremental Cooperative Rebalancing in Apache Kafka: Why Stop the World When You Can Change It?
Apache Kafka Rebalance Protocol for the Cloud: Static Membership