序
本文主要研究一下rocketmq5的PushConsumer与SimpleConsumer拉取消息的区别
ProcessQueueImpl
org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
private void receiveMessageImmediately(String attemptId) {
final ClientId clientId = consumer.getClientId();
if (!consumer.isRunning()) {
log.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq, clientId);
return;
}
try {
final Endpoints endpoints = mq.getBroker().getEndpoints();
final int batchSize = this.getReceptionBatchSize();
final Duration longPollingTimeout = consumer.getPushConsumerSettings().getLongPollingTimeout();
final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression,
longPollingTimeout, attemptId);
activityNanoTime = System.nanoTime();
// Intercept before message reception.
final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);
consumer.doBefore(context, Collections.emptyList());
final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,
longPollingTimeout);
Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {
@Override
public void onSuccess(ReceiveMessageResult result) {
// Intercept after message reception.
final List<GeneralMessage> generalMessages = result.getMessageViewImpls().stream()
.map((Function<MessageView, GeneralMessage>) GeneralMessageImpl::new)
.collect(Collectors.toList());
final MessageInterceptorContextImpl context0 =
new MessageInterceptorContextImpl(context, MessageHookPointsStatus.OK);
consumer.doAfter(context0, generalMessages);
try {
onReceiveMessageResult(result);
} catch (Throwable t) {
// Should never reach here.
log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, "
+ "clientId={}", mq, endpoints, clientId, t);
onReceiveMessageException(t, attemptId);
}
}
@Override
public void onFailure(Throwable t) {
String nextAttemptId = null;
if (t instanceof StatusRuntimeException) {
StatusRuntimeException exception = (StatusRuntimeException) t;
if (org.apache.rocketmq.shaded.io.grpc.Status.DEADLINE_EXCEEDED.getCode() == exception.getStatus().getCode()) {
nextAttemptId = request.getAttemptId();
}
}
// Intercept after message reception.
final MessageInterceptorContextImpl context0 =
new MessageInterceptorContextImpl(context, MessageHookPointsStatus.ERROR);
consumer.doAfter(context0, Collections.emptyList());
log.error("Exception raised during message reception, mq={}, endpoints={}, attemptId={}, " +
"nextAttemptId={}, clientId={}", mq, endpoints, request.getAttemptId(), nextAttemptId,
clientId, t);
onReceiveMessageException(t, nextAttemptId);
}
}, MoreExecutors.directExecutor());
receptionTimes.getAndIncrement();
consumer.getReceptionTimes().getAndIncrement();
} catch (Throwable t) {
log.error("Exception raised during message reception, mq={}, clientId={}", mq, clientId, t);
onReceiveMessageException(t, attemptId);
}
}
PushConsumer通过ProcessQueueImpl的receiveMessageImmediately拉取消息,其内部是通过consumer.receiveMessage(request, mq, longPollingTimeout)来拉取的,request是通过consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression, longPollingTimeout, attemptId)构建的
SimpleConsumerImpl
org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
public ListenableFuture<List<MessageView>> receive0(int maxMessageNum, Duration invisibleDuration) {
if (!this.isRunning()) {
log.error("Unable to receive message because simple consumer is not running, state={}, clientId={}",
this.state(), clientId);
final IllegalStateException e = new IllegalStateException("Simple consumer is not running now");
return Futures.immediateFailedFuture(e);
}
if (maxMessageNum <= 0) {
final IllegalArgumentException e = new IllegalArgumentException("maxMessageNum must be greater than 0");
return Futures.immediateFailedFuture(e);
}
final HashMap<String, FilterExpression> copy = new HashMap<>(subscriptionExpressions);
final ArrayList<String> topics = new ArrayList<>(copy.keySet());
// All topic is subscribed.
if (topics.isEmpty()) {
final IllegalArgumentException e = new IllegalArgumentException("There is no topic to receive message");
return Futures.immediateFailedFuture(e);
}
final String topic = topics.get(IntMath.mod(topicIndex.getAndIncrement(), topics.size()));
final FilterExpression filterExpression = copy.get(topic);
final ListenableFuture<SubscriptionLoadBalancer> routeFuture = getSubscriptionLoadBalancer(topic);
final ListenableFuture<ReceiveMessageResult> future0 = Futures.transformAsync(routeFuture, result -> {
final MessageQueueImpl mq = result.takeMessageQueue();
final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,
invisibleDuration, awaitDuration);
return receiveMessage(request, mq, awaitDuration);
}, MoreExecutors.directExecutor());
return Futures.transformAsync(future0, result -> Futures.immediateFuture(result.getMessageViews()),
clientCallbackExecutor);
}
SimpleConsumerImpl的receive0也是通过ConsumerImpl的receiveMessage(request, mq, awaitDuration)方法来拉取消息的,其request是通过wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression, invisibleDuration, awaitDuration)来构建的
ConsumerImpl
org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
receiveMessage
protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRequest request,
MessageQueueImpl mq, Duration awaitDuration) {
List<MessageViewImpl> messages = new ArrayList<>();
try {
final Endpoints endpoints = mq.getBroker().getEndpoints();
final Duration tolerance = clientConfiguration.getRequestTimeout();
final Duration timeout = awaitDuration.plus(tolerance);
final ClientManager clientManager = this.getClientManager();
final RpcFuture<ReceiveMessageRequest, List<ReceiveMessageResponse>> future =
clientManager.receiveMessage(endpoints, request, timeout);
return Futures.transformAsync(future, responses -> {
Status status = Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR)
.setMessage("status was not set by server")
.build();
Long transportDeliveryTimestamp = null;
List<Message> messageList = new ArrayList<>();
for (ReceiveMessageResponse response : responses) {
switch (response.getContentCase()) {
case STATUS:
status = response.getStatus();
break;
case MESSAGE:
messageList.add(response.getMessage());
break;
case DELIVERY_TIMESTAMP:
final Timestamp deliveryTimestamp = response.getDeliveryTimestamp();
transportDeliveryTimestamp = Timestamps.toMillis(deliveryTimestamp);
break;
default:
log.warn("[Bug] Not recognized content for receive message response, mq={}, " +
"clientId={}, response={}", mq, clientId, response);
}
}
for (Message message : messageList) {
final MessageViewImpl view = MessageViewImpl.fromProtobuf(message, mq, transportDeliveryTimestamp);
messages.add(view);
}
StatusChecker.check(status, future);
final ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(endpoints, messages);
return Futures.immediateFuture(receiveMessageResult);
}, MoreExecutors.directExecutor());
} catch (Throwable t) {
// Should never reach here.
log.error("[Bug] Exception raised during message receiving, mq={}, clientId={}", mq, clientId, t);
return Futures.immediateFailedFuture(t);
}
}
receiveMessage方法通过clientManager.receiveMessage(endpoints, request, timeout)来拉取消息,之后转换为ReceiveMessageResult
wrapReceiveMessageRequest(ProcessQueueImpl
)
ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, MessageQueueImpl mq,
FilterExpression filterExpression, Duration longPollingTimeout, String attemptId) {
attemptId = null == attemptId ? UUID.randomUUID().toString() : attemptId;
return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
.setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression))
.setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
.setBatchSize(batchSize).setAutoRenew(true).setAttemptId(attemptId).build();
}
ProcessQueueImpl调用的wrapReceiveMessageRequest只传递了batchSize、mq、filterExpression, longPollingTimeout, attemptId这几个参数
wrapReceiveMessageRequest(SimpleConsumerImpl
)
ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, MessageQueueImpl mq,
FilterExpression filterExpression, Duration invisibleDuration, Duration longPollingTimeout) {
final org.apache.rocketmq.shaded.com.google.protobuf.Duration duration = Durations.fromNanos(invisibleDuration.toNanos());
return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
.setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression))
.setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
.setBatchSize(batchSize).setAutoRenew(false).setInvisibleDuration(duration).build();
}
SimpleConsumerImpl调用的wrapReceiveMessageRequest只传递了maxMessageNum(
batchSize
), mq, filterExpression, invisibleDuration, awaitDuration(longPollingTimeout
)这几个参数
区别在于一个是setAutoRenew为true且设置了attemptId,一个是setAutoRenew为false且设置了invisibleDuration
小结
rocketmq5的PushConsumer与SimpleConsumer拉取消息都是通过ConsumerImpl的receiveMessage方法来拉取的,区别在于构建的ReceiveMessageRequest参数不一样,一个是setAutoRenew为true且设置了attemptId,一个是setAutoRenew为false且设置了invisibleDuration。