Apache Pulsar 源码走读(五)服务端消息流转逻辑概述

版权归本人所有,如果转载请联系本人

这篇大致说一下消息从写入到读取在Apache Pulsar服务端是怎么串起来的。
( 这篇不会详细说明每个逻辑怎么走的,不过会给读者一个整体的俯瞰印象。)

首先说一下表示业务逻辑的几个对象。(都在org.apache.pulsar.broker.service 这个包里面)

Pulsar服务端的主要逻辑对象

Topic: 这个对象在服务端就表示一个topic。这个类是最上层的,所有逻辑都被组织到这个对象里面。

  • 负责管理org.apache.pulsar.broker.service.Producer (这里是服务端跟踪状态的对象)
  • 负责管理Subscription
  • 负责数据写入

Subscription 这个对象表示一份数据的消费进度。

  • 负责管理Consumer(这里是服务端跟踪状态的对象)
  • 负责维护一份数据的消费进度(单独ack,累积ack,哪些已经消费哪些还没有消费)
  • 读取消息,负责消息的分发(哪个消费分给哪个consumer)
  • 消息重发,延迟投递。

上面这2个对象,在存储层对应的就是Topic -> ManagedLedgerSubscription -> ManagedCursor

消息写入

通过找到服务端记录的Producer对象,经过一些逻辑处理(消息去重,加密,状态检查等)
确定Topic 最后写入到ManagedLedger里面。

消息读取

则是客户端通过CommandFlow 告知自己当前可以处理消息的状态,
触发消息的分发流程。通过从Consumer 拿到 Subscription 触发消息的读取和Dispatch的过程。

写入逻辑相对读取逻辑来说比较直观。写入到ManagedLedger 即可。主要说下Subscription

Pulsar的消息派发流程(Dispatcher)

我们先单独看一下一个分区的topic

  • 有多个producer
  • 有一个subscription按照Shared方式消费。
  • 这个subscription里面有多个consumer(比如说3个consumer)
  • 每个consumer可能会单独ack某一个消息。

假设这个topic的数据是一个纸带,如果确认消费好了一个消息就涂黑一段。
写入的话就是不断给纸带增加长度。
我们把那些已经分发出去的消息但是还没有ack的消息认为是灰色的。

服务端会记录每个consumer当前的一个队列容量。
如果consumer的队列可以接受更多消息的时候会主动发送CommandFlow
请求给服务端,来标识自己能接受更多的消息。

C1 -> 10
C2 -> 4
C3 -> 8

有新消息写入的时候,如果为了减少延迟,最好马上能通知每个Subscription 有新的消息到来了。
这样的话,比如说新写入了20条消息,则会触发一个消息分发的动作来把这20条消息读取出来(很可能走的cache)之后按照客户端和消息状态分给这3个consumer。

Dispatcher

这个类是Subscription 对象里面的一个成员变量。

当消息从ManagedLedger 里面成功读取之后,这个类需要按照consumer的状态和消息的一些属性
把消息推送给consumer(客户端)。

根据订阅方式的不同分发的逻辑也有区别

  • Exclusive: 这个模式比较容易,所有消息都会分给唯一的一个Active Consumer
  • Failover: 这个模式在Exclusive的基础上,增加了一个切换consumer的逻辑。不过还是分发消息给一个consumer
  • Shared: 这个模式是按照轮训的方式将这个topic的消息分发给多个consumer,同时consumer发生变化的时候也需要做一些逻辑来调整分发逻辑。
  • Key_Shared:这个模式也是分发给多个consumer,不过可以一定程度上保证一个key上面的消息是有顺序的消费的(不一定严格保证)

这样一个Dispatcher 需要有以下功能

  • 跟踪consumer状态:consumer队列容量,consumer个数变化。(因为会影响消息派发的逻辑)
  • 消息分发:根据当前状态确定读取成功的消息要分配到哪些consumer上面,并把消息推送给consumer
  • 消息重新投递的逻辑(replay):有的消息被consumer标记需要重新投递了,这样的话需要重新读取并分发。

消费进度跟踪

1. 消息确认方式

消息确认可以是累积确认或者是单独确认的。

  • 累积确认比较容易。
    如果一个位置被确认那么之前的消息都认为是成功消费了的(kafka是这样的方式)
    不过这样灵活性比较差,如果一个消费不成功的话,可能就卡在这个位置了,影响其他消息的处理。

  • 单独确认的话,某条消息可以直接ack,而不会影响其他消息ack的状态被覆盖。

2. 单条消息ack状态的维护(需要持久化)

单独确认的话,相当于纸带上有一些位置之前全是黑色的,这种是某个点位之前全都消费完的。
我们称这个点位是deletePoint。

有一些位置是黑色和灰色相间的,这种就是某些消息已经被标记ack了,
有些已经被投递到客户端但是还没有ack。
随着单独确认逐渐累积,这样黑色的部分会慢慢连接起来。
这样的话这个deletePoint就可以推进了。

我们再看一下batch的情况,上面说的消息都是针对单条消息的(一个Entry)
如果这个Entry里面的消息是做了batch发送的,里面会包含多条信息。

客户端可能会单独ack这个batch里面的某个消息。
这样的话我们需要记录这个Entry的batch里面哪些消息是已经被ack的。
对于这种消息,Dispatcher在投递消息的时候会带着一个位图来标记这个Entry里面哪些消息是已经处理的。
这样consumer会按照这个bitmap来进行过滤。

3. 消息重发状态跟踪(非持久化)

灰色的位置我们称为当前的readPoint,标记的是当前读取结尾的位置。如果有新的数据写入的话,dispatcher会从这个位置尝试读取新写入的数据,并推进这个readPoint。

如果有的消息已经被发送给consumer了,但是这个消息consumer又通知服务端说需要重新投递。
则这个时候就需要标记readPoint之前的某个消息需要重新读取,这个重新读取的话不会更改readPoint。
同时因为这个消息没有被ack。所以这个redeliver的消息不需要被持久化。
(重新加载之后就可以根据ack的状态推断出来)

4. 消费进度持久化状态

那么如果这个topic被unload到其他的broker。对于一个Subscription需要加载哪些状态呢?

主要状态就是哪些消息已经ack了。同时确认下一次读取的位置是什么。

需要知道记录单独ack消息的状态和batch ack信息的状态。

ManagedCursor 从 ledger(状态被写到bookkeeper里面)或者zk(写入一直失败的话会fallback到zk上)
读取到这个状态的话,按照相应逻辑恢复即可。


// MLDataFormats.proto

message ManagedCursorInfo {
    // If the ledger id is -1, then the mark-delete position is
    // the one from the (ledgerId, entryId) snapshot below
    required int64 cursorsLedgerId = 1;

    // Last snapshot of the mark-delete position
    optional int64 markDeleteLedgerId = 2;
    optional int64 markDeleteEntryId  = 3;


    // 记录了单独某个消息被ack的状态
    repeated MessageRange individualDeletedMessages = 4;

    // Additional custom properties associated with
    // the current cursor position
    repeated LongProperty properties = 5;

    // 时间戳
    optional int64 lastActive = 6;

    // Store which index in the batch message has been deleted
   
    // batch 的 ack状态记录
    repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,076评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,658评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,732评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,493评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,591评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,598评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,601评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,348评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,797评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,114评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,278评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,953评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,585评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,202评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,180评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,139评论 2 352

推荐阅读更多精彩内容