DeFiBus的Rpc调用
整个调用过程包含了两个消息的产生和消费过程。
1.请求方产生请求消息,服务响应方消费这条请求消息。请求方根据服务提供方的协议将请求内容设置到消息体中,并将消息发送到Broker上。服务响应方订阅相应的Topic,从Broker上获取到请求消息并消费。
2.服务响应方产生响应消息,请求方接收这条响应消息。服务响应方收到请求消息后,执行相应的处理,并将请求结果设置到响应消息的消息体中,将响应消息发送到Broker上。
3.Broker接收响应消息的方式采用的是Broker推送的形式,而不是由Producer订阅的方式,从而使得响应消息能够精准回到发出请求消息的实例上。
4.DeFiBus在每条请求消息中增加REPLY_TO属性来唯一标识每一个请求方实例。在创建响应消息时将REPLY_TO属性透传到响应消息中。Broker收到响应消息后根据REPLY_TO属性查找出对应的请求方实例的连接,将响应消息推送给该请求方实例。
DeFiBus的Producer
public class DeFiBusProducerImpl {
public Message request(Message requestMsg, final SendCallback sendCallback, RRCallback rrCallback, long timeout)
throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
boolean isAsyncRR = (rrCallback != null);
final String uniqueRequestId = DeFiBusRequestIDUtil.createUniqueName("w");
DefaultMQProducer producer = deFiBusProducer.getDefaultMQProducer();
requestMsg.putUserProperty(DeFiBusConstant.KEY, DeFiBusConstant.PERSISTENT);
requestMsg.putUserProperty(DeFiBusConstant.PROPERTY_RR_REQUEST_ID, uniqueRequestId);
// 在请求的消息中增加了PROPERTY_MESSAGE_REPLY_TO属性
requestMsg.putUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO, producer.buildMQClientId());
requestMsg.putUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_TTL, String.valueOf(timeout));
final RRResponseFuture responseFurture = new RRResponseFuture(rrCallback, timeout);
String topic = requestMsg.getTopic()
ResponseTable.getRrResponseFurtureConcurrentHashMap().put(uniqueRequestId, responseFurture);
if (isAsyncRR) {
// 省略代码
} else {
publish(requestMsg, new SendCallback() {
// 省略相关代码
}, timeout);
Message retMessage = responseFurture.waitResponse(timeout);
ResponseTable.getRrResponseFurtureConcurrentHashMap().remove(uniqueRequestId);
return retMessage;
}
}
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
// 携带producer所在的IP地址
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
}
- DeFiBus的Producer请求消息中增加PROPERTY_MESSAGE_REPLY_TO属性来标识每一个请求方实例。
- PROPERTY_MESSAGE_REPLY_TO包含请求方的IP地址和实例名字。
DeFiBus的Consumer
public class DeFiBusClientUtil {
public static final Logger LOGGER = LoggerFactory.getLogger(DeFiBusClientUtil.class);
public static Message createReplyMessage(MessageExt sourceMsg, byte[] content) {
String cluster = sourceMsg.getUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_CLUSTER);
String replyTopic = DeFiBusConstant.RR_REPLY_TOPIC;
if (!StringUtils.isEmpty(cluster)) {
replyTopic = cluster + "-" + replyTopic;
}
Message msg = new Message();
msg.setTopic(replyTopic);//回程topic
msg.setBody(content);//body
msg.putUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO,
sourceMsg.getUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO));//回给谁
msg.putUserProperty(DeFiBusConstant.PROPERTY_RR_REQUEST_ID,
sourceMsg.getUserProperty(DeFiBusConstant.PROPERTY_RR_REQUEST_ID));//原uniqueId
String sourceBroker = sourceMsg.getUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_BROKER);
if (!StringUtils.isEmpty(sourceBroker)) {
msg.putUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_BROKER, sourceBroker);//消息从哪个broker来
}
return msg;
}
}
- DeFiBus的Consumer在响应的报文体中同样携带PROPERTY_MESSAGE_REPLY_TO属性。
- DeFiBus的Broker在收到Consumer的响应消息后会进行特殊处理。
DeFiBus的Broker
public class DeFiReplyMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private RemotingCommand processReplyMessageRequest(final ChannelHandlerContext ctx, //
final RemotingCommand request, //
final SendMessageContext sendMessageContext, //
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
if (msgInner.getProperties() != null && DeFiBusConstant.REPLY.equals(msgInner.getProperties().get(DeFiBusConstant.KEY))) {
// 获取发送者的消息Id
String senderId = msgInner.getProperties().get(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO);
if (senderId == null) {
// 省略相关代码
} else {
// 查找senderId对应的Chennel信息
ClientChannelInfo clientChannelInfo = this.deFiBrokerController.getProducerManager().getClientChannel(senderId);
if (clientChannelInfo == null || clientChannelInfo.getChannel() == null || !clientChannelInfo.getChannel().isActive()) {
// 省略相关代码
} else {
Map<String, String> map = MessageDecoder.string2messageProperties(replyMessageRequestHeader.getProperties());
map.put(DeFiBusMessageConst.LEAVE_TIME, String.valueOf(System.currentTimeMillis()));
replyMessageRequestHeader.setProperties(MessageDecoder.messageProperties2String(map));
try {
this.deFiBrokerController.getPushReplyMessageExecutor().submit(new Runnable() {
@Override public void run() {
boolean isPushSuccess = deFiBrokerController.getDeFiBusBroker2Client()
.pushRRReplyMessageToClient(clientChannelInfo.getChannel(), replyMessageRequestHeader, msgInner);
}
}
});
} catch (RejectedExecutionException e) {
}
}
}
}
}
}
- DeFiBus的Broker在收到Rpc的响应消息后通过线程池异步执行pushRRReplyMessageToClient发送Rpc消息。