Zab系列7 集群工作原理Leader篇

Zab系列博客

Raft Vs Zab
https://www.jianshu.com/p/24307e7ca9da
Zab系列1 核心概念
https://www.jianshu.com/p/76e5dba31ea4
Zab系列2 角色和存储
https://www.jianshu.com/p/d80f9250ffd1
Zab系列3 选举
https://www.jianshu.com/p/0d2390c242f6
Zab系列4 zookeeper特性
https://www.jianshu.com/p/08b62ca1fe4e
Zab系列5 选举恢复(源码分析)
https://www.jianshu.com/p/b6acd99921b7
Zab系列6 zk单机版工作原理
https://www.jianshu.com/p/ed45982b18b4
Zab系列7 集群工作原理Leader篇
https://www.jianshu.com/p/59240c36ba1b
Zab系列8 集群工作原理Follower篇
https://www.jianshu.com/p/8d7c7f1b2838
Zab系列9 消息顺序性
https://www.jianshu.com/p/0aa96b6a2070

Zk集群的事务操作过程

事务请求转发到leader-->封装Proposal后发起proposal()-->所有节点完成Proposal持久化后ACK响应-->leader监听到过半ACK后广播Commit-->把Proposal丢到toBeApplied-->apply内存树,响应客户端-->toBeApplied中删除

  1. Node收到事务请求的话,会转发到Leader处理
  2. Leader会生成一个Proposal(PrepRequestProcessor中完成),并且在ProposalRequestProcessor发起leader的proposal
  3. Follower有个消息监听程序,监听到Proposal的消息时,会先存在自己内存中,再持久化到本地的日志中,最终通过消息反馈Leader,自己Proposal保存成功(Follower.processPacket监听Leader的消息,SyncProcessor持久化Proposal,SendAckRequestProcessor反馈leader自己持久化成功)
  4. Leader自己也会通过自己的SyncProcessor持久化Proposal,并且调用投票收集器 Leader.processAck
  5. 当Leader.processAck统计到某个proposal的ack过半了的时候,会广播commit(在tryToCommit方法里),并且把这个request丢到各自的CommitProcessor里面处理
  6. CommitProcessor是一个线程,没有请求的时候会阻塞,有写任务的时候,会完成commit操作,把这个消息丢到leader.toBeApplied当中,leader会转发到ToBeAppliedRequestProcessor
  7. ToBeAppliedRequestProcessor会先把消息丢给 FinalRequestProcessor,完成内存树应用,客户端反馈之后,再把这个Proposal从toBeApplied中删除掉。

一致性分析

  1. 只有超过半数节点Ack了的事务操作,才会被commit,才会最终响应到客户端。
  • 所以响应了客户端的操作,不管leader是否挂了,新leader中肯定存了这个日志,否则选举中不会获胜。
  • 不存在反馈client操作成功,而实际事务操作日志又丢失的情况,即使leader挂了。
  1. 未完成半数Ack的事务操作,leader挂了,新leader可能保存这个日志,也可能没有保存这个日志。
  • 如果新leader没有这个事务操作的日志,依赖客户端的超时重试机制,来完成这个proposal。
  • 如果新leader有这个uncommitted的事务操作日志,则会替代老leader继续完成这个操作

RequestProcessor概述

ZK的设置非常巧妙,通过责任链的方式,降低了流程处理的耦合度。而根据集群与否、角色、和请求类型有不同的处理逻辑。

  1. 节点在启动的时候,会判断是集群模式还是单机模式
  2. 如果是集群模式,节点在初始化的时候,根据Node角色的不同,拼装不同的链条(RequestProcessor)
  3. 事务请求和非事务请求公用相同的责任链,RequestProcessor会对这个请求类型做判断,做不同的处理

LeaderNode的责任链拼装方式

调用LeaderZooKeeperServer.setupRequestProcessors,从后往前拼装,把自己作为参数,赋值到上一个RequestProcessor
倒着讲:
firstProcessor:LeaderRequestProcessor -->PrepRequestProcessor -->CommitProcessor -->ToBeAppliedRequestProcessor-->FinalRequestProcessor

    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
       
        commitProcessor = new CommitProcessor(toBeAppliedProcessor,Long.toString(getServerId()), false,getZooKeeperServerListener());
        commitProcessor.start();
        
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,commitProcessor);
        proposalProcessor.initialize();
        
        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
        prepRequestProcessor.start();
        //第一个责任链LeaderRequestProcessor
        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
        setupContainerManager();
    }

其中ProposalRequestProcessor的构造函数里面自带了一个SyncRequestProcessor和AckRequestProcessor,会在拼装的时候,调用初始化

    public ProposalRequestProcessor(LeaderZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        this.zks = zks;
        this.nextProcessor = nextProcessor;
        AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
        syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
    }

 proposalProcessor.initialize();

    public void initialize() {
        syncProcessor.start();
    }

Leader事务请求过程的源码分析

请求的主要流程:(另外还会涉及到Leader和Follower的消息交互操作)
LeaderRequestProcessor -->PrepRequestProcessor -->ProposalRequestProcessor-->SyncRequestProcessor-->AckRequestProcessor-->CommitProcessor-->ToBeAppliedRequestProcessor-->FinalRequestProcessor

核心的代码主要在ProposalRequestProcessor(发起投票),SyncRequestProcessor(持久化日志)、AckRequestProcessor(反馈ACK,并且触发统计,最终commit消息)、CommitProcessor(单线程执行,确保了所有消息的有序性执行)、FinalRequestProcessor(事务操作结果应用到内存树中,反馈客户端操作成功)

LeaderRequestProcessor

leader.firstProcessor,leader的所有操作都从这里走,对所有操作进行转发之外,还会对非事务操作进行校验session的操作

PrepRequestProcessor请求预处理器。

作用:

  • 对事务请求:组装事务处理消息头,从leader中获取自增的zxid写到消息头中
  • 转发到下一个处理器
  1. PrepRequestProcessor有一个线程循环的从submittedRequest里面来消费请求(可能是事务请求,可能是非事务请求)
  2. 消费线程通过request.type,来选择不同的处理方式,如果是事务请求,则会自增zks的Zxid,且会封装消息头,header里面有这个操作的zxid、sessionId、request.cxid
  3. 而非事务请求则不需要自增Zxid,消息头为null,只需要校验Session即可

ProposalRequestProcessor

  1. 转发请求到下一个处理器
  2. 如果事务请求,那么发起leader.propose(request),会被follower监听到,最后触发follower的SyncRequestProcessor和SendAck
  3. 并且自己调用proposal的持久化
    public void processRequest(Request request) throws RequestProcessorException {
            nextProcessor.processRequest(request);
            if (request.getHdr() != null) {
                zks.getLeader().propose(request);
                syncProcessor.processRequest(request);
            }
}

Leader.proposal

向所有的follower发起proposal消息

public Proposal propose(Request request) throws XidRolloverException {
        Proposal p = new Proposal();
        p.request = request;
        lastProposed = p.packet.getZxid();
        outstandingProposals.put(lastProposed, p);
        sendPacket(pp);
    }

SyncRequestProcessor

既可以处理事务请求,也可以处理非事务请求(单机版zk中会调用它)

  • 事务请求则添加到 内存里面,然后进行持久化,持久化结束之后,触发下一个处理器
  • 非事务请求,触发下一个处理器(因为单机版的Zk,没有ProposalRequestProcessor帮忙做过滤,需要在SyncRequestProcessor做逻辑判断,判断有没有消息头,来决定是否会持久化)

事务持久化的2种情况:可以提升性能,这样支持批处理持久化。

  1. 当前toFlush不为空,而且当前queuedRequests为空
if (toFlush.isEmpty()) {
                    si = queuedRequests.take();
} else {
    si = queuedRequests.poll();
    if (si == null) {
        flush(toFlush);
        continue;
    }
}
  1. toFlush.size>1000
if (toFlush.size() > 1000) {
                        flush(toFlush);
}

flush的时候涉及2个动作:

  • 对所有的已经预提交的日志,一起写到磁盘commit();
  • 如果有nextProcess,则让下一个Process继续处理
    private void flush(LinkedList<Request> toFlush){
        zks.getZKDatabase().commit();
        while (!toFlush.isEmpty()) {
            Request i = toFlush.remove();
            if (nextProcessor != null) {
                nextProcessor.processRequest(i);
            }
        }
    }

AckRequestProcessor(leader专属)

leader会向投票收集器告知,leader的日志持久化已经完成了
主要是leader.processAck()会触发投票统计,统计成功会触发commit

    public void processRequest(Request request) {
        QuorumPeer self = leader.self;
        leader.processAck(self.getId(), request.zxid, null);
    }

SendAckRequestProcessor(follower专属)

follower会向投票收集器告知,follower的日志持久化已经完成了

    public void processRequest(Request si) {
            QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);
            learner.writePacket(qp, false);
    }

注意区别

AckRequestProcessor和SendAckRequestProcessor很像,都是告知投票管理器,自己完成了某个事务请求的日志持久化,但是调用者发起人角色不同,调用的接收者也不同

  • leader调用leader.processAck,follower调用learner.writePacket
    QuorumPeer self = leader.self;
    leader.processAck(self.getId(), request.zxid, null);
  
    QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,null);
    learner.writePacket(qp, false);

Leader.tryToCommit

如果统计超过半数ack则广播commit

 synchronized public boolean tryToCommit() {
        if (!p.hasAllQuorums()) {
           return false;
        }
        commit(zxid);
        inform(p);
        zk.commitProcessor.commit(p.request);

CommitProcessor

如果当前没有请求,则线程阻塞,当tryToCommit提交成功时会触发唤醒,并且开始下一步操作

                if (requestsToProcess == 0 && !commitIsWaiting){
                    synchronized (this) {
                        while (!stopped && requestsToProcess == 0&& !commitIsWaiting) {
                            wait();
                            commitIsWaiting = !committedRequests.isEmpty();
                            requestsToProcess = queuedRequests.size();
                        }
                    }
                }

ToBeAppliedRequestProcessor

很简单,但是注意是个串行的流程,只有完成了内存树的应用,客户端的反馈,最后一步才是把这个Proposal从ToBeApplied中删除

参考

这个大佬很牛逼,一篇博客讲明白了整个事务操作的流程,以及各个Processor的各个细节
https://www.jianshu.com/p/64f6e8124625

大佬视角很高,简单明了
https://www.jianshu.com/p/a8b5783eec63

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容

  • 声明:本文写的时候,当时就是完全不懂zk,边看网上的文章边学习归纳和整理,这不是我的产出,不用点赞打赏。大家理智友...
    _Zy阅读 75,980评论 38 129
  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,719评论 13 425
  • Apache Zookeeper是由Apache Hadoop的子项目发展而来,于2010年11月正式成为Apac...
    olostin阅读 6,108评论 2 9
  • 概述 ZAB (Zookeeper Atomic Broadcast)协议是为分布式协调服务 ZooKeeper ...
    jiangmo阅读 2,991评论 0 3
  • 笔者的话:我是一名特殊教育工作者,在这个行业坚持了11年。在一次机构参访过程中,听到了这则故事,很温暖,也希望分享...
    梦在雨巷阅读 205评论 0 1