1. RPCFilter接口
其下面有2个子接口
- ClientFilter Consumer端调用
- ServiceFilter Provider端调用
2. RPCFilter的链式调用类实现
RPCFilter接口的invoke方法中的第1个参数并非是RPCFilter,而是InvocationHandler,按照之前责任链一带多的思路,InvocationHandler是最终的出口,肯定有一个类实现InvocationHandler,并内部包含RPCFilter,RPCFilterNode就是这样的实现
public class RPCFilterNode implements InvocationHandler {
static final AttributeKey GET_THROUGH_KEY;
static final AttributeKey LAST_FILTER_KEY;
protected RPCFilterNode pre;
protected RPCFilterNode next;
private RPCFilter filter;
RPCFilterNode(RPCFilter filter) {
this.filter = filter;
}
public ListenableFuture<RPCResult> invoke(Invocation invocation) throws Throwable {
this.markNode(invocation);
return this.filter.invoke(this.next, invocation);
}
public void onResponse(Invocation invocation, RPCResult rpcResult) {
this.filter.onResponse(invocation, rpcResult);
this.pre.onResponse(invocation, rpcResult);
}
}
2.RPCFilter的链式构建
RPCFilterBuilder实现了RPCFilter的链式构建,其包含一个HeadNode和TailNode,用于标识头部节点和尾部节点,
调用则从HeadNode节点开始
2.1 构建过程
RPCFilterNode是一个双向链表
- 初始化状态为 HeadNode->TailNode
- 添加的Node始终在TailNode前面,结果为HeadNode->Node2->Node1->TailNode
public static InvocationHandler buildInvokerChain(InvocationHandler nextHandler, ServiceMetadata metadata, boolean afterCluster) {
List<RPCFilter> filters = HSFServiceContainer.getInstances(RPCFilter.class, (String[])metadata.getIncludeFilters().toArray(new String[0]), true);
setMetadata(metadata, filters);
filters = sift(filters, metadata.isProvider(), afterCluster);
if (filters.isEmpty()) {
return nextHandler;
} else {
List<BlockAwareFilter> blockAwareFilters = getBlockAwareFilter(filters);
RPCFilterNode head = new RPCFilterBuilder.HeadNode(blockAwareFilters);
RPCFilterNode tail = new RPCFilterBuilder.TailNode(nextHandler);
head.next = tail;
tail.pre = head;
RPCFilterNode current;
for(Iterator i$ = filters.iterator(); i$.hasNext(); tail.pre = current) {
RPCFilter rpcFilter = (RPCFilter)i$.next();
current = new RPCFilterNode(rpcFilter);
RPCFilterNode pre = tail.pre;
current.pre = pre;
current.next = tail;
pre.next = current;
}
return head;
}
}
3.RPCFilterNode的GET_THROUGH_KEY和LAST_FILTER_KEY
public class RPCFilterNode implements InvocationHandler {
static final AttributeKey GET_THROUGH_KEY;
static final AttributeKey LAST_FILTER_KEY;
protected RPCFilterNode pre;
protected RPCFilterNode next;
private RPCFilter filter;
RPCFilterNode(RPCFilter filter) {
this.filter = filter;
}
public ListenableFuture<RPCResult> invoke(Invocation invocation) throws Throwable {
this.markNode(invocation);
return this.filter.invoke(this.next, invocation);
}
public void onResponse(Invocation invocation, RPCResult rpcResult) {
this.filter.onResponse(invocation, rpcResult);
this.pre.onResponse(invocation, rpcResult);
}
private void markNode(Invocation invocation) {
invocation.put(LAST_FILTER_KEY, this);
}
final RPCFilterNode fetchMarkedNode(Invocation invocation) {
return (RPCFilterNode)invocation.remove(LAST_FILTER_KEY);
}
final void reachedTerminal(Invocation invocation) {
invocation.put(GET_THROUGH_KEY, Boolean.TRUE);
}
final boolean isGetThrough(Invocation invocation) {
return Boolean.TRUE == invocation.get(GET_THROUGH_KEY);
}
}
- LAST_FILTER_KEY表示当前执行到了哪个Node节点,markNode在invoke方法中调用
- GET_THROUGH_KEY,表示整个Filter链执行流程是完整的,reachedTerminal在在TailNode的invoker方法中调用
private static class TailNode extends RPCFilterNode {
private InvocationHandler invocationHandler;
private TailNode(InvocationHandler invocationHandler) {
super((RPCFilter)null);
this.invocationHandler = invocationHandler;
}
public ListenableFuture<RPCResult> invoke(Invocation invocation) throws Throwable {
this.reachedTerminal(invocation);
ListenableFuture<RPCResult> future = this.invocationHandler.invoke(invocation);
return future;
}
public void onResponse(Invocation invocation, RPCResult rpcResult) {
this.pre.onResponse(invocation, rpcResult);
}
}
4.RPCFilter的onResponse方法
onResponse方法是对ListenableFuture的回调事件监听,其流程在HeadNode的invoker方法中发起,最终在FilterFunc的call方法进行回调
public static class FilterFunc implements Func1<RPCResult, RPCResult> {
private final Invocation invocation;
private final RPCFilterNode rpcFilterNode;
private FilterFunc(Invocation invocation, RPCFilterNode rpcFilterNode) {
this.invocation = invocation;
this.rpcFilterNode = rpcFilterNode;
}
public RPCResult call(RPCResult rpcResult) {
this.rpcFilterNode.onResponse(this.invocation, rpcResult);
return rpcResult;
}
}
private static class HeadNode extends RPCFilterNode {
private final List<BlockAwareFilter> blockAwareFilters;
private HeadNode(List<BlockAwareFilter> blockAwareFilters) {
super((RPCFilter)null);
this.blockAwareFilters = blockAwareFilters;
}
public ListenableFuture<RPCResult> invoke(Invocation invocation) throws Throwable {
ListenableFuture<RPCResult> future = this.next.invoke(invocation);
RPCFilterNode lastMarked = this.fetchMarkedNode(invocation);
ListenableFuture<RPCResult> filterFuture = Futures.map(future, new RPCFilterBuilder.FilterFunc(invocation, lastMarked), new ContextAwareWrapper(invocation));
return filterFuture;
}
public void onResponse(Invocation invocation, RPCResult rpcResult) {
if (!this.isGetThrough(invocation)) {
Iterator i$ = this.blockAwareFilters.iterator();
while(i$.hasNext()) {
BlockAwareFilter blockAwareFilter = (BlockAwareFilter)i$.next();
blockAwareFilter.onBlock(invocation, rpcResult);
}
}
}
}
onResponse方法的调用是链式的反向调用
如链式为HeadNode->Node2->Node1->TailNode
则调用方向为TailNode->Node1->Node2->HeadNode
4.1 BlockAwareFilter
当调用链未完整执行,在HeadNode的onResponse方法中会触发BlockAwareFilter接口的调用
参考官方的调用流程
* RPC调用的过滤器,客户端可以通过实现{@link RPCFilter}来将自己的逻辑植入到HSF的调用流程中
*
* ┌--------------------------------------------------------------------------------┐
* | com.taobao.hsf.invocation.filter.RPCFilterBuilder$HeadNode |
* └--------------------------------------------------------------------------------┘
* | nextHandler.invoke(invocation); ^
* v | onResponse(invocation, rpcResult)
* ┌--------------------------------------------------------------------------------┐
* | com.taobao.hsf.invocation.filter.TestFilter1 |
* └--------------------------------------------------------------------------------┘
* | nextHandler.invoke(invocation); ^
* v | onResponse(invocation, rpcResult)
* ┌--------------------------------------------------------------------------------┐
* | com.taobao.hsf.invocation.filter.TestFilter2222222 |
* └--------------------------------------------------------------------------------┘
* | nextHandler.invoke(invocation); ^
* v | onResponse(invocation, rpcResult)
* ┌--------------------------------------------------------------------------------┐
* | com.taobao.hsf.invocation.filter.RPCFilterBuilder$TailNode |
* └--------------------------------------------------------------------------------┘
* | ^
* v |
* nextHandler.invoke(invocation); ------同步或者异步-------> future.set(rpcResult);