通过上篇文章的学习,我们对Kafka时间流平台有了初步的认识,并且也了解了组成Kafka的核心组件。整个Kafka的最核心部分其实就是我们所有的服务器端,也叫Broker,Broker在整个Kafka平台上承担了服务器端和数据存储层的职责。
Broker作为数据的存储层,需要负责数据管理,过期数据管理,数据复制来保证可靠性等任务,过期数据管理决定了我们的消息数据会被保存多长的时间,而数据复制机制会将数据在多个节点上进行备份,当某台Broker由于软硬件故障出现问题,数据不会因此而丢失。
除了管理数据的职责,Broker也负责处理从生产者客户端来的消息发送请求处理,下图生产者客户端和Broker进行数据交互的示意图:

如上图所示,生产者客户端发送请求给Broker,Broker收到请求后,将消息保存到自己的文件系统,然后发送响应给客户端,这是最基本的交互方式。但是基于笔者最近在多个项目上代码评审的结果看,消息发送端代码写的不规范时有发生。以下是笔者最近在代码评审过程中总结的MQ代码编写规范,大家可以参考一下:
1,在生产者客户端要主动捕捉消息发送的异常,特别是采用异步发送的接口的生产者代码。而在消费者客户端,首先必须确保关闭自动点位提交功能。
2,大部分消息队列出现消息丢失的场景都发生在消费者端,因此我们必须遵守:在编写消费者端业务处理代码的时候,请确保先进行业务逻辑处理,处理成功后才提交点位,防止提前提交了点位,但是后续消费处理出现问题,消息消费丢失。
3,消费者端的消息处理代码必须简洁,简单和高效。并且必须做到幂等,当出现重复消息的时候,不至于造成业务异常。
4,消费者的数量不要超过分区的数量,造成资源浪费。
5,消息队列在主题层面并不能保证消息的有序,因此如果我们需要保证有序消费,要给需要有序消费的消息设置相同的key,这样分区分配拦截器就可以将持有相同key的消息发送到相同的分区上。我们也应该从业务上给每个数据设置版本号,这样当同一个数据多个版本消息乱序被消费的时候,确保数据不会出现老版本更新新版本。
6,采用MQ来进行系统数据交互要考虑一致性的问题,因为这种模式是数据最终一致方案,不是强一致。如果用户对数据一致性要求极高,建议采用事务的方式。
7,基于MQ的核心系统方案中,要做好业务监控,以及以人工处理作为兜底方案。因此需要引入统一日志收集和监控告警,全链路监控和告警等机制,特别是在MQ出现消息堆积的时候,能够第一时间通知到运维团队,采取必要的手段。
好了,对MQ使用的规范有深入的了解之后,我们接下来从Broker如何处理生产者客户端发送的请求来开始说起,带领大家来看看消息是如何产生的。
【生产者客户端如何生产消息】
当生产者有信息要发给Broker的时候,生产者客户端通过创建一个叫produce的请求,来讲数据发送给Broker,如下图所示:

如上图所示,会有多个客户端同时给Broker发送消息请求,Broker一般都是集群部署,会有多台Broker服务器统一对外提供服务,并且一个客户端可能会连接到多个Broker服务器上,具体的消息发送步骤如下:
1,生产者将收集的一批消息发送给Broker。对于Kafka来说,无论是生产者还是消费者客户端,消息总是按批处理,来提升整个系统的吞吐量。
2,Broker从接受缓存队列中出队消息数据。
3,消息被保存到主题中,在主题内部,数据按分区来存储,我们可以讲分区看成放不同苹果的篮子。Broker接收到的批量消息总是被保存到某个确定的分区中,并且消息总是被追加到存储数据文件的末尾。
4,当Broker成功保存了数据之后,会发送响应给生产者客户端。
以上就是生产者客户端发送消息的过程,接下来让我们将目标移动到消费者客户端,看看发送到消息队列Broker的数据,是如何被消费的。
【消费者客户端】
消费者客户端发送消息读取请求到Broker,请求中会制定具体要读取的主题名称,这里需要特别注意的是,Kafka实现了发布和订阅模式,因此一个消费者对消息的消费操作,并不会影响消息的可用性。Kafka Broker可以同时处理数以百计的客户端从同一个主题上读取数据,而相互之间不会影响。
对于基于消息队列通信的两端来说,相互之间不存在基于时间的依赖(依赖有三种类型,基于时间的依赖是我们在系统设计中需要重点考虑的模式,基于时间的依赖需要两个系统同时在线,才能正常工作),因此生产者生产消息和消费者消费消息其实没有直接的关系,如下图所示:

如上图所示,消费者从Broker上读取消息会分为4步,详细的介绍如下:
1,消费者客户端发送消息拉取请求,并指定读取消息的位移。
2,Broker从请求缓冲区队列中出队请求。
3,基于请求消息中的主题,分区和位移信息,从存储介质上读取对应的消息数据。
4,Broker将读取到的一批消息作为响应返回给消费者客户端。
通过上边简单的介绍,希望大家对Kafka体系中的生产和消费消息有一个直观的认识,由于笔者会在后边的文章详细介绍发送和消费消息的机制,因此这里如果你的不是很懂,没有关系,我们还会回到这些细节讨论上的。
主题和分区这两个概念在前边的内容中出现过,但是笔者并没有做过多的解释,这两个概念是理解Kafka工作原理的基石,咱们接下来就详细说说主题和分区的概念。
【Kafka消息队列的主题和分区】
在上一篇文章中,笔者介绍过Kafka的消息数据保存在Broker中,并且Kafka提供的持久化机制可以让我们存储几乎无上限的key-value数据,除了持久化消息数据,Kafka还提供了跨节点的数据冗余,来提升数据的可靠性,特别是在某台Broker出现磁盘故障,不至于造成数据丢失。
具体来说,Kafka Broker使用文件系统来存储消息数据,并且从生产者客户端发送的新消息会被追加到文件的末尾。逻辑上来讲,主题是磁盘上的一个文件夹,这个文件夹中保存了所有的数据文件,而写入数据到某个主题就是给和主题名相同的文件夹中的数据文件追加消息记录。
注:Kafka接受到的消息数据是字节码,并且Broker对数据不做任何变动,直接将字节码保存到磁盘上。对于消费者客户端来说,请求消息数据返回的也是字节码,和发送端发送的数据基本一致。Broker处理的直接是字节码,因为没有序列化和反序列化这样的操作,因此Kafka Broker的处理性能会非常高。
主题的数据会被分成多个分区来保存,分区从编号0开始,因此如果我们的主体在创建的时候指定了分区数量是3,那么分区编号分别是0,1,2。Kafka通过把分区编号附加到主题名称上,在主题所代表的文件夹中创建对应的子文件夹,比如yunpan-0,yunpan-1,yunpan-2(yunpan是笔者创建的主题名称)。
Broker有个叫log.dirs的配置项,指定了数据文件的目录路径,比如在笔者的macOS上,log.dirs的值如下图所在的配置文件截图所示:

基于这个目录,我们就能找到具体的日志文件,主题和分区的关系,比如笔者在本地有second-topic这么个主题,并且指定了分区数量为2,那么从文件系统中就可以看到有两个对应的子文件夹,及数据文件的关系如下图所示:

通过上边的目录结构,你就能看到这个叫second-topic的主题和两个分区(0,1)最终会产生second-topic-0和second-topic-1这样的文件夹结构。从这个实际生成的文件夹结构上看,我们说主题只是一个逻辑的文件夹,而分区是存储单元一点都不为过啊。
对于Kafka来说,分区数量代表这并发度,在大部分场景下,分区数量越多,并发度越高,同时也意味着吞吐量的提升。作为整个消息中间件唯一的存储介质,分区这种方式可以极大的提升整个Kafka集群的数据存储量和稳定性,消息会被分发到不同的分区上,不同的分区在多节点集群中意味着不同的机器。
由于整个集群的存储容量不再受限于一台Broker机器的磁盘大小,因此整个集群存储数据量近乎无限。另外通过使用数据在多个节点之间复制机制,单节点的故障不会造成数据丢失,整个集群的数据可靠性得到保障。
我们会在后续的文章中讨论负载分发,复制机制,leader选举等机制,以及多级存储体系功能,我们就可以把冷数据同步到外部系统上,让Kafka集群只保存热数据,降低磁盘的占用空间,也提升的系统的稳定性。
读到这里,你可能会问,Kafka是如何将给消息找到对应的分区呢?其实啊,在生产者客户端,有一个叫分区分配拦截器的组件,我们可以自定义这个组件,消息在发送到Broker之前,通过这个拦截器会计算出分区编号,当Broker收到消息的时候,只需要按照消息的数据中制定的分区编号,来保存到对应文件夹下的数据文件中。
生产者客户端配置的拦截器有三种方式来给消息指定分区编号:
1,如果待发送消息的key不为null,那么Kafka就可以确定消息发送的分区,计算方式为获取key,进行哈希计算,然后和分区的总数进行取模,因此笔者在开篇提到的保序最佳实践中,如果后续消息都设置了相同的key,那么消息会被发送到相同的分区上。
2,当我们在代码中直接通过ProduceRecord来构建待发送消息的时候,我们可以直接给这个消息对象制定分区编号,Broker会基于这个指定的分区编号来讲数据保存到对应的文件夹。
3,如果消息没有设置key,也就是说key是null,并且未人为指定分区编号,那么消息的生产者端会采用轮流的方式来发送消息到所有的分区上。
到这里为止我们对Kafka如何把消息映射到分区上已经非常清楚了,接下来我们来聊聊新消息总是被追加到数据文件尾部这个机制。回顾一下笔者展示的在自己机器上看到的文件夹中的数据文件,是以.log为文件扩展名。但是这个log文件和开发人员打印的日志不太一样,和我们在Kubernetes中查看的容器日志也不一样。
在Kafka或者大部分数据库系统中,比如MYSQL有redo log和bin log,redis有append only log,日志log这个概念代表的是事务日志,表示造成数据库状态变化的一系列事件。因此每个分区的文件夹中都分别保存了自己的事务日志,这个时候你可能会说,如果是这种机制,日志文件的size岂不是不断增长,我们后边会介绍rention的机制。
当Broker将收到的消息追加到日志文件末尾时,会给每个消息分配一个叫offset的id编号,offset从0开始计数,被增加一条消息就加1,offset是消息在日志文件中的逻辑位置。而”逻辑“一次的含义是,offset代表的是消息是日志文件中的第N条消息,由于每条消息的size都可能不一致,因此消息的物理位置和所有前边消息的size有关系,我们会在后续的文章中详细介绍Broker是如何在日志文件中确定消息的物理位置。下图展示了给刚接收到的新消息分配offset的原理:

如上图所示,新消息总是被追加到日志文件的末尾,Kafka会保证同一个分区上消息的有序性,但是无法保证跨分区的有序性。这个也不难理解,同一个分区上的消息,总是按offset有序排列。时间具备单调有序,因此我们很自然就会想到按时间有序。Kafka中每条消息有两个时间属性,arrival time(达到时间)和 event time(时间发生时间),这两个时间的业务含义不太一样,我们会在后续章节详细介绍。
消费者客户端通过offsets来维护消费进度,这样的话,Broker就可以为每个消费者按照offset单调递增的方式来读取下一批消息,如下图所示:

如上图所示,如果消费A第一次成功消费了0-5位移的多条消息,那么下一次消费读取到的数据就从offset6开始,Broker为每个消费者维护了offset位移信息,这些信息被保存在一个内部的叫_consumer_offsets的主题中。
读到这里,大家可能会问,那么我们应该如何确定分区的数量呢?从笔者过往的经验来看,确定准确的分区数量和数据量有很大的关系。我们需要考虑特定主题会接收到的最大数据量,因此简单看数据量越大,我们需要的分区越多,并且吞吐量也会越好,但是系统架构设计导出充斥着取舍,而确定分区数量也不列外,没有一劳永逸的事情。
具体来说,增加分区数量会造成数据同步机制的负担增加,整个集群的网络上会有更多的TCP连接和数据。另外消费者端处理消息的效率也会影响吞吐量,如果我们的消费端处理的非常缓慢,那么短期内增加分区可能提升吞吐量,但是总会到天花板。
基于笔者过往的经验,有这么些最佳时间可以分享:1,我们一开始需要创建尽可能多个分区数量,具体来说对于生产环境,这个数量可以是30,因为30可以被很多数整除,这就意味着分区可以更加平均的分配到多个Broker节点上,而分区被分配的越均衡,数据就可能越平衡分配。整个集群的吞吐量就越高。
好了,今天的这篇文章就这么多内容了,接下来笔者会介绍Kafka的数据管理机制,主要介绍数据是如何在磁盘上保存以及数据的过期清理机制,敬请期待!