消息中间件--阿里云RocketMQ(六)

一、Maven引入

<!--消息队列 RocketMQ-->
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.7.9.Final</version>
</dependency>

二、登录阿里云创建主题与组

1、创建主题

(1)创建主题

(2)创建主题

(3)创建主题

2、主题授权

(1)主题授权

(2)主题授权

2、创建组

(1)创建组

(2)创建组

(3)创建组

3、获取用户Key

(1)获取用户Key

(2)获取用户Key

二、代码编写

1、配置代码

@Configuration
public class RocketMQConfig {
    public Properties getProperties(){
        Properties properties=new Properties();
        /**
         * 键的首字母必须大写
         */
        //
        properties.setProperty("AccessKey","xxxxxxxxx");
        //
        properties.setProperty("SecretKey","xxxxxxxxxxxxxxxxxx");
        //
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // 顺序消息消费失败进行重试前的等待时间,单位(毫秒)
        properties.put(PropertyKeyConst.SuspendTimeMillis, "100");
        // 消息消费失败时的最大重试次数
        properties.put(PropertyKeyConst.MaxReconsumeTimes, "20");
        //
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80");
        return  properties;
    }
}

2、生产消息

import com.aliyun.openservices.ons.api.*;
import com.aliyun.openservices.ons.api.order.OrderProducer;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import com.hrz.common.utils.HashUtil;
import com.hrz.push.config.RocketMQConfig;
import com.hrz.push.service.MessageService;
import com.hrz.push.service.impl.LocalTransactionCheckerImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;

@Component
@Slf4j
public class ProducerClient {
    @Autowired
    private RocketMQConfig rocketMQConfig;

    @Autowired
    private MessageService messageService;

    /**
     * 1、发送普通消息
     *
     * @param message
     * @return
     */
    public boolean sendNormalMessage(Message message,String groupId) {
        Properties properties=rocketMQConfig.getProperties();
        properties.setProperty(PropertyKeyConst.GROUP_ID,groupId);
        Producer producer = ONSFactory.createProducer(properties);
        // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
        producer.start();
        try {
            SendResult sendResult = producer.send(message);
            // 同步发送消息,只要不抛异常就是成功
            if (sendResult != null) {
                log.info(new Date() + " 发送普通消息成功,消息主题为:" + message.getTopic() + " msgId is: " + sendResult.getMessageId());
                return true;
            }
        } catch (Exception e) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
            log.error(new Date() + " 发送普通消息失败,消息主题为:" + message.getTopic());
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 2、发送顺序消息
     *
     * @param message
     * @return
     */
    public boolean sendOrderMessage(Message message,String groupId,Integer shardingNo) {
        Properties properties=rocketMQConfig.getProperties();
        properties.setProperty(PropertyKeyConst.GROUP_ID,groupId);
        OrderProducer producer = ONSFactory.createOrderProducer(properties);
        // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
        producer.start();
        try {
            // 设置代表消息的业务关键属性,请尽可能全局唯一。
            // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
            // 注意:不设置也不会影响消息正常收发
            message.setKey(UUID.randomUUID().toString());
            // 分区顺序消息中区分不同分区的关键字段,sharding key 于普通消息的 key 是完全不同的概念。
            // 全局顺序消息,该字段可以设置为任意非空字符串。
            String shardingKey = "Hrz_" + shardingNo;
            SendResult sendResult = producer.send(message, shardingKey);
            // 同步发送消息,只要不抛异常就是成功
            if (sendResult != null) {
                log.info(new Date() + " 发送顺序消息成功,消息主题为:" + message.getTopic() + " 消息 is: " + sendResult.getMessageId());
                return true;
            }
        } catch (Exception e) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
            log.error(new Date() + " 发送顺序消息成功,消息主题为:" + message.getTopic());
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 2、发送事务消息
     *
     * @param message
     * @return
     */
    public boolean sendTransactionMessage(Message message) {
        TransactionProducer producer = ONSFactory.createTransactionProducer(rocketMQConfig.getProperties(),
                new LocalTransactionCheckerImpl());

        try {
            SendResult sendResult = producer.send(message, new LocalTransactionExecuter() {
                @Override
                public TransactionStatus execute(Message msg, Object arg) {
                    // 消息 ID(有可能消息体一样,但消息 ID 不一样,当前消息 ID 在控制台无法查询)
                    String msgId = msg.getMsgID();
                    // 消息体内容进行 crc32,也可以使用其它的如 MD5
                    long crc32Id = HashUtil.crc32Code(msg.getBody());
                    // 消息 ID 和 crc32id 主要是用来防止消息重复
                    // 如果业务本身是幂等的,可以忽略,否则需要利用 msgId 或 crc32Id 来做幂等
                    // 如果要求消息绝对不重复,推荐做法是对消息体 body 使用 crc32或 md5来防止重复消息
                    //业务自己的参数对象,这里只是一个示例,需要您根据实际情况来处理
                    Object businessServiceArgs = new Object();
                    TransactionStatus transactionStatus = TransactionStatus.Unknow;
                    try {
                        //要求方法返回为boolean
                        boolean isCommit = messageService.test("测试");
                        if (isCommit) {
                            // 本地事务成功则提交消息
                            transactionStatus = TransactionStatus.CommitTransaction;
                        } else {
                            // 本地事务失败则回滚消息
                            transactionStatus = TransactionStatus.RollbackTransaction;
                        }
                    } catch (Exception e) {
                        log.error("消息 Id:{}", msgId, e);
                    }
                    System.out.println(msg.getMsgID());
                    log.warn("消息 Id:{}事务状态:{}", msgId, transactionStatus.name());
                    return transactionStatus;
                }
            }, null);
        } catch (Exception e) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
            System.out.println(new Date() + " 发送消息失败. 主题为:" + message.getTopic());
            e.printStackTrace();
        }
        // demo example 防止进程退出(实际使用不需要这样)
        // TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);
        return true;
    }

    /**
     * 3、发送延时消息
     *
     * @param message
     * @param delay
     * @return
     */
    public boolean sendDelayMessage(Message message, Integer delay) {
        Producer producer = ONSFactory.createProducer(rocketMQConfig.getProperties());
        // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
        producer.start();

        // 注意:不设置也不会影响消息正常收发
        message.setKey("ORDERID_100");
        try {
            // 延时消息,单位毫秒(ms),在指定延迟时间(当前时间之后)进行投递,例如消息在 3 秒后投递
//            long delayTime = System.currentTimeMillis() + 3000;
            long delayTime = System.currentTimeMillis() + delay;
            // 设置消息需要被投递的时间
            message.setStartDeliverTime(delayTime);

            SendResult sendResult = producer.send(message);
            // 同步发送消息,只要不抛异常就是成功
            if (sendResult != null) {
                log.info(new Date() + " 发送普通消息成功,消息主题为:" + message.getTopic() + " msgId is: " + sendResult.getMessageId());
                return true;
            }
        } catch (Exception e) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
            log.error(new Date() + " 发送普通消息失败,消息主题为:" + message.getTopic());
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 4、收发定时消息
     *
     * @param message
     * @param datetime
     * @return
     */
    public boolean sendFixedDelayMessage(Message message, String datetime) {
        Producer producer = ONSFactory.createProducer(rocketMQConfig.getProperties());
        // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
        producer.start();

        // 注意:不设置也不会影响消息正常收发
        message.setKey("ORDERID_100");
        try {
            // 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递,例如 2016-03-07 16:21:00 投递。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
//            long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();
            long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(datetime).getTime();
            // 设置消息需要被投递的时间
            message.setStartDeliverTime(timeStamp);

            SendResult sendResult = producer.send(message);
            // 同步发送消息,只要不抛异常就是成功
            if (sendResult != null) {
                log.info(new Date() + " 发送普通消息成功,消息主题为:" + message.getTopic() + " msgId is: " + sendResult.getMessageId());
                return true;
            }
        } catch (Exception e) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
            log.error(new Date() + " 发送普通消息失败,消息主题为:" + message.getTopic());
            e.printStackTrace();
        }
        return false;
    }
}

3、消费消息

import com.aliyun.openservices.ons.api.*;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;
import com.hrz.common.utils.StringUtils;
import com.hrz.push.config.RocketMQConfig;
import com.hrz.push.entity.ConsumerMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Properties;

@Component
@Slf4j
public class ConsumerClient {
    @Autowired
    private RocketMQConfig rocketMQConfig;


    /**
     * 1、普通订阅
     *
     * @param consumerMessage
     */
    public void normalSubscribe(ConsumerMessage consumerMessage) {
        Properties properties = rocketMQConfig.getProperties();
        //以 "GID_" 或者 "GID-" 开头
        properties.put(PropertyKeyConst.GROUP_ID, consumerMessage.getGroupId());
        // 集群订阅方式 (默认)
        if (consumerMessage.isModel()) {
            // 广播订阅方式
            properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
        }
        // 在订阅消息前,必须调用 start 方法来启动 Consumer,只需调用一次即可。
        Consumer consumer = ONSFactory.createConsumer(properties);
        // 1. * 表示订阅所有消息
        // 2. TagA || TagB || TagC 表示订阅 TagA 或 TagB 或 TagC 的消息
        if(StringUtils.isNullOrEmpty(consumerMessage.getTag())){
            consumerMessage.setTag("*");
        }
        //消息监听
        consumer.subscribe(consumerMessage.getTopic(), consumerMessage.getTag(), new MessageListener() {
            //订阅多个 Tag
            public Action consume(Message message, ConsumeContext context) {
                log.info("订阅消息内容->: " + message.getTopic() + "->" + message.getKey() + "->" + new String(message.getBody()));
                log.info("订阅消息ID->: "+message.getMsgID());
                //提交订阅,消息依然存在
                return Action.CommitMessage;
            }
        });
        consumer.start();
        log.info("消息订阅成功执行。");
    }

    /**
     * 2、顺序订阅
     * @param consumerMessage
     */
    public void orderSubscribe(ConsumerMessage consumerMessage) {
        Properties properties = rocketMQConfig.getProperties();
        //以 "GID_" 或者 "GID-" 开头
        properties.put(PropertyKeyConst.GROUP_ID, consumerMessage.getGroupId());
        // 集群订阅方式 (默认)
        if (consumerMessage.isModel()) {
            // 广播订阅方式
            properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
        }
        // 在订阅消息前,必须调用 start 方法来启动 Consumer,只需调用一次即可。
        OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);
        // 1. * 表示订阅所有消息
        // 2. TagA || TagB || TagC 表示订阅 TagA 或 TagB 或 TagC 的消息
        if(StringUtils.isNullOrEmpty(consumerMessage.getTag())){
            consumerMessage.setTag("*");
        }
        //消息监听
        consumer.subscribe(consumerMessage.getTopic(), consumerMessage.getTag(), new MessageOrderListener() {
            @Override
            public OrderAction consume(Message message, ConsumeOrderContext consumeOrderContext) {
                log.info("订阅消息内容->: " + message.getTopic() + "->" + message.getKey() + "->" + new String(message.getBody()));
                log.info("订阅消息ID->: "+message.getMsgID());
                //提交订阅,消息依然存在
                return OrderAction.Success;
            }
        });
        consumer.start();
        log.info("消息订阅成功执行。");
    }
}

四、测试

1、测试代码

@RestController
@RequestMapping(value = "/push", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
@Slf4j
@Api(value = "1、消息队列服务", tags = {"1、消息队列服务"})
public class RocketMQController {
    @Autowired
    private ConsumerClient consumerClient;
    @Autowired
    private ProducerClient producerClient;

    /**
     * 1、订阅普通消息
     *
     * @return
     */
    @RequestMapping(value = "/consumer", method = RequestMethod.GET)
    public String consumer() {
        ConsumerMessage consumerMessage = new ConsumerMessage();
        consumerMessage.setTopic("TOPIC_HRZ_TEST");
        consumerMessage.setGroupId("GID_HRZ_TEST");
        consumerClient.normalSubscribe(consumerMessage);
        return "消费成功";
    }

    /**
     * 1、订阅普通消息
     *
     * @return
     */
    @RequestMapping(value = "/orderConsumer", method = RequestMethod.GET)
    public String orderConsumer() {
        ConsumerMessage consumerMessage = new ConsumerMessage();
        consumerMessage.setTopic("TOPIC_HRZ_ORDER_TEST");
        consumerMessage.setGroupId("GID_HRZ_TEST");
        consumerClient.orderSubscribe(consumerMessage);
        return "消费成功";
    }

    /**
     * 1、发送普通消息
     *
     * @return
     */
    @RequestMapping(value = "/sendNormalMessage", method = RequestMethod.GET)
    public String sendNormalMessage() {
        try {
            Message message = new Message();
            //设置主题
            message.setTopic("TOPIC_HRZ_TEST");
            message.setBody("测试".getBytes());
            // 设置代表消息的业务关键属性,全局唯一。
            message.setKey("OrderId_" + Snowflake.getId());
            //C0A80137276C18B4AAC20261E0600000
            producerClient.sendNormalMessage(message, "GID_HRZ_TEST");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "发送普通消息成功";
    }

    /**
     * 2、发送顺序消息
     *
     * @return
     */
    @RequestMapping(value = "/sendOrderMessage", method = RequestMethod.GET)
    public String sendOrderMessage() {
        try {
            Message message = new Message();
            //设置主题
            message.setTopic("TOPIC_HRZ_ORDER_TEST");
            message.setBody("顺序消息".getBytes());
            // 设置代表消息的业务关键属性,全局唯一。
            message.setKey("OrderId_" + Snowflake.getId());
            //C0A80137276C18B4AAC20261E0600000
            producerClient.sendOrderMessage(message, "GID_HRZ_TEST", 1);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "发送顺序消息成功";
    }
}

2、测试方法

(1)生产消息

(2)消费消息

3、平台查看

订阅成功后查看。


(1)阿里平台

(2)订阅结果

(3)数据统计

(4)消费堆积

注意事项

如果使用 SpringBoot请不要使用单元测试来测试订阅,要使用控制器来测试。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容