介绍
该处理器有一个toBeApplied队列,用来存储那些已经被CommitProcessor处理过的可被提交的Proposal。其会将这些请求交付给FinalRequestProcessor处理器处理,待其处理完后,再将其从toBeApplied队列中移除。
源码在Leader.ToBeAppliedRequestProcessor,是leader的内部类
只有leader角色才有,其他角色没有该处理器,在leader中的位置如下
源码
代码很简单
static class ToBeAppliedRequestProcessor implements RequestProcessor {
private RequestProcessor next;
private ConcurrentLinkedQueue<Proposal> toBeApplied;//注意是concurrent的
/**
* This request processor simply maintains the toBeApplied list. For
* this to work next must be a FinalRequestProcessor and
* FinalRequestProcessor.processRequest MUST process the request
* synchronously!
*
* @param next
* a reference to the FinalRequestProcessor
*/
ToBeAppliedRequestProcessor(RequestProcessor next,
ConcurrentLinkedQueue<Proposal> toBeApplied) {
if (!(next instanceof FinalRequestProcessor)) {//下一个必须是FinalRequestProcessor
throw new RuntimeException(ToBeAppliedRequestProcessor.class
.getName()
+ " must be connected to "
+ FinalRequestProcessor.class.getName()
+ " not "
+ next.getClass().getName());
}
this.toBeApplied = toBeApplied;
this.next = next;
}
/*
* (non-Javadoc)
*
* @see org.apache.zookeeper.server.RequestProcessor#processRequest(org.apache.zookeeper.server.Request)
*/
public void processRequest(Request request) throws RequestProcessorException {
// request.addRQRec(">tobe");
next.processRequest(request);
Proposal p = toBeApplied.peek();
if (p != null && p.request != null
&& p.request.zxid == request.zxid) {
toBeApplied.remove();//进行相关清除操作
}
}
/*
* (non-Javadoc)
*
* @see org.apache.zookeeper.server.RequestProcessor#shutdown()
*/
public void shutdown() {
LOG.info("Shutting down");
next.shutdown();
}
}
思考
这个类的意义是什么
这个类就是消费toBeApplied队列的
之前讲leader源码的时候,processAck是用来生产toBeApplied队列的(用来代表),见源码42分析,来告诉leader,某事务请求已经被某个服务器写完了日志,发送ACK了
toBeApplied消费时的检查
其实这个和上一节讲的CommitProcessor有点像,本身是想维护一个事务顺序的,但是这里又没有对不按事务顺序的逻辑进行检查
可以参考refer最新的代码,会检查事务,然后检查顺序,这样才体现出来toBeApplied的意义