Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务

MQ的选择

消息队列对比参照表:


image.png

RocketMQ vs. ActiveMQ vs. Kafka:


image.png

参考至:


CentOS7上搭建RocketMQ

环境要求:

  • CentOS 7.2
  • 64位JDK1.8+
  • 4G+的可用磁盘空间

1、下载RocketMQ的二进制包,我这里使用的是4.5.1版本,下载地址如下:

http://rocketmq.apache.org/release_notes/release-notes-4.5.1/

使用wget命令下载:

[root@study-01 ~]# cd /usr/local/src
[root@study-01 /usr/local/src]# wget http://mirror.bit.edu.cn/apache/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip

2、解压下载好的压缩包,并移动到合适的目录下:

[root@study-01 /usr/local/src]# unzip rocketmq-all-4.5.1-bin-release.zip
[root@study-01 /usr/local/src]# mv rocketmq-all-4.5.1-bin-release /usr/local/rocketmq-4.5.1

注:若没有安装unzip命令则使用如下命令安装:
yum install -y unzip

3、进入rocketmq的根目录并查看是否包含如下目录及文件:

[root@study-01 /usr/local/src]# cd /usr/local/rocketmq-4.5.1
[root@study-01 /usr/local/rocketmq-4.5.1]# ls
benchmark  bin  conf  lib  LICENSE  NOTICE  README.md

4、没问题后,使用如下命令启动Name Server:

[root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqnamesrv &
[1] 2448
[root@study-01 /usr/local/rocketmq-4.5.1]# 

5、查看默认的9876端口是否被监听,以验证Name Server是否启动成功:

[root@study-01 /usr/local/rocketmq-4.5.1]# netstat -lntp |grep java
tcp6       0      0 :::9876                 :::*                    LISTEN      2454/java           
[root@study-01 /usr/local/rocketmq-4.5.1]#

6、启动Broker:

[root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqbroker -n localhost:9876 &
[2] 2485
[root@study-01 /usr/local/rocketmq-4.5.1]# 

7、验证Broker是否启动成功,如果启动成功,能看到类似如下的日志::

[root@study-01 /usr/local/rocketmq-4.5.1]# cat ~/logs/rocketmqlogs/broker.log |grep "boot success"
2019-08-04 01:27:38 INFO main - The broker[study-01, 192.168.190.129:10911] boot success. serializeType=JSON and name server is localhost:9876
[root@study-01 /usr/local/rocketmq-4.5.1]# 

若想停止Name Server和Broker,则依次执行以下两条命令即可:

[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown broker
The mqbroker(2492) is running...
Send shutdown request to mqbroker(2492) OK  # 输出该信息说明停止成功
[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown namesrv
The mqnamesrv(2454) is running...
Send shutdown request to mqnamesrv(2454) OK  # 输出该信息说明停止成功
[2]+  退出 143              nohup sh bin/mqbroker -n localhost:9876
[root@study-01 /usr/local/rocketmq-4.5.1]#

验证RocketMQ功能是否正常

1、验证生产消息正常,执行如下命令:

[root@study-01 /usr/local/rocketmq-4.5.1]# export NAMESRV_ADDR=localhost:9876
[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

正常的情况下,会看到一堆的类似于如下的输出,这是生产消息后成功的result:

SendResult [sendStatus=SEND_OK, msgId=C0A8BE810A690D7163610FCC253B03E7, offsetMsgId=C0A8BE8100002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=study-01, queueId=3], queueOffset=249]

2、验证消费消息正常,执行如下命令:

[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

正常的情况下,会看到一堆的类似于如下的输出,这是消费的消息内容:

ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=3, storeSize=180, queueOffset=242, sysFlag=0, bornTimestamp=1564853837073, bornHost=/192.168.190.129:34708, storeTimestamp=1564853837074, storeHost=/192.168.190.129:10911, msgId=C0A8BE8100002A9F000000000002AA4E, commitLogOffset=174670, bodyCRC=911284903, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1564854006859, UNIQ_KEY=C0A8BE810A690D7163610FCC251103CB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 55, 49], transactionId='null'}]]

搭建RocketMQ控制台

RocketMQ官方提供了一个基于Spring Boot开发的可视化控制台,可以方便我们查看RocketMQ的运行情况以及提升运维效率。所以本小节将介绍一下如何搭建搭建RocketMQ的控制台,由于我们使用的RocketMQ版本是4.5.1,所以需要对控制台的源码进行一些改动以适配RocketMQ的4.5.1版本。

1、首先需要下载源码,有两种方式,一是使用git克隆代码仓库,二是直接下载rocketmq-externals的zip包,我这里使用git方式,执行如下命令:

git clone https://github.com/apache/rocketmq-externals.git

2、修改控制台代码,使用IDE打开rocketmq-console项目,如下图所示:

image.png

2.1、修改项目中的application.properties配置文件,我这里主要是修改了监听端口和Name Server的连接地址,至于其他配置项有需要的话可按照说明自行修改:

# console的监听端口,默认是8080
server.port=8011
# Name Server的连接地址;非必须,可以在启动了console后,在控制台导航栏 - 运维 - NameSvrAddrList一栏设置
rocketmq.config.namesrvAddr=192.168.190.129:9876

2.2、修改依赖,由于console项目默认使用的rocketmq版本是4.4.0,与我们这里使用的是4.5.1不完全兼容,所以需要修改一下依赖版本,找到这一行:

<rocketmq.version>4.4.0</rocketmq.version>

修改为:

<rocketmq.version>4.5.1</rocketmq.version>

2.3、修改代码,由于修改了rocketmq的版本,会导致org.apache.rocketmq.console.service.impl.MessageServiceImpl#queryMessageByTopic方法编译报错,所以需要改动一下此处代码 ,将:

@Override
public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) {
    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);
    ...

修改为:

@Override
public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) {
    RPCHook rpcHook = null;
    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
    ...

3、打包构建并启动,打开idea的terminal,执行如下命令:

# 在rocketmq-console目录下执行
mvn clean package -DskipTests

# 进入jar包存放目录
cd target

# 启动rocketmq console
java -jar rocketmq-console-ng-1.0.1.jar

4、使用浏览器访问控制台,我这里由于修改了端口,所以访问地址是:http://localhost:8011,正常的情况下能看到如下界面:

image.png

不习惯英文的话可以在右上角切换语言:


image.png

由于控制台是可视化界面并且支持中文,这里就不过多介绍了,可以参考官方的控制台使用说明文档:


RocketMQ术语与概念

我这里将基本的术语与概念简单总结成了思维导图:


官方文档:


Spring消息编程模型 - 编写生产者

在以上小节搭建完RocketMQ之后,我们来使用Spring的消息编程模型,编写一个简单的示例。首先需要在项目中添加相关依赖如下:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>

在配置文件中添加rocketmq相关的配置如下:

rocketmq:
  name-server: 192.168.190.129:9876
  producer:
    # 小坑:必须指定group
    group: test-group

编写生产者的代码,这里以Controller做示例,具体代码如下:

package com.zj.node.contentcenter.controller.content;

import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * 生产者
 *
 * @author 01
 * @date 2019-08-03
 **/
@RestController
@RequiredArgsConstructor
public class TestProducerController {

    /**
     * 用于发送消息到 RocketMQ 的api
     */
    private final RocketMQTemplate rocketMQTemplate;

    @GetMapping("/test-rocketmq/sendMsg")
    public String testSendMsg() {
        String topic = "test-topic";
        // 发送消息
        rocketMQTemplate.convertAndSend(topic, Message.getInstance());

        return "send message success";
    }
}

@Data
class Message {
    private Integer id;
    private String name;
    private String status;
    private Date createTime;

    static Message getInstance() {
        Message message = new Message();
        message.id = 1;
        message.name = "小明";
        message.status = "default";
        message.createTime = new Date();

        return message;
    }
}

编写完成后,启动项目,访问该接口:


image.png

消息发送成功后,可以到RocketMQ的控制台中进行查看:


image.png

消息体可以在消息详情中查看,如下:


image.png

从生产者的代码来看,可以说是十分的简单了,只需要使用一个RocketMQTemplate就可以实现将对象转换成消息体并发送消息。实际上除了RocketMQ外,其他的MQ也有对应的Template,如下:

  • RocketMQ:RocketMQTemplate
  • ActiveMQ/Artemis:JmsTemplate
  • RabbitMQ:AmqpTemplate
  • Kafka:KafkaTemplate

Spring消息编程模型 - 编写消费者

在消费者项目中,也需要添加rocketmq的依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>

同样需要配置Name Server的连接地址:

rocketmq:
  name-server: 192.168.190.129:9876

编写消费者的代码,具体代码如下:

package com.zj.node.usercenter.rocketmq;

import com.alibaba.fastjson.JSON;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 消费者监听器
 *
 * @author 01
 * @date 2019-08-03
 **/
@Slf4j
@Component
// topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer-group")
public class TestConsumerListener implements RocketMQListener<Message> {

    /**
     * 监听到消息的时候就会调用该方法
     *
     * @param message 消息体
     */
    @Override
    public void onMessage(Message message) {
        log.info("从test-topic中监听到消息");
        log.info(JSON.toJSONString(message));
    }
}

/**
 * 消息体结构需要一致
 */
@Data
class Message {
    private Integer id;
    private String name;
    private String status;
    private Date createTime;
}

编写完成后启动项目,由于之前我们已经往队列里发送了消息,所以此时消费者项目一启动,就可以监听到消息并消费,控制台就会输出如下日志:


image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容