开篇
这篇文章的主要目的是想梳理下Dubbo框架中server端处理consumer端请求的过程,主要为了解答自己心中的困惑:server端如何处理查找对应的provider接口、执行接口、返回数据的过程。
在这个过程中我们会了解到provider在export的过程中会维护interface和exporter的映射的map对象,处理请求的过程中会通过查找map对象来定位具体执行的接口。
DubboProtocol - provider - export过程
说明:
- DubboProtocol的exporterMap维护provider端的interface的key和interface映射关系。
- DubboProtocol的export过程中exporterMap.put(key, exporter)添加映射关系。
- Dubboprotocol中接口对应的exporter的key通过serviceKey方法生成。
- exporterMap的key组装格式serviceGroup/serviceName: serviceVersion:port。
- 通过exporterMap的key可以间接理解provider的interface可以通过group进行隔离。
public abstract class AbstractProtocol implements Protocol {
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
}
public class DubboProtocol extends AbstractProtocol {
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
}
public class ProtocolUtils {
public static String serviceKey(URL url) {
return serviceKey(url.getPort(), url.getPath(), url.getParameter(Constants.VERSION_KEY),
url.getParameter(Constants.GROUP_KEY));
}
public static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
StringBuilder buf = new StringBuilder();
if (serviceGroup != null && serviceGroup.length() > 0) {
buf.append(serviceGroup);
buf.append("/");
}
buf.append(serviceName);
if (serviceVersion != null && serviceVersion.length() > 0 && !"0.0.0".equals(serviceVersion)) {
buf.append(":");
buf.append(serviceVersion);
}
buf.append(":");
buf.append(port);
return buf.toString();
}
}
dubbo - provider - receive过程
说明
说明:
- Dubbo - provider端处理时序图,便于理解模块之间调用顺序。
NettyCodecAdapter & DubboCountCodec
说明:
NettyServer的doOpen方法内部通过addLast方法添加NettyCodecAdapter的decoder。
NettyCodecAdapter的decoder是InternalDecoder对象。
InternalDecoder内部的decoder是DubboCountCodec对象。
DubboCountCodec内部的decoder是DubboCodec对象。
DubboCodec的decodeBody方法负责生成Request对象和DecodeableRpcInvocation对象。
public class NettyServer extends AbstractServer implements Server {
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
}
final class NettyCodecAdapter {
private final ChannelHandler decoder = new InternalDecoder();
private final Codec2 codec;
private class InternalDecoder extends SimpleChannelUpstreamHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
Object o = event.getMessage();
// 省略核心读消息部分逻辑
try {
// 解码数据
do {
saveReaderIndex = message.readerIndex();
try {
msg = codec.decode(channel, message);
} catch (IOException e) {}
} while (message.readable());
} finally {
// 省略相关代码
}
}
}
public final class DubboCountCodec implements Codec2 {
private DubboCodec codec = new DubboCodec();
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int save = buffer.readerIndex();
MultiMessage result = MultiMessage.create();
do {
Object obj = codec.decode(channel, buffer);
if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
buffer.readerIndex(save);
break;
} else {
// 保存result数据
result.addMessage(obj);
logMessageLength(obj, buffer.readerIndex() - save);
save = buffer.readerIndex();
}
} while (true);
return result;
}
}
public class DubboCodec extends ExchangeCodec implements Codec2 {
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// 解析response,省略相关代码
return res;
} else {
// decode request.
Request req = new Request(id);
req.setVersion("2.0.0");
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
Object data;
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
} else if (req.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
} else {
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();
} else {
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}
req.setData(data);
} catch (Throwable t) {}
// 返回解析代码
return req;
}
}
}
NettyHandler
说明:
继承自Netty的原生网络时间处理器实现类SimpleChannelHandler,定义了网络建连(channelConnected)、断连(channelDisconnected)、消息接收(messageReceived)、异常(exceptionCaught)等事件处理方法;
维护了<ip:port, channel>的对应关系Map<String, Channel>channels,在网络建连/断连时进行相应put/remove操作,并暴露给NettyServer使用;
接收到网络消息时,执行messageReceived()方法,将Netty的原生Channel转换为Dubbo封装的NettyChannel,并将事件传递给其包含的ChannelHandler处理;
public class NettyHandler extends SimpleChannelHandler {
private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>();
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
super.writeRequested(ctx, e);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.sent(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
}
MultiMessageHandler
说明:
- 处理MultiMessage,将其拆分成多个Message处理;
public abstract class AbstractChannelHandlerDelegate implements ChannelHandlerDelegate {
public void sent(Channel channel, Object message) throws RemotingException {
handler.sent(channel, message);
}
public void received(Channel channel, Object message) throws RemotingException {
handler.received(channel, message);
}
public void caught(Channel channel, Throwable exception) throws RemotingException {
handler.caught(channel, exception);
}
}
public class MultiMessageHandler extends AbstractChannelHandlerDelegate {
public MultiMessageHandler(ChannelHandler handler) {
super(handler);
}
@SuppressWarnings("unchecked")
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
for (Object obj : list) {
handler.received(channel, obj);
}
} else {
handler.received(channel, message);
}
}
}
HeartbeatHandler
说明:
消息收发时重置当前通道的最新消息收发时间,用于配合HeaderExchangeServer和HeaderExchangeClient中的心跳检测任务HeartBeatTask;
拦截并处理心跳请求/响应消息。对心跳请求消息,构建对应的心跳响应消息并通过Channel发送回去;对心跳响应消息,仅记录日志后返回,不做功能上的处理;
public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
public void sent(Channel channel, Object message) throws RemotingException {
setWriteTimestamp(channel);
handler.sent(channel, message);
}
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(Response.HEARTBEAT_EVENT);
channel.send(res);
if (logger.isInfoEnabled()) {
int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
if (logger.isDebugEnabled()) {
logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period"
+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
}
}
}
return;
}
if (isHeartbeatResponse(message)) {
if (logger.isDebugEnabled()) {
logger.debug(
new StringBuilder(32)
.append("Receive heartbeat response in thread ")
.append(Thread.currentThread().getName())
.toString());
}
return;
}
handler.received(channel, message);
}
}
AllChannelHandler
说明:
Dubbo通过该处理器完成了 IO线程 与 业务线程 的解耦!
内部封装了业务线程池,默认使用FixedThreadPool;
将接收到的网络消息事件封装成可执行任务ChannelEventRunnable,交由业务线程池处理;
public class AllChannelHandler extends WrappedChannelHandler {
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
}
ChannelEventRunnable
说明:
- 将接收到的网络消息事件封装成可执行任务ChannelEventRunnable,交由业务线程池处理;
public class ChannelEventRunnable implements Runnable {
private final ChannelHandler handler;
private final Channel channel;
private final ChannelState state;
private final Throwable exception;
private final Object message;
public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) {
this.channel = channel;
this.handler = handler;
this.state = state;
this.message = message;
this.exception = exception;
}
public void run() {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {}
break;
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {}
break;
case RECEIVED:
try {
handler.received(channel, message);
} catch (Exception e) {}
break;
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {}
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
}
DecodeHandler
说明:
进行业务请求响应的解码工作;
对Request和Response中携带的消息体或结果体,如果其实现了Decodeable接口,则进行一次解码处理;
public class DecodeHandler extends AbstractChannelHandlerDelegate {
public DecodeHandler(ChannelHandler handler) {
super(handler);
}
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
}
HeaderExchangeHandler
说明:
交换层真正完成请求响应收发功能的处理器!
将网络层Channel转换为交换层ExchangeChannel,为其增加了请求响应方法request();
判断收到的网络消息类型,根据类型分别执行不同的处理逻辑;
a)请求响应模型的Request消息:调用ExchangeHandlerAdapter.reply()获取执行结果Result ->将本地执行结果Result封装成RPC响应Response -> 通过channel.send()发送RPC响应;
b)单向请求消息的处理:调用ExchangeHandlerAdapter.received()处理请求消息,如果该消息是Invocation则执行reply()逻辑但不主动发送RPC响应Response;
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
// 将网络层Channel转换为交换层ExchangeChannel
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
//case a: 请求响应模型的请求处理
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
//case b: 单向消息接收的处理
handler.received(exchangeChannel, request.getData());
}
}
}
// 省略相关代码
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
Object msg = req.getData();
try {
// handle data.
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
}
DubboProtocol
说明:
ExchangeHandlerAdapter由DubboProtocol创建,并实现了reply()方法;
reply()方法实际通过RPC调用参数Invocation从DubboProtocol.exporterMap中获取到对应的本地实现DubboExporter -> 进而获取到对应的本地执行AbstractProxyInvoker -> 最终通过AbstractProxyInvoker.invoke()方法,以反射的方式执行真正实现类的对应方法,完成RPC请求。
通过在DubboProtocol中的exporterMap查找exporter,进而查找invoker,进而执行返回结果
public class DubboProtocol extends AbstractProtocol {
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
// 获取服务发布时候注册的exporter对象
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
// 省略一些相关的代码
};
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;
int port = channel.getLocalAddress().getPort();
String path = inv.getAttachments().get(Constants.PATH_KEY);
// if it's callback service on client side
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));
if (isStubServiceInvoke) {
port = channel.getRemoteAddress().getPort();
}
//callback
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
if (isCallBackServiceInvoke) {
path = inv.getAttachments().get(Constants.PATH_KEY) + "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
}
// 获取相应的key并获取
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null)
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
return exporter.getInvoker();
}
}