参考资料
阿里云官方英文、最新的Demo和Guidence
http://rocketmq.apache.org/docs/transaction-example/
阿里云的帮助文档啊,超级详细而且有Demo
https://help.aliyun.com/document_detail/29551.html
阿里云在github上的Demos(包括整合Spring 和简单TCP的形式)
https://github.com/AliwareMQ/mq-demo
消息的类型
http://rocketmq.apache.org/docs/simple-example/
按照发送的特点分:
https://help.aliyun.com/document_detail/29547.html?spm=a2c4g.11186623.2.13.496e2379jnaAlt
- 同步消息
- 异步消息
- 单向消息
按照使用功能特点分:
- 普通消息(订阅)
- 顺序消息
- 广播消息
- 延时消息
- 批量消息
- 事务消息
同步发送(可靠)
- 同步发送,线程阻塞,投递completes阻塞结束
- 如果发送失败,会在默认的超时时间3秒内进行重试,最多重试2次
- 投递completes不代表投递成功,要check SendResult.sendStatus来判断是否投递成功
//Send message in synchronous mode. This method returns only when the sending procedure totally completes.
SendResult sendResult = producer.send(msg);
- SendResult里面有发送状态的枚举:SendStatus,同步的消息投递有一个状态返回值的
public enum SendStatus {
SEND_OK,
FLUSH_DISK_TIMEOUT,
FLUSH_SLAVE_TIMEOUT,
SLAVE_NOT_AVAILABLE,
}
- retry的实现原理:只有ack的SendStatus=SEND_OK才会停止retry
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
for (; times < timesTotal; times++) {
trySend();
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
}
注意事项:发送同步消息且Ack为SEND_OK,只代表该消息成功的写入了MQ当中,并不代表该消息成功的被Consumer消费了
异步发送(可靠)
- 异步调用的话,当前线程一定要等待异步线程回调结束再关闭producer啊,因为是异步的,不会阻塞,提前关闭producer会导致未回调链接就断开了
- 异步消息不retry,投递失败回调onException()方法,只有同步消息才会retry,源码参考 DefaultMQProducerImpl.class
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
- 异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
Oneway(不可靠,类似于UPD)
- 消息不可靠,性能高,只负责往服务器发送一条消息,不会重试也不关心是否发送成功
- 此方式发送消息的过程耗时非常短,一般在微秒级别
for (int i = 0; i < 50; i++) {
Message msg = new Message("TopicTest2" ,"TagA" ,("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
}
延迟发送
艿艿的博客
https://blog.csdn.net/github_38592071/article/details/72230984
延迟的机制是在 服务端实现的,也就是Broker收到了消息,但是经过一段时间以后才发送
服务器按照1-N定义了如下级别: “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;若要发送定时消息,在应用层初始化Message消息对象之后,调用Message.setDelayTimeLevel(int level)方法来设置延迟级别,按照序列取相应的延迟级别,例如level=2,则延迟为5s
msg.setDelayTimeLevel(2);
SendResult sendResult = producer.send(msg);
实现原理:
- 发送消息的时候如果消息设置了DelayTimeLevel,那么该消息会被丢到ScheduleMessageService.SCHEDULE_TOPIC这个Topic里面
- 根据DelayTimeLevel选择对应的queue
- 再把真实的topic和queue信息封装起来,set到msg里面
- 然后每个SCHEDULE_TOPIC_XXXX的每个DelayTimeLevelQueue,有定时任务去刷新,是否有待投递的消息
- 每 10s 定时持久化发送进度
源码参考 :rocketmq-store.4.4.0.jar
CommitLog.putMessage()
- 存储延迟消息
if (msg.getDelayTimeLevel() > 0) {
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
- MQ对SCHEDULE_TOPIC_XXXX 每条消费队列对应单独一个定时任务进行轮询,发送 到达投递时间【计划消费时间】 的消息。
ScheduleMessageService.java类里面有个内部类 DeliverDelayedMessageTimerTask,定时执行check待执行消息的功能,每个类有delayLevel、offset2个变量
class DeliverDelayedMessageTimerTask extends TimerTask {
private final int delayLevel;
private final long offset;
public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
this.delayLevel = delayLevel;
this.offset = offset;
}
@Override
public void run() {
this.executeOnTimeup();
}
}
批量发送
- 需要rocketmq的版本要高,4.2以上是支持的,4.1不知道,4.0不支持
- 提升性能,建议一次消息的大小不超过1M,超大的话,官网有方法Split
- Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support.
String topic = "TopicTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "Order1", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "Order2", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "Order3", "Hello world 2".getBytes()));
producer.send(messages);
广播、集群订阅(只需要对接收端做控制即可)
通过在Consumer做配置,实现广播消息,或者集群订阅消息(默认)
consumer.setMessageModel(MessageModel.CLUSTERING); //默认
consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式