微服务 Spring Cloud Alibaba 项目搭建(七、RocketMQ 集成)

RocketMQ介绍

RocketMQ 是一个 队列模型 的消息中间件,具有高性能、高可靠、高实时、分布式 的特点。它是一个采用 Java 语言开发的分布式的消息系统,由阿里巴巴团队开发,在2016年底贡献给 Apache,成为了 Apache 的一个顶级项目。 在阿里内部,RocketMQ 很好地服务了集团大大小小上千个应用,在每年的双十一当天,更有不可思议的万亿级消息通过 RocketMQ 流转。

RocketMQ 特点

  • 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式等特点
  • Producer、Consumer、队列都可以分布式
  • Producer 向一些队列轮流发送消息,队列集合称为 Topic,Consumer 如果做广播消费,则一个 Consumer 实例消费这个 Topic 对应的所有队列,如果做集群消费,则多个 Consumer 实例平均消费这个 Topic 对应的队列集合
  • 能够保证严格的消息顺序
  • 支持拉(pull)和推(push)两种消息模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力
  • 支持多种消息协议,如 JMS、OpenMessaging 等
  • 较少的依赖

kafka 、RocketMQ 、RabbitMQ 对比

RocketMQ安装

RocketMQ下载: rocketmq-all-4.8.0-bin-release.zip
1.RocketMQ zip包传入linux服务器

[root@localhost ]# cd usr/local/
[root@localhost local]# rz

2.解压缩

[root@localhost local]# unzip rocketmq-all-4.8.0-bin-release.zip

3.调整启动参数(修改默认启动参数,默认启动的最大内存为4G,比较大,修改小一点,否则如果服务器内存不够会启动失败)

[root@localhost local]# cd rocketmq-all-4.8.0-bin-release/bin
[root@localhost bin]# vim runserver.sh
  • -Xms4g -Xmx4g -Xmn2g 改为 -Xms256m -Xmx256m -Xmn128m

4.调整broker

[root@localhost bin]# vim runbroker.sh
  • -Xms8g -Xmx8g -Xmn4g 改为 -Xms256m -Xmx256m -Xmn128m

5.启动namesrv

[root@localhost bin]# nohup sh mqnamesrv &

6.启动broker,注意ip为公网ip,端口为navmesrv的默认端口9876

[root@localhost bin]# nohup ./mqbroker -n localhost:9876 &

7.检查是否启动成功

[root@localhost bin]# jps -l
  • 如果发现报错bash: jps: 未找到命令... 请更新以下命令
[root@localhost bin]# sudo yum install java-1.8.0-openjdk-devel.x86_64
  • 输入命令 jps -l
  • 关闭 RocketMQ 命令 (此处无需关闭,只用于了解)

./mqshutdown broker
./mqshutdown namesrv

RocketMQ 控制台安装

1.克隆rocketmq项目

[root@localhost local]# cd /usr/local/
[root@localhost local]# git clone  https://github.com/apache/rocketmq-externals.git
  • 进入\rocketmq-externals\rocketmq-console\src\main\resources\ 下修改 application.properties 配置文件
  • 配置文件修改如下图

github提供了 Docker 和 非Docker 两种安装方法供其选择,这里使用非Docker方式进行安装

  • 在 \rocketmq-externals\rocketmq-console\ 文件夹下打开控制台,输入以下命令进行maven打包

mvn clean package -Dmaven.test.skip=true

  • 进入 \rocketmq-externals\rocketmq-console\target\ 文件夹下打开控制台,输入以下命令进行 jar包启动

java -jar rocketmq-console-ng-2.0.0.jar

  • 打开浏览器访问 localhost:9877,如果报错
  • 开放 10909 01911 9876 端口

firewall-cmd --zone=public --add-port=10909/tcp --permanent
firewall-cmd --zone=public --add-port=10911/tcp --permanent
firewall-cmd --zone=public --add-port=9876/tcp --permanent
systemctl restart firewalld.service
firewall-cmd --reload

  • 验证RocketMQ功能是够正常

1.验证生产消息正常,输入命令

[root@localhost rocketmq-all-4.8.0-bin-release]# export NAMESRV_ADDR=localhost:9876
[root@localhost rocketmq-all-4.8.0-bin-release]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

正常的情况下,会看到一堆的类似于如下的输出,这是生产消息后成功的result:

SendResult [sendStatus=SEND_OK, msgId=7F000001372329453F44466341350068, offsetMsgId=C0A8017600002A9F000000000000674E, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=3], queueOffset=33]

2.验证消费消息正常,执行如下命令:

[root@localhost rocketmq-all-4.8.0-bin-release]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

正常的情况下,会看到一堆的类似于如下的输出,这是消费的消息内容:

ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=0, storeSize=201, queueOffset=0, sysFlag=0, bornTimestamp=1618387294736, bornHost=/192.168.1.118:34722, storeTimestamp=1618387294743, storeHost=/192.168.1.118:10911, msgId=C0A8017600002A9F0000000000000192, commitLogOffset=402, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=34, CONSUME_START_TIME=1618387666005, UNIQ_KEY=7F00000136FE29453F44466306100001, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}]]

RocketMQ 集成 - 生产者

  • gateway下pom.xml文件添加依赖


<!-- rocketmq -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>
  • nacos 配置 RocketMQ
rocketmq:
  name-server: 192.168.190.129:9876
  producer:
    # 小坑:必须指定group
    group: test-group
  • common 下创建实体类 MyMessage.class


package com.bi.cloud.pojo;

import lombok.Data;

import java.io.Serializable;
import java.util.Date;

@Data
public class MyMessage implements Serializable {
    private Integer id;
    private String name;
    private String status;
    private Date createTime;
}

  • gateway下创建 TestProducerController.class


package com.bi.cloud.controller;

import com.bi.cloud.pojo.MyMessage;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Date;

/**
 * 生产者
 **/
@RestController
@RequestMapping("/api/testRocketMQ")
public class TestProducerController {

    /**
     * 用于发送消息到 RocketMQ 的api
     */
    @Resource
    public RocketMQTemplate rocketMQTemplate;

    @GetMapping("/sendMsg")
    public String testSendMsg() {
        String topic = "test-topic";
        MyMessage message = new MyMessage();
        message.setId(1);
        message.setName("王霄");
        message.setStatus("default");
        message.setCreateTime(new Date());
        // 发送消息
        rocketMQTemplate.convertAndSend(topic, message);

        return "send message success";
    }
}
  • Postman 调用接口
  • 如果报错 请关闭linux防火墙
systemctl stop firewalld
  • 消息发送成功后,可以到RocketMQ的控制台中进行查看:

RocketMQ 集成 - 消费者

  • engine下pom.xml文件添加依赖
<!-- rocketmq -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>
  • nacos 配置 RocketMQ
rocketmq:
  name-server: 192.168.190.129:9876
  producer:
    # 小坑:必须指定group
    group: test-group
  • engine 下创建消费者监听器 TestConsumerListener.class
package com.bi.cloud.service.Impl;

import com.alibaba.fastjson.JSON;
import com.bi.cloud.pojo.MyMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 消费者监听器
 **/
@Slf4j
@Component
// topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer-group")
public class TestConsumerListener implements RocketMQListener<MyMessage> {

    /**
     * 监听到消息的时候就会调用该方法
     */
    @Override
    public void onMessage(MyMessage message) {
        log.info("从test-topic中监听到消息");
        log.info(JSON.toJSONString(message));
    }
}
  • 编写完成后启动项目,由于之前我们已经往队列里发送了消息,所以此时消费者项目一启动,就可以监听到消息并消费,控制台就会输出如下日志:

第八章 Oauth2.0 安全认证子模块集成 https://www.jianshu.com/p/4fd45fb565eb

参考文献:
https://github.com/apache/rocketmq-externals.git
https://blog.csdn.net/qq_40280582/article/details/111785355
https://zhuhuix.blog.csdn.net/article/details/108866638
https://blog.51cto.com/zero01/2426303

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容