Apache RocketMQ 原生Api使用

创建maven工程,引入RocketMQ依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.2</version>
</dependency>

创建Producer发送同步消息,异步消息,单向消息

public class Producer {

    /**
     * RocketMQ 将消息发送到指定队列
     */
    public static void asyncMsgCustomQueue() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
        producer.setNamesrvAddr("192.168.218.131:9876;192.168.218.132:9876");

        producer.start();
        producer.setSendMsgTimeout(10000);
        //1、创建消息
        Message message = new Message("test_quick_topic",  //主题
                "TagA",          //标签
                "keyA",          //用户自定义的key,唯一的标识
                ("hello RocketMq").getBytes());//消息体
        //2、将消息发送到指定队列
        producer.send(message, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> list, Message message, Object arg) {
                Integer queueNumber = (Integer) arg;
                int size = list.size();
                int index = queueNumber % size;
                return list.get(index);
            }
        },1);


        producer.shutdown();
    }


    /**
     * RocketMQ 发送单向消息
     */
    public static void syncOneWay() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
        //同步发送消息,如果5秒内没有发送成功,则重试3次
        producer.setRetryTimesWhenSendFailed(3);
        producer.setNamesrvAddr("192.168.218.131:9876;192.168.218.132:9876");

        producer.start();
        //设置消息发送超时时间,默认为3000ms
        producer.setSendMsgTimeout(5000);
        //1、创建消息
        Message message = new Message("test_quick_topic",  //主题
                "TagA",          //标签
                "keyA",          //用户自定义的key,唯一的标识
                ("hello RocketMq").getBytes());//消息体

        //2、发送单向消息
        producer.sendOneway(message);
        System.out.println("单向消息已发出");

        producer.shutdown();
    }


    /**
     * RocketMQ 异步消息发送
     */
    public static void asyncMsg() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
        producer.setNamesrvAddr("192.168.218.131:9876;192.168.218.132:9876");

        producer.setRetryTimesWhenSendAsyncFailed(0);
        producer.start();
        //设置消息发送超时时间,默认为3000ms
        producer.setSendMsgTimeout(5000);
        //1、创建消息
        Message message = new Message("test_quick_topic",  //主题
                "TagA",          //标签
                "keyA",          //用户自定义的key,唯一的标识
                ("hello RocketMq").getBytes());//消息体
        //2、发送异步消息
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("msgId:" + sendResult.getMsgId()+",status:"+sendResult.getSendStatus());
                //通过status可以知道此异步消息是否发送成功
                //如果要保障100%可靠性投递的消息的话,在获取到status的时候对消息数据做变更操作
            }

            @Override
            public void onException(Throwable throwable) {
                //消息发送失败的话,进行补偿或者重发
                System.out.println("消息发送失败");
            }
        });


        producer.shutdown();
    }

    /**
     * RocketMQ 同步消息发送
     */
    public static void syncMsg() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
        //同步发送消息,如果5秒内没有发送成功,则重试3次
        producer.setRetryTimesWhenSendFailed(3);
        producer.setNamesrvAddr("192.168.218.131:9876;192.168.218.132:9876");

        producer.start();
        //设置消息发送超时时间,默认为3000ms
        producer.setSendMsgTimeout(5000);
        //1、创建消息
        Message message = new Message("test_quick_topic",  //主题
                "TagA",          //标签
                "keyA",          //用户自定义的key,唯一的标识
                ("hello RocketMq").getBytes());//消息体
        //2、发送同步消息
        SendResult sendResult = producer.send(message,5000L);
        System.out.println("消息发出:" + sendResult);

        producer.shutdown();
    }
}

创建Consumer消费消息

Consumer集群消费模式

/**
 * @Description: Clustering集群模式
 */

public class Consumer1 {

    public Consumer1()  {
        try {
            String group_name = "test_model_producer_name";
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
            consumer.setNamesrvAddr("192.168.218.131:9876;192.168.218.132:9876");
            //consumer.subscribe("test_model_topic","*");
            consumer.subscribe("test_model_topic","TagA || TagB");
            consumer.setMessageModel(MessageModel.CLUSTERING);//集群模式
            consumer.registerMessageListener(new ConsumerListener());
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Consumer1 consumer1 = new Consumer1();
        System.out.println("consumer1 start...");
    }
}

public class ConsumerListener implements MessageListenerConcurrently {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        MessageExt messageExt = null;
        try {
            for (MessageExt message : list) {
                messageExt = message;
                String topic = messageExt.getTopic();
                String tags = messageExt.getTags();
                String keys = messageExt.getKeys();
                String body = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                System.out.println("topic:" + topic + ",tags:" + tags + ",keys:" + keys + ",body:" + body);
            }
        } catch (Exception e) {
            //获取当前消息被重发了多少次
            int reconsumeTimes = messageExt.getReconsumeTimes();
            System.out.println("reconsumeTimes:" + reconsumeTimes);
            if (reconsumeTimes == 3) {
                //如果一条消息被重发了3次,且还是没有消费成功,那么就记录日志
                //记录日志目的就是为了做补偿处理
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            //消息消费失败会自动重试
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

Consumer广播消费模式

/**
 * @Description: Broadcasting广播模式
 *
 * Broadcasting广播模式下
 * 同一group_name下的Consumer通过consumer.subscribe("test_model_topic","TagA");监听同一topic,用tags来区分消费消息是不行的
 * 这种效果必须要在Listener里面手动实现
 *
 * 不同group_name下的Consumer可以通过监听同一topic,用tags来区分消费消息
 *
 */

public class Consumer1 {

    public Consumer1()  {
        try {
            String group_name = "test_model_producer_name";
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
            consumer.setNamesrvAddr("192.168.218.131:9876;192.168.218.132:9876");
            consumer.subscribe("test_model_topic","*");
            consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式
            consumer.registerMessageListener(new Consumer1Listener());
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Consumer1 consumer1 = new Consumer1();
        System.out.println("consumer1 start...");
    }
}

public class Consumer1Listener implements MessageListenerConcurrently {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        MessageExt messageExt = null;
        try {
            for (MessageExt message : list) {
                messageExt = message;
                String topic = messageExt.getTopic();
                String tags = messageExt.getTags();
                //广播模式下,当Tags的值为"TagA"时,consumer1才进行消费
                if("TagA".equals(tags)){
                    String keys = messageExt.getKeys();
                    String body = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                    System.out.println("topic:" + topic + ",tags:" + tags + ",keys:" + keys + ",body:" + body);
                }
            }
        } catch (Exception e) {
            //获取当前消息被重发了多少次
            int reconsumeTimes = messageExt.getReconsumeTimes();
            System.out.println("reconsumeTimes:" + reconsumeTimes);
            if (reconsumeTimes == 3) {
                //如果一条消息被重发了3次,且还是没有消费成功,那么就记录日志
                //记录日志目的就是为了做补偿处理
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            //消息消费失败会自动重试
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

顺序消息

Rocketmq的顺序消息需要满足2点:

  • 1、Producer端保证发送消息有序,且发送到同一个队列。
  • 2、consumer端保证消费同一个队列。

生产端:

RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是局部(queue)顺序。同一个queue里面,RocketMQ的确是能保证FIFO的。

确保消息放到同一个queue中,需要使用 MessageQueueSelector。

Producer向指定messageQueue发送消息

/**
 * @author: huangyibo
 * @Date: 2019/11/30 1:57
 * @Description: 顺序消息 保障同一个topic下的同一组消息发往同一个MessageQueue即可
 */
public class OrderProducer {

    public static void main(String[] args) {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("test_order_producer_group_name");
            producer.setNamesrvAddr(Const.NAMESER_ADDR);
            producer.setSendMsgTimeout(10000);
            producer.start();

            Date date = new Date();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String dateStr = sdf.format(date);
            //这5条消息是一个大的业务操作
            for (int i = 0; i < 5; i++) {
                //时间戳
                String body = dateStr + "Hello RocketMQ " + i;
                //参数: topic tag message
                Message message = new Message("test_order_topic","TagA","keyA"+i,body.getBytes(RemotingHelper.DEFAULT_CHARSET));
                //发送数据:如果使用顺序消费,则需自己实现MessageQueueSelector 保证消息进入同一个队列

                SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                    /**
                     *
                     * @param list 队列集合
                     * @param message 消息对象
                     * @param arg 业务标识的参数
                     * @return
                     */
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object arg) {
                        Integer id = (Integer) arg;
                        System.out.println("id : " + id);
                        int queueIndex = id % list.size();
                        return list.get(queueIndex);
                    }
                }, 1);//1是队列的下标
                System.out.println(sendResult + ",body:"+body);
            }
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }catch (MQClientException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

    }
}

消费端:

需要使用 MessageListenerOrderly 来消费数据。

MessageListenerOrderly与MessageListenerConcurrently区别:

  • MessageListenerOrderly:有序消费,同一队列的消息同一时刻只能一个线程消费,可保证消息在同一队列严格有序消费。
  • MessageListenerConcurrently:并发消费。

Consumer消费者,编写OrderListener实现MessageListenerOrderly接口

public class OrderConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_order_producer_group_name");
        consumer.setNamesrvAddr(Const.NAMESER_ADDR);

        /**
         * 设置consumer第一次启动是从队列头部开始消费还是从队列尾部开始消费
         * 如果非第一次启动,那么按照上一次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //订阅的主题,以及过滤的标签内容
        consumer.subscribe("test_order_topic","TagA");
        //注册监听
        consumer.registerMessageListener(new OrderListener());

        consumer.start();
        System.out.println("consumer started......");
    }
}

public class OrderListener implements MessageListenerOrderly {

    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
        MessageExt messageExt = null;
        try {
            for (MessageExt message : list) {
                messageExt = message;
                String topic = messageExt.getTopic();
                String tags = messageExt.getTags();
                //广播模式下,当Tags的值为"TagB"时,consumer2才进行消费
                String keys = messageExt.getKeys();
                String body = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                System.out.println("topic:" + topic + ",tags:" + tags + ",keys:" + keys + ",body:" + body);
            }
        } catch (Exception e) {
            //获取当前消息被重发了多少次
            int reconsumeTimes = messageExt.getReconsumeTimes();
            System.out.println("reconsumeTimes:" + reconsumeTimes);
            if (reconsumeTimes == 3) {
                //如果一条消息被重发了3次,且还是没有消费成功,那么就记录日志
                //记录日志目的就是为了做补偿处理
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
            //消息消费失败会自动重试
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
}

事物消息

创建事物消息Producer和事务监听器

/**
 * @Description: 事务监听器
 */

public class TransactionListenerImpl implements TransactionListener {

    /**
     * 执行本地事务
     * @param message
     * @param arg
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
        System.out.println("------执行本地事物----");
        String callArg = (String) arg;
        System.out.println("callArg:" + callArg);
        System.out.println("message:" + message);
        //begin tx
        //数据库的落库操作

        //tx.commit
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    /**
     * 未决事务,Rocket服务器端回查客户端
     * @param messageExt
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        System.out.println("------回调消息检查,查询数据库是否存在该条消息----"+messageExt);

        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

public class TransactionProducer {

    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("tx_producer_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("tx_producer_group_name" + "-check-thread");
                return thread;
            }
        });
        producer.setNamesrvAddr(Const.NAMESER_ADDR);
        producer.setExecutorService(executorService);
        //这个对象主要做两件事情,1、异步执行本地事物,2、回查
        TransactionListener transactionListener = new TransactionListenerImpl();
        producer.setTransactionListener(transactionListener);
        producer.setSendMsgTimeout(10000);
        producer.start();

        Message message = new Message("tx_topic","TagA","Key",
                ("hello rocketmq tx").getBytes(RemotingHelper.DEFAULT_CHARSET));
        producer.sendMessageInTransaction(message,"我是回调的参数");

        Thread.sleep(Integer.MAX_VALUE);
        producer.shutdown();
    }
}

创建消费者

public class TransactionConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tx_producer_group_name");
        consumer.setConsumeThreadMin(10);
        consumer.setConsumeThreadMax(20);
        consumer.setNamesrvAddr(Const.NAMESER_ADDR);
        /**
         * 设置consumer第一次启动是从队列头部开始消费还是从队列尾部开始消费
         * 如果非第一次启动,那么按照上一次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //订阅的主题,以及过滤的标签内容
        consumer.subscribe("tx_topic","*");
        //注册监听
        consumer.registerMessageListener(new Listener());
        consumer.start();
        System.out.println("tx consumer started...");
    }

}

class Listener implements MessageListenerConcurrently{

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        MessageExt messageExt = null;
        try {
            for (MessageExt message : list) {
                messageExt = message;
                String topic = messageExt.getTopic();
                String tags = messageExt.getTags();
                //广播模式下,当Tags的值为"TagB"时,consumer2才进行消费
                String keys = messageExt.getKeys();
                String body = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                System.out.println("topic:" + topic + ",tags:" + tags + ",keys:" + keys + ",body:" + body);
            }
        } catch (Exception e) {
            //获取当前消息被重发了多少次
            int reconsumeTimes = messageExt.getReconsumeTimes();
            System.out.println("reconsumeTimes:" + reconsumeTimes);
            if (reconsumeTimes == 3) {
                //如果一条消息被重发了3次,且还是没有消费成功,那么就记录日志
                //记录日志目的就是为了做补偿处理
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            //消息消费失败会自动重试
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

参考:
https://www.cnblogs.com/fqybzhangji/p/11044119.html

https://www.cnblogs.com/hzmark/p/orderly_message.html

https://www.cnblogs.com/hzmark/p/rocket_txn.html

https://baijiahao.baidu.com/s?id=1638994745278332160&wfr=spider&for=pc

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容

  • 什么是rocketmq RocketMQ 是阿里巴巴开源的消息队列中间件。具有下列特点: 能够保证严格的消息顺序 ...
    millions_chan阅读 11,021评论 2 10
  • instanceName Topic 一个Topic是一个主题。一个系统中,我们可以将消息划成Topic,这样,将...
    jackcooper阅读 15,030评论 1 9
  • RocketMQ使用场景 RocketMQ使用场景场景介绍重要功能实例用户注册传统处理异步解耦分布式事务的数据一致...
    夜寒灯暖阅读 10,587评论 0 12
  • 本文所介绍环境为win7环境下运行, 从官方github中(https://github.com/alibaba/...
    alterem阅读 537评论 2 0
  • 简介 RocketMQ 特点 RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Ap...
    预流阅读 39,180评论 7 55