由于rocketMQ采用netty通信组件进行服务互通,所以需要定义好协议的格式。有些协议采用google的protobf进行可编辑并自动生成的协议,但是rocketMQ是也可以实现自定义协议。
RemotingCommand 通信命令格式
看一下该类的属性
private int code;
private LanguageCode language = LanguageCode.JAVA;
private int version = 0;
private int opaque = requestId.getAndIncrement();
private int flag = 0;
private String remark;
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
private transient byte[] body;
1.code 可以理解为协议的请求类型,用于区别不同的请求协议,从而达到处理相应功能
2.language 协议的语言,默认java
3.version 协议版本
4.opaque 协议的编号,每个请求协议都有自己的编号,用于返回响应时能够与请求的协议标识为同一个动作。
5.flag,从标识中可以知道,该协议是请求协议(也可以存在无需返回协议),响应协议
6.extFields 将customHeader中的属性转换成map
7.body 即为协议的根本内容
public ByteBuffer encode() {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
if (this.body != null) {
length += body.length;
}
ByteBuffer result = ByteBuffer.allocate(4 + length);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
// body data;
if (this.body != null) {
result.put(this.body);
}
result.flip();
return result;
}
private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}
public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
Field[] fields = getClazzFields(customHeader.getClass());
if (null == this.extFields) {
this.extFields = new HashMap<String, String>();
}
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
value = field.get(this.customHeader);
} catch (Exception e) {
log.error("Failed to access field [{}]", name, e);
}
if (value != null) {
this.extFields.put(name, value.toString());
}
}
}
}
}
}
这一组方法是将协议转换成byte数组,一个协议大致组成为头内容,包括了code,flag,opaque等数据内容,还有一部分就是body内容了。
NettyRemotingAbstract
该抽象类,是对请求协议的处理,事件的处理,响应协议的处理进行了封装。
处理协议中,该协议会存在2种类型 请求类型和响应类型,一般正常流程是客户端创建一个请求类型的协议并向服务端发起请求,如果需要等待,则一直等待服务端处理请求协议后,又封装了一个响应结果返回给客户端,此时客户端接受到的协议是响应协议,并且唤醒等待的线程,请求线程就完成一次请求结果了。
a. 服务端处理请求协议
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
请求协议中都指明该协议的具体类型,通过processorTable得到了协议的处理工具,Pair是一对的意思,他这里存放了协议处理类和一个线程池。声明了一个线程类,在实现线程方法中,通过pair中的object1执行processRequest方法,处理请求得到返回结果response(响应的内容),再判断请求协议是否是单向请求的,如果是需要返回,则response更新了opaue的值即为request中的opaue值,然后标记为响应协议,最后将协议写入到管道通信中。在实现一个协议处理线程后,将线程封装成RequestTask放入到线程池中。
b. 客户端处理响应协议
/**
* Process response from remote peer to the previous issued requests.
*
* @param ctx channel handler context.
* @param cmd response command instance.
*/
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
/**
* Execute callback in callback executor. If callback executor is null, run directly in current thread
*/
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("execute callback in executor exception, and callback throw", e);
} finally {
responseFuture.release();
}
}
});
} catch (Exception e) {
runInThisThread = true;
log.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
}
if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("executeInvokeCallback Exception", e);
} finally {
responseFuture.release();
}
}
}
先从cmd中获取到opaque值,然后从响应列别中responseTable获取到ResponseFuture,其实该类是客户端发起请求时临时创建的一个响应监听器,当响应协议到达客户端后,会进行唤醒等待线程或者也可以执行内部的回调方法。
1.等待模式设计
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
同步方法,创建了ResponseFuture对象,并且将responseFuture放入到responseTable列表中,然后发送请求内容。其中responseFuture.waitResponse(timeoutMillis);方法就是一个等待过程,
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
就是监听倒计时锁进行监听的,所以到响应协议通过opaque值得到了ResponseFuture对象,然后只要将内部countDownLatch进行扣减,就能唤醒等待线程了。
public void putResponse(final RemotingCommand responseCommand) {
this.responseCommand = responseCommand;
this.countDownLatch.countDown();
}
这部分就是在ResponseFuture对象放入响应协议时并进行扣减计数。实现请求协议同步工作。
b. 异步模式设计
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final int opaque = request.getOpaque();
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
once.release();
throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
}
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
} catch (Exception e) {
responseFuture.release();
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
} else {
String info =
String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreAsync.getQueueLength(),
this.semaphoreAsync.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
在发起请求之前,需要通过semaphoreAsync资源控制器,尝试获取资源,主要目的时控制异步请求的数量。并且也是声明了ResponseFuture对象,但是该对象添加了invokeCallback回调函数,将responseFuture放入到responseTable,然后发送请求协议。当响应协议到达时,最终会执行回调函数的方法,当然此时的response的内容都已经放入到future中了。
public void executeInvokeCallback() {
if (invokeCallback != null) {
if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {
invokeCallback.operationComplete(this);
}
}
}
既然semaphoreAsync已经申请到了资源,使用完也要进行归还的,在finally块中都会调用responseFuture的释放资源方法
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("executeInvokeCallback Exception", e);
} finally {
responseFuture.release();
}
}
考虑一个问题 responseTable 是记录了请求时的ResponseFuture数据,如果服务端长时间没有返回结果,responseTable中的数据必然会超时,超时了就需要将过期数据给清除掉。所以他就提供了扫描方法
public void scanResponseTable() {
final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ResponseFuture> next = it.next();
ResponseFuture rep = next.getValue();
if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
rep.release();
it.remove();
rfList.add(rep);
log.warn("remove timeout request, " + rep);
}
}
for (ResponseFuture rf : rfList) {
try {
executeInvokeCallback(rf);
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
扫描方法主要时通过请求的开始时间与超时时间之和与系统当前时间比较,将过期的future移除,并且进行异步处理。如果同步的话,本身有超时监听,所以不需要要异步处理,只要有异步处理的回调方法就可以。
c. 监听事件处理
在客户端与服务端构建连接时,会出现很多事件,例如连接成功,连接关闭,连接超时,连接异常等。但是这些连接的状态往往也非常重要,因为需要进行二次处理。在rocketMQ中,一个broker可能需要连接多个producer,多个consumer。一个producer或者consumer也会连接多个broker。所以他们之间的连接状态也可能会很频繁。在NettyRemotingAbstract中就引入了线程处理状态变更事件NettyEvent。
NettyEventExecutor
该线程实现ServiceThread,也就是一个线程类
private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
内部有个队列eventQueue,用于记录事件。
在接口实现方法中
public void run() {
log.info(this.getServiceName() + " service started");
final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
while (!this.isStopped()) {
try {
NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
if (event != null && listener != null) {
switch (event.getType()) {
case IDLE:
listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
break;
case CLOSE:
listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
break;
case CONNECT:
listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
break;
case EXCEPTION:
listener.onChannelException(event.getRemoteAddr(), event.getChannel());
break;
default:
break;
}
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
获取到ChannelEventListener对象,然后从while循环中,不停的从eventQueue队列中poll事件对象,通过事件类型处理对应的listener方法。
NettyRemotingClient
继承于NettyRemotingAbstract抽象类,主要实现客户端的连接方式。设置了通信的参数配置,设置了链路的过滤处理,包括协议的加密解密,协议的处理,连接状态的处理
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
NettyEncoder,NettyDecoder是协议的加密解密处理类,就是将Command与byte之间的转换,NettyClientHandler是接受协议的处理类,但是需要通过NettyDecoder解密后才能处理,最终调用的是processMessageReceived方法。
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
NettyConnectManageHandler是连接管理处理类。主要实现connect,disconnect,close,exceptionCaught等状态方法,将状态封装成NettyEvent放入到状态处理的线程中。
在发起服务端连接时,
private Channel createChannel(final String addr) throws InterruptedException {
// ...省略代码
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
boolean createNewConnection;
cw = this.channelTables.get(addr);
if (cw != null) {
if (cw.isOK()) {
cw.getChannel().close();
this.channelTables.remove(addr);
createNewConnection = true;
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
} else {
this.channelTables.remove(addr);
createNewConnection = true;
}
} else {
createNewConnection = true;
}
if (createNewConnection) {
ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new ChannelWrapper(channelFuture);
this.channelTables.put(addr, cw);
}
} catch (Exception e) {
log.error("createChannel: create channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
// ... 省略代码
return null;
}
channelTables是保存了地址对应的连接管道Channel, 在申请连接时,判断addr地址是否已经创建过连接,如果需要创建则调用bootstrap.connect连接方法,得到ChannelFuture对象并且封装成ChannelWrapper保存在channelTables。客户端是连接多个服务端的,在rocketMQ中,存在有namesrv集群服务,broker集群服务。客户端都需要对他们进行连接,所以会出现很多Channel。
NettyRemotingServer
通信服务端,他提供2中通信模式,select 和 epoll形式,其他的协议处理,状态监听都与客户端类似。
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
prepareSharableHandlers();
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
// 省略部分代码
}
这是服务端通信的启动代码,通过判断是否可用epoll模式,确定了ServerChannel。并且设置了其他的ServerBootstrap中属性。在管道初始话连接中,与客户端不同的是,这里增加了握手认证,当握手认证成功后,就会移除管道绑定的handler。其他协议的加密,解密,idle状态处理,连接状态管理和协议处理等都与客户端方式一致。