一 MQClientManager
private static MQClientManager instance = new MQClientManager();
public static MQClientManager getInstance() {
return instance;
}
- 根据id,创建并缓存对应的MQClientInstance
ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
二 MQClientInstance属性及实例化
- 使用当前MQClientInstance的生产者,消费者,管理者的纪录信息
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;//顺序消费的配置信息
private List<QueueData> queueDatas;//topic在broker上的队列配置信息
private List<BrokerData> brokerDatas;//topic所在broker的信息
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;//过滤server信息
}
public class QueueData implements Comparable<QueueData> {
private String brokerName; //topc所在broker
private int readQueueNums;//读队列数
private int writeQueueNums;//写队列数
private int perm;//读写配置
private int topicSynFlag;//同步配置
}
public class BrokerData implements Comparable<BrokerData> {
private String cluster;//broker的集群名称
private String brokerName;//broker名称
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;//broker的主从节点地址
}
//broker主从节点地址表
ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable
//broker版本表
ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
//配置
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
//通信客户端配置
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
//请求码处理函数
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
//组装请求,代理通信客户端的通信接口
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
if (this.clientConfig.getNamesrvAddr() != null) {
//更新namesrv地址列表 this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
this.clientId = clientId;
//topic,队列,消息管理接口
this.mQAdminImpl = new MQAdminImpl(this);
//push方式中,异步线程处理拉消息请求。
this.pullMessageService = new PullMessageService(this);
//定时任务,调用消费端负载均衡服务。
this.rebalanceService = new RebalanceService(this);
//内部生产者topic,用于消费失败或超时的消息,sendMessageBack回发给broker,放大retry topic中重试消费
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
//统计服务
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
this.instanceIndex,
this.clientId,
this.clientConfig,
MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}
三 MQClientInstance初始化
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
四 RebalanceService
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
//默认20s,使用CountDownLatch2休眠等待20s
this.waitForRunning(waitInterval);
//调用
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
//遍历所有消费客户端,执行DefaultMQPushConsumerImpl.doRebalance()。具体见前述章节
public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
五 PullMessageService
- push方式消费端使用
- 存储拉消息的请求,异步处理请求拉取消息
LinkedBlockingQueue<PullRequest> pullRequestQueue
- pullRequestQueue生产者1,消费端负载均衡更新完成后,发送PullRequest到pullRequestQueue队列中
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
...
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
//遍历分配给topic的MessageQueue
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
//没有消费缓存快照的,则需要组装PullRequest,拉取消息放入缓存快照中
if (isOrder && !this.lock(mq)) {
//顺序topic需要先加锁
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
//计算首次消费的偏移量
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {//防止并发
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
//消费组
pullRequest.setConsumerGroup(consumerGroup);
//消费偏移量
pullRequest.setNextOffset(nextOffset);
//消息队列
pullRequest.setMessageQueue(mq);
//消息缓存快照,拉取消息后本地缓存,等待业务方消费
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
this.dispatchPullRequest(pullRequestList);
...
}
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
//分发消息到阻塞队列中 this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//等待获取拉消息请求PullRequest
PullRequest pullRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(pullRequest);
}
} catch (InterruptedException e) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
//获取消费组对应的消费客户端,执行pullMessage拉取消息。
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
六 MQAdminImpl
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
try {
//从namesrv获取topic路由信息
TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis);
//获取broker列表
List<BrokerData> brokerDataList = topicRouteData.getBrokerDatas();
if (brokerDataList != null && !brokerDataList.isEmpty()) {
Collections.sort(brokerDataList);
//至少一个broker创建成功
boolean createOKAtLeastOnce = false;
MQClientException exception = null;
StringBuilder orderTopicString = new StringBuilder();
//遍历broker
for (BrokerData brokerData : brokerDataList) {
//取broker主节点地址
String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (addr != null) {//初始化topic的配置
TopicConfig topicConfig = new TopicConfig(newTopic);
topicConfig.setReadQueueNums(queueNum);//读队列数
topicConfig.setWriteQueueNums(queueNum);//写队列数
topicConfig.setTopicSysFlag(topicSysFlag);//单元标记
boolean createOK = false;
for (int i = 0; i < 5; i++) {
//通知broker创建topic。失败最多尝试5次
try {
this.mQClientFactory.getMQClientAPIImpl()
.createTopic(addr, key, topicConfig, timeoutMillis);
createOK = true;
createOKAtLeastOnce = true;
break;
} catch (Exception e) {
if (4 == i) {
exception = new MQClientException("create topic to broker exception", e);
}
}
}
if (createOK) {
orderTopicString.append(brokerData.getBrokerName());
orderTopicString.append(":");
orderTopicString.append(queueNum);
orderTopicString.append(";");
}
}
}
//有一个broker创建topic成功,即认为成功
if (exception != null && !createOKAtLeastOnce) {
throw exception;
}
} else {
throw new MQClientException("Not found broker, maybe key is wrong", null);
}
} catch (Exception e) {
throw new MQClientException("create new topic failed", e);
}
}
public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
try {
TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
if (topicRouteData != null) {
//若顺序order配置,则route.getOrderTopicConf().split(";");获取broker列表,存储broker的mq
//若非顺序order配置,则遍历route.getQueueDatas(),选择可写的broker,保存broker主节点上的mq
TopicPublishInfo topicPublishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
if (topicPublishInfo != null && topicPublishInfo.ok()) {
return topicPublishInfo.getMessageQueueList();
}
}
} catch (Exception e) {
throw new MQClientException("Can not find Message Queue for this topic, " + topic, e);
}
throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
}
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
try {
TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
if (topicRouteData != null) {
//遍历route.getQueueDatas(),选择可读的broker,保存mq信息
Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
if (!mqList.isEmpty()) {
return mqList;
} else {
throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);
}
}
} catch (Exception e) {
throw new MQClientException(
"Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
e);
}
throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
}
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
//获取队列所属broker主节点地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
if (brokerAddr != null) {
try { //发送请求获取指定时间戳的偏移量数据
return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
protected QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end,
boolean isUniqKey) throws MQClientException,
InterruptedException {
//获取topic路由信息
TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
if (null == topicRouteData) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
}
if (topicRouteData != null) {
//遍历topic所在broker列表,获取broker地址
List<String> brokerAddrs = new LinkedList<String>();
for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
String addr = brokerData.selectBrokerAddr();
if (addr != null) {
brokerAddrs.add(addr);
}
}
if (!brokerAddrs.isEmpty()) {
//等待每个broker查询结果返回
final CountDownLatch countDownLatch = new CountDownLatch(brokerAddrs.size());
//存储查询结果
final List<QueryResult> queryResultList = new LinkedList<QueryResult>();
//控制对queryResultList的并发修改
final ReadWriteLock lock = new ReentrantReadWriteLock(false);
//遍历broker地址,发送异步查询消息,查询topic指定key在时间区间内的消息
for (String addr : brokerAddrs) {
try {
QueryMessageRequestHeader requestHeader = new QueryMessageRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setKey(key);
requestHeader.setMaxNum(maxNum);
requestHeader.setBeginTimestamp(begin);
requestHeader.setEndTimestamp(end);
this.mQClientFactory.getMQClientAPIImpl().queryMessage(addr, requestHeader, timeoutMillis * 3,
new InvokeCallback() {//异步查询响应回调函数
@Override
public void operationComplete(ResponseFuture responseFuture) {
try {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
QueryMessageResponseHeader responseHeader = null;
try {
responseHeader =
(QueryMessageResponseHeader) response
.decodeCommandCustomHeader(QueryMessageResponseHeader.class);
} catch (RemotingCommandException e) {
log.error("decodeCommandCustomHeader exception", e);
return;
}
//解码查询到的消息列表
List<MessageExt> wrappers =
MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);
QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);
try {//加锁保存
lock.writeLock().lock();
queryResultList.add(qr);
} finally {
lock.writeLock().unlock();
}
break;
}
default:
log.warn("getResponseCommand failed, {} {}", response.getCode(), response.getRemark());
break;
}
} else {
log.warn("getResponseCommand return null");
}
} finally {//通知一次异步查询完成
countDownLatch.countDown();
}
}
}, isUniqKey);
} catch (Exception e) {
log.warn("queryMessage exception", e);
}
}
//等待所有broker异步查询完成
boolean ok = countDownLatch.await(timeoutMillis * 4, TimeUnit.MILLISECONDS);
if (!ok) {
log.warn("queryMessage, maybe some broker failed");
}
long indexLastUpdateTimestamp = 0;
List<MessageExt> messageList = new LinkedList<MessageExt>();
for (QueryResult qr : queryResultList) {
if (qr.getIndexLastUpdateTimestamp() > indexLastUpdateTimestamp) {
indexLastUpdateTimestamp = qr.getIndexLastUpdateTimestamp();
}
for (MessageExt msgExt : qr.getMessageList()) {
if (isUniqKey) {//唯一key,则返回最新存储时间的broker上消息
if (msgExt.getMsgId().equals(key)) {
if (messageList.size() > 0) {
if (messageList.get(0).getStoreTimestamp() > msgExt.getStoreTimestamp()) {
messageList.clear();
messageList.add(msgExt);
}
} else {
messageList.add(msgExt);
}
} else {
log.warn("queryMessage by uniqKey, find message key not matched, maybe hash duplicate {}", msgExt.toString());
}
} else {//一组key,则包含key的消息即保存
String keys = msgExt.getKeys();
if (keys != null) {
boolean matched = false;
String[] keyArray = keys.split(MessageConst.KEY_SEPARATOR);
if (keyArray != null) {
for (String k : keyArray) {
if (key.equals(k)) {
matched = true;
break;
}
}
}
if (matched) {
messageList.add(msgExt);
} else {
log.warn("queryMessage, find message key not matched, maybe hash duplicate {}", msgExt.toString());
}
}
}
}
}
if (!messageList.isEmpty()) {
return new QueryResult(indexLastUpdateTimestamp, messageList);
} else {
throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by key finished, but no message.");
}
}
}
throw new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "The topic[" + topic + "] not matched route info");
}
七 MQClientAPIImpl
7.1 请求码处理函数ClientRemotingProcessor
7.1 请求码处理分发函数
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
//校验事务消息生产者的事务状态,使用事务校验线程异步处理,
//调用事务校验回调函数检查事务状态,根据本地事务状态发送事务状态对应的处理方式消息给broker
return this.checkTransactionState(ctx, request);
case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
return this.notifyConsumerIdsChanged(ctx, request);
case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
return this.resetOffset(ctx, request);
case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
return this.getConsumeStatus(ctx, request);
case RequestCode.GET_CONSUMER_RUNNING_INFO:
return this.getConsumerRunningInfo(ctx, request);
case RequestCode.CONSUME_MESSAGE_DIRECTLY:
return this.consumeMessageDirectly(ctx, request);
default:
break;
}
return null;
}
7.2 代理接口
- 和namesrv及broker通信的接口都从这里代理.
7.2.1 发送消息
RemotingCommand request = null;
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
request.setBody(msg.getBody());
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
default:
assert false;
break;
}
private void sendMessageAsync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final AtomicInteger times,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {//构建异步请求响应回调函数
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (null == sendCallback && response != null) {
//无用户回调函数,有响应消息
try {//处理响应报文
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
if (context != null && sendResult != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
} catch (Throwable e) {
}
//更新broker抑制时间
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
return;
}
if (response != null) {
try {//有响应回调函数,则调用用户回调函数接口。
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
assert sendResult != null;
if (context != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
try {
sendCallback.onSuccess(sendResult);
} catch (Throwable e) {
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
} catch (Exception e) {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, e, context, false, producer);
}
} else {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
if (!responseFuture.isSendRequestOK()) {
MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
} else if (responseFuture.isTimeout()) {
MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
responseFuture.getCause());
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
} else {
MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
}
}
}
});
}
private void onExceptionImpl(final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int timesTotal,
final AtomicInteger curTimes,
final Exception e,
final SendMessageContext context,
final boolean needRetry,
final DefaultMQProducerImpl producer
) {
int tmp = curTimes.incrementAndGet();
if (needRetry && tmp <= timesTotal) {//发送重试
String retryBrokerName = brokerName;//by default, it will send to the same broker
if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
retryBrokerName = mqChosen.getBrokerName();
}
String addr = instance.findBrokerAddressInPublish(retryBrokerName);
log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
retryBrokerName);
try {
//更新请求唯一id request.setOpaque(RemotingCommand.createNewRequestId());
//再次发送异步消息
sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
timesTotal, curTimes, context, producer);
} catch (InterruptedException e1) {
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, false, producer);
} catch (RemotingConnectException e1) {
producer.updateFaultItem(brokerName, 3000, true);
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, true, producer);
} catch (RemotingTooMuchRequestException e1) {
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, false, producer);
} catch (RemotingException e1) {
producer.updateFaultItem(brokerName, 3000, true);
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, true, producer);
}
} else {
//不重试则处理异常结果
if (context != null) {
context.setException(e);
context.getProducer().executeSendMessageHookAfter(context);
}
try {//调用回调函数异常处理函数
sendCallback.onException(e);
} catch (Exception ignored) {
}
}
}
7.2.2 拉取消息
private PullResult pullMessageSync(
final String addr,
final RemotingCommand request,
final long timeoutMillis
) throws RemotingException, InterruptedException, MQBrokerException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
//根据结果转换状态码,返回拉取的消息
return this.processPullResponse(response);
}
private void pullMessageAsync(
final String addr,
final RemotingCommand request,
final long timeoutMillis,
final PullCallback pullCallback
) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {//封装一层异步回调函数
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {//处理响应结果
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
//调用上层的拉消息异步回调,push方式的消息快照缓存处理
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
pullCallback.onException(e);
}
} else {//无响应,调用异常回调函数处理
if (!responseFuture.isSendRequestOK()) {
pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
responseFuture.getCause()));
} else {
pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
}
}
}
});
}