Consumer需要通过Netty(默认)把请求信息发送到远程Provider,Provider也是通过Netty接收并处理请求;这个处理流程的核心实现在NettyServer中,如下图所示,接下来重点分析Provider如何使用Netty的:
NettyServer
在NettyServer中绑定服务时,指定了handler为:new NettyHandler(getUrl(), this);
,所以dubbo在NettyHandler中处理所有消息;NettyServer中doOpen()
方法如下:
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
// 设置编码&解码
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
// 设置消息处理handler
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// 在端口上开启Netty服务
channel = bootstrap.bind(getBindAddress());
}
NettyHandler
NettyServer中已经设置了处理消息的handler为NettyHandler,NettyHandler只需继承org.jboss.netty.channel.SimpleChannelHandler即可(这只是Netty中成为handler的若干实现中的一种方法),源码如下:
public class NettyHandler extends SimpleChannelHandler {
... ...
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.caught(channel, e.getCause());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
}
从NettyHandler的源码可知,不止接收消息在这里处理,建立连接(
channelConnected
),断开连接(channelDisconnected
),异常处理(exceptionCaught
)都在这里;备注:该系列文章主要是讲解dubbo,不打算发散Netty相关知识点,如果想稍微了解Netty中的SimpleChannelHandler,请参考SimpleChannelHandler官方介绍;
AllChannelHandler
接收请求的核心源码如下:
// 初始化一个共享的线程池
protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
protected final ExecutorService executor;
// 构造方法中给executor赋值,通过ThreadPool的SPI注解可知,Provider的默认线程池为FixedThreadPool
public WrappedChannelHandler(ChannelHandler handler, URL url) {
... ...
// 选择一种线程池实现方式处理请求,想了解dubbo如何选择线程池实现方式,请参考[dubbo源码-ExtensionLoader扩展机制](http://www.jianshu.com/p/4691f98663aa)
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
... ...
}
public void received(Channel channel, Object message) throws RemotingException {
// 先得到线程池
ExecutorService cexecutor = getExecutorService();
try {
// 线程异步处理请求
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
private ExecutorService getExecutorService() {
ExecutorService cexecutor = executor;
// 如果构造方法中初始化的ExecutorService为空,或者线程池已经被shutdown,那么用共享连接池处理请求
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
return cexecutor;
}
ChannelEventRunnable
dubbo只接收5种类型的消息,由枚举ChannelState定义;实现源码如下:
public void run() {
switch (state) {
case CONNECTED:
... ...
break;
case DISCONNECTED:
... ...
break;
case SENT:
... ...
break;
case RECEIVED:
try{
// 前面分析用线程池异步处理请求时ChannelState.RECEIVED,所以处理请求的业务逻辑在这里
handler.received(channel, message);
}catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is "+ message,e);
}
break;
case CAUGHT:
... ...
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
DecodeHandler
DecodeHandler的received()
接收到请求消息,判断是否需要解码;核心源码如下:
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
// 如果是consumer调用provider,那么message就是Request类型,需要在这里解码
if (message instanceof Request) {
decode(((Request)message).getData());
}
if (message instanceof Response) {
decode( ((Response)message).getResult());
}
// 如果使用telnet方式调用provider,那么message是String类型,不需要再做任何处理,直接运行到这里
handler.received(channel, message);
}
HeaderExchangeHandler
这里分析的是Provider处理Consumer的请求,所以message就是Request类型,实现的核心源码如下:
*** ***
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
// twoway就是指Consumer需要拿到Provider的结果的调用
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
// 将RPC结果发送到请求的channel(NettyChannel中调用send()方法)
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
}
*** ***
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
... ...
return res;
}
// 从Request类型的req中取出请求msg,RpcInvocation类型.
Object msg = req.getData();
try {
// handle data. 将返回的result(RpcResult类型)包装成dubbo统一的返回数据类型Reponse
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
DubboProtocol
由DubboProtol中绑定端口开启Netty服务server = Exchangers.bind(url, requestHandler);
可知,dubbo把请求交给requestHandler处理,requestHandler定义如下,从源码可知,requestHandler中定义了reply()
,received()
,connected()
,disconnected()
这4个方法,而这4个方法最终都调用到reply()
,上面的源码调用handler.reply(channel, msg)
,其核心实现源码如下:
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
// 根据调用参数Invocation就能得到需要调用的Invoker
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要处理高版本调用低版本的问题
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1){
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods){
if (inv.getMethodName().equals(method)){
hasMethod = true;
break;
}
}
}
if (!hasMethod){
logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 经过Filter链后,动态代理调用Provider的方法实现,并返回结果
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
... ...
};
reply()源码解读:
方法入参ExchangeChannel channel: dubbo服务处理请求/响应的渠道,例如NettyChannel, MinaChannel
方法入参Object message: Consumer调用Provider时的请求数据,包括调用的接口, 调用的参数,参数类型,dubbo的版本,请求服务的版本等信息;
- if (message instanceof Invocation){
所有dubbo请求都被会封装成Invocation类型对象,所以如果请求message不是Invocation类型,那么抛出异常:throw new RemotingException(channel, "Unsupported request: " - Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
根据调用参数message得到Invoke - return invoker.invoke(inv);
根据封装后的请求参数inv进行invoke调用; - HeaderExchangeHandler中把 invoker.invoke(inv)得到的RpcResult封装成Response对象: