概述
消息在发送到消息队列RocketMQ版服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者。比如:常见的场景电商交易中超时未支付关闭订单的场景,通过延时消息在30分钟后投递给消息端进行关单。
开源 RocketMQ 针对目前只支持固定精度的定时消息。生产端发送消息,通过设delayTimeLevel时间级别后,可实现消息不立马被消费者消费到,而是按照18个级别 ("1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";)。比如支持5秒、10秒的Level,那么用户只能发送5秒延迟或者10秒延迟,不能发送8秒延迟的消息。
实现原理
Message转化
- Producer端设置定时级别,发送延时消息后。
- Broker端收到延时消息时,会提前将延迟消息的 topic 进行转化为进入 Topic 为 SCHEDULE_TOPIC_XXXX的消息,同时 延迟级别 与 消息队列编号 做固定映射:QueueId = DelayLevel - 1。
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
任务检测
- 生成 ConsumeQueue 时,每条消息的 tagsCode 使用【消息计划消费时间】。ScheduleMessageService 在轮询 ConsumeQueue 时,可以使用 tagsCode 进行过滤。
- 对 SCHEDULE_TOPIC_XXXX 每条消费队列(一共18个) broker端都会启动了一个 timer和 timerTask的任务,默认1s执行一次 拉取数据,拉取过程 首先进行系统topic+delayLevel 查询ConsumeQueue ,然后对比每条消息的延时时间和当前时间对比,发送 到达投递时间【计划消费时间】 的消息。将消息进行转化并按照业务topic封装好Message写入到 commit_log 中。
DeliverDelayedMessageTimerTask#run
public void executeOnTimeup() {
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
}
}
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown <= 0) { // 消息到达可发送时间
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) { // 将Message消息转化并将其写入
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
}
} catch (Exception e) {
}
}
} else { // 安排下一次任务
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
}
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
}
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
- 当延时消息写成功后,先更新延时消息消费进度(内存中),同时定时消息发送进度存储在文件(../config/delayOffset.json)里,每 10s 定时持久化发送进度。
- 若发送延时消息失败后,MQ会安排下一次任务继续发送直到成功。
优缺点
优点:设计简单,把所有相同延迟时间的消息都先放到一个队列中,定时扫描,可以保证消息消费的有序性
缺点:定时器采用了timer,timer是单线程运行,如果延迟消息数量很大的情况下,可能单线程处理不过来,造成消息到期后也没有发送出去的情况
总结
图片.png