本文针对对于想了解RocketMQ本质的同学,如何快速的在本地搭建一套本地环境。
部署服务
部署NameServer
docker pull rocketmqinc/rocketmq
docker run --name rmqnamesrv -d -p 9876:9876 rocketmqinc/rocketmq:4.4.0 sh mqnamesrv
部署Broker
- 启动脚本
docker run --name rmqbroker -d -p 10911:10911 -p 10909:10909 -v ./conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
borker.conf文件内容
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1 = 30.203.94.18
注:重点修改brokerIP1的值,此值为宿主机的真实ip
部署RocketMQ可视化界面控制台
- 下载镜像
docker pull pangliang/rocketmq-console-ng
- 启动容器
docker run --name rmqconsole -d -p 8080:8080 --link rmqnamesrv:namesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876" pangliang/rocketmq-console-ng
测试程序
生成者端
- 生成标准spring boot工程,可自行参考任意spring boot的创建(此处省略)
- 配置依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
- 配置application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=boot-group1
rocketmq.producer.sendMessageTimeout=300000
- 定义消息体
package com.example.producer;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class Demo01Message {
public static final String TOPIC = "DEMO_01";
private Integer id;
}
- 定义发送体
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/send")
public String send() {
Demo01Message message = new Demo01Message();
message.setId(88899);
rocketMQTemplate.syncSend(Demo01Message.TOPIC, message);
return "send";
}
消费者端
- 生成标准spring boot工程,可自行参考任意spring boot的创建(此处省略)
- 配置依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
- 配置application.properties
rocketmq.name-server=127.0.0.1:9876
备注:
- 定义消息体
package com.example.producer;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class Demo01Message {
public static final String TOPIC = "DEMO_01";
private Integer id;
}
- 实现消息接受
@Service
@RocketMQMessageListener(
topic = Demo01Message.TOPIC,
consumerGroup = "my-consumer_test-topic-1"
)
public class MyConsumer implements RocketMQListener<Demo01Message> {
@Override
public void onMessage(Demo01Message demo01Message) {
System.out.println(demo01Message);
}
}