RocketMQ 生产者 Producer 发送消息三种方式分析

概述

Producer 发送消息,RocketMQ 提供了三种模式。

  • 同步发送
  • 异步发送
  • OneWay 发送

示例代码如下:

// 1、同步发送
SendResult sendResult = producer.send(msg);

//2、异步发送
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
    }
    @Override
    public void onException(Throwable e) {
    }
});

//3、 Oneway发送
producer.sendOneway(msg);
  • 1、同步发送
    Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应 发送结果。
  • 2、异步发送
    Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。
  • 3、Oneway 发送
    Oneway 方式只负责发送请求,不等待应答,Producer 只负责把请求发出去,而不处理响应结果。

一、同步发送

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl

private SendResult sendDefaultImpl(Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback, final long timeout
  ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //省略代码
    ......

    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        //省略代码
        ......
        // 1、计算重发次数
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];

        // 2、循环执行发送消息
        for (; times < timesTotal; times++) {
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            // 3、选择要发送的 MessageQueue
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }
                    // 4、调用 sendKernelImpl 方法进行发送消息
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            // 5、如果发送失败,则continue,继续循环发送,发送成功则直接 return 返回
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }
                            
                            return sendResult;
                        default:
                            break;
                    }
                } 
                //省略代码
                ......
            } else {
                break;
            }
        }

        if (sendResult != null) {
            return sendResult;
        }
        //省略代码
        ......
}

1、计算消息最多发送的次数。

同步发送:org.apache.rocketmq.client.producer.DefaultMQProducer#retryTimesWhenSendFailed + 1,默认retryTimesWhenSendFailed是2,所以除了正常调用一次外,发送消息如果失败了会重试2次。总共会发送3次,如果3次都失败则返回发送失败的消息。
异步发送:不会重试(调用总次数等于1)

2、循环执行发送消息

如果发送的消息未成功发送,则循环继续发送,直到发送的次数达到 timesTotal 。

3、选择要发送的 MessageQueue

选择消息要发送的 MessageQueue。

4、调用 sendKernelImpl 方法进行发送消息
5、如果发送失败,则continue,继续循环发送,发送成功则直接 return 返回

同步发送原理

RocketMQ 通讯是使用 Netty 进行发送的,Netty 通讯默认都是异步的,那么同步是怎么实现的呢?

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
    final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    final int opaque = request.getOpaque();

    try {
        // 1、定义一个 ResponseFuture 来处理响应结果
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        this.responseTable.put(opaque, responseFuture);
        final SocketAddress addr = channel.remoteAddress();
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    // 2、通过 Netty 执行完成后回调处理请求的结果
                    responseFuture.setSendRequestOK(true);
                    return;
                } else {
                    responseFuture.setSendRequestOK(false);
                }

                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                // 唤醒阻塞的线程
                responseFuture.putResponse(null);
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });
        // 3、请求结果默认等待请求3秒钟,如果超过3秒则抛出异常。
        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        if (null == responseCommand) {
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                    responseFuture.getCause());
            } else {
                throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
            }
        }

        return responseCommand;
    } finally {
        this.responseTable.remove(opaque);
    }
}
1、定义一个 ResponseFuture 来处理响应结果
public class ResponseFuture {
    //省略其它代码
    ......
    private final CountDownLatch countDownLatch = new CountDownLatch(1);

    public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
        this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
        return this.responseCommand;
    }

    public void putResponse(final RemotingCommand responseCommand) {
        this.responseCommand = responseCommand;
        this.countDownLatch.countDown();
    }
}

ResponseFuture 内部使用了 CountDownLatch 来实现的。当调用waitResponse() 方法时阻塞当前线程,当返回结果时调用 putResponse() 方法存放结果,然后执行this.countDownLatch.countDown() 唤醒阻塞的线程。

2、通过 Netty 执行完成后回调处理请求的结果

使用 Netty 进行发送消息,当 Netty 收到结果后会执行自定义的 ChannelFutureListener.operationComplete()方法。
如果执行完成,调用responseFuture.putResponse(null);立即唤醒阻塞的线程,处理请求结果。

3、默认最长等待请求3秒钟,如果超过3秒则抛出异常。

调用 responseFuture.waitResponse(timeoutMillis)方法阻塞等待 Netty 返回结果。默认最长等待时间为3秒,如果超过3秒则认为调用超时,抛出 RemotingSendRequestException 异常信息。

二、异步发送

public void send(final Message msg, final SendCallback sendCallback, final long timeout)
    throws MQClientException, RemotingException, InterruptedException {
    final long beginStartTime = System.currentTimeMillis();
    //获取执行异步请求的线程池
    ExecutorService executor = this.getCallbackExecutor();
    try {
        executor.submit(new Runnable() {
            @Override
            public void run() {
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeout > costTime) {
                    try {
                        // 执行发送消息
                        sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
                    } catch (Exception e) {
                        sendCallback.onException(e);
                    }
                } else {
                    sendCallback.onException(
                        new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
                }
            }

        });
    } catch (RejectedExecutionException e) {
        throw new MQClientException("executor rejected ", e);
    }

}

异步发送的消息是把发送的消息提交给线程池来进行调度执行的。因为不需要同步返回结果。

异步发送原理

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
    final InvokeCallback invokeCallback)
    throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {

    long beginStartTime = System.currentTimeMillis();
    final int opaque = request.getOpaque();
    //1、尝试获得 semaphore 信号量,semaphore 默认为65535。
    boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
    if (acquired) {
        final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
        long costTime = System.currentTimeMillis() - beginStartTime;
        if (timeoutMillis < costTime) {
            throw new RemotingTooMuchRequestException("invokeAsyncImpl call timeout");
        }
        // 2、定义 ResponseFuture 来处理响应的结果
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
        // 存储到把 ResponseFuture  存储到 responseTable 中。
        this.responseTable.put(opaque, responseFuture);
        try {
            // 3、调用 Netty 发送数据
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    // 3.1、成功则设置 responseFuture 的状态为发送成功
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    }
                    // 3.2、发送失败则快速处理失败请求
                    requestFail(opaque);
                    log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                }
            });
        } catch (Exception e) {
            responseFuture.release();
            log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
            throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
        }
    } else {
        if (timeoutMillis <= 0) {
            throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
        } else {
            String info =
                String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                    timeoutMillis,
                    this.semaphoreAsync.getQueueLength(),
                    this.semaphoreAsync.availablePermits()
                );
            log.warn(info);
            throw new RemotingTimeoutException(info);
        }
    }
}
1、尝试获得 semaphore 信号量,semaphore 默认为65535。
   public static final int CLIENT_ASYNC_SEMAPHORE_VALUE 
=Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"));

异步发送请求的并发量,默认最大为65535。

2、定义 ResponseFuture 来处理响应的结果

ResponseFuture responseFuture 定义了发送数据响应的结果,同上面介绍的同步发送。
把responseFuture 存储到 responseTable 中。

protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
        new ConcurrentHashMap<Integer, ResponseFuture>(256);

opaque 为请求的唯一标识,每次请求创建一个新的(AtomicInteger 自增的)。

3、调用 Netty 发送数据

调用 Netty 异步发送数据。

  • 3.1、 Netty 发送成功则设置 responseFuture 的状态为发送成功
  • 3.2、发送失败则快速处理失败请求
    快速失败是相对下面的定时任务扫描处理响应结果的

处理异步的响应结果

//1、 定时间隔3秒钟扫描一次 responseTable
this.timer.scheduleAtFixedRate(new TimerTask() {
    @Override
        public void run() {
            try {
                NettyRemotingServer.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);


public void scanResponseTable() {
    final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
    Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<Integer, ResponseFuture> next = it.next();
        ResponseFuture rep = next.getValue();
        // 2、扫描开始执行时间大于 执行超时+1s 的 ResponseFuture 数据,并存放到 rfList 中
        if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
            rep.release();
            it.remove();
            rfList.add(rep);
            log.warn("remove timeout request, " + rep);
        }
    }
    
    // 3、执行处理完成或超时的请求
    for (ResponseFuture rf : rfList) {
        try {
            executeInvokeCallback(rf);
        } catch (Throwable e) {
            log.warn("scanResponseTable, operationComplete Exception", e);
        }
    }
}
1、 定时间隔3秒钟扫描一次 responseTable

当 Netty 客户端和服务端启动的时候,都会自动这个定时任务。定时的扫描 responseTable 的请求数据。每隔3秒扫描一次。

2、扫描开始执行时间大于 执行超时+1s 的 ResponseFuture 数据

Netty 请求的时候默认会有等待的执行超时时间(或可自己设置),超过超时时间的则认为任务超时,需要通过定时任务处理超时的任务。
异步执行完成的请求也会在定时任务中回调执行处理结果。

三、Oneway 发送

public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
    throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {

    request.markOnewayRPC();
    // 尝试获得 semaphore 信号量,semaphore 默认为65535。
    boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);

    if (acquired) {
        final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
        try {
            // 2、调用 Netty 发送请求
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    once.release();
                    if (!f.isSuccess()) {
                        log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                    }
                }
            });
        } catch (Exception e) {
            once.release();
            log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
            throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
        }
    } else {
        if (timeoutMillis <= 0) {
            throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
        } else {
            String info = String.format(
                "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                timeoutMillis,
                this.semaphoreOneway.getQueueLength(),
                this.semaphoreOneway.availablePermits()
            );
            log.warn(info);
            throw new RemotingTimeoutException(info);
        }
    }
}
1、尝试获得 semaphore 信号量,semaphore 默认为65535。
    public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE 
= Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535"));

Oneway 方式发送请求的并发量,默认最大为65535。

2、调用 Netty 发送请求

这里只是发送数据,而没有处理响应结果。
这种方式发送数据,吞吐量更高,但不管数据是否发送成功。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,639评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,093评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,079评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,329评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,343评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,047评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,645评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,565评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,095评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,201评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,338评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,014评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,701评论 3 332
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,194评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,320评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,685评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,345评论 2 358