原创文章,转载请注明出处
Kafka,近年来比较流行的一款分布式消息订阅发布系统。由于其高吞吐量、可持久化以及分布式等特点得到了广泛的应用。最近部门搭建 Monitor 日志系统,需要在 Monitor Server 和产生日志的应用中间放置一个消息队列做为缓冲,在解耦应用的同时提高 Monitor 系统的峰值处理能力,最终选定了使用 Kafka。在做系统的过程中学习到了不少Kafka的设计逻辑,对互联网系统的认识很有帮助。下面主要就 Kafka 的基本概念和配置进行一些总结。
消息队列
在目前的消息队列系统中,有几个重要的概念,可能在不同的系统中叫法不一样,但是其在系统中的作用是互通的。
生产者(Producer):消息的产生方,将“消息”这个实体发送到队列中。
消费者(Consumer):消息的使用方,从队列中获取消息。
消息(Record):生产和消费的内容,实际要传输的数据。
主题(Topic):用于将消息进行分类,生产者和消费者可以指定 topic 进行生产和消费行为,可以将topic 理解为一个用于存放消息的队列。
偏移量(Offset):用于标识被消费的消息所在队列中的位置。
除了这些,Kafka 中还有一些特有的概念:
消费者组(Consumer Group):将消费者分组,维护其在不同 topic 之间的订阅权限。
Broker:分布式系统中一个具体的服务,一个 Kafka 集群一般由多个 Broker 组成,以达到高可用的目的。
分区(Partition):物理概念,在一个 topic 内部可以有若干个 Partition,每个 Partition 是实际储存消息的队列,若有多台Broker,Partition会尽量被分配到不同的Broker上。生产者产生的消息只会进入到其中一个 Partiton 中。
Consumer Group
简单来讲,消息队列模型可以分为两种:基础队列模式和发布-订阅模式。在基础队列模式中,一条消息只能由一个消费者进行处理,而在发布-订阅模式中,一条消息可以分发给所有“订阅”了此主题(Topic)的消费者。Kafka 将这两种模型合二为一,产生了消费者组(Group)的概念。在不同的Group间,消息分发遵循发布-订阅模式,即消息会被分发到所有订阅了此Topic的 Group 中,而在 Group 内部,消息分发遵循基础队列模式,即消息在 Group 中最终只会由一个消费者消费掉。
Partition
Kafka 不仅在 Consumer 层面做了分组控制,在 Topic 队列中同样存在着“分区”的概念。在基本模式中,一个 Topic 就是一个队列,这样虽然非常清晰,但是在大数据量下,其性能直接受限于物理机。在分布式环境中,若采用简单的“一个队列”模型,会导致服务器压力分布不均,假设服务器A(Broker A) 中的 Topic 消息压力较大,而其他机器无法帮助其分担,那么可能会直接导致服务器A(Broker A) 崩掉。
在 Kafka 的设计中,Topic 是逻辑概念,Partition 是物理概念,对于简单的 Producer 和 Consumer ,两者均不需要考虑 Partiton 的存在,只需要在 Topic 层面进行操作即可。Partiton 的设计对于分布式系统有重要意义,Kafka 将一个 Topic 打散成为若干 Partition,并且尽量保证 Partition 能平均分布到集群中的服务器(Broker)上。这样也就解决了两个重要问题— 负载均衡 和 水平扩展。
Producer 产生的消息最终只会进入到一个 Partition 中,至于进入哪个 Partition,默认的策略是平均分配,当然在 Java 中我们也可以继承Partitioner
类来编写自己的分配策略。
在 Patition 方面还涉及到分区备份以及 Leader 的选取,将在后面的扩展中进行总结。
场景
- 消息系统
Kafka最基本的使用方式,基于订阅-发布的消息系统。和其他消息队列产品相同,主要为系统带来了异步处理和解耦的特性。
在这种场景下,Kafka是应用之间联系的纽带,也是一个标准,有的文章将其比作人体的神经系统。这是很准确的,身体的许多器官之间之所以可以相互协作调用(最常见的就是大脑控制身体的运动),完全依靠神经系统传递电信号。这些信号就是一个个消息,大脑产生这些信号,并将信号交给神经系统让它传递给具体的器官,器官接收到信号后根据信号的指令进行具体的操作。
我们可以看一下消息系统的特点:
解耦。各个应用之间更独立,不必受其他应用的影响。大家都通过一个简单的消息队列来联系,不必受其他应用返回值的影响,当一个应用挂掉之后其他应用还是可以正常运行
异步,提速。所谓提速,是指有大量并发的情况下,系统能够尽快的做出回应。如果是传统的同步调用服务,在这项业务没有完成之前,线程一直处于繁忙的状态,当并发量增加时,新的请求将会被阻塞,直至前面的线程处理完。更糟糕的情况是处于多个服务协作的系统中,当有RPC请求时,当前系统的线程将会处于阻塞状态,对于分秒必争的CPU资源,线程阻塞是非常昂贵的开销。
通过消息系统,可以有效使CPU避免由于RPC引起的阻塞。当应用需要通过RPC调用其他服务时,可以直接向消息队列发送一份信息,之后马上返回继续处理。这样便可以将CPU的资源最大化利用。但是这需要架构师考虑清楚几点:
系统是否能够容忍暂时的不一致性,或者说系统是否必须立马知道RPC执行的结果。显然,引入消息系统,当上层消息发送完消息并继续往下执行时,有一个潜在的逻辑是有下游系统会去做这件事, 但是实际做没做,并不知道。
引入消息系统带来的收益是否超过了引入的成本。引入成本自然不必说,新系统的布置维护等都是需要考虑的因素。如果盲目的照搬,“别人都这样用,我们也这样用”是架构师的大忌,只有适合自己业务的架构才是最好的架构。
- 日志系统
其实日志系统使用Kafka也算消息队列的一种应用,只是由于日志系统的特性,使得消息队列在这里格外重要。
对于日志系统来说,需要将若干应用的日志统一搜集,然后进行整理与存储。日志系统的特点在于其量大、可能存在峰值,若使用单一的App向日志系统Push的模式,服务器的压力可想而知。更关键的一点在于日志系统作为Monitor System的一部分,其本身的稳定性要求就很高,一旦日志系统挂掉,可能会丢失大量的系统日志。
这时候Kafka(消息队列)的有势就显现出来了:
第一,切断了单个App与日志系统的直接联系,解耦。因为日志的搜集与分析实际上App本身并不很关心,将其发送到MQ就够了,至于后续怎么处理,那App本身可以不管。
第二,削峰,有了MQ作为缓冲,当不同应用有大量Log需要传输时,数据有了合适的积压点,在大流量下不至于将日志系统冲垮。而Kafka中的数据是以文件的形式落地在硬盘上的,保证了数据不会丢失,即便日志系统未来得及消费,MQ中的消息也会保存一段时间(可配置,一般一周就可以了)。
第三,扩展性。这一点是显而易见的,现在微服务的概念非常流行,当企业自身的服务越来越多,日志的量也会越来越大。得益于Kafka天生的集群特性,很容易可以添加Broker以拓展MQ的性能与容量。
上述的使用场景描述的还是比较笼统,实际业务中需要仔细考虑遇到的瓶颈在哪里,而引入MQ是否能够解决这些问题,这需要对MQ的特点有详细的了解。再者对于开发人员,不能一味照搬,无论在成本(开发成本,服务器成本,后期维护成本)还是使用方法上都需要慎重考虑。同时也是为自己提个醒,在使用一项技术之前一定要搞清楚为什么要用它。
简单启动
了解完Kafka的基本概念之后我们可以简单的上手操作一下,通过Kafka自带的命令行工具和对一些参数的调整,直观感受一下Kafka的运作。
Kafka依赖zookeeper实现集群,所以我们先使用默认的配置文件启动zk:
ZK启动之后,我们继续使用默认配置启动一个Kafka实例(Broker):
现在我们有一个运行的zk实例和一个Kafka broker,它们已经可以实现MQ的基本功能。接下来我们需要一个Producer和一个Consumer来进行测试。Kafka的包中为我们提供了两个命令行工具:kafka-console-producer和kafka-console-consumer,使得我们可以通过命令行来发送和消费消息。
通过命令行启动Producer和Consumer,并发送一条测试消息:
这里为了演示使用的是官方提供的命令行工具,Kafka还提供了非常易用的Java API供开发者使用,能够很方便的建立Producer和Consumer。
扩展
Leader如何选举?
首先何谓Leader?在之前的内容中我们了解到了Kafka将整个Topic拆分为若干个Partition的作用,使每个Partition可以分布在不同的Broker上,以保证负载均衡。但是这样还不够,因为无法避免单点故障。
读者可能会问:我这已经将消息分布到不同的Partition中了,怎么还有单点故障呢?这里有一个非常重要的逻辑,那就是一条消息只能进入一个Partition中,我们区分Partition、使其分布在不同的Broker上,只是为了保证更大的负载量和更有效的利用服务器资源。但是当其中一个Broker突然挂掉了怎么办呢?在这个Broker上的那些Partition中还有未被消费的消息,如果没有补救措施,它们就这样丢失了吗?
Kafka当然考虑到了这点,所以有了副本和Leader的概念。
为了保证系统的高可用,每一个分区(Partition)都会有一定数量的副本(Replica),这样如果部分服务器不可用,副本所在的服务器就会接管上来,保证应用的持续性。同时为了保证较高的处理效率,消息的读写都是固定在同一个副本上的,这个副本就是所谓的Leader,其他作为备份存在的副本则被称为Follower。值得注意的一点是在创建Topic配置参数时,副本(Replica)的数量不可以超过Broker的数量。
先来看一下 Leader 和 Follower 在 Broker 上的分布情况。
Kafka 使用分配算法来保证每个 Broker 上的 Replica 数量尽量平均,从而使单个 Broker 的压力不会过大。关于详细的分配算法我们可以从 Kafka 官方的文档中查到,这里我们只需要简单的理解为:
Kafka在分配 Partition 时会尽量平均的将 Partition 分配到不同的 Broker 中,然后从当前 Broker 开始顺序地为此 Partition 创建 Replica。
为了保证较高的处理效率,消息的读写都是由Leader来完成,那怎样从众多Follower中选举出Leader就成了重要的话题。
这里不得不提到两个概念:AR(Assigned Replicas)和ISR(In-Sync Replicas)。当我们为一个Topic设置了多个Replica的时候,这些所有的副本统称为AR,AR中的Follower会不断的从Leader中同步数据,同步的过程必然有所延迟,这是由于不同机器的性能等因素导致的。
ISR是AR的子集,它表示的是与Leader同步的Follower的集合。当Follower从Leader同步,延迟超过了阈值,将会被Leader从ISR中踢出,Kafka 0.10.x中的参数replica.lag.time.max.ms
,表示了延迟的阈值。
例如上图中的 Partition 1只有一个Replica,R1,那么 R1就属于 Partition 1的AR,如果 R1能在延迟阈值之内保持与 Leader 的同步,那么 R1就属于 ISR。
ISR这个集合非常重要,正是它保证了Kafka的高可用性。当一条消息被Produce或者需要被Consume的时候,Leader需要等到ISR中的所有副本将本次操作的状态同步,再返回出结果。这样虽然在性能上有所下降,但是大大提高了Kafka的高可用性。
当Leader挂掉,Kafka会从ISR中选取一个Follower作为新的Leader,因为ISR中Follower的数据是与前任Leader同步的,所以其中任何一个Follower都可以直接充当Leader的角色。
Kafka的高可用性有很多值得探讨的地方,篇幅所限这里就不再深入讨论,推荐一篇不错的文章,有兴趣的朋友可以看看: