一、搭建项目引入依赖
访问https://mvnrepository.com/获取rocketmq的starter依赖。
在pom中添加下依赖:
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
刷新maven依赖后,在配置文件增加以下配置:
rocketmq:
name-server: http://101.200.36.168:9876
producer:
#指定消息发送者的组,在控制台查询时会用到
group: test
二、测试代码
2.1 producer消息发送者
package com.cloud.bssp.message;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* RocketMqProducer
* @date: 2020/11/26
* @author weirx
* @version 3.0
*/
@Component
public class RocketMqProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送普通消息
*/
public void sendMsg(String msgBody) {
rocketMQTemplate.syncSend("queue_test_topic", MessageBuilder.withPayload(msgBody).build());
}
}
2.2 consumer 消息接收者
package com.cloud.bssp.message;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* RocketMqListener
* @date: 2020/11/26
* @author weirx
* @version 3.0
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "queue_test_topic", selectorExpression = "*", consumerGroup = "queue_group_test")
public class RocketMqListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("接收到消息:{}", msg);
}
}
2.3 测试代码
这里直接创建一个Controller:
package com.cloud.bssp.message;
import com.cloud.bssp.util.R;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* RocketMqTest
* @date: 2020/11/26
* @author weirx
* @version 3.0
*/
@Slf4j
@Component
@RestController
@RequestMapping("/message")
public class RocketMqTest {
@Autowired
private RocketMqProducer rocketMqProducer;
@RequestMapping("/send")
public void sendMsg(){
rocketMqProducer.sendMsg("我来测试一下rocketmq");
}
}
2.4 发送消息实验
启动项目并调用上一步中的接口:http://localhost:8085/message/send
收到以下结果,至此我们的代码与rocketmq成功建立了连接: