Rocketmq
使用 Netty
实现了 remoting
模块(即 RPC
模块)。
一. RemotingClient
和 RemotingServer
接口
1.1 RemotingService
接口
/**
* 远程RPC调用服务接口
*/
public interface RemotingService {
// 服务开启
void start();
// 服务停止
void shutdown();
// 注册RPC调用的钩子对象RPCHook, 可以监控RPC调用请求和响应数据。
void registerRPCHook(RPCHook rpcHook);
}
RemotingService
是 RemotingClient
和 RemotingServer
接口公共父接口,表示远程RPC
调用服务接口,提供了三个方法。
1.2 RemotingClient
接口
/**
* 远程RPC调用服务客户端
*/
public interface RemotingClient extends RemotingService {
/**
* 更新 NameServer 服务器的地址列表
* @param addrs
*/
void updateNameServerAddressList(final List<String> addrs);
/**
* 获取 NameServer 服务器的地址列表
* @return
*/
List<String> getNameServerAddressList();
/**
* 向远程服务器 addr 地址发送数据,并同步阻塞等待响应。
* 超过给的时间,没有数据响应,就抛出异常。
*/
RemotingCommand invokeSync(final String addr, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;
/**
* 向远程服务器 addr 地址发送数据,立刻返回。
* 当远程服务器有响应,那么就回调 InvokeCallback 的方法,传递响应数据;
* 如果超过给的时间,也会回调 InvokeCallback 的方法,响应失败。
*/
void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;
/**
* 注册远程请求命令处理程序,
*/
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
/**
* 设置异步请求响应回调 InvokeCallback 的方法线程池执行器
*/
void setCallbackExecutor(final ExecutorService callbackExecutor);
ExecutorService getCallbackExecutor();
/**
* 通道是否可写
*/
boolean isChannelWritable(final String addr);
}
RemotingClient
是RPC
服务的客户端接口,有如下方法:
-
invokeSync
,invokeAsync
和invokeOneway
: 都是向远程服务器addr
地址发送数据。区别是
invokeSync
同步阻塞等待响应;invokeAsync
异步发送,在InvokeCallback
回调方法中传递响应结果;invokeOneway
只是发送数据,不管响应结果。
注意如果addr
值为null
,就表示向NameServer
服务器地址发送数据。 -
updateNameServerAddressList
和getNameServerAddressList
: 更新和获取NameServer
服务器的地址列表。 -
registerProcessor
: 注册远程请求命令处理程序。注意客户端不只是接收到服务端的响应结果,也会接收到服务端的请求数据的,一般都是服务端主动通知客户端的数据信息;在
MQClientAPIImpl
类中调用了这个方法。 -
setCallbackExecutor
和getCallbackExecutor
: 异步请求响应回调InvokeCallback
方法的线程池执行器。 -
isChannelWritable
: 通道是否可写。
1.3 RemotingServer
接口
/**
* 远程RPC调用服务服务端
*/
public interface RemotingServer extends RemotingService {
/**
* 注册特定请求(requestCode)的处理器和对应线程池执行器
*/
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
/**
* 注册默认请求的处理器和对应线程池执行器
*/
void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
/**
* 服务端监听的端口
*/
int localListenPort();
/**
* 根据请求 requestCode,获取对应请求命令处理器和线程池
*/
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
/**
* 向指定客户端 `channel` 发送数据,并同步阻塞等待响应。
* 超过给的时间,没有数据响应,就抛出异常。
*/
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException;
/**
* 向指定客户端 `channel` 发送数据,立刻返回。
* 当远程服务器有响应,那么就回调 InvokeCallback 的方法,传递响应数据;
* 如果超过给的时间,也会回调 InvokeCallback 的方法,响应失败。
*/
void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
/**
* 向指定客户端 `channel` 发送数据,不管响应结果。
*/
void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException;
}
RemotingServer
是 RPC
服务的服务端接口,有如下方法:
-
registerProcessor
: 注册特定请求(requestCode
)的处理器和对应线程池执行器。 -
registerDefaultProcessor
: 注册默认请求的处理器和对应线程池执行器。 -
localListenPort
: 获取服务端监听的端口。 -
getProcessorPair
: 根据请求requestCode
,获取对应请求命令处理器和线程池。 -
invokeSync
,invokeAsync
和invokeOneway
: 向指定客户端channel
发送数据。
二. NettyRemotingAbstract
类
NettyRemotingAbstract
是 RPC
服务基础抽样类,客户端和服务端实现类都继承这个抽样类。
2.1 重要成员变量
/**
* Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint.
*
* 就是为了限制 invokeOneway(...) 方法的最大请求数量,保护系统内存占用。
*/
protected final Semaphore semaphoreOneway;
/**
* Semaphore to limit maximum number of on-going asynchronous requests, which protects system memory footprint.
*
* 就是为了限制 invokeAsync(...) 方法的最大请求数量,保护系统内存占用。
*/
protected final Semaphore semaphoreAsync;
/**
* This map caches all on-going requests.
*
* 缓存所有正在进行的请求。
*/
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);
/**
* This container holds all processors per request code, aka, for each incoming request, we may look up the
* responding processor in this map to handle the request.
*
* 储存所有请求命令(requestCode)对应的处理器 NettyRequestProcessor 和 线程池执行器,处理请求命令返回响应结果
*/
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
/**
* Executor to feed netty events to user defined {@link ChannelEventListener}.
*
* 将netty事件提供给用户定义 ChannelEventListener 的后台服务执行器。
*/
protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
/**
* The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
*
* 默认的所有请求命令对应的处理器;
* 当请求命令没有精确匹配到处理器时使用。
*/
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
/**
* SSL context via which to create {@link SslHandler}.
*
* 用于处理 SSL
*/
protected volatile SslContext sslContext;
/**
* custom rpc hooks
*
* 用户自定义的 RPC 回调钩子
*/
protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
-
semaphoreOneway
和semaphoreAsync
: 通过信号量Semaphore
来限制oneway
和async
请求的数量,因为这两种请求都是异步。同步请求不需要这个,因为同步请求本身就是阻塞的。
-
responseTable
: 记录所有正在进行的请求,包括同步请求和异步请求,但是没有oneway
类型请求。 -
processorTable
: 储存指定请求命令(requestCode
)对应的处理器NettyRequestProcessor
和 线程池执行器。即通过registerProcessor
方法注册的。 -
defaultRequestProcessor
: 默认的所有请求命令对应的处理器,当processorTable
中没有找到对应的处理器时,就会使用这个。注意这个值只在服务端 (
NettyRemotingServer
) 实现中赋值了;在客户端(NettyRemotingClient
) 实现中这个值就是null
,也就是说客户端只能处理指定requestCode
请求命令。 -
nettyEventExecutor
: 将Netty
事件提供给用户定义ChannelEventListener
接口的后台服务执行器。 -
sslContext
: 用于处理SSL
。 -
rpcHooks
: 用户自定义的RPC
回调钩子。
2.2 重要方法
2.2.1 processMessageReceived
方法
// 处理远程命令,包括请求命令和响应命令
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
// 请求命令
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
// 响应命令
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
这个方法一般都是在 Netty
接收到数据,转成 RemotingCommand
对象,然后调用这个方法;分为请求命令和响应命令。
2.2.2 processRequestCommand
方法
/**
* Process incoming request command issued by remote peer.
*
* 处理远程请求命令
* @param ctx channel handler context.
* @param cmd request command.
*/
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
// 通过请求命令的 code 获取对应的处理器
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
// 如果没有匹配到处理器,就用默认的处理器
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
// 这次请求的 opaque,用这个值来实现响应和请求一一对应
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
// 请求处理开始,回调 RPC 钩子方法
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingResponseCallback callback = new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
// 请求处理结束,回调 RPC 钩子方法
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
// 请求命令不是一次性命令 Oneway
if (!cmd.isOnewayRPC()) {
if (response != null) {
// 设置响应的 opaque 就是请求的 opaque
response.setOpaque(opaque);
response.markResponseType();
try {
// 将响应发送回去
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
}
};
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
// 不是 Oneway 类型,就要有响应,返回 SYSTEM_ERROR 响应
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
// 请求处理器 NettyRequestProcessor,是不是拒绝处理请求
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
// 设置响应的 opaque 就是请求的 opaque
response.setOpaque(opaque);
// 将响应发送回去
ctx.writeAndFlush(response);
return;
}
try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
// 请求处理过程是在请求处理器对应的线程执行器ExecutorService 中运行
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
// RejectedExecutionException 表示线程池拒绝执行任务。
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
// 异常处理的响应
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
// 没有对应请求处理器的响应
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
// 将响应数据 response 返回
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
处理远程请求,这个方法虽然长,但是流程其实很简单:
- 通过
processorTable
和defaultRequestProcessor
来得到这个请求命令对应的处理器NettyRequestProcessor
和 线程执行器ExecutorService
。 - 获取请求的
opaque
, 这个值很重要,用这个值来实现响应和请求一一对应。 - 如果没有对应处理器
NettyRequestProcessor
,那么就返回code
是RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED
的响应。 - 创建一个
run
来包装实际处理请求的代码,因为请求处理过程要放在指定线程执行器ExecutorService
中执行。通过
NettyRequestProcessor
的processRequest
方法或者asyncProcessRequest
方法处理请求,获取响应结果response
, 最后通过ctx.writeAndFlush(response)
方法,将响应返回给请求端。 - 判断请求处理器
NettyRequestProcessor
,是否拒绝处理请求。 - 在
ExecutorService
的线程池中执行run
。
2.2.3 processResponseCommand
方法
/**
* Process response from remote peer to the previous issued requests.
*
* 处理响应
*
* @param ctx channel handler context.
* @param cmd response command instance.
*/
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
// 通过响应的 opaque,来获取对应请求的 ResponseFuture
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
// 设置响应结果
responseFuture.setResponseCommand(cmd);
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
// 异步请求,设置响应
executeInvokeCallback(responseFuture);
} else {
// 同步请求,设置响应
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
// 收到响应,却没有对应匹配的请求
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
处理响应结果,方法流程:
- 通过响应的
opaque
,从responseTable
中获取对应请求的ResponseFuture
。 - 设置响应值,并从
responseTable
中移除这个ResponseFuture
。 - 如果是异步请求,那么通过
executeInvokeCallback
方法,在回调线程池中响应回调。 - 如果是同步请求,通过
responseFuture.putResponse(cmd)
方法,唤醒正在阻塞等待的线程。
2.2.4 invokeSyncImpl
方法
/**
* 同步发送请求
*/
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
// 获取这次请的 opaque
final int opaque = request.getOpaque();
try {
// 构建一个响应ResponseFuture
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
// 将这个请求存入 responseTable 中
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
// 通过 channel.writeAndFlush(...) 方法将请求发送到远端
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// 这里的回调,说明请求已经发送出去了
if (f.isSuccess()) {
// 设置请求发送成功,直接返回
responseFuture.setSendRequestOK(true);
return;
} else {
// 设置请求发送失败,也就是不需要等待响应了,
// 这次请求就直接失败了
responseFuture.setSendRequestOK(false);
}
// 执行到这里,表明请求发送失败了。
// 从正在请求集合中移除这次请求
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
// 调用 putResponse 方法,唤醒通过
// `responseFuture.waitResponse(timeoutMillis)` 等待请求返回的线程
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
// 同步发送请求,这里需要等待远端的响应。
// 用到了countDownLatch 来阻塞当前线程,
// 等待响应回来之后唤醒,或者超时唤醒
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
// null == responseCommand,说明响应没有,需要抛出异常
if (responseFuture.isSendRequestOK()) {
// 如果 isSendRequestOK 是true,说明请求发送出去了,但是响应没有,响应超时
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
// 如果 isSendRequestOK 是false,说明请求都没有发送出去,发送请求超时。
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
发送同步请求,方法流程:
- 创建
ResponseFuture
对象,并存入到responseTable
中。 - 通过
channel.writeAndFlush()
方法,将请求发送到远端。并添加
ChannelFutureListener
监控,如果请求发送成功,那么设置responseFuture
的sendRequestOK
为true
;如果请求发送失败,那么从responseTable
移除responseFuture
,并通过putResponse
方法唤醒阻塞等待的线程。 - 调用
responseFuture.waitResponse(timeoutMillis)
方法,同步阻塞等待响应。 - 如果
responseCommand
为null
,那么就抛出异常。
2.2.5 invokeAsyncImpl
方法
/**
* 异步发送请求
*/
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
// 开始时间
long beginStartTime = System.currentTimeMillis();
// 获取这次请的 opaque
final int opaque = request.getOpaque();
// 通过 semaphoreAsync 来限制异步请求最大数量
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
// 通过 SemaphoreReleaseOnlyOnce 保证异步请求,
// 只会释放一个 semaphoreAsync 的许可
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
// 已经超时,释放许可,并抛出异常
once.release();
throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
}
// 构建一个响应ResponseFuture,
// 注意异步请求就会有 invokeCallback 对象,
// 也要将 SemaphoreReleaseOnlyOnce 对象传递进去,用于释放semaphoreAsync 许可。
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
// 将这个请求存入 responseTable 中
this.responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// 这里的回调,说明请求已经发送出去了
if (f.isSuccess()) {
// 设置请求发送成功,直接返回
responseFuture.setSendRequestOK(true);
return;
}
// 执行到这里,表明请求发送失败了。
requestFail(opaque);
// 这里没有调用 responseFuture.setCause(f.cause());, ChannelFuture的异常丢失了
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
} catch (Exception e) {
// 释放semaphoreAsync 许可。
responseFuture.release();
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
// 执行到这里,表示没有获取到发送异步请求的许可,直接抛出超时异常
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
} else {
String info =
String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreAsync.getQueueLength(),
this.semaphoreAsync.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
发送异步请求,方法流程:
- 通过
semaphoreAsync
来限制异步请求最大数量。 - 如果没有获取到许可,那么就抛出异常。
- 获取到许可,先创建
SemaphoreReleaseOnlyOnce
对象,保证只会释放一次semaphoreAsync
的许可。 - 创建
ResponseFuture
对象,并存入到responseTable
中。 - 通过
channel.writeAndFlush(request)
方法,将请求送到到远端。并添加
ChannelFutureListener
监控,如果请求发送成功,那么设置responseFuture
的sendRequestOK
为true
;如果请求发送失败,那么调用requestFail()
方法,进行失败通知。 - 与
invokeSyncImpl
方法不同,就是不会阻塞等待,通过processResponseCommand
方法,调用executeInvokeCallback
方法,通知异步请求的响应结果。
2.2.6 invokeOnewayImpl
方法
/**
* 发送一次性请求
*/
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
// 设置请求是一次性请求 RPC_ONEWAY
request.markOnewayRPC();
// 通过 semaphoreAsync 来限制一次性请求最大数量
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
// 通过SemaphoreReleaseOnlyOnce保证,只会释放一个 semaphoreOneway 的许可
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// 请求发送成功,就表示完成了,不需要等待响应
once.release();
if (!f.isSuccess()) {
log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
}
}
});
} catch (Exception e) {
once.release();
log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
// 执行到这里,表示没有获取到发送一次性请求的许可,直接抛出超时异常
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else {
String info = String.format(
"invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreOneway.getQueueLength(),
this.semaphoreOneway.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
发送一次性请求,与 invokeAsyncImpl
方法相比,就是不需要创建ResponseFuture
对象,存入到 responseTable
中。
2.2.7 scanResponseTable
方法
/**
* <p>
* This method is periodically invoked to scan and expire deprecated request.
* </p>
*
* 扫描所有正在进行的请求,发现超时的请求,就移除它,并进行失败通知
*/
public void scanResponseTable() {
// 记录所有过期的请求
final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
// 遍历所有正在进行的请求
while (it.hasNext()) {
Entry<Integer, ResponseFuture> next = it.next();
ResponseFuture rep = next.getValue();
// 如果这个请求的时间已经超过设置的超时时间TimeoutMillis,
// 那么就要从 responseTable 中移除它,添加到 rfList 集合中,进行失败通知。
if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
rep.release();
it.remove();
rfList.add(rep);
log.warn("remove timeout request, " + rep);
}
}
// 遍历超时的请求,通知它们
for (ResponseFuture rf : rfList) {
try {
executeInvokeCallback(rf);
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
扫描所有正在进行的请求,发现超时的请求,就从 responseTable
中移除它,并进行失败通知。
注意这里虽然扫描所有正在进行请求(包括同步请求和异步请求) 的超时情况,但是只调用了
executeInvokeCallback()
方法,进行异步请求的通知;而没有调用responseFuture.putResponse()
方法,唤醒同步请求,因为同步请求waitResponse()
方法,等超时了会自动唤醒。
2.3 ResponseFuture
类
// 请求编号
private final int opaque;
// 发送请求的通道 Channel
private final Channel processChannel;
// 请求超时时间
private final long timeoutMillis;
// 异步请求回调接口实例
private final InvokeCallback invokeCallback;
// 开始时间,用于判断是否超时
private final long beginTimestamp = System.currentTimeMillis();
// 用于同步请求,阻塞当前线程
private final CountDownLatch countDownLatch = new CountDownLatch(1);
// 用于保证只释放一次许可 Semaphore
private final SemaphoreReleaseOnlyOnce once;
// 保证异步回调只调用一次
private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
// 响应结果对象
private volatile RemotingCommand responseCommand;
// 发送请求成功
private volatile boolean sendRequestOK = true;
// 失败原因
private volatile Throwable cause;
通过 countDownLatch
来实现同步阻塞,通过 executeCallbackOnlyOnce
保证异步回调只调用一次。
注:这里
executeCallbackOnlyOnce
不应该使用AtomicBoolean
类型,因为ResponseFuture
对象每次请求的时候都会创建,使用AtomicBoolean
对象,非常占用内存,应该使用AtomicIntegerFieldUpdater
+volatile int
的模式。
2.4 小结
-
通过
processMessageReceived()
方法,处理远程命令包括请求命令和响应命令。- 通过
processRequestCommand()
方法处理请求命令,根据请求code
获取对应的请求处理器和线程池执行器,在对应线程池中处理请求命令。 - 通过
processResponseCommand()
方法处理响应,如果是同步请求,就是唤醒阻塞等待线程,并获取响应结果;如果是异步线程, 就在异步线程池执行器getCallbackExecutor()
中,将响应结果回调。
- 通过
invokeSyncImpl()
方法,发送同步请求。创建ResponseFuture
对象放入responseTable
集合中,通过channel.writeAndFlush(request)
方法发送请求数据,通过responseFuture.waitResponse()
方法阻塞当前线程,等待响应结果。invokeAsyncImpl
方法,发送异步请求。通过semaphoreAsync
限制异步请求并发数,然后创建ResponseFuture
对象放入responseTable
集合中,通过channel.writeAndFlush(request)
方法发送请求数据。invokeOnewayImpl
方法,发送一次性请求。通过semaphoreOneway
限制异步请求并发数,通过channel.writeAndFlush(request)
方法发送请求数据。scanResponseTable
方法,定时巡查超时请求,并进行通知。
三. NettyRemotingClient
类
这个是RPC
服务的客户端具体实现类。
3.1 重要成员属性
// Netty的配置项
private final NettyClientConfig nettyClientConfig;
// Netty客户端引导类
private final Bootstrap bootstrap = new Bootstrap();
// 用来处理当前客户端所有 Socket连接的 IO事件
private final EventLoopGroup eventLoopGroupWorker;
// 控制 channelTables 并发修改的锁
private final Lock lockChannelTables = new ReentrantLock();
// 缓存地址 addr 对应的通道 channel,这样可以直接通过地址获取 channel,进行数据传输
private final ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
// 用于检查请求是否过期的定时器
private final Timer timer = new Timer("ClientHouseKeepingService", true);
// namesrv 的地址列表
private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
// 当前被选中 namesrv 地址
private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
// 记录当前选中 namesrv 的索引值
private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
// 用于并发修改 namesrvAddrList 和 namesrvAddrChoosed 值的锁
private final Lock lockNamesrvChannel = new ReentrantLock();
// 公共线程池执行器,
// 如果调用 `registerProcessor(...)`方法注册请求处理器NettyRequestProcessor时,没有设置ExecutorService,那么就是publicExecutor;
// 如果没有设置异步请求响应回调处理线程池 callbackExecutor,那么也直接使用这个公共线程池 publicExecutor。
private final ExecutorService publicExecutor;
/**
* 异步请求响应回调处理线程池 callbackExecutor
*/
private ExecutorService callbackExecutor;
// Netty 事件的监听接口
private final ChannelEventListener channelEventListener;
// 用来处理 ChannelHandler 的方法,线程数是 NettyClientConfig 中的 clientWorkerThreads 值
private DefaultEventExecutorGroup defaultEventExecutorGroup;
-
nettyClientConfig
:Netty
的一些配置项值。 -
bootstrap
:Netty
客户端引导类。 -
lockChannelTables
和channelTables
: 缓存地址addr
对应的通道channel
。 -
timer
: 用于检查请求是否过期的定时器。 -
namesrvAddrList
,namesrvAddrChoosed
,namesrvIndex
和lockNamesrvChannel
: 记录namesrv
地址列表,和当前选中的namesrv
地址。 -
publicExecutor
: 公共线程池执行器。 -
callbackExecutor
: 异步请求响应回调处理线程池执行器。 -
channelEventListener
:Netty
事件的监听接口。 -
eventLoopGroupWorker
: 用来处理当前客户端所有Socket
连接的IO
事件,只需要一个线程就可以了。 -
defaultEventExecutorGroup
: 用来处理ChannelHandler
的方法,线程数是NettyClientConfig
中的clientWorkerThreads
值。
3.2 重要方法
3.2.1 构造方法
public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
}
public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
final ChannelEventListener channelEventListener) {
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
this.nettyClientConfig = nettyClientConfig;
this.channelEventListener = channelEventListener;
// 获取公共线程池的线程数
int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// 创建公共线程池 publicExecutor,线程名都是 NettyClientPublicExecutor_ 开头
// 如果调用 `registerProcessor(...)`方法注册 请求处理器NettyRequestProcessor,
// 没有设置线程池,那么就用这个公共线程池,也就是处理请求的操作就在 publicExecutor 线程池中执行。
// 如果没有设置异步请求响应回调处理线程池 callbackExecutor,那么也直接使用这个公共线程池 publicExecutor
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
// 用来处理当前客户端所有 Socket连接的 IO事件,就使用一个线程处理
this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
}
});
// 是否要使用 SSL
if (nettyClientConfig.isUseTLS()) {
try {
sslContext = TlsHelper.buildSslContext(true);
log.info("SSL enabled for client");
} catch (IOException e) {
log.error("Failed to create SSLContext", e);
} catch (CertificateException e) {
log.error("Failed to create SSLContext", e);
throw new RuntimeException("Failed to create SSLContext", e);
}
}
}
就是创建 publicExecutor
和 eventLoopGroupWorker
对象,如果支持 SSL
, 那么再创建 sslContext
对象。
3.2.2 start
方法
@Override
public void start() {
// 这个线程池 defaultEventExecutorGroup 是用来处理 ChannelHandler 的方法
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
// 通过 eventLoopGroupWorker 线程来接收 IO 事件,
// 然后交给 defaultEventExecutorGroup 线程,来进行事件处理,
// 这样不会阻塞 处理 IO 事件线程。
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
// Socket 的 keepalive 是false
.option(ChannelOption.SO_KEEPALIVE, false)
// Socket 连接建立的超时时间
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
// Socket 发送缓存区大小
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
// Socket 接收缓存区大小
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
// 是否需要使用 SSL 加密
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
// 使用 defaultEventExecutorGroup 里面的线程
// 来处理接收到Socket IO事件数据的解析和处理
pipeline.addLast(
defaultEventExecutorGroup,
// 将命令RemotingCommand 对象转成缓存区对象ByteBuf,以便发送到远端
new NettyEncoder(),
// 将接收到数据对象ByteBuf 解析成命令RemotingCommand 对象
new NettyDecoder(),
// 进行心跳处理,当当前通道Channel 超过一定时间没有发送或者读取到数据,就当失效处理
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
// 主要是做监控用的,用来发送 Netty 的 CONNECT, CLOSE, IDLE, EXCEPTION 事件的
new NettyConnectManageHandler(),
// 这个就是用来处理解析后得到的 远程命令RemotingCommand,
// 其实就是调用了 processMessageReceived(...) 方法
new NettyClientHandler());
}
});
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
// 每隔三秒扫描有没有过期请求
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
这个方法很长,但是流程其实很简单:
- 创建
defaultEventExecutorGroup
线程池执行器,用于执行ChannelHandler
方法。 - 初始化
bootstrap
对象,使用eventLoopGroupWorker
处理通道的IO
事件,使用defaultEventExecutorGroup
执行添加到ChannelPipeline
的处理器ChannelHandler
的方法。 - 定时器
timer
每隔三秒扫描有没有过期请求。 - 如果
channelEventListener
不为null
, 那么开启nettyEventExecutor
线程, 将Netty
事件提供给用户定义ChannelEventListener
。
3.2.3 closeChannel
方法
public void closeChannel(final String addr, final Channel channel) {
if (null == channel)
return;
// 从通道channel 获取对应的远端地址
final String addrRemote = null == addr ? RemotingHelper.parseChannelRemoteAddr(channel) : addr;
try {
// 加锁,因为要改变 channelTables 集合数据
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
boolean removeItemFromTable = true;
// 得到channelTables 中的通道 ChannelWrapper
final ChannelWrapper prevCW = this.channelTables.get(addrRemote);
log.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, prevCW != null);
// 如果 prevCW 没有,或者和关闭的通道 channel 不是同一个,都不用移除
if (null == prevCW) {
log.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote);
removeItemFromTable = false;
} else if (prevCW.getChannel() != channel) {
log.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.",
addrRemote);
removeItemFromTable = false;
}
if (removeItemFromTable) {
this.channelTables.remove(addrRemote);
log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
}
// 关闭通道
RemotingUtil.closeChannel(channel);
} catch (Exception e) {
log.error("closeChannel: close the channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
} catch (InterruptedException e) {
log.error("closeChannel exception", e);
}
}
将这个 channel
从 channelTables
中移除,并通过 lockChannelTables
锁进行并发修改控制。
3.2.4 getAndCreateChannel
方法
/**
* 根据远程地址 addr 获取通道Channel,和远端进行交互
*/
private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
if (null == addr) {
// 如果地址 addr 为null,那么就从 namesrvAddrList 列表中选择一个
return getAndCreateNameserverChannel();
}
// 记录了每个地址对应的通道
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
// 如果这个通道还能用,那么直接返回
return cw.getChannel();
}
// 根据地址addr 创建通道 Channel
return this.createChannel(addr);
}
- 如果
addr == null
,那么就是namesrv
地址,通过getAndCreateNameserverChannel()
方法,获取对应的通道Channel
。 - 根据地址
addr
从channelTables
中获取对应的Channel
。 - 如果没有可用通道
Channel
,通过createChannel()
方法,创建这个地址addr
的通道。
3.2.5 createChannel
方法
private Channel createChannel(final String addr) throws InterruptedException {
ChannelWrapper cw = this.channelTables.get(addr);
// 如果 channelTables 中有地址addr 对应的通道,并且还是能用的直接返回
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
boolean createNewConnection;
cw = this.channelTables.get(addr);
if (cw != null) {
// 如果 channelTables 中有对应的通道
if (cw.isOK()) {
// 通道还能用,直接返回
return cw.getChannel();
} else if (!cw.getChannelFuture().isDone()) {
// 通道还没有创建完成,那么也不需要再创建了,等待它创建完成
createNewConnection = false;
} else {
// 说明通道坏了,移除它,设置createNewConnection为 true,重新创建
this.channelTables.remove(addr);
createNewConnection = true;
}
} else {
createNewConnection = true;
}
if (createNewConnection) {
// 调用 bootstrap.connect(...) 方法创建通道
ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new ChannelWrapper(channelFuture);
// 存入 channelTables 中。
this.channelTables.put(addr, cw);
}
} catch (Exception e) {
log.error("createChannel: create channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
if (cw != null) {
ChannelFuture channelFuture = cw.getChannelFuture();
// 等待通道创建完成
if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} else {
log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
}
} else {
log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
channelFuture.toString());
}
}
return null;
}
通过 this.bootstrap.connect()
方法创建通道,但是要保证不能多次创建,所以通过 lockChannelTables
进行并发控制。
3.2.6 invokeSync
方法
/**
* 向远程服务器 addr 地址发送数据,并同步阻塞等待响应。
*/
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
// 根据远程地址 addr 获取通道Channel,和远端进行交互
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
// 通道还是活跃的,能进行数据交互
try {
// 执行钩子
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
// 超时就抛出异常
throw new RemotingTimeoutException("invokeSync call timeout");
}
// 调用 NettyRemotingAbstract 的invokeSyncImpl 方法,发送请求
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
// 执行钩子
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
// 发生异常,关闭通道
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
// 如果通道不活跃了,就关闭通道
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
- 通过
getAndCreateChannel()
方法,获取地址addr
对应的通道channel
。 - 如果通道不活跃了,就关闭通道,抛出异常。
- 通道可用,就调用父类
NettyRemotingAbstract
的invokeSyncImpl()
方法。
3.2.7 invokeAsync
方法
@Override
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
// 根据远程地址 addr 获取通道Channel,和远端进行交互
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
// 通道还是活跃的,能进行数据交互
try {
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
// 超时抛出异常
throw new RemotingTooMuchRequestException("invokeAsync call timeout");
}
// 调用 NettyRemotingAbstract 的 invokeAsyncImpl 方法,发送异步请求
this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
} catch (RemotingSendRequestException e) {
log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
// 如果获取的通道 Channel 不能用了,那么从 channelTables 中移除,并抛出异常。
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
与 invokeSync
方法流程类似。
3.3 相关 Netty
的 ChannelHandler
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
// 是否需要使用 SSL 加密
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
// 使用 defaultEventExecutorGroup 里面的线程
// 来处理接收到Socket IO事件数据的解析和处理
pipeline.addLast(
defaultEventExecutorGroup,
// 将命令RemotingCommand 对象转成缓存区对象ByteBuf,以便发送到远端
new NettyEncoder(),
// 将接收到数据对象ByteBuf 解析成命令RemotingCommand 对象
new NettyDecoder(),
// 进行心跳处理,当当前通道Channel 超过一定时间没有发送或者读取到数据,就当失效处理
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
// 主要是做监控用的,用来发送 Netty 的 CONNECT, CLOSE, IDLE, EXCEPTION 事件的
new NettyConnectManageHandler(),
// 这个就是用来处理解析后得到的 远程命令RemotingCommand,
// 其实就是调用了 processMessageReceived(...) 方法
new NettyClientHandler());
}
-
SslHandler
: 处理SSL
加密,通过sslContext.newHandler(ch.alloc())
创建。 -
NettyEncoder
: 将命令RemotingCommand
对象转成缓存区对象ByteBuf
,以便发送到远端。 -
NettyDecoder
: 将接收到数据对象ByteBuf
解析成命令RemotingCommand
对象。 -
IdleStateHandler
: 进行心跳处理,当通道Channel
超过一定时间没有发送或者读取到数据,就会发送事件进行提醒。 -
NettyConnectManageHandler
: 主要是做监控用的,用来发送Netty
的CONNECT
,CLOSE
,IDLE
,EXCEPTION
事件。 -
NettyClientHandler
: 调用父类NettyRemotingAbstract
的processMessageReceived
方法,处理远程命令。
3.3.1 NettyEncoder
类
/**
* 将 RemotingCommand 转成缓存区 ByteBuffer 对象,
*/
@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
// 得到远程命令头信息数据
ByteBuffer header = remotingCommand.encodeHeader();
// 先写头数据
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
// 再写内容体数据
out.writeBytes(body);
}
} catch (Exception e) {
// 发生错误,那么就关闭通道 Channel。
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}
}
通过 encodeHeader()
方法获取头信息header
, 写入头信息,然后再写入体信息 body
。
// 在 RemotingCommand 类中 encodeHeader 方法。
public ByteBuffer encodeHeader() {
return encodeHeader(this.body != null ? this.body.length : 0);
}
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size
// 1. 整个远程命令 RemotingCommand 的总长度
int length = 4;
// 2> header data length
// 2. 得到头数据
byte[] headerData;
headerData = this.headerEncode();
// 增加头数据长度
length += headerData.length;
// 3> body data length
// 3. 增加内容体数据长度
length += bodyLength;
// 内容体先不用添加,
// 那么 ByteBuffer 大小就是 4(总长度) + 4(头长度) + 数据头内容
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// 先将总长度存入
result.putInt(length);
// header length
// 头数据长度 headerData.length 存入,要进行处理
// 第一个字节储存类型,后三个字节储存头长度 headerData.length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
result.flip();
return result;
}
请求命令RemotingCommand
转成ByteBuffer
的数据格式:
4个字节(总长度) + 4个字节(数据头长度) + 数据头(header)字节内容 + 数据体(body)字节内容
其中4个字节数据头长度中,第一个字节储存类型,后三个字节储存头长度,也就是说 数据头长度不会超过三个字节大小。
3.3.2 NettyDecoder
类
/**
* 将接收到数据对象 `ByteBuf` 解析成命令 `RemotingCommand` 对象
*/
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static final int FRAME_MAX_LENGTH =
Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
public NettyDecoder() {
/**
* 0->4 用4个字节表示整个内容帧的总长度
* initialBytesToStrip == 4,表示最后得到的数据,是跳过这个总长度字段。
*/
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
// 得到完整的远程命令数据对应的缓存区ByteBuf
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
// 从缓存区ByteBuf 中解析出一个远程命令RemotingCommand对象
return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}
return null;
}
}
继承自 LengthFieldBasedFrameDecoder
类,得到完整的远程命令数据对应的缓存区 ByteBuf
,再通过RemotingCommand.decode(byteBuffer)
从缓存区 ByteBuf
中解析出一个远程命令RemotingCommand
对象。
关于
LengthFieldBasedFrameDecoder
用法,请看Netty源码_编解码器。
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
// length 去掉了表示总长度的4个字节,也就是说只包括 4(头长度) + 头内容 + 体内容
int length = byteBuffer.limit();
// 一个四个字节,最高一个字节记录类型,即 JSON 或者 ROCKETMQ
// 剩下三个字节才代表头数据长度,即 oriHeaderLen & 0xFFFFFF
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
// 将头数据存入 字节数组headerData 中
byteBuffer.get(headerData);
// 解析头内容
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
// 得到数据体的字节长度
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
根据数据头长度和数据体长度,从缓存区中解析出,数据头和数据头的内容。
3.3.3 NettyConnectManageHandler
类
class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);
final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);
log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote);
super.connect(ctx, remoteAddress, localAddress, promise);
// 发送连接 CONNECT 事件通知
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remote, ctx.channel()));
}
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
// 关闭通道
closeChannel(ctx.channel());
super.disconnect(ctx, promise);
// 发送关闭 CLOSE 事件通知
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
// 关闭通道
closeChannel(ctx.channel());
super.close(ctx, promise);
NettyRemotingClient.this.failFast(ctx.channel());
// 发送关闭 CLOSE 事件通知
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 接收到通道长时间空闲事件,即心跳检测
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
// 关闭通道
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
// 发送空闲 IDLE 事件通知
NettyRemotingClient.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
// 关闭通道
closeChannel(ctx.channel());
// 发送异常 EXCEPTION 事件通知
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
}
}
}
主要是做监控用的,用来发送 Netty 的 CONNECT, CLOSE, IDLE, EXCEPTION 事件。
3.3.4 NettyClientHandler
类
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
// 处理远程命令
processMessageReceived(ctx, msg);
}
}
3.4 小结
NettyRemotingClient
主要功能:
- 通过
bootstrap
创建连接通道channel
,并使用channelTables
缓存地址addr
和 通道channel
对应关系,不需要每次都创建通道。 - 记录
namesrv
的地址列表,当发送请求时,没有写地址addr
,那么就向namesrv
的地址发送请求。 - 通过
invokeSync
,invokeAsync
和invokeOneway
发送请求,其实就是先根据地址addr
获取可使用的通道channel
,调用父类对应方法发送请求数据。
四. NettyRemotingServer
类
这个是 RPC
服务的服务端具体实现类。
4.1 重要的成员属性
// Netty 服务端引导类
private final ServerBootstrap serverBootstrap;
// 处理连接上服务端的所有 Socket 的IO 事件
private final EventLoopGroup eventLoopGroupSelector;
// 处理服务端接收客户端连接的线程池
private final EventLoopGroup eventLoopGroupBoss;
// Netty的配置项
private final NettyServerConfig nettyServerConfig;
// 公共线程池
private final ExecutorService publicExecutor;
// Netty 事件的监听接口
private final ChannelEventListener channelEventListener;
4.2 重要方法
4.2.1 构造方法
public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
}
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
if (useEpoll()) {
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
loadSslContext();
}
创建公共线程池 publicExecutor
,根据是否使用 useEpoll()
,创建不同的 eventLoopGroupBoss
和 eventLoopGroupSelector
实现。
4.2.2 start
方法
public void start() {
// 这个线程池 defaultEventExecutorGroup 是用来处理 ChannelHandler 的方法
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
// 创建共享的 ChannelHandler
prepareSharableHandlers();
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
// 服务端绑定监控端口
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
// 每隔三秒扫描有没有过期请求
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
- 创建
defaultEventExecutorGroup
线程池。 - 创建共享的
ChannelHandler
实例。 - 初始化
serverBootstrap
服务端。添加的
ChannelHandler
与NettyRemotingClient
中的类似,这里就不再展开分析了。 - 服务端
serverBootstrap
绑定监控端口。 - 定时器
timer
每隔三秒扫描有没有过期请求。