zk源码阅读47:SyncRequestProcessor源码解析

摘要

在上一节ProposalRequestProcessor处理器中,后续会有两个处理器CommitProcessor和SyncRequestProcessor
如果是事务请求,会经过SyncRequestProcessor,本节对SyncRequestProcessor进行讲解
SyncRequestProcessor:事务日志记录处理器。用来将事务请求记录到事务日志文件中,同时会触发Zookeeper进行数据快照。

主要讲解

介绍
属性
函数
  构造函数
  processRequest:生产者,加入请求队列
  run:核心方法,消费请求队列,批处理进行快照以及刷到事务日志
  flush:批处理的思想,把事务日志刷到磁盘,让下一个处理器处理
  shutdown:队列添加requestOfDeath请求,线程结束后,调用flush函数,最后关闭nextProcessor
思考

介绍

可以参考类的注释,该处理器在leader,Follower,Observer中都存在

/**
 * This RequestProcessor logs requests to disk. It batches the requests to do
 * the io efficiently. The request is not passed to the next RequestProcessor
 * until its log has been synced to disk.
 *
 * SyncRequestProcessor is used in 3 different cases
 * 1. Leader - Sync request to disk and forward it to AckRequestProcessor which
 *             send ack back to itself.
 * 2. Follower - Sync request to disk and forward request to
 *             SendAckRequestProcessor which send the packets to leader.
 *             SendAckRequestProcessor is flushable which allow us to force
 *             push packets to leader.
 * 3. Observer - Sync committed request to disk (received as INFORM packet).
 *             It never send ack back to the leader, so the nextProcessor will
 *             be null. This change the semantic of txnlog on the observer
 *             since it only contains committed txns.
 */

在leader中处理链的位置如下

在leader处理链的位置

属性

    private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class);
    private final ZooKeeperServer zks;
    private final LinkedBlockingQueue<Request> queuedRequests =
        new LinkedBlockingQueue<Request>();// 请求队列
    private final RequestProcessor nextProcessor;//下一个处理器

    private Thread snapInProcess = null;//处理快照的线程
    volatile private boolean running;//是否在运行

    /**
     * Transactions that have been written and are waiting to be flushed to
     * disk. Basically this is the list of SyncItems whose callbacks will be
     * invoked after flush returns successfully.
     */
    private final LinkedList<Request> toFlush = new LinkedList<Request>();//等待被刷到磁盘的请求队列
    private final Random r = new Random(System.nanoTime());
    /**
     * The number of log entries to log before starting a snapshot
     */
    private static int snapCount = ZooKeeperServer.getSnapCount();//快照的个数
    
    /**
     * The number of log entries before rolling the log, number
     * is chosen randomly
     */
    private static int randRoll;// 一个随机数,用来帮助判断何时让事务日志从当前“滚”到下一个

    private final Request requestOfDeath = Request.requestOfDeath;// 结束请求标识

函数

构造函数

    public SyncRequestProcessor(ZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        super("SyncThread:" + zks.getServerId(), zks
                .getZooKeeperServerListener());
        this.zks = zks;
        this.nextProcessor = nextProcessor;//下一个处理器
        running = true;
    }

processRequest

生产者,加入请求队列

    public void processRequest(Request request) {//生产者,加入请求队列
        // request.addRQRec(">sync");
        queuedRequests.add(request);
    }

run

核心方法,消费请求队列,批处理进行快照以及刷到事务日志

    public void run() {//核心方法,消费请求队列,批处理进行快照以及刷到事务日志
        try {
            int logCount = 0;

            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            setRandRoll(r.nextInt(snapCount/2)); //randRoll是一个 snapCount/2以内的随机数, 避免所有机器同时进行snapshot
            while (true) {
                Request si = null;
                if (toFlush.isEmpty()) { //没有要刷到磁盘的请求
                    si = queuedRequests.take();//消费请求队列
                } else {//有需要刷到磁盘的请求
                    si = queuedRequests.poll();
                    if (si == null) {//如果请求队列的当前请求为空
                        flush(toFlush);//刷到磁盘
                        continue;
                    }
                }
                if (si == requestOfDeath) {//结束标识请求
                    break;
                }
                if (si != null) {//请求队列取出了请求
                    // track the number of records written to the log
                    if (zks.getZKDatabase().append(si)) {//请求添加至日志文件,只有事务性请求才会返回true
                        logCount++;
                        if (logCount > (snapCount / 2 + randRoll)) {//如果logCount到了一定的量
                            randRoll = r.nextInt(snapCount/2);//下一次的随机数重新选
                            // roll the log
                            zks.getZKDatabase().rollLog();//事务日志滚动到另外一个文件记录
                            // take a snapshot
                            if (snapInProcess != null && snapInProcess.isAlive()) {//正在进行快照
                                LOG.warn("Too busy to snap, skipping");
                            } else {
                                snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                        public void run() {
                                            try {
                                                zks.takeSnapshot();//进行快照,将sessions和datatree保存至snapshot文件
                                            } catch(Exception e) {
                                                LOG.warn("Unexpected exception", e);
                                            }
                                        }
                                    };
                                snapInProcess.start();//启动线程
                            }
                            logCount = 0;//重置
                        }
                    } else if (toFlush.isEmpty()) {//刷到磁盘的队列为空
                        // optimization for read heavy workloads
                        // iff this is a read, and there are no pending
                        // flushes (writes), then just pass this to the next
                        // processor
                        if (nextProcessor != null) {
                            nextProcessor.processRequest(si);//下个处理器处理
                            if (nextProcessor instanceof Flushable) {
                                ((Flushable)nextProcessor).flush();//下个处理器可以刷,就刷
                            }
                        }
                        continue;
                    }
                    toFlush.add(si);//刷的队列添加记录
                    if (toFlush.size() > 1000) {//超过了1000条就一起刷到磁盘
                        flush(toFlush);
                    }
                }
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
            running = false;
        }
        LOG.info("SyncRequestProcessor exited!");
    }

几个注意的点在思考里面说

里面调用了flush函数

flush

批处理的思想,把事务日志刷到磁盘,让下一个处理器处理

    private void flush(LinkedList<Request> toFlush)
        throws IOException, RequestProcessorException// 刷新到磁盘
    {
        if (toFlush.isEmpty())//队列为空,没有需要刷的
            return;

        zks.getZKDatabase().commit();//事务日志刷到磁盘
        while (!toFlush.isEmpty()) {
            Request i = toFlush.remove();
            if (nextProcessor != null) {
                nextProcessor.processRequest(i);//下一个处理器处理
            }
        }
        if (nextProcessor != null && nextProcessor instanceof Flushable) {
            ((Flushable)nextProcessor).flush();//下个处理器也可以刷,就刷
        }
    }

shutdown

队列添加requestOfDeath请求,线程结束后,调用flush函数,最后关闭nextProcessor

    public void shutdown() {//队列添加requestOfDeath请求,线程结束后,调用flush函数,最后关闭nextProcessor
        LOG.info("Shutting down");
        queuedRequests.add(requestOfDeath);
        try {
            if(running){
                this.join();
            }
            if (!toFlush.isEmpty()) {
                flush(toFlush);
            }
        } catch(InterruptedException e) {
            LOG.warn("Interrupted while wating for " + this + " to finish");
        } catch (IOException e) {
            LOG.warn("Got IO exception during shutdown");
        } catch (RequestProcessorException e) {
            LOG.warn("Got request processor exception during shutdown");
        }
        if (nextProcessor != null) {
            nextProcessor.shutdown();
        }
    }

思考

SyncRequestProcessor和快照,事务日志的关系

就和介绍里面说的一样,他就是事务日志记录处理器。用来将事务请求记录到事务日志文件中,同时会触发Zookeeper进行数据快照。
里面调用了很多相关方法,如rollLog,append,commit,takeSnapshot等方法,底层都是FileTxnLog,FileSnap来执行的

run方法注意的点

1.randRoll的意义,可以看到语句 if (logCount > (snapCount / 2 + randRoll))
这是用来判断logCount是否足够,如果足够了,代表一个事务日志记录的量够了,
下面调用rollLog,就会生成下一个事务日志文件了。

2.“批”处理的思想
当 logCount > (snapCount / 2 + randRoll) 时,批处理的思想提箱
对于事务日志,此时才调用rollLog写入到下一个事务日志
对于快照,如果可行,就调用zks.takeSnapshot()进行快照

而不是每一个请求就一个事务日志,不是每一个请求就生成一次快照

然后对于事务日志,当toFlush.size() > 1000才会调用flush函数

refer

http://www.cnblogs.com/leesf456/p/6438411.html
http://www.cnblogs.com/leesf456/p/6279956.html (FileTxnLog)
zk源码阅读3:FileTxnLog
《paxos到zk》

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • MySQL技术内幕:InnoDB存储引擎(第2版) 姜承尧 第1章 MySQL体系结构和存储引擎 >> 在上述例子...
    沉默剑士阅读 12,145评论 0 16
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,384评论 19 139
  • 很实用的编程英语词库,共收录一千五百余条词汇。 第一部分: application 应用程式 应用、应用程序app...
    春天的蜜蜂阅读 5,306评论 0 22
  • 花,可谓俗语物,也可谓圣洁之物!爱美之心人人皆有,有喜欢高贵般的牡丹;有喜欢清雅的白玉兰;有喜欢出淤泥而不染的荷花...
    啊阳瑟喔阅读 3,106评论 1 0
  • 金色圈阅读 536评论 0 0