rocketmq支持定时消息和延时消息。
定时消息:Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。
延时消息:Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
定时消息与延时消息在代码配置上存在一些差异,但是最终达到的效果相同:消息在发送到消息队列RocketMQ版服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者。
定时消息和延时消息的使用在代码编写上存在略微的区别:
发送定时消息需要明确指定消息发送时间点之后的某一时间点作为消息投递的时间点。
发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。
代码测试
在阿里云和apache的两个版本中,有一定的区别:
阿里云:
延时消息:当前时间+延时时间表示,转成时间戳,表示延时消息。
定时消息:指定时间转成时间戳,表示定时消息。
以上两种代码实现方式相同。
apache :
只有延时消息的概念,并且需要在配置文件中给出延时等级的定义,在broker.conf中指定以下配置:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
测试代码:
控制器
/**
* 定时消息发送
*/
@RequestMapping("/send/delay")
public void delay(){
rocketMqProducer.sendDelay("test_delay","延时消息",3000L, 5);
}
生产者
/**
* 发送延时消息
*/
public void sendDelay(String topic, String msgBody, long timeout, Integer delayLevel) {
SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), timeout, delayLevel);
if (ObjectUtils.isNotEmpty(sendResult)) {
//sendResult不空则表示消息发送成功
log.info("send success , send msg = {}, messageId = {}", msgBody, sendResult.getMsgId());
}
}
消费者
/**
* RocketMqProducer
* @date: 2020/11/26
* @author weirx
* @version 3.0
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "test_delay", selectorExpression = "*", consumerGroup = "test_delay")
public class SimpleDelayMessageListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("receive async message:{}", msg);
}
}
结果,对比上面的配置,延时等级5是一分钟,消息发送和消费正好一分钟。
2020-11-27 17:56:55.248 INFO 51704 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 延时消息, messageId = AC100208C9F818B4AAC289BF48250000
2020-11-27 17:57:55.280 INFO 51704 --- [MessageThread_1] c.c.b.m.r.c.SimpleDelayMessageListener : receive async message:延时消息