四、RocketMQ-Producer的Send方法

一、概述

RocketMQ的producer默认有两个,一个是DefaultMQProducer,另一个是TransactionMQProducer,本文只对send方法做一个总结,其他的细节在其他章节介绍

二、DefaultMQProducer

一共定义了17种send方法,从4.x版本,事务消息被放到了TransactionMQProducer中,所以有15个send方法,这15个方法中,又有两个异步带超时时间的send方法被废弃了,所以有效的send方法有13个:

1、同步发送

/**
     * 同步发送模式. 只有消息被成功接收并且被固化完成后才会收到反馈。
     * 内置有重发机制, producer将会重试
     * {@link #retryTimesWhenSendFailed,default=2} 次 ,然后才会报错. 
     * 因此,有一定的概率向broker发送重复的消息
     * 使用者有责任去解决潜在的重复数据造成的影响
     * @param msg 待发送数据
     * @return {@link SendResult} 实体,来通知发送者发送状态等信息, 比如消息的ID
     * {@link SendStatus} 指明 broker 存储/复制 的状态, 发送到了哪个队列等等
     * @throws MQClientException 客户端异常
     * @throws RemotingException 网络连接异常
     * @throws MQBrokerException broker异常
     * @throws InterruptedException 发送线程中断异常
     */
    @Override
    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg);
    }

2、同步发送,带超时时间

/**
     * 与 {@link #send(Message)} 相同,只不过多了超时时间的指定.
     *
     * @param msg 待发送消息
     * @param timeout 发送超时时间
     * @return {@link SendResult} 同上
     * {@link SendStatus} 同上
     * @throws MQClientException 客户端异常
     * @throws RemotingException 网络连接异常
     * @throws MQBrokerException broker异常
     * @throws InterruptedException 发送线程中断异常
     */
    @Override
    public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, timeout);
    }

3、异步发送

/**
     * 异步发送消息
     * 消息发送后,立即返回。broker处理返程后, 触发sendCallback回调方法
     * 与上面一样,在给出发送失败标志前,会尝试2次,所以开发者要处理重复发送带来的问题
     * @param msg 待发送消息
     * @param sendCallback 回调函数
     * @throws MQClientException 客户端异常
     * @throws RemotingException 网络异常
     * @throws InterruptedException 发送线程中断异常
     */
    @Override
    public void send(Message msg,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, sendCallback);
    }

4、异步发送,带超时时间

@Override
    public void send(Message msg, SendCallback sendCallback, long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
    }

5、单向发送,不等待broker回馈

/**
     * 发送方法不会等待broker的反馈,只会一直发
     * 所以有很高的吞吐量,但是有一定概率丢失消息
     *
     * @param msg 待发送消息
     * @throws MQClientException 客户端异常
     * @throws RemotingException 网络异常
     * @throws InterruptedException 发送线程中断异常
     */
    @Override
    public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.sendOneway(msg);
    }

6、同步发送,指定队列

/**
     * 同步发送,指定队列
     * @param msg 待发送消息
     * @param mq 指定的消息队列
     * @return {@link SendResult} 同上
     * {@link SendStatus} 同上
     */
    @Override
    public SendResult send(Message msg, MessageQueue mq)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, mq);
    }

7、同步发送,指定队列,并附带超时时间

@Override
    public SendResult send(Message msg, MessageQueue mq, long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, mq, timeout);
    }

8、异步发送,指定队列

@Override
    public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
        throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, mq, sendCallback);
    }

9、异步发送,指定队列,附带超时时间

这个在4.4.0版本被设置为废弃,后续版本会给出

/**
  * 因为在处超时异常存在问题,所以废弃
 */
@Override
    public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
    }

10、单向发送,指定队列

@Override
    public void sendOneway(Message msg,
        MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.sendOneway(msg, mq);
    }

11、同步发送,指定队列选择策略

官方的有序消息的DEMO就是基于队列选择器做的,让一些列有序的消息(相同ID)发送到同一个队列

/**
     * 指定队列选择策略MessageQueueSelector 
     *
     * @param msg 待发送消息
     * @param selector 队列选择器
     * @param arg 配合队列选择器选择队列的参数,一般可以是业务参数(ID等)
     * @return {@link SendResult} 同上
     * {@link SendStatus} 同上
     */
@Override
    public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, selector, arg);
    }

12、同步发送消息,指定队列选择策略,并附带超时时间

@Override
    public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, selector, arg, timeout);
    }

13、异步发送消息,指定队列选择策略

@Override
    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
        throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
    }

14、异步发送消息,指定队列选择策略,并附带超时时间

这个方法在4.4.0版本废弃,后续提供

@Override
    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
    }

15、单向发送,指定队列选择策略

 @Override
    public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
        throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
    }

三、TransactionMQProducer

1、发送事务消息

@Override
    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final Object arg) throws MQClientException {
        if (null == this.transactionListener) {
            throw new MQClientException("TransactionListener is null", null);
        }

        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
    }
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 16,066评论 2 11
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,165评论 1 32
  • 最全的iOS面试题及答案 iOS面试小贴士 ———————————————回答好下面的足够了-----------...
    zweic阅读 2,738评论 0 73
  • 每个人的想法不同 , RocketMQ 介绍的时候就说 是阿里从他们使用的上 解耦出来 近一步简化 便捷的 目...
    楼亭樵客阅读 427评论 0 0
  • 1资金分成若干分,不断地投项目,上线了涨不动就卖,破发也卖。时间耗不起。资金必须滚动,对项目要做到拔掉无情。 2熊...
    于海涛_290e阅读 189评论 0 0