一、背景介绍
Kafka是一种高吞吐量的,分布式,快速、可扩展的,分区和可复制,基于发布/订阅模式的消息系统,由Linkedin开发,之后成为Apache项目的一部分。使用Scala语言编写,目前已被广泛应用于各行业各类型的数据管道和消息系统中。
kafka有如下特性:
- 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
- 高吞吐量。即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
- 支持通过kafka服务器和消费机集群来分区消息。
- 支持Hadoop并行数据加载。
kafka的设计目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。
二、基本组件
1. Broker [中间者,代理者]
Kafka集群包含多台服务器,一台Kafka服务器就是一个Broker,一个集群由多个broker组成,一个broker可以有多个topic。broker承担着中间缓存和分发的作用,broker将producer发送的数据分发到注册consumer中
2. Topic [主题,类别,话题]
topic,我们可以理解为是一种队列,每条发送消息都从属于一种类别,这种类别在kafka中被设计为一个topic,比如:用户信息类的消息的topic,我们定义为user-topic,那么凡是用户信息类的消息都将发送到这个topic中,从而我们所要处理用户信息类的消费者就可以从这topic中拉取。
3. Producer[生产者]
producer是生产者,意在向Topic中发送消息的一方
4. Consumer [消费者]
consumer是消费者,意在向Topic中拉取/消费消息的一方
Kafka拓扑结构
如上图所示,一个典型的Kafka集群中包含若干Producer,若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
5. Replications[备份,复制]
分区的备份,以便容错,分布在其他broker上,每个broker上只能有0个或者1个replications
6. Consumer Group[消费者群组]
消费者群组,是有若干个消费者组成的集体,每个consumer属于一个特定的consumer group,kafka采用将Consumer分组的方式实现一个主题(topic)的消息和广播(发给所有的consumer)和单播(发给单个的consumer)
7. Partition [分区]
Kafka内在就是分布式的,一个broker中可以有多个topic,一个topic可以设置多个partition(分区),每个Partition在物理上都对应一个文件夹,该文件夹存储这个Partition的所有消息和索引文件。Partition中的消息都会被分配为一个有序的ID(offset),一个partition对应多个Segment,每个Segment对应一个文件,Segment由一个个的不可变记录组成,该记录只会append到Segment中,不会单独的修改或者删除,可以设置Segment根据时间或者大小来定时删除文件。默认是根据大小128MB,当segment大小达到128MB时,则会删除一些Segment文件(这里有一点,删除的时候,会选择一个或者多个Segment来删除,也就是说删除三个Segment大小可能大于128MB,但是不会是小于128MB)
8.offset[偏移]
在每个partition分区下的消息都是顺序保存的,kakfa使用一个唯一的标识来记录它们在该分区下的位置,这个位置标识被称为offset(位移),位移是顺序递增的,一旦确定下来之后就不能修改。Kafka会维护分区下的消息顺序,但是不会维护跨partition(分区)间的顺序(假如,我们往topic1分别发送三条消息1,2,3 那么,1和3发送到了partition1中,2发送到了partition2中,那么kafka consumer在消费时,会按照1 然后 3的顺序消息,但是不保证 2 会在消费1之后在消费 )
那么我们知道了topic,partition和offset信息,我们就能唯一定位一条消息。所以说每条Kafka的消息本质上都是一个三元组(tuple):<topic, partition, offset>。我们可以称该三元组为消息的元数据(message metadata)。它们之间的关系如下图所示:
三、设计原理
Kafka是一个消息引擎(messaging system),Kafka本身提供了用于消息传递的一套完整功能及接口,而不仅仅是提供消息的表示。大型系统之间数据交互基本上都需要这样的消息引擎。作为一个基础设施,Kafka即扮演了这样的角色。
1. 持久化(Persistence)
Kafka是高度依赖文件系统和缓存的,Kafka对磁盘时append操作,磁盘检索的开支是较少的,同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的数量(offset)达到一定阀值时(可配置 offset可以设置为自动提交或者手动提交),再flush到磁盘中,这样减少了磁盘的开销。
2. 生产者(producer)
producer用于往broker中发送/生产消息,每一个broker中可以有多个topic,每个topic下面又会有多个partition,在负载均衡的情况,如果均衡的将消息发送到指定的partition中.(对于具体的producer在以后会有详细的介绍)
异步处理:将多条消息存储在buffer中,之后,批量的提交到broker中,从而提高了网络IO,但是也有一点,采用异步发送机制如果producer异常或者实效,那么消息将会丢失。
3. 消费者(Consumer)
Consumer通过链接broker,采用pull方式来拉取broker中的数据,consumer根据自己的消费能力,去消费信息。那么这里为什么采用pull而不是push呢?因为pull模式中1:broker是不需要感知有多少个consumer。2:如果采用push模式一旦消息量级超过consumer的承受范围,会压垮consumer。
前面我们说过,topic属于一个类别,topic由多个有序的partition序列组成,每个partition在同一时刻只会被一个consumer进行消费,同时会更新offset的index,记录消费到的位置,在consumer每消费完一条信息之后,kafka会自动的提交offset,当然我们也可以设置为手动提交,当我们在消费100000条消息之后,offset存储在一个list中,达到一定的消息量之后,提交这个offset list。
4. 常量时间存储能力
一个实现了数据持久化的队列,提供简单的数据读和数据追加写到文件末尾,例如在一个日志管理系统中。这种数据结构的最大优势是所有操作的算法复杂度都是O(1),磁盘读、写也不会互相阻塞。这使得Kafka具有了一个显著的性能优势,因为在这里性能与数据量实现了完全得解耦。一台服务器现在可以轻松利用到一组廉价的低转速、大容量(1+TB)磁盘能够提供的各种优势,虽然这些磁盘只有可怜的寻道速度但仍然能够基于大块数据的读、写提供可以接受的性能。这些磁盘普遍只有SAS磁盘价格的1/3和3倍以上的容量。
拥有访问几乎无限的磁盘空间的能力,却不会有任何性能惩罚,这意味着我们可以基于Kafka实现一些在传统消息中间件中很少看到的特性。例如,在传统消息中间件系统中往往会在消息一旦被获取后立即尝试删除该消息数据,而Kafka能够为消息数据保留一个相对来说很长的时间(如一周)。仅这一个特性,就为消息消费端提供了大量的灵活性。
5. 消费分发机制
- At most once 消息可能会丢,但绝不会重复传输
- At least one 消息绝不会丢,但可能会重复传输
- Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的
当Producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果Producer发送数据给broker后,遇到网络问题而造成通信中断,那Producer就无法判断该条消息是否已经commit。虽然Kafka无法确定网络故障期间发生了什么,但是Producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactly once。
Consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中保存该Consumer在该Partition中读取的消息的offset。该Consumer下一次再读该Partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然可以将Consumer设置为autocommit,即Consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。但实际使用中应用程序并非在Consumer读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。
- 读完消息先commit再处理消息。这种模式下,如果Consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once
- 读完消息先处理再commit。这种模式下,如果在处理完消息之后commit之前Consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once。在很多使用场景下,消息都有一个主键,所以消息的处理往往具有幂等性,即多次处理这一条消息跟只处理一次是等效的,那就可以认为是Exactly once。(笔者认为这种说法比较牵强,毕竟它不是Kafka本身提供的机制,主键本身也并不能完全保证操作的幂等性。而且实际上我们说delivery guarantee 语义是讨论被处理多少次,而非处理结果怎样,因为处理方式多种多样,我们不应该把处理过程的特性——如是否幂等性,当成Kafka本身的Feature)
- 如果一定要做到Exactly once,就需要协调offset和实际操作的输出。精典的做法是引入两阶段提交。如果能让offset和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,Consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)
6 .复制 Replication
kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有)。备份的个数可以通过broker配置文件来设定,Kafka自动维护leader和follower的失效转移。leader处理所有的read-write请求,follower需要和leader保持同步。Follower和consumer一样,消费消息并保存在本地日志中。leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除。当所有的follower都将一条消息保存成功,此消息才被认为是"committed",此时consumer才能消费它。即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可(不同于其他分布式存储,比如Hbase需要"多数派"存活才行)。当然了,在Producer端是可以通过参数”acks”来控制自己是否要等待消息返”committed”的响应。
当leader失效时,需在followers中选取出新的leader,当然了只有处于”in-sync”状态的followers才有参选资格。可能此时follower落后于leader,因此需要选择一个"up-to-date"的follower。选择follower时需要兼顾一个问题就是新leader server上所已经承载的partition leader的个数。如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力。在选举新leader时需要考虑到"负载均衡"。Follower需要能够维护和ZooKeeper之间一个有效的会话,否则也会被判定为”unalive”。一个配置参数”replica.lag.time.max.ms”控制着一个follower数据同步滞后所能允许的最大延时。
四、应用场景
1. 消息系统
kafka本身作为一个消息发布订阅系统,具有很好的容错性以及可扩展性并且能够支撑高并发的数据量,kafka使用了多种效率的优化机制,采用来push/pull架构模式,更适合于异构集群。和大多数的消息系统相比,kafka具有更好的吞吐量,内置分区,复制和容错,这使得它成为一个大规模的消息处理应用程序。
消息的使用往往是相对较低的吞吐量,但可能需要低终端到终端的延迟,往往依赖于强大的耐用性。在这一领域消息传递系统如ActiveMQ和RabbitMQ。
2. 监控
Kafka通常被用于可操作的监控数据。这包括从分布式应用程序来的聚合统计用来生产集中的运营数据提要。
3. 日志聚合
kafka的特性决定它非常适合作为"日志收集中心";application可以将操作日志"批量""异步"的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.此时consumer端可以使Hadoop等其他系统化的存储和分析系统