一、为什么需要消息系统
1.解耦:
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
2.冗余:
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
3.扩展性:
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
4.灵活性 & 峰值处理能力:
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
5.可恢复性:
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
6.顺序保证:
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)
7.缓冲:
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
8.异步通信:
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
二、kafka 架构
2.1 拓扑结构
如下图:
2.2 相关概念
如图.1中,kafka 相关名词解释如下:
1.producer:
消息生产者,发布消息到 kafka 集群的终端或服务。
2.broker:
kafka 集群中包含的服务器。
3.topic:
每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。
4.partition:
partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。
5.consumer:
从 kafka 集群中消费消息的终端或服务。
6.Consumer group:
high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
7.replica:
partition 的副本,保障 partition 的高可用。
8.leader:
replica 中的一个角色, producer 和 consumer 只跟 leader 交互。
9.follower:
replica 中的一个角色,从 leader 中复制数据。
10.controller:
kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。
12.zookeeper:
kafka 通过 zookeeper 来存储集群的 meta 信息。
2.3 zookeeper 节点
kafka 在 zookeeper 中的存储结构如下图所示:
2.4 kafka controller
Kakfa Broker Leader的选举:Kakfa Broker集群受Zookeeper管理。所有的Kafka Broker节点一起去Zookeeper上注册一个临时节点,因为只有一个Kafka
Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。(这个过程叫Controller在ZooKeeper注册Watch)。这个Controller会监听其他的Kafka Broker的所有信息,如果这个kafka broker controller宕机了,在zookeeper上面的那个临时节点就会消失,此时所有的kafka
broker又会一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。
三、producer 发布消息
3.1 写入方式
producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。
3.2 消息路由
producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:
1\. 指定了 patition,则直接使用;
2\. 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
3\. patition 和 key 都未指定,使用轮询选出一个 patition。
3.3 写入流程
producer 写入消息序列图如下所示:
1\. producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader
2\. producer 将消息发送给该 leader
3\. leader 将消息写入本地 log
4\. followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
5\. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
3.4 producer delivery guarantee
一般情况下存在三种情况:
1\. At most once 消息可能会丢,但绝不会重复传输
2\. At least one 消息绝不会丢,但可能会重复传输
3\. Exactly once 每条消息肯定会被传输一次且仅传输一次
当 producer 向 broker 发送消息时,一旦这条消息被 commit,由于 replication 的存在,它就不会丢。但是如果 producer 发送数据给 broker 后,遇到网络问题而造成通信中断,那 Producer 就无法判断该条消息是否已经 commit。虽然 Kafka 无法确定网络故障期间发生了什么,但是 producer 可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了 Exactly once,但目前还并未实现。所以目前默认情况下一条消息从 producer 到 broker 是确保了 At least once,可通过设置 producer 异步发送实现At most once。
四、broker 保存消息
4.1 存储方式
物理上把 topic 分成一个或多个 patition(对应 server.properties 中的 num.partitions=3 配置),每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文件),如下:
4.2 存储策略
无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:
- 基于时间:log.retention.hours=168
- 基于大小:log.retention.bytes=1073741824
需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。
4.3 topic 创建与删除
4.3.1 创建 topic
创建 topic 的序列图如下所示:
controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
2.1 从分配给该 partition 的所有 replica(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR
2.2 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/statecontroller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。
4.3.2 删除 topic
删除 topic 的序列图如下所示:
流程说明:
controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
若 delete.topic.enable=false,结束;否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest。
五、kafka 高可用机制
5.1 replication
如图.1所示,同一个 partition 可能会有多个 replica(对应 server.properties 配置中的 default.replication.factor=N)。没有 replica 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时 producer 也不能再将数据存于其上的 patition。引入replication 之后,同一个 partition 可能会有多个 replica,而这时需要在这些 replica 之间选出一个 leader,producer 和 consumer 只与这个 leader 交互,其它 replica 作为 follower 从 leader 中复制数据。
Kafka 分配 Replica 的算法如下:
- 将所有 broker(假设共 n 个 broker)和待分配的 partition 排序
- 将第 i 个 partition 分配到第(i mod n)个 broker 上
- 将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上
5.2 leader failover
当 partition 对应的 leader 宕机时,需要从 follower 中选举出新 leader。在选举新leader时,一个基本的原则是,新的 leader 必须拥有旧 leader commit 过的所有消息。
kafka 在 zookeeper 中(/brokers/.../state)动态维护了一个 ISR(in-sync replicas),由3.3节的写入流程可知 ISR 里面的所有 replica 都跟上了 leader,只有 ISR 里面的成员才能选为 leader。对于 f+1 个 replica,一个 partition 可以在容忍 f 个 replica 失效的情况下保证消息不丢失。
当所有 replica 都不工作时,有两种可行的方案:
- 等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。
- 选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短。
kafka 0.8.* 使用第二种方式。
kafka 通过 Controller 来选举 leader,流程请参考5.3节。
5.3 broker failover
kafka broker failover 序列图如下所示:
- controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
- controller 从 /brokers/ids 节点读取可用broker
- controller决定set_p,该集合包含宕机 broker 上的所有 partition
- 对 set_p 中的每一个 partition
4.1 从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR
4.2 决定新 leader(如4.3节所描述)
4.3 将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点 - 通过 RPC 向相关 broker 发送 leaderAndISRRequest 命令
6. consumer 消费消息
6.1 consumer API
kafka 提供了两套 consumer API:
- The high-level Consumer API
- The SimpleConsumer API
其中 high-level consumer API 提供了一个从 kafka 消费数据的高层抽象,而 SimpleConsumer API 则需要开发人员更多地关注细节。
以下针对 high-level Consumer API 进行说明。
6.2 consumer group
如 2.2 节所说, kafka 的分配单位是 patition。每个 consumer 都属于一个 group,一个 partition 只能被同一个 group 内的一个 consumer 所消费(也就保障了一个消息只能被 group 内的一个 consuemr 所消费),但是多个 group 可以同时消费这个 partition。
kafka 的设计目标之一就是同时实现离线处理和实时处理,根据这一特性,可以使用 spark/Storm 这些实时处理系统对消息在线处理,同时使用 Hadoop 批处理系统进行离线处理,还可以将数据备份到另一个数据中心,只需要保证这三者属于不同的 consumer group。如下图所示:
6.3 消费方式
consumer 采用 pull 模式从 broker 中读取数据。
push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
kafka文件的存储机制
(1)、kafka文件存储基本结构
- 在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。
如,前篇里面的orderMq这个topic对应的partitions在三台机器上名称分别为
drwxr-xr-x. 2 root root 4096 11月 21 22:25 orderMq-0
drwxr-xr-x. 2 root root 4096 11月 21 22:25 orderMq-2
drwxr-xr-x. 2 root root 4096 11月 14 18:45 orderMq-1
drwxr-xr-x. 2 root root 4096 11月 14 18:45 orderMq-2
drwxr-xr-x. 2 root root 4096 11月 21 22:25 orderMq-0
drwxr-xr-x. 2 root root 4096 11月 21 22:25 orderMq-1
注:重复的是副本,partition是为orderMq-0,orderMq-1,orderMq-2
- 每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。默认保留7天的数据。
如orderMq-0目录下(index和log为后缀名的文件合称就是segment 文件)
[root@mini3 orderMq-0]# ll
总用量 4
-rw-r--r--. 1 root root 10485760 11月 21 22:31 00000000000000000000.index
-rw-r--r--. 1 root root 219 11月 22 05:22 00000000000000000000.log
-每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。(什么时候创建,什么时候删除)
(2)kafka的segment 文件
- Segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件。
Segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。
3,497:当前log文件中的第几条信息,存放在磁盘上的那个地方
上述图中索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。
其中以索引文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移地址为497。
(3)、kafka查找message
读取offset=368776的message,需要通过下面2个步骤查找。
第一步:查找segment file
00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0
00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1
00000000000000737337.index的起始偏移量为737338=737337 + 1
其他后续文件依次类推。
以起始偏移量命名并排序这些文件,只要根据offset **二分查找**文件列表,就可以快速定位到具体文件。当offset=368776时定位到00000000000000368769.index和对应log文件。
第二步:通过segment file查找message
当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址
然后再通过00000000000000368769.log顺序查找直到offset=368776为止。