Pulsar工作原理
参考文档:
Pulsar:
https://jack-vanlightly.com/blog/2018/10/2/understanding-how-apache-pulsar-works
https://jack-vanlightly.com/blog/2018/10/21/how-to-not-lose-messages-on-an-apache-pulsar-cluster
https://www.splunk.com/en_us/blog/it/effectively-once-semantics-in-apache-pulsar.html
BookKeeper:
https://medium.com/splunk-maas/apache-bookkeeper-internals-part-1-high-level-6dce62269125
https://medium.com/splunk-maas/apache-bookkeeper-internals-part-2-writes-359ffc17c497
https://medium.com/splunk-maas/apache-bookkeeper-internals-part-3-reads-31637b118bf
https://medium.com/splunk-maas/apache-bookkeeper-internals-part-4-back-pressure-7847bd6d1257
中文(翻译的不太行,建议看英文):
https://blog.51cto.com/u_13491808/3124971
https://juejin.cn/post/7034135607869702157
https://juejin.cn/post/6948044887941971998
https://blog.csdn.net/zhaijia03/article/details/112691063
https://xie.infoq.cn/article/5dc2c3a2afa62d1eec3061fec
https://xie.infoq.cn/article/35b764ea84155a0f41542df6f
本文暂不关心Pulsar的消息消费(如批量消费),Topic分区,和消息存储时间(TTL和Retention)等的具体细节,而主要探讨和总结Pulsar的工作原理,部分地方会和Kafka进行对比。
Pulsar的总体架构
Pulsar的总体架构如下,
总共有3个部分,Broker、BookKeeper、Zookeeper。通常,我们讲Pulsar的话,主要都是指Broker,另外两个模块也是独立的Apache项目。
Broker本身没有存储,因此也不会丢失数据,Broker运行需要的数据都来自BookKeeper和Zookeeper。Broker对client提供服务,处理client的请求。
BookKeeper主要存储消息数据,因而也是需要存储最大的地方,单个BookKeeper也叫Bookie。BookKeeper本身是有WAL的消息存储引擎。
Zookeeper主要存储元数据,包括Broker和BookKeeper的元数据。
Pulsar提供的消息读写模式与Kafka比较类似,都是按Topic来关系消息,都有消息顺序性保证,都有Offset机制,在Pulsar中叫Cursor。
Client发起请求,包括读/写请求,会先发送到Broker,Broker来判定,当前请求消息的Cursor在哪个Bookie上,然后访问对应的Bookie读取/写入消息并返回。这个只是最简单的流程,Pulsar中有Cache机制,实际的流程会比这个更复杂一些。
出于消息可靠性的考虑,Pulsar会将消息存储多份,也就是说,相同的消息会存在多个Bookie上。
数据存储模型
关于消息数据的存储,首先要理解下面这张图,
上图中,从上到下,每个层次的名称是:Topic(主题),Ledger(账本),Fragment(片段),Entry(条目)。逐一解释一下,
- Entry是存储的最小单位,在Pulsar中,Entry可以是一条消息或一组消息;
- Fragment是多个Entry的组合,是BookKeeper上最小的分布单位,以Fragment为单位在多个Bookie上做数据冗余复制;
- Ledger包含1个或多个Fragment,是BookKeeper的管理单元,进行可用性管理,Ledger一旦关闭就是不可变的(immutable),以Ledger为单位进行删除,无法删除单个Entry;
- Topic包括多个Ledger,是最上层的逻辑概念,消费者可以对Topic进行订阅;
其中,Ledger、Fragment、Entry是BookKeeper中的概念,Topic是Pulsar中的概念。在Pulsar官方文档中还出现了Managed Ledger概念,没有特别看出和Ledger的差别,应该是和BookKeeper的Ledger等价的Pulsar概念。
在实际的物理结构上,数据存储的分布如下图,
可以看到,一个Topic对应多个Ledger,一个Ledger有1个或多个Fragment,不同的Fragment分布在不同的Bookie上,存储了多份。Ledger、Fragment如何切分,如何分布在Bookie的元数据,统一存储在Zookeeper。
存储结构与Kafka的最大差别是:一个Topic包含多个Ledger,Kafka是1个Ledger;以Fragment为单位进行分布式存储,Kafka是以Ledger为单位分布式存储。可以很明显的感觉到,Pulsar的存储分的很细,而且做到了物理存储结构与逻辑结构相隔离,最终达到,只要扩展Bookie集群就能提升整体可用性和性能。
什么时候切分Ledger和Fragment?
切分Ledger的时机有以下几个地方:
- 新建Topic;
- 当前Ledger达到大小上限,或时间上限;
- Topic的Broker所有权发生变化;
切分Fragment的时机有以下几个地方:
- 新建Ledger;
- 写入Bookie失败;
也就是说,如果Bookie没有发生停机的情况下,Ledger和Fragment会是一对一的。
存储的高可用
存储的配置是以Ledger为单位来管理的,最重要的配置有三个,
- Ensemble size (E),全体数量
- Write quorum size (Qw),写入数量
- Ack quorum size (Qa),响应数量
全体数量,E,表示Ledger可以写入的总体Bookie池的Bookie数量;写入数量,Qw,表示对于每个Entry,Ledger需要写入的份数;响应数量,Qa,表示当写入返回多少个Ack时,返回给客户端,即写入成功。通常情况下,E >= Qw >= Qa。
Qa和Qw
先来看Qa和Qw,举例,Qa=2,Qw=3。也就是说,对于每个写入的Entry,需要复制3份,也就是存储在3个Bookie上;但只要已经收到成功写入2份的Ack,就表示成功写入,返回给客户端。在这个配置下,如果宕机了1个Bookie,数据是完全可以恢复回来的,但是宕机2个Bookie的话,数据就可能出现丢失。如果想宕机2个Bookie数据仍然可以不丢失,那么至少需要配置Qa=3。
也就是说,Qa是保证数据不丢失的最小数据复制份数,这个取决于应用场景,需要恢复何种宕机程度的数据。这个概念和Kafka的in sync replica很相似。
Qw和E
再来看Qw和E,举例,Qw=3,E=3。这个情况下,对于每个写入的Entry,需要复制到当前Fragment每个Bookie上,如下图,
可以看到,Entry按写入的顺序紧密排列,如果是Qw=3,E=5的情况下,可用Bookie的数量比写入Bookie的数量要多,写入的Entry的排列会出现空洞,如下图,
这样的现象,Pulsar称为Striping。这种情况下,写入的tps会提高,但是读取的性能会下降,最终增大整体的延迟。在这种情况下,BookKeeper的顺序读取被打破,降低整体性能,因此不建议使用。
因此,通常情况下,取E = Qw >= Qa,例如,E=3,Qw=3,Qa=2。
同样不建议取Qa=1。这是一个危险的设置,如果唯一的Bookie宕机,那么就不知道Entry是否已写入。Bookie的恢复会因无法进行而停止。
Brookie也可以配置机架感知(rack-awareness),当配置了机架感知策略时,Broker会尝试选取不同机架的Bookie节点。当然也可以自定义其他选取策略。
Broker和Topic所有权
Pulsar的Broker不存储数据,因此也不会丢失。Jack Vanlightly的博客原文是这样,
Pulsar brokers have no persistent state that cannot be lost.
这里并没有无状态的意思,很多中文翻译博客把这里翻译成,Broker是无状态的,甚至把这一句放在非常开头的地方,但其实是不对的。Broker只是不存储有状态的数据而已,本身在内存中是有状态的。Broker和其他Broker并不对等。
每个Topic都归一个Broker所有,所有的读写都需要通过这个Broker进行。写入过程如下图,
可以看到,上图例子中,Qw=3,Broker收到写入请求的时候,先写入Bookie,Bookie完成写入请求后返回Ack,Broker收到Qa个Ack后返回Ack给客户端。如果Bookie返回失败或者无返回,那么Broker会发起创建新Fragment。
读取过程如下图,
因为Topic所有的请求都需要通过所有者Broker,那么,我们可以在这个Broker上引入Cache机制,提升读的QPS。
这样的缓存机制会对不同消费者有比较大的性能差异,如果是追尾消费者(tail reader),即一直追踪Entry最新变化的消费者,当有Entry写入时,会更新Cache,于是直接从缓存中取走Entry;但如果是追赶消费者(catch-up reader),即读取的是老的Entry,例如消费者宕机后重启,中间堆积了一段时间的消息的情况,此时,缓存中没有数据,必须去Bookie上读取,再返回给客户端。由于无法直接从缓存获取Entry,追赶消费者获取消息的性能是要比追尾消费者差很多的。
Broker故障恢复
Broker因为是有状态的,无法做到非常完美的灾备切换,只能在故障后尽快恢复Broker的工作场景。
Broker故障恢复中有一个非常重要的概念,最新确认序号(Last Added Confirmed ,LAC)。这个表示当前Ledger最后commit的序号,也就是收到Qa个Ack的Entry的序号。Pulsar约定,读取数据不可以读取LAC之后的数据,读取LAC之后的数据是没用一致性和正确性保障的,视为脏读。
理解LAC之后,就可以理解Broker故障恢复的栅栏阻挡机制(Fencing)了。步骤如下,
- 当前Broker B1,拥有Topic X,被Zookeeper确认为不可用;
- 另一个Broker B2 将Topic X 的当前Ledger状态从OPEN修改为IN_RECOVERY(应该是修改Zookeeper的状态);
- B2发起栅栏阻挡LAC请求(Fencing LAC Request)给当前Fragment/Ledger的所有Bookie,并等待(Qw-Qa)+1个响应。一旦收集齐,Ledger就被阻挡了。即使B1仍然存活(如B1网络断线重连场景),也无法写入消息,因为无法获得Qa个Ack,会返回Fencing异常;
- B2得到最大的LAC,然后从LAC+1开始对Bookie上的数据进行恢复读取,确保从LAC+1开始,每个Entry都被复制到Qw个Bookie。这主要是由于一些Bookie的Ack之前没有传输到B1。一旦B2处理完Bookie上所有Entry,无法读取到新的Entry了,数据恢复就完成了;
- B2将Ledger的状态设置为CLOSED;
- B2创建一个新Ledger,并开始接收Topic的消息写入和读取;
Fencing解决方案解决了脑裂问题,也没有数据丢失。故障恢复,Ledger的状态流程图,
这个方案和 Raft Leader 的故障恢复机制实际上是没有什么差别的,应该是有所借鉴。至于解决了脑裂,这个也不是真正解决,也是由于消息系统的特性导致的直接结果。为什么这么说呢?
Raft 中也有类似的概念,叫 committed index(Pulsar与之对应的是LAC),只有在收到多数节点写入 Entry 返回成功之后,才可以更新 committed index,再更新 Entry 到状态机中,并返回给客户端。对尚未更新 committed index 的 Entry,Raft 也是不可读的。可以发现,Pulsar 的 Fencing 和 Raft 的机制几乎一致,但是 Raft 有脑裂问题。
先回顾一下 Raft 的脑裂问题。当 Raft Leader 节点故障发生时,例如 Raft Leader 网络断开,其他节点已经发现当前 Leader 超时,并发起下一轮选举投票,快速选举出新的 Leader,但是老 Raft Leader 的 Follower 无响应超时时间尚未到达,导致老 Leader 仍然认为自己是真正的 Leader,并响应客户端的请求,因此导致客户端读取到了旧的数据。而与此同时,部分客户端连接到了新 Raft Leader,写入并读取到新的数据,造成不一致,这是 Raft 发生脑裂的原因。Raft 发生脑裂不会持续很长时间,当老 Leader 发现长时间没有收到 Follower 响应而超时(主要取决于超时参数的配置),或者发现有新 Leader 产生时,老 Leader 就会将自己重置为 Follower。
那使用了同样机制的 Pulsar 为什么就没有脑裂问题呢?那是因为,Pulsar 是个消息系统,写入的消息类似 WAL,是不可变的(immutable),追加的。当发生和 Raft 一样的故障的时候,老的 Pulsar Broker 也会读到老的数据,但老的数据仍然合法,因为对同样的 Cursor,在新的 Broker 上也是读到相同的数据,只要读取 Entry 不超过 LAC 就没问题,最多只是无法获取到最新的消息而已,获取的消息并不会错。而 Raft 的存储是偏向于通用存储场景,因此就会有新旧数据版本不一致的问题。
脑裂一般都是指读取数据发生的不一致,如果是写入数据的脑裂,那可能是分布式算法有问题,成熟的算法一般不会有这个问题。
Bookie存储
BookKeeper的存储引擎是可插拔的,默认是DbLedgerStorage,整体架构如下,
Bookie写入的流程如下,
Bookie是一个有WAL的消息存储,写入时,会先写入WAL(Journal),再写入Write Cache。Write Cache会定期的将数据排序并写入磁盘的Entry Log文件中。排序过程,将不同Ledger的消息聚合在一起,这样,在读取Ledger的时候,就是完全的磁盘顺序读。如果没有排序聚合的话,就无法获得顺序读的性能。
写入Write Cache的时候,也会把索引信息写入RocksDB,索引信息很简单,就是 (ledgerId, entryId) 到 (entryLogId, 文件偏移量) 的映射。
Bookie可以缓存最近写入的Entry和最近读取的Entry,读取的顺序是: Write Cache -> Read Cache -> Bookie上的Entry。当两个缓存都没有命中的时候,会到RocksDB中查找该Entry所在的文件和偏移量,并读取该Entry,然后缓存再Read Cache中,以期之后可以命中。
BookKeeper可以支持磁盘IO分离,将写入WAL的放在一个高速磁盘上,其他数据放在低速磁盘上。当有写入Entry请求时,只会发生写WAL的磁盘同步IO操作,其他都是写入内存缓存。同时,以异步的方式,将Write Cache中的数据以批量写的方式写入到Entry Log文件和RocksDB中。
Bookie的Journal可以有多个,但和Ledger并不是一一对应的。4.5.0之后的BookKeeper可以配置journalDirectories参数,如,journalDirectories=/tmp/bk-journal1,/tmp/bk-journal2,配置多个目录,由Bookie统一管理。
Bookie故障恢复
当Bookie故障的时候,所有在这个Bookie上有Fragment的Ledger都需要复制。恢复过程是重复制Fragment,来确保每个Ledger满足Qw个复制因子。
有两种恢复方法:自动和手动,主要讨论自动方案。自动方案包括内置的故障节点检测机制,手动就需要人为干预。具体的复制过程,两者是一致的。
恢复过程可以通过在Bookie集群上运行AutoRecoveryMain来完成。其中一个自动恢复进程被选举为Auditor,Auditor来检测故障的Bookie,然后,
- 从Zookeeper读取所有Ledger列表,找到故障Bookie上的Ledger;
- 对上述每个Ledger创建一个重复制任务,并记录在Zookeeper的/underreplicated Znode上;
如果Auditor失败,就再选举一个Auditor。Auditor只是AutoRecoveryMain的一个线程。AutoRecoveryMain也有运行Replication Task Worker的线程,每个Worker监听/underreplicated Znode获取任务。发现任务后,就尝试lock住这个任务,如果lock失败,说明其他Worker已经拿到这个任务,就去寻找下一个任务。
如果获取到了锁,那么需要,
- 扫描Ledger的Fragments,找到那些当前Bookie不属于的Fragment;
- 对那些匹配的Fragment,从另一个Bookie上把数据复制到本地,然后更新Zookeeper,并将此Fragment标记为完全复制;
如果Ledger的所有Fragment都已经完全复制,则删除/underreplicated任务;如果仍然存在未完全复制的Fragment,则释放锁,等待其他Worker处理。
如果一个Fragment没有结束Entry id,Worker的复制任务会等待并再次检查。如果还是没有,说明之前的数据副本可能没有完全写入,会发起Fencing任务,然后再继续重复制。
注意:自动恢复机制和Fencing机制是有差别的。
Fencing机制主要是处理Broker故障的场景;自动恢复机制是处理Bookie故障的场景。
虽然自动恢复机制在某些边界情况下回调用到Fencing机制。
总结
总结部分直接抄了中文翻译,
- 每个Topic都有一个归属的Broker。
- 每个Topic在逻辑上分解为Ledgers、Fragments和Entries。
- Fragments分布在Bookie集群中。Topic与Bookie并不耦合。
- Fragments可以跨多个Bookies带状(Striping)分布。
- 当Pulsar Broker不可用时,该Broker持有的Topic所有权将转移至其他的Broker。Fencing机制避免了同一个Topic当前的Ledger同时有两个所有者(Broker)。
- 当Bookie不可用时,自动恢复(如果启用)将自动进行数据重新复制到其他的Bookies。如果禁用,则可以手动启动此过程。
- Broker缓存尾部消息日志,可以非常高效的为尾部读取操作提供服务。
- Bookies使用Journal提供持久化保证。该日志可用于故障恢复时恢复尚未写入Entry Log文件的数据。
- 所有Topic的的条目都保存在Entry Log文件中。查找索引保存在RocksDB中。
- Bookies读取逻辑如下:Write Cache -> Read Cache -> Log Entry Files(RocksDB 作为索引)
- Bookies可以通过单独的磁盘做IO读写分离。
- Zookeeper存储Pulsar和BookKeeper的所有元数据。如果Zookeeper不可用整个Pulsar将不可用。
- 存储可以单独扩展。如果存储是瓶颈,那么只需要添加更多的Bookies,他们会自动承担负载,不需要Rebalance。
后记
Jack Vanlightly在博客中表示,Pulsar的两个突出特点是,
- 将Broker与存储分离,结合BookKeeper的Fencing功能,优雅的解决了脑裂问题,并防止了数据丢失;
- 将Topic分割为Ledger和Fragment,然后将将他们分布在整个Pulsar集群上,因此扩展变的容易。新的数据自然会写到新的Bookie上,不需要再进行再平衡(Rebalancing);
后一点应该是独创,前一点应该是借鉴了Raft,之前提到过。