1、原理
Producer:
消息生产者
Broker:
通过zookeeper把多个broker连成了一体,对topic来说,多个broker其实是一个整体,所以topic的partition可以分布在一个broker里面,也可以分布在多个broker里面。
Topic:
topic是逻辑的,partition是物理的;
一个topic对应多个partition; 一个topic中能有多少个partition是在kafka的配置文件里配置好的默认值。
一个topic中的partition可以通过设置replication factor来设置副本的个数,默认是1,就是没副本,没冗余,一旦一台broker挂了,那消息就没了,所以一般要设置成2,这样就有备份了。
Replication:
一个partition可以有多个副本,但是给消费者用的时候,只用一个,这里就区分哪个是Leader,哪些是follower。leader提交数据之后,会同步给follower,保证follower和leader的消息状态是一模一样的。
Loader的选举:
Zookeeper会从所有的broker中选出一个作为controller,负责选leader;
Controller会在所有broker中的所有partition中选leader。
Partition:
Partition其实就是一个消息queue,先进先出,里面存的都是消息。
消息的分区机制:
默认是取KEY的正值后取总分区数量的模,基本做到平均分配;当然,如果不传KEY,那就直接轮询,直接遍历过去;也可以自己实现分区机制,根据KEY的范围区分用哪个分区。当然,不管设置什么样的机制,保证每个KEY都只能分到某一个固定的分区。
消息在partition的存储是顺序存储的,所以在写入的的时候会非常快,不需要浪费IO资源,partition的存储默认每500M生成一个segment文件,一个segment文件包括一个index文件和一个log文件,其中log文件是用来存消息数据的,index文件是用来存每个消息的offset的。Index文件的文件名其实就是该文件中最小的offset。
Consumer:
Consumer从partition中读出的数据是有序但不连续的,有序是因为paritition中的数据本身就是一个队列先进先出,是顺序存储的;不连续是因为一个topic的消息分散在多个partition中,所以partition中存储的数据本身就不是完整的,因此读的时候肯定就不连续。
一个partition只能被一个group中的一个consumer消费,不能被多个consumer消费;当然,不同group可以消费同一个topic,当然也肯定能消费这个topic中的同一个partition。
Consumer Group:
一个Consumer group可以有多个consumer,一个group中的所有consumer协调一起来消费topic中的数据。
Consumer Rebalance:
Consumer group开始消费数据后,每个consumer对应哪个partition都是分配好的,但是一旦出现异常情况,就会导致这个分配关系的rebalance,异常情况包括:
Group消费的Topic的paritition数量变了;
Group消费的Topic的数量变了;
consumer挂了一个或加了一个。
每个consumer group都会有一个coodinator,专门用来负责该group的rebalance工作。每个group加入的时候,都会注册到coodinator。Coodinator从所有的consumer里面选一个leader,由这个leader来分配哪个consumer消费哪个partition。
Offset:
Partition中的每个消息进去之后,都会被分配一个offset,作为他的下标,也可以当成他的标识,这就是offset的产生过程。
下面是offset的使用过程,也就是用offset拿partition中的数据:
由于一个partition可以分成多个片段,那么会有多个index索引文件,多个log数据文件,我们可以对所有index索引文件的文件名,也就是最小offset,进行二分排序,然后跟指定的offset比较,确定该offset指定的数据在哪个segment里面。
Offset产生于paritition,但是在实际使用时,我们是用来确定consumer消费到哪个offset了,以便确定下一个数据读哪个。
老版本的Offset的消费数据是存储在zookeeper里面的,新版本换了地方,是因为zookeeper对于频率的读写操作支持不好,所以把offset存到一个叫consumer_offsets的topic里了,这个topic默认50个partiition,里面存的每个消息是当然消费组的groupId,被消费的topic,被消费的partition,被消费消息的offset。
Offset消费数据会定期自动提交到consumer_offsets的topic里。
2、用法
消息堆积问题:
原因是消费太慢。消息堆在paritition里太多,consumer来不及消费,超出了session.timeout的时间,然后就报错了,报错之后,consumer当前消费的offset就没法提交了。因为没提交offset,consumer又会重新从老的offset位置开始读数据,然后因为来不及消费,又超时,又挂了,无限循环。
解决办法是关了offset的自动提交,然后用spring-kafka自带的offset提交机制处理offset的提交问题。Spring kafka新建了一个阻塞队列,临时放需要消费的数据,相当于多了一个中间队列。SpringKafka把partition中的数据拿到阻塞队列,然后启一个线程去消费他,如果这个阻塞队列中的数据消费完了,就把offset提交了;如果阻塞队列满了,会直接暂停consumer从队列里拿数据。
代码:
KafkaTemplate.send
@kafkaListener(topics groupId)
Listen(message)