源码分析基于dubbo 2.7.1
NettyServerHandler继承了netty的ChannelDuplexHandler,这里只关注channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
异步处理
handler就是NettyServer, 前面说过NettyServer是一个装饰类,它会调用到AllChannelHandler.received
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
...
}
}
就是给ExecutorService提交一个任务,异步处理请求。
看看ChannelEventRunnable.run,会根据state进行不同的处理。但实际操作都转发到handler继续处理。
解码
再看看DecodeHandler.received
public void received(Channel channel, Object message) throws RemotingException {
// 解码
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
// 调用下一个节点
handler.received(channel, message);
}
处理请求
HeaderExchangeHandler
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// 处理请求
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
// 处理响应
handleResponse(channel, (Response) message);
} ...
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
twoWay表示需要响应的请求。这里调用handleRequest方法,将返回一个Response,最后通过channel将它发送给客户端。
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
...
Object msg = req.getData();
try {
// 处理请求
CompletableFuture<Object> future = handler.reply(channel, msg);
if (future.isDone()) {
res.setStatus(Response.OK);
res.setResult(future.get());
channel.send(res); // 发送结果
return;
}
future.whenComplete((result, t) -> {
...
});
} catch (Throwable e) {
...
}
}
处理完成后通过channel.send(res);
发送结果给客户端,这个方法会调用NettyChannel
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
// 发送结果
ChannelFuture future = channel.write(message);
if (sent) { // 是否发送超时
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.getCause(); // 抛出异常
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
...
}
可以看到,也是通过netty发送结果。
业务逻辑处理
handler.reply
终于调用到DubboProtocol.requestHandler了
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
Invocation inv = (Invocation) message;
// 获取Invoker
Invoker<?> invoker = getInvoker(channel, inv);
...
RpcContext rpcContext = RpcContext.getContext();
rpcContext.setRemoteAddress(channel.getRemoteAddress());
// 调用Invoker
Result result = invoker.invoke(inv);
if (result instanceof AsyncRpcResult) {
return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
} else {
return CompletableFuture.completedFuture(result);
}
}
这里也是通过invoker调用,逐渐调用到我们的业务方法。
先看看getInvoker方法
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
...
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
return exporter.getInvoker();
}
很简单,从exporterMap中获取DubboExporter,再获取对应的Invoker。
这里可以回顾上篇文章说到的DubboProtocol.export方法,它用invoker构建了DubboExporter,缓存到exporterMap中。
invoker的创建
那么invoker在哪里创建的呢?
这时要回到dubbo server启动,那里说到ServiceConfig.doExportUrlsFor1Protocol
if (registryURLs != null && registryURLs.size() > 0) {
for (URL registryURL : registryURLs) {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
}
}
是的,invoker就是在这里创建的。(注意,参数ref就是业务逻辑的实现类)
proxyFactory默认使用的是JavassistProxyFactory。
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 生成动态代理类
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
再回顾一下RegistryProtocol.doLocalExport方法,为Invoker添加了包装类,生成了InvokerDelegete。所以invoker装饰层次为ProtocolFilterWrapper&Invoker ---> RegistryProtocol&InvokerDelegete ---> DelegateProviderMetaDataInvoker ---> JavassistProxyFactory&AbstractProxyInvoker。
(ProtocolFilterWrapper下面会说到)
JavassistProxyFactory中创建的invoke,会通过Wrapper调用到实际的业务方法。
Wrapper.getWrapper
也是动态生成代理类,JavassistProxyFactory正是通过它调用业务方法。
Wrapper会拼凑代码字符串,再通过javassist生成代理类。
过程比较繁琐,直接看生成的代理类吧。
原接口
public interface HelloService {
String hello(String user) ;
String hello2(String user) ;
}
生成的Wrapper,关键的方法在invokeMethod
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException{
com.dubbo.start.service.HelloService w;
try{
w = ((com.dubbo.start.service.HelloService)$1);
} catch(Throwable e){
throw new IllegalArgumentException(e);
}
try{
if( "hello".equals( $2 ) && $3.length == 1 ) {
return ($w)w.hello((java.lang.String)$4[0]);
}
if( "hello2".equals( $2 ) && $3.length == 1 ) {
return ($w)w.hello2((java.lang.String)$4[0]);
}
} catch(Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
}
$1, $2, $3
在javassist中表示方法参数。
可以看到,通过方法名和参数数调用对应的逻辑方法。
所以dubbo暴露的接口就尽量不要方法重构了
Filter
前面也说过了,DubboProtocol处理前,会经过包装类ProtocolListenerWrapper,ProtocolFilterWrapper。
ProtocolFilterWrapper.export 会调用buildInvokerChain,为每一个filter创建一个invoker,
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
...
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
};
}
}
return last;
}
dubbo通过url中的service.filter参数查找filter扩展类,可以dubbo:service标签上添加filter属性,用逗号分隔多个filter。
dubbo默认有如下Filter
- EchoFilter
- ClassLoaderFilter
- GenericFilter
- ContextFilter
- TraceFilter
- TimeoutFilter
- MonitorFilter
- ExceptionFilter
- ValidationFilter
- TpsLimitFilter
可以通过Filter实现限流:
Dubbo之限流TpsLimitFilter源码分析