一、使用rocketmq-spring-boot-starter组件
1、添加依赖
<!-- rocketmq依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
2、添加配置
rocketmq:
nameServer: xx.xx.xx.xx:9876
producer:
group: test-group # 必须指定group
send-message-timeout: 3000 # 消息发送超时时长,默认3s
retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2
consumer:
group: test-group
topic: test-topic
3、配置生产者
常用方法汇总,根据实际需要进行封装
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class MQProducerService {
@Value("${rocketmq.producer.send-message-timeout}")
private Integer messageTimeOut;
// 直接注入使用,用于发送消息到broker服务器
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送同步消息,异步消费(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)
* (msgBody也可以是对象,sendResult为返回的发送结果)
*/
public SendResult sendMsg(String topic,String msgBody) {
SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build());
return sendResult;
}
public SendResult sendKeyMsg(String topic,String msgBody,String key){
SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).setHeader("KEYS",key).build());
return sendResult;
}
/**
* 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑)
* (适合对响应时间敏感的业务场景)
*/
public void sendAsyncMsg(String topic,String msgBody) {
rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 处理消息发送成功逻辑
}
@Override
public void onException(Throwable throwable) {
// 处理消息发送异常逻辑
}
});
}
public void sendAsyncKeyMsg(String topic,String msgBody,String key) {
rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).setHeader("KEYS",key).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 处理消息发送成功逻辑
}
@Override
public void onException(Throwable throwable) {
// 处理消息发送异常逻辑
}
});
}
/**
* 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时)
* 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
public void sendDelayMsg(String topic,String msgBody, int delayLevel) {
rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);
}
public void sendDelayKeyMsg(String topic,String msgBody, int delayLevel,String key) {
rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).setHeader("KEYS",key).build(), messageTimeOut, delayLevel);
}
/**
* 发送单向消息(只负责发送消息,不等待应答,不关心发送结果,如日志)
*/
public void sendOneWayMsg(String topic,String msgBody) {
rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).build());
}
public void sendOneWayKeyMsg(String topic,String msgBody,String key) {
rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).setHeader("KEYS",key).build());
}
/**
* 发送带tag的消息,直接在topic后面加上":tag"
*/
public SendResult sendTagMsg(String topic,String msgBody,String tag) {
return rocketMQTemplate.syncSend(topic + ":"+tag, MessageBuilder.withPayload(msgBody).build());
}
public SendResult sendTagKeyMsg(String topic,String msgBody,String tag,String key) {
return rocketMQTemplate.syncSend(topic + ":"+tag, MessageBuilder.withPayload(msgBody).setHeader("KEYS",key).build());
}
}
4、配置消费者
@RocketMQMessageListener(topic = "${rocketmq.topic}",nameServer = "${rocketmq.nameServer}",consumerGroup = "${rocketmq.consumer.group}")
@Component
public class ComsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String str) {
try {
Thread.sleep(1000);
System.out.println("睡眠结束");
}catch (Exception e){
System.out.println("异常");
}
System.out.println("接收信息:"+str);
}
}
5、测试推送/消费mq
@RestController("/mq")
public class RocketMqTestController {
@Autowired
private MQProducerService mqProducerService;
@GetMapping(value = "/sned")
@ResponseBody
public Boolean snedMqMsg(){
mqProducerService.sendMsg("test-topic","发送mq消息:1111");
System.out.println("发送成功");
return true;
}
}
测试结果:
二、使用rocketmq-client组件
1、添加依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.1</version>
</dependency>
2、添加配置
rocketmq:
config:
topic: test-topic
nameServerAddress: xx.xx.xxx.xxx:9876;xx.xx.xxx.xxx:9876
consumerGroupId: test-group
添加配置类
@Component
public class RocketMqProducerConfig {
private static String nameServerAddress;
private static String topic;
private static String groupId;
@Value("${rocketmq.nameServerAddress}")
private String appAddress;
@Value("${rocketmq.config.topic}")
private String appTopic;
@Value("${rocketmq.esSync.consumerGroupId}")
private String appGroupId;
@PostConstruct
public void getConfig() {
topic = this.appTopic;
nameServerAddress = this.appAddress;
groupId = this.appGroupId;
}
public static String getNameServerAddress() {
return nameServerAddress;
}
public static String getTopic() {
return topic;
}
public static String getGroupId() {
return groupId;
}
}
添加消息dto
public class MqMsgDto implements Serializable {
private String pushId;
public String getPushId() {
return pushId;
}
public void setPushId(String pushId) {
this.pushId = pushId;
}
}
3、配置生产者
@Service
@DependsOn("rocketMqProducerConfig")
public class CalendarPushMqServiceImpl implements ICalendarPushMqApi {
protected static final Logger logger = LoggerFactory.getLogger(CalendarPushMqServiceImpl.class);
private static DefaultMQProducer rocketProducer = null;
static {
try {
// 实例化消息生产者Producer
rocketProducer = new DefaultMQProducer(RocketMqProducerConfig.getGroupId());
// 设置NameServer的地址
rocketProducer.setNamesrvAddr(RocketMqProducerConfig.getNameServerAddress());
rocketProducer.start();
logger.error("rocketMq生产者初始化成功");
} catch (Exception e) {
logger.error("rocketMq生产者初始化失败:", e);
}
}
public void pushMsg(MqMsgDto mqMsgDto) {
try {
Message message = new Message();
message.setTopic(RocketMqProducerConfig.getTopic());
message.setBody(JsonUtil.json(mqMsgDto).getBytes());
message.setKeys(calendarMqMsgDto.getCalendarId());
message.setTags(Long.toString(new Date().getTime()));
//延时消费,第二个等级-5s
message.setDelayTimeLevel(2);
rocketProducer.send(message);
} catch (Exception e) {
logger.error("推送mq出错:", e);
}
}
}
4、配置消费者
@Configuration
public class RocketMqConfiguration {
protected static final Logger logger = LoggerFactory.getLogger(RocketMqConfiguration.class);
/**
* -推送开放搜索-普通消费队列
* */
@Bean
public DefaultMQPushConsumer testConsumer() {
try {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqProducerConfig.getGroupId());
// 设置NameServer的地址
consumer.setNamesrvAddr(RocketMqProducerConfig.getNameServerAddress());
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe(RocketMqProducerConfig.getTopic(), "*");
consumer.registerMessageListener(new RocketMqMessageListener());
consumer.start();
logger.info("初始化消费者成功");
return consumer;
}catch (Exception e){
logger.error("初始化消费队列失败:",e);
}
return null;
}
}
监听器配置
@Component
public class RocketMqMessageListener implements MessageListenerConcurrently {
private static final Logger logger = LoggerFactory.getLogger(RocketMqMessageListener.class);
private Test test= SpringBeanUtil.getBean(Test.class);
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if(CollectionUtils.isEmpty(list)){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt message : list) {
String mdmqMessage = null;
MqMsgDto dto = null;
try {
mdmqMessage = new String(message.getBody(), StandardCharsets.UTF_8);
dto = JSON.parseObject(mdmqMessage, MqMsgDto.class);
if (Objects.isNull(dto)){
continue;
}
logger.error("接收信息:"+JSON.toJSONString(dto));
test.update(dto);
}catch (Exception e){
logger.error("接收无效消息,自动过滤",e);
continue;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}