序
本文主要研究一下rocketmq5的PushConsumer的消费逻辑
PushConsumerImpl
org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
class PushConsumerImpl extends ConsumerImpl implements PushConsumer {
//......
protected void startUp() throws Exception {
try {
log.info("Begin to start the rocketmq push consumer, clientId={}", clientId);
GaugeObserver gaugeObserver = new ProcessQueueGaugeObserver(processQueueTable, clientId, consumerGroup);
this.clientMeterManager.setGaugeObserver(gaugeObserver);
super.startUp();
final ScheduledExecutorService scheduler = this.getClientManager().getScheduler();
this.consumeService = createConsumeService();
// Scan assignments periodically.
scanAssignmentsFuture = scheduler.scheduleWithFixedDelay(() -> {
try {
scanAssignments();
} catch (Throwable t) {
log.error("Exception raised while scanning the load assignments, clientId={}", clientId, t);
}
}, 1, 5, TimeUnit.SECONDS);
log.info("The rocketmq push consumer starts successfully, clientId={}", clientId);
} catch (Throwable t) {
log.error("Exception raised while starting the rocketmq push consumer, clientId={}", clientId, t);
shutDown();
throw t;
}
}
//......
}
PushConsumerImpl实现了AbstractIdleService的startUp方法,该方法会每隔5秒定时调度执行scanAssignments
scanAssignments
@VisibleForTesting
void scanAssignments() {
try {
log.debug("Start to scan assignments periodically, clientId={}", clientId);
for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
final String topic = entry.getKey();
final FilterExpression filterExpression = entry.getValue();
final Assignments existed = cacheAssignments.get(topic);
final ListenableFuture<Assignments> future = queryAssignment(topic);
Futures.addCallback(future, new FutureCallback<Assignments>() {
@Override
public void onSuccess(Assignments latest) {
if (latest.getAssignmentList().isEmpty()) {
if (null == existed || existed.getAssignmentList().isEmpty()) {
log.info("Acquired empty assignments from remote, would scan later, topic={}, "
+ "clientId={}", topic, clientId);
return;
}
log.info("Attention!!! acquired empty assignments from remote, but existed assignments"
+ " is not empty, topic={}, clientId={}", topic, clientId);
}
if (!latest.equals(existed)) {
log.info("Assignments of topic={} has changed, {} => {}, clientId={}", topic, existed,
latest, clientId);
syncProcessQueue(topic, latest, filterExpression);
cacheAssignments.put(topic, latest);
return;
}
log.debug("Assignments of topic={} remains the same, assignments={}, clientId={}", topic,
existed, clientId);
// Process queue may be dropped, need to be synchronized anyway.
syncProcessQueue(topic, latest, filterExpression);
}
@Override
public void onFailure(Throwable t) {
log.error("Exception raised while scanning the assignments, topic={}, clientId={}", topic,
clientId, t);
}
}, MoreExecutors.directExecutor());
}
} catch (Throwable t) {
log.error("Exception raised while scanning the assignments for all topics, clientId={}", clientId, t);
}
}
scanAssignments通过queryAssignment(topic)查询Assignments,然后执行syncProcessQueue
syncProcessQueue
void syncProcessQueue(String topic, Assignments assignments, FilterExpression filterExpression) {
Set<MessageQueueImpl> latest = new HashSet<>();
final List<Assignment> assignmentList = assignments.getAssignmentList();
for (Assignment assignment : assignmentList) {
latest.add(assignment.getMessageQueue());
}
Set<MessageQueueImpl> activeMqs = new HashSet<>();
for (Map.Entry<MessageQueueImpl, ProcessQueue> entry : processQueueTable.entrySet()) {
final MessageQueueImpl mq = entry.getKey();
final ProcessQueue pq = entry.getValue();
if (!topic.equals(mq.getTopic())) {
continue;
}
if (!latest.contains(mq)) {
log.info("Drop message queue according to the latest assignmentList, mq={}, clientId={}", mq,
clientId);
dropProcessQueue(mq);
continue;
}
if (pq.expired()) {
log.warn("Drop message queue because it is expired, mq={}, clientId={}", mq, clientId);
dropProcessQueue(mq);
continue;
}
activeMqs.add(mq);
}
for (MessageQueueImpl mq : latest) {
if (activeMqs.contains(mq)) {
continue;
}
final Optional<ProcessQueue> optionalProcessQueue = createProcessQueue(mq, filterExpression);
if (optionalProcessQueue.isPresent()) {
log.info("Start to fetch message from remote, mq={}, clientId={}", mq, clientId);
optionalProcessQueue.get().fetchMessageImmediately();
}
}
}
syncProcessQueue方法获取维护最新的以及已有的MessageQueueImpl,然后进行遍历,对于新的执行createProcessQueue及其fetchMessageImmediately方法
createProcessQueue
protected Optional<ProcessQueue> createProcessQueue(MessageQueueImpl mq, final FilterExpression filterExpression) {
final ProcessQueueImpl processQueue = new ProcessQueueImpl(this, mq, filterExpression);
final ProcessQueue previous = processQueueTable.putIfAbsent(mq, processQueue);
if (null != previous) {
return Optional.empty();
}
return Optional.of(processQueue);
}
createProcessQueue会创建ProcessQueueImpl,并维护到processQueueTable中
ProcessQueueImpl
org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
public void fetchMessageImmediately() {
receiveMessageImmediately();
}
private void receiveMessageImmediately() {
receiveMessageImmediately(this.generateAttemptId());
}
private void receiveMessageImmediately(String attemptId) {
final ClientId clientId = consumer.getClientId();
if (!consumer.isRunning()) {
log.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq, clientId);
return;
}
try {
final Endpoints endpoints = mq.getBroker().getEndpoints();
final int batchSize = this.getReceptionBatchSize();
final Duration longPollingTimeout = consumer.getPushConsumerSettings().getLongPollingTimeout();
final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression,
longPollingTimeout, attemptId);
activityNanoTime = System.nanoTime();
// Intercept before message reception.
final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);
consumer.doBefore(context, Collections.emptyList());
final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,
longPollingTimeout);
Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {
@Override
public void onSuccess(ReceiveMessageResult result) {
// Intercept after message reception.
final List<GeneralMessage> generalMessages = result.getMessageViewImpls().stream()
.map((Function<MessageView, GeneralMessage>) GeneralMessageImpl::new)
.collect(Collectors.toList());
final MessageInterceptorContextImpl context0 =
new MessageInterceptorContextImpl(context, MessageHookPointsStatus.OK);
consumer.doAfter(context0, generalMessages);
try {
onReceiveMessageResult(result);
} catch (Throwable t) {
// Should never reach here.
log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, "
+ "clientId={}", mq, endpoints, clientId, t);
onReceiveMessageException(t, attemptId);
}
}
@Override
public void onFailure(Throwable t) {
String nextAttemptId = null;
if (t instanceof StatusRuntimeException) {
StatusRuntimeException exception = (StatusRuntimeException) t;
if (org.apache.rocketmq.shaded.io.grpc.Status.DEADLINE_EXCEEDED.getCode() == exception.getStatus().getCode()) {
nextAttemptId = request.getAttemptId();
}
}
// Intercept after message reception.
final MessageInterceptorContextImpl context0 =
new MessageInterceptorContextImpl(context, MessageHookPointsStatus.ERROR);
consumer.doAfter(context0, Collections.emptyList());
log.error("Exception raised during message reception, mq={}, endpoints={}, attemptId={}, " +
"nextAttemptId={}, clientId={}", mq, endpoints, request.getAttemptId(), nextAttemptId,
clientId, t);
onReceiveMessageException(t, nextAttemptId);
}
}, MoreExecutors.directExecutor());
receptionTimes.getAndIncrement();
consumer.getReceptionTimes().getAndIncrement();
} catch (Throwable t) {
log.error("Exception raised during message reception, mq={}, clientId={}", mq, clientId, t);
onReceiveMessageException(t, attemptId);
}
}
receiveMessageImmediately方法执行consumer.receiveMessage,成功时执行onReceiveMessageResult,异常时执行onReceiveMessageException
onReceiveMessageResult
private void onReceiveMessageResult(ReceiveMessageResult result) {
final List<MessageViewImpl> messages = result.getMessageViewImpls();
if (!messages.isEmpty()) {
cacheMessages(messages);
receivedMessagesQuantity.getAndAdd(messages.size());
consumer.getReceivedMessagesQuantity().getAndAdd(messages.size());
consumer.getConsumeService().consume(this, messages);
}
receiveMessage();
}
public void receiveMessage() {
receiveMessage(this.generateAttemptId());
}
public void receiveMessage(String attemptId) {
final ClientId clientId = consumer.getClientId();
if (dropped) {
log.info("Process queue has been dropped, no longer receive message, mq={}, clientId={}", mq, clientId);
return;
}
if (this.isCacheFull()) {
log.warn("Process queue cache is full, would receive message later, mq={}, clientId={}", mq, clientId);
receiveMessageLater(RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL, attemptId);
return;
}
receiveMessageImmediately(attemptId);
}
onReceiveMessageResult执行consumer.getConsumeService().consume(this, messages),最后再执行receiveMessage,它会判断isCacheFull,为true则返回,否则再次出发receiveMessageImmediately
onReceiveMessageException
public void onReceiveMessageException(Throwable t, String attemptId) {
Duration delay = t instanceof TooManyRequestsException ? RECEIVING_FLOW_CONTROL_BACKOFF_DELAY :
RECEIVING_FAILURE_BACKOFF_DELAY;
receiveMessageLater(delay, attemptId);
}
private void receiveMessageLater(Duration delay, String attemptId) {
final ClientId clientId = consumer.getClientId();
final ScheduledExecutorService scheduler = consumer.getScheduler();
try {
log.info("Try to receive message later, mq={}, delay={}, clientId={}", mq, delay, clientId);
scheduler.schedule(() -> receiveMessage(attemptId), delay.toNanos(), TimeUnit.NANOSECONDS);
} catch (Throwable t) {
if (scheduler.isShutdown()) {
return;
}
// Should never reach here.
log.error("[Bug] Failed to schedule message receiving request, mq={}, clientId={}", mq, clientId, t);
onReceiveMessageException(t, attemptId);
}
}
onReceiveMessageException会判断是不是TooManyRequestsException异常,是则delay取RECEIVING_FLOW_CONTROL_BACKOFF_DELAY(
Duration.ofMillis(20)
),否则取RECEIVING_FAILURE_BACKOFF_DELAY(Duration.ofSeconds(1)
),最后执行receiveMessageLater,它会延时调度执行receiveMessage(attemptId)
ConsumeService
org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
public abstract class ConsumeService {
private static final Logger log = LoggerFactory.getLogger(ConsumeService.class);
protected final ClientId clientId;
private final MessageListener messageListener;
private final ThreadPoolExecutor consumptionExecutor;
private final MessageInterceptor messageInterceptor;
private final ScheduledExecutorService scheduler;
public ConsumeService(ClientId clientId, MessageListener messageListener, ThreadPoolExecutor consumptionExecutor,
MessageInterceptor messageInterceptor, ScheduledExecutorService scheduler) {
this.clientId = clientId;
this.messageListener = messageListener;
this.consumptionExecutor = consumptionExecutor;
this.messageInterceptor = messageInterceptor;
this.scheduler = scheduler;
}
public abstract void consume(ProcessQueue pq, List<MessageViewImpl> messageViews);
public ListenableFuture<ConsumeResult> consume(MessageViewImpl messageView) {
return consume(messageView, Duration.ZERO);
}
public ListenableFuture<ConsumeResult> consume(MessageViewImpl messageView, Duration delay) {
final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(consumptionExecutor);
final ConsumeTask task = new ConsumeTask(clientId, messageListener, messageView, messageInterceptor);
// Consume message with no delay.
if (Duration.ZERO.compareTo(delay) >= 0) {
return executorService.submit(task);
}
final SettableFuture<ConsumeResult> future0 = SettableFuture.create();
scheduler.schedule(() -> {
final ListenableFuture<ConsumeResult> future = executorService.submit(task);
Futures.addCallback(future, new FutureCallback<ConsumeResult>() {
@Override
public void onSuccess(ConsumeResult consumeResult) {
future0.set(consumeResult);
}
@Override
public void onFailure(Throwable t) {
// Should never reach here.
log.error("[Bug] Exception raised while submitting scheduled consumption task, clientId={}",
clientId, t);
}
}, MoreExecutors.directExecutor());
}, delay.toNanos(), TimeUnit.NANOSECONDS);
return future0;
}
}
ConsumeService是个抽象类,它定义了consume方法接收ProcessQueue及批量messageViews,同时它还内置了consume单个messageView的方法,支持delay参数,该方法主要就是创建ConsumeTask,然后往executorService提交ConsumeTask
ConsumeTask
org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
public class ConsumeTask implements Callable<ConsumeResult> {
private static final Logger log = LoggerFactory.getLogger(ConsumeTask.class);
private final ClientId clientId;
private final MessageListener messageListener;
private final MessageViewImpl messageView;
private final MessageInterceptor messageInterceptor;
public ConsumeTask(ClientId clientId, MessageListener messageListener, MessageViewImpl messageView,
MessageInterceptor messageInterceptor) {
this.clientId = clientId;
this.messageListener = messageListener;
this.messageView = messageView;
this.messageInterceptor = messageInterceptor;
}
/**
* Invoke {@link MessageListener} to consumer message.
*
* @return message(s) which is consumed successfully.
*/
@Override
public ConsumeResult call() {
ConsumeResult consumeResult;
final List<GeneralMessage> generalMessages = Collections.singletonList(new GeneralMessageImpl(messageView));
MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.CONSUME);
messageInterceptor.doBefore(context, generalMessages);
try {
consumeResult = messageListener.consume(messageView);
} catch (Throwable t) {
log.error("Message listener raised an exception while consuming messages, clientId={}, mq={}, " +
"messageId={}", clientId, messageView.getMessageQueue(), messageView.getMessageId(), t);
// If exception was thrown during the period of message consumption, mark it as failure.
consumeResult = ConsumeResult.FAILURE;
}
MessageHookPointsStatus status = ConsumeResult.SUCCESS.equals(consumeResult) ? MessageHookPointsStatus.OK :
MessageHookPointsStatus.ERROR;
context = new MessageInterceptorContextImpl(context, status);
messageInterceptor.doAfter(context, generalMessages);
// Make sure that the return value is the subset of messageViews.
return consumeResult;
}
}
ConsumeTask实现了Callable接口,其call方法主要是执行messageListener.consume(messageView),同时处理messageInterceptor的doBefore及doAfter
StandardConsumeService
org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
public class StandardConsumeService extends ConsumeService {
private static final Logger log = LoggerFactory.getLogger(StandardConsumeService.class);
public StandardConsumeService(ClientId clientId, MessageListener messageListener,
ThreadPoolExecutor consumptionExecutor, MessageInterceptor messageInterceptor,
ScheduledExecutorService scheduler) {
super(clientId, messageListener, consumptionExecutor, messageInterceptor, scheduler);
}
@Override
public void consume(ProcessQueue pq, List<MessageViewImpl> messageViews) {
for (MessageViewImpl messageView : messageViews) {
// Discard corrupted message.
if (messageView.isCorrupted()) {
log.error("Message is corrupted for standard consumption, prepare to discard it, mq={}, "
+ "messageId={}, clientId={}", pq.getMessageQueue(), messageView.getMessageId(), clientId);
pq.discardMessage(messageView);
continue;
}
final ListenableFuture<ConsumeResult> future = consume(messageView);
Futures.addCallback(future, new FutureCallback<ConsumeResult>() {
@Override
public void onSuccess(ConsumeResult consumeResult) {
pq.eraseMessage(messageView, consumeResult);
}
@Override
public void onFailure(Throwable t) {
// Should never reach here.
log.error("[Bug] Exception raised in consumption callback, clientId={}", clientId, t);
}
}, MoreExecutors.directExecutor());
}
}
}
StandardConsumeService继承了ConsumeService,其consume方法遍历messageViews,挨个执行父类的consume方法(创建ConsumeTask,然后提交到executorService执行),之后针对该future注册FutureCallback,onSuccess的时候执行pq.eraseMessage(messageView, consumeResult)
FifoConsumeService
org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
class FifoConsumeService extends ConsumeService {
private static final Logger log = LoggerFactory.getLogger(FifoConsumeService.class);
public FifoConsumeService(ClientId clientId, MessageListener messageListener,
ThreadPoolExecutor consumptionExecutor, MessageInterceptor messageInterceptor,
ScheduledExecutorService scheduler) {
super(clientId, messageListener, consumptionExecutor, messageInterceptor, scheduler);
}
@Override
public void consume(ProcessQueue pq, List<MessageViewImpl> messageViews) {
consumeIteratively(pq, messageViews.iterator());
}
public void consumeIteratively(ProcessQueue pq, Iterator<MessageViewImpl> iterator) {
if (!iterator.hasNext()) {
return;
}
final MessageViewImpl messageView = iterator.next();
if (messageView.isCorrupted()) {
// Discard corrupted message.
log.error("Message is corrupted for FIFO consumption, prepare to discard it, mq={}, messageId={}, "
+ "clientId={}", pq.getMessageQueue(), messageView.getMessageId(), clientId);
pq.discardFifoMessage(messageView);
consumeIteratively(pq, iterator);
return;
}
final ListenableFuture<ConsumeResult> future0 = consume(messageView);
ListenableFuture<Void> future = Futures.transformAsync(future0, result -> pq.eraseFifoMessage(messageView,
result), MoreExecutors.directExecutor());
future.addListener(() -> consumeIteratively(pq, iterator), MoreExecutors.directExecutor());
}
}
FifoConsumeService继承了ConsumeService,其consume方法获取messageViews的iterator,然后执行consumeIteratively,该方法通过iterator.next()获取一条记录,然后调用父类的consume方法(创建ConsumeTask,然后提交到executorService执行),之后针对该future0注册了一个function执行pq.eraseMessage(messageView, consumeResult),同时它对这个新future增加了listener在其执行完毕之后继续执行consumeIteratively处理下一个消息
小结
- rocketmq5的PushConsumerImpl实现了AbstractIdleService的startUp方法,该方法会每隔5秒定时调度执行scanAssignments;scanAssignments通过queryAssignment(topic)查询Assignments,然后执行syncProcessQueue;syncProcessQueue方法获取维护最新的以及已有的MessageQueueImpl,然后进行遍历,对于新的执行createProcessQueue及其fetchMessageImmediately方法
- receiveMessageImmediately方法执行consumer.receiveMessage,成功时执行onReceiveMessageResult,异常时执行onReceiveMessageException;onReceiveMessageResult执行consumer.getConsumeService().consume(this, messages),最后再执行receiveMessage,它会判断isCacheFull,为true则返回,否则再次出发receiveMessageImmediately;onReceiveMessageException会判断是不是TooManyRequestsException异常,是则delay取RECEIVING_FLOW_CONTROL_BACKOFF_DELAY(
Duration.ofMillis(20)
),否则取RECEIVING_FAILURE_BACKOFF_DELAY(Duration.ofSeconds(1)
),最后执行receiveMessageLater,它会延时调度执行receiveMessage(attemptId) - ConsumeService是个抽象类,它定义了consume方法接收ProcessQueue及批量messageViews,同时它还内置了consume单个messageView的方法,支持delay参数,该方法主要就是创建ConsumeTask,然后往executorService提交ConsumeTask;ConsumeTask实现了Callable接口,其call方法主要是执行messageListener.consume(messageView),同时处理messageInterceptor的doBefore及doAfter;StandardConsumeService是遍历messageViews,挨个执行父类的consume方法,同时对该future注册FutureCallback,onSuccess的时候执行pq.eraseMessage(messageView, consumeResult);FifoConsumeService则是利用iterator以及listener通过递归调用consumeIteratively实现消息的顺序消费