聊聊PushConsumer与SimpleConsumer拉取消息的区别

本文主要研究一下rocketmq5的PushConsumer与SimpleConsumer拉取消息的区别

ProcessQueueImpl

org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java

    private void receiveMessageImmediately(String attemptId) {
        final ClientId clientId = consumer.getClientId();
        if (!consumer.isRunning()) {
            log.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq, clientId);
            return;
        }
        try {
            final Endpoints endpoints = mq.getBroker().getEndpoints();
            final int batchSize = this.getReceptionBatchSize();
            final Duration longPollingTimeout = consumer.getPushConsumerSettings().getLongPollingTimeout();
            final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression,
                longPollingTimeout, attemptId);
            activityNanoTime = System.nanoTime();

            // Intercept before message reception.
            final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);
            consumer.doBefore(context, Collections.emptyList());

            final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,
                longPollingTimeout);
            Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {
                @Override
                public void onSuccess(ReceiveMessageResult result) {
                    // Intercept after message reception.
                    final List<GeneralMessage> generalMessages = result.getMessageViewImpls().stream()
                        .map((Function<MessageView, GeneralMessage>) GeneralMessageImpl::new)
                        .collect(Collectors.toList());
                    final MessageInterceptorContextImpl context0 =
                        new MessageInterceptorContextImpl(context, MessageHookPointsStatus.OK);
                    consumer.doAfter(context0, generalMessages);

                    try {
                        onReceiveMessageResult(result);
                    } catch (Throwable t) {
                        // Should never reach here.
                        log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, "
                            + "clientId={}", mq, endpoints, clientId, t);
                        onReceiveMessageException(t, attemptId);
                    }
                }

                @Override
                public void onFailure(Throwable t) {
                    String nextAttemptId = null;
                    if (t instanceof StatusRuntimeException) {
                        StatusRuntimeException exception = (StatusRuntimeException) t;
                        if (org.apache.rocketmq.shaded.io.grpc.Status.DEADLINE_EXCEEDED.getCode() == exception.getStatus().getCode()) {
                            nextAttemptId = request.getAttemptId();
                        }
                    }
                    // Intercept after message reception.
                    final MessageInterceptorContextImpl context0 =
                        new MessageInterceptorContextImpl(context, MessageHookPointsStatus.ERROR);
                    consumer.doAfter(context0, Collections.emptyList());

                    log.error("Exception raised during message reception, mq={}, endpoints={}, attemptId={}, " +
                            "nextAttemptId={}, clientId={}", mq, endpoints, request.getAttemptId(), nextAttemptId,
                        clientId, t);
                    onReceiveMessageException(t, nextAttemptId);
                }
            }, MoreExecutors.directExecutor());
            receptionTimes.getAndIncrement();
            consumer.getReceptionTimes().getAndIncrement();
        } catch (Throwable t) {
            log.error("Exception raised during message reception, mq={}, clientId={}", mq, clientId, t);
            onReceiveMessageException(t, attemptId);
        }
    }

PushConsumer通过ProcessQueueImpl的receiveMessageImmediately拉取消息,其内部是通过consumer.receiveMessage(request, mq, longPollingTimeout)来拉取的,request是通过consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression, longPollingTimeout, attemptId)构建的

SimpleConsumerImpl

org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java

    public ListenableFuture<List<MessageView>> receive0(int maxMessageNum, Duration invisibleDuration) {
        if (!this.isRunning()) {
            log.error("Unable to receive message because simple consumer is not running, state={}, clientId={}",
                this.state(), clientId);
            final IllegalStateException e = new IllegalStateException("Simple consumer is not running now");
            return Futures.immediateFailedFuture(e);
        }
        if (maxMessageNum <= 0) {
            final IllegalArgumentException e = new IllegalArgumentException("maxMessageNum must be greater than 0");
            return Futures.immediateFailedFuture(e);
        }
        final HashMap<String, FilterExpression> copy = new HashMap<>(subscriptionExpressions);
        final ArrayList<String> topics = new ArrayList<>(copy.keySet());
        // All topic is subscribed.
        if (topics.isEmpty()) {
            final IllegalArgumentException e = new IllegalArgumentException("There is no topic to receive message");
            return Futures.immediateFailedFuture(e);
        }
        final String topic = topics.get(IntMath.mod(topicIndex.getAndIncrement(), topics.size()));
        final FilterExpression filterExpression = copy.get(topic);
        final ListenableFuture<SubscriptionLoadBalancer> routeFuture = getSubscriptionLoadBalancer(topic);
        final ListenableFuture<ReceiveMessageResult> future0 = Futures.transformAsync(routeFuture, result -> {
            final MessageQueueImpl mq = result.takeMessageQueue();
            final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,
                invisibleDuration, awaitDuration);
            return receiveMessage(request, mq, awaitDuration);
        }, MoreExecutors.directExecutor());
        return Futures.transformAsync(future0, result -> Futures.immediateFuture(result.getMessageViews()),
            clientCallbackExecutor);
    }

SimpleConsumerImpl的receive0也是通过ConsumerImpl的receiveMessage(request, mq, awaitDuration)方法来拉取消息的,其request是通过wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression, invisibleDuration, awaitDuration)来构建的

ConsumerImpl

org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java

receiveMessage

    protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRequest request,
        MessageQueueImpl mq, Duration awaitDuration) {
        List<MessageViewImpl> messages = new ArrayList<>();
        try {
            final Endpoints endpoints = mq.getBroker().getEndpoints();
            final Duration tolerance = clientConfiguration.getRequestTimeout();
            final Duration timeout = awaitDuration.plus(tolerance);
            final ClientManager clientManager = this.getClientManager();
            final RpcFuture<ReceiveMessageRequest, List<ReceiveMessageResponse>> future =
                clientManager.receiveMessage(endpoints, request, timeout);
            return Futures.transformAsync(future, responses -> {
                Status status = Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR)
                    .setMessage("status was not set by server")
                    .build();
                Long transportDeliveryTimestamp = null;
                List<Message> messageList = new ArrayList<>();
                for (ReceiveMessageResponse response : responses) {
                    switch (response.getContentCase()) {
                        case STATUS:
                            status = response.getStatus();
                            break;
                        case MESSAGE:
                            messageList.add(response.getMessage());
                            break;
                        case DELIVERY_TIMESTAMP:
                            final Timestamp deliveryTimestamp = response.getDeliveryTimestamp();
                            transportDeliveryTimestamp = Timestamps.toMillis(deliveryTimestamp);
                            break;
                        default:
                            log.warn("[Bug] Not recognized content for receive message response, mq={}, " +
                                "clientId={}, response={}", mq, clientId, response);
                    }
                }
                for (Message message : messageList) {
                    final MessageViewImpl view = MessageViewImpl.fromProtobuf(message, mq, transportDeliveryTimestamp);
                    messages.add(view);
                }
                StatusChecker.check(status, future);
                final ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(endpoints, messages);
                return Futures.immediateFuture(receiveMessageResult);
            }, MoreExecutors.directExecutor());
        } catch (Throwable t) {
            // Should never reach here.
            log.error("[Bug] Exception raised during message receiving, mq={}, clientId={}", mq, clientId, t);
            return Futures.immediateFailedFuture(t);
        }
    }

receiveMessage方法通过clientManager.receiveMessage(endpoints, request, timeout)来拉取消息,之后转换为ReceiveMessageResult

wrapReceiveMessageRequest(ProcessQueueImpl)

    ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, MessageQueueImpl mq,
        FilterExpression filterExpression, Duration longPollingTimeout, String attemptId) {
        attemptId = null == attemptId ? UUID.randomUUID().toString() : attemptId;
        return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
            .setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression))
            .setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
            .setBatchSize(batchSize).setAutoRenew(true).setAttemptId(attemptId).build();
    }

ProcessQueueImpl调用的wrapReceiveMessageRequest只传递了batchSize、mq、filterExpression, longPollingTimeout, attemptId这几个参数

wrapReceiveMessageRequest(SimpleConsumerImpl)

    ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, MessageQueueImpl mq,
        FilterExpression filterExpression, Duration invisibleDuration, Duration longPollingTimeout) {
        final org.apache.rocketmq.shaded.com.google.protobuf.Duration duration = Durations.fromNanos(invisibleDuration.toNanos());
        return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
            .setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression))
            .setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
            .setBatchSize(batchSize).setAutoRenew(false).setInvisibleDuration(duration).build();
    }

SimpleConsumerImpl调用的wrapReceiveMessageRequest只传递了maxMessageNum(batchSize), mq, filterExpression, invisibleDuration, awaitDuration(longPollingTimeout)这几个参数
区别在于一个是setAutoRenew为true且设置了attemptId,一个是setAutoRenew为false且设置了invisibleDuration

小结

rocketmq5的PushConsumer与SimpleConsumer拉取消息都是通过ConsumerImpl的receiveMessage方法来拉取的,区别在于构建的ReceiveMessageRequest参数不一样,一个是setAutoRenew为true且设置了attemptId,一个是setAutoRenew为false且设置了invisibleDuration。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容