SpringBoot整合阿里RocketMQ

什么是RocketMQ

阿里消息队列 RocketMQ版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性,同时是收费的产品。

应用场景

削峰填谷

诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列 RocketMQ 版可提供削峰填谷的服务来解决该问题。

异步解耦

交易系统作为淘宝/天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列 RocketMQ 版可实现异步通信和应用解耦,确保主站业务的连续性。

顺序收发

细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出(First In First Out,缩写 FIFO)原理类似,消息队列 RocketMQ 版提供的顺序消息即保证消息 FIFO。

分布式事务一致性

交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列 RocketMQ 版的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。

大数据分析

数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用阿里云消息队列 RocketMQ 版与流式计算引擎相结合,可以很方便的实现将业务数据进行实时分析。

分布式缓存同步

天猫双 11 大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因为带宽瓶颈限制商品变更的访问流量,通过消息队列 RocketMQ 版构建分布式缓存,实时通知商品数据的变化。

1、配置pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!--lombok-->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

<!--阿里RocketMQ-->
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.0.Final</version>
</dependency>

2、配置application.properties

server.port=8888
#rocketmq配置
#鉴权用AccessKeyId在阿里云服务器管理控制台创建
rocketmq.accessKey=accessKey
#鉴权用AccessKeySecret在阿里云服务器管理控制台创建
rocketmq.secretKey=secretKey
#tcp长连接,设置TCP接入域名,进入控制台的实例管理页面,在页面上方选择实例后,在实例信息中的“获取接入点信息”区域查看
rocketmq.namesrvAddr=http://MQ_INST_15namesrvAddr7I.cn-hangzhou.mq-internal.aliyuncs.com:8080
#mq主题,,您在控制台创建的topic
rocketmq.topic=topic
#mq组名,您在控制台创建的 Group ID
rocketmq.groupId=groupId

以上参数均可在阿里控制台中找到

3、配置类

package com.ifilldream.rocketmq_lean.mq;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.Properties;

/**
 * RocketMQ配置
 * @author RickSun && iFillDream
 * @date 2020/01/10 15:58
 * @Copyright "轻梦致新"即"iFillDream"微信公众号所有
 */
@Configuration
public class RocketMQConfig {

    @Value("${rocketmq.accessKey}")
    public String accessKey;
    public static String ACCESS_KEY;

    @Value("${rocketmq.secretKey}")
    public String secretKey;
    public static String SECRET_KEY;

    @Value("${rocketmq.namesrvAddr}")
    public String namesrvAddr;
    public static String NAMESRV_ADDR;

    @Value("${rocketmq.groupId}")
    public String groupId;
    public static String GROUP_ID;

    @Value("${rocketmq.topic}")
    public String topic;
    public static String TOPIC;

    /**
     * 配置RocketMq参数
     * @return Properties
     */
    public Properties getProperties() {
        Properties properties = new Properties();
        //您在控制台创建的GroupID
        properties.put(PropertyKeyConst.GROUP_ID, groupId);
        // 鉴权用AccessKeyId在阿里云服务器管理控制台创建
        properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
        // 鉴权用AccessKeySecret在阿里云服务器管理控制台创建
        properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
        //延时时间
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // 顺序消息消费失败进行重试前的等待时间单位(毫秒)
        properties.put(PropertyKeyConst.SuspendTimeMillis, "100");
        // 消息消费失败时的最大重试次数
        properties.put(PropertyKeyConst.MaxReconsumeTimes, "20");
        // 设置TCP接入域名,进入控制台的实例管理页面,在页面上方选择实例后,在实例信息中的“获取接入点信息”区域查看
        properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
        return properties;
    }

    /**
     * 初始化静态常量
     */
    @PostConstruct
    public void init(){
        ACCESS_KEY = this.accessKey;
        SECRET_KEY = this.secretKey;
        NAMESRV_ADDR = this.namesrvAddr;
        GROUP_ID = this.groupId;
        TOPIC = this.topic;
    }
}

4、RocketMQ工具

package com.ifilldream.rocketmq_lean.mq;
import com.aliyun.openservices.ons.api.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Properties;

/**
 * RocketMQ工具
 * @author RickSun && iFillDream
 * @date 2020/01/10 16:07
 * @Copyright "轻梦致新"即"iFillDream"微信公众号所有
 */
@Component
@Slf4j
public class MQUtil {

    @Autowired
    private RocketMQConfig rocketMQConfig;

    /**
     * 发送普通消息
     * @param content 内容
     * @param tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列
     */
    public void sendMessage(String content,String tag){
        Message message = new Message();
        message.setBody(content.getBytes());
        message.setTopic(RocketMQConfig.TOPIC);
        message.setTag(tag);
        this.sendCustomerMessage(message);
    }

    /**
     * 发送定时任务
     * @param content   内容
     * @param tag   标签
     * @param delayTime 定时时间
     */
    public void sendDelayMessage(String content,String tag,long delayTime){
        Message message = new Message();
        message.setBody(content.getBytes());
        message.setTopic(RocketMQConfig.TOPIC);
        message.setTag(tag);
        /**
         * 单位毫秒(ms)
         * 在指定时间戳(当前时间之后)进行投递
         * 例如 2016-03-07 16:21:00 投递
         * 如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者
         */
        message.setStartDeliverTime(System.currentTimeMillis()+delayTime);
        this.sendCustomerMessage(message);
    }

    /**
     * 发送消息
     * @param message
     */
    private void sendCustomerMessage(Message message) {
        Properties properties=rocketMQConfig.getProperties();
        Producer producer = ONSFactory.createProducer(properties);
        //在发送消息前,必须调用start方法来启动Producer,只需调用一次即可
        producer.start();
        try {
            SendResult sendResult = producer.send(message);
            // 同步发送消息,只要不抛异常就是成功
            if (sendResult != null) {
                log.info("消息发送成功:messageID:"+sendResult.getMessageId());
            }
        } catch (Exception e) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
            e.printStackTrace();
        }
        //在应用退出前,销毁Producer对象
        producer.shutdown();
    }
}

5、标签

package com.ifilldream.rocketmq_lean.mq;

/**
 * RocketMQ Tag业务标签
 * @author RickSun && iFillDream
 * @date 2020/01/10 16:32
 * @Copyright "轻梦致新"即"iFillDream"微信公众号所有
 */
public class MqTag {
    /**
     * 根据业务老创建标签
     */
    //测试1
    public final static String ROCKETMQTEST1 = "ROCKETMQ_TEST1";
    //测试2
    public final static String ROCKETMQTEST2 = "ROCKETMQ_TEST2";
}

6、消费者

package com.ifilldream.rocketmq_lean.mq;
import com.aliyun.openservices.ons.api.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Properties;

/**
 * RocketMQ消费者
 * @author RickSun && iFillDream
 * @date 2020/01/10 16:29
 * @Copyright "轻梦致新"即"iFillDream"微信公众号所有
 */
@Component
@Slf4j
public class RocketMQConsumer {

    @Autowired
    private RocketMQConfig rocketMQConfig;

    /**
     * 订阅消息,处理业务
     */
    public void normalSubscribe() {
        Properties properties = rocketMQConfig.getProperties();
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe(RocketMQConfig.TOPIC, "", new MessageListener() {
            @Override
            public Action consume(Message message, ConsumeContext context) {
                try {
                    //接收到的消息内容
                    String msg = new String(message.getBody(), "UTF-8");
                    String tag = message.getTag();
                    switch (tag) {
                        case MqTag.ROCKETMQTEST1:
                            log.info("收到消息messageID:" + message.getMsgID() + " msg:" + msg);
                            //TODO do something
                            break;
                        case  MqTag.ROCKETMQTEST2:
                            log.info("收到消息messageID:" + message.getMsgID() + " msg:" + msg);
                            //TODO do something
                            break;
                    }
                    return Action.CommitMessage;
                } catch (Exception e) {
                    log.info("消费失败:messageID:" + message.getMsgID());
                    e.printStackTrace();
                    return Action.ReconsumeLater;
                }
            }
        });
        consumer.start();
    }
}

7、消费者启动监听

package com.ifilldream.rocketmq_lean.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

/**
 * RocketMQ启动监听
 * @author RickSun && iFillDream
 * @date 2020/01/10 16:07
 * @Copyright "轻梦致新"即"iFillDream"微信公众号所有
 */
@Component
public class RocketConsumerListener implements CommandLineRunner {

    @Autowired
    private RocketMQConsumer rocketMQConsumer;

    @Override
    public void run(String... args) {
        System.out.println("========rocketMQ消费者启动==========");
        rocketMQConsumer.normalSubscribe();
    }
}

8、接口

package com.ifilldream.rocketmq_lean.controller;
import com.ifilldream.rocketmq_lean.mq.MQUtil;
import com.ifilldream.rocketmq_lean.mq.MqTag;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;

/**
 * @ClassName RocketController
 * @Author RickSun && iFillDream
 * @Date 2020/1/10 15:18
 * @Version 1.0
 */
@RestController
@RequestMapping("/ifilldream/rocketmq")
public class RocketController {

    @Resource
    private MQUtil mqUtil;

    @GetMapping("/test")
    public String test(String content) {
        return content;
    }

    @GetMapping("/test1")
    public String test1(String content) {
        mqUtil.sendMessage(content, MqTag.ROCKETMQTEST1);
        mqUtil.sendDelayMessage("测试", MqTag.ROCKETMQTEST1, 1000L);
        return "success";
    }

    @GetMapping("/test2")
    public String test2(String content) {
        mqUtil.sendMessage(content, MqTag.ROCKETMQTEST2);
        mqUtil.sendDelayMessage("测试", MqTag.ROCKETMQTEST2,3000L);
        return "success";
    }

}

此时代码完毕,在Linux服务器上运行项目Jar包,浏览器中输入:xx.xx.xx.xx:8888/ifilldream/rocketmq/test1?content=nihao即可看到效果;

file

xx.xx.xx.xx为服务器的IP或域名,运行效果如下:
file

以上代码亲测可用,更多详情请关注阿里官方文档https://help.aliyun.com/product/29530.html?spm=a2c4g.11186623.6.540.1a4b7e805ygc75

统一首发平台为微信公众号"轻梦致新",搜索关注公众号,第一时间阅读最新内容。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容