前言
主要学习RocketMQ生产者的基本API使用,以及各种投递消息模式,消息的返回状态等等。
RocketMQ生产者的参数
- productGroup: 组名
- createTopicKey:
- defaultTopicQueueNums (默认为4)
- sendMsgTimeout(单位:ms)
- compressMsgBodyOverHowmuch
- retryTimesWhenSendFaild (可配置)
- retryAnotherBrokerWhenNotStoreOK(默认视false)
- maxMessageSize (默认128K)
注:这些通过配置文件去配置,不需要在代码层次去配置
主从同步机制基本了解
元数据同步:
通过启动Broker的时候,会去判断是否为从节点,如果是为Slave则启动同步任务(定时任务,固定的时间,每隔一分钟去执行)
元数据信息有,topic配置信息,同步消费者的偏移量,延迟偏移量,订阅组配置信息等。底层是netty去做的。
同步消费者偏量移
主服务器Master启动,监听从服务器Slave的监听;从服务器Slave启动,主动向主服务器建立连接,获取从服务器Slave的commitlog偏移量,以此偏移量向主服务器Master主动拉取消息,主服务器根据偏移量,与自身commitlog文件的最大偏移量进行比较,如果大于Slave的最大偏移量,主服务器Master将向从服务器Slave返回一定数量的消息。
消息同步:HAService、HAconnection、WaitNotifyObject(只做基本的了解)
HAconnection
Master节点:
AcceptSocketService: 接收Slave节点连接
HAConnection:
- ReadSocketService: 读自Slave节点的数据
- WriteSocketService:写往Slave节点的数据
HAService
实时同步,用的socket
Slave 节点: HAServie -》HAClinet:对Master节点连接,读写数据。
通信协议
Master节点与Slave节点通信协议
Slave - 》 Master : 上报CommitLog已同步的位置(最大的位置)
Master -》 Slave : 传输新的CommitLog数据(开始的位置)
RocketMQ生产者同步消息发送
- 消息的同步发送: producer.send(msg)
- 同步发送消息核心实现:DefaultMQProducerImpl
SendResult sendResult = producer.send(message);
同步重试次数:当前1次+设置的重试次数
RocketMQ生产者异步消息发送
- 异步发送:producer(Message msg, SendCallback sendCallback)
- 异步发送消息核心实现: DefaultMQoridycerImpl
异步重试次数:只有1次
RocketMQ生产者消息返回状态
- SEND_OK
- FLUSH_DISK_TIMEOUT
- FLUSH_SLAVE_TIMEOUT
- SLAVE_NOT_AVAILABLE
最后三个状态(虽然RocketMQ已经是有高可用了,但要做得比较可靠性可以做补偿机制)
// 状态
SendStatus status = sr.getSendStatus();
RocketMQ生产者延迟消息
- 延迟消息:消息发到Broker后,要特定是时间才会北Consumer消费
- 目前只支持固定精度的定时消息
- MessageStoreConfig配置类&ScheduleMessageService任务类
- setDelayTimeLevel方法设置
Message message = new Message("test_topic", // 主题
"TagA", // 标签
"key" + i, // 用户自定义的key ,唯一的标识
("Hello RocketMQ" + i).getBytes()); // 消息内容实体(byte[])
// 2. 发送消息 topic 默认4个队列
SendResult sendResult = producer.send(message);
System.out.println("消息发出:" + sendResult);
// 3. 设置延迟队列时间
message.setDelayTimeLevel(3);
RocketMQ生产者自定义消息发送规则
- 如何把消息发送到指定的队列
- 使用MessageQueueSelector
- 使用方法producer.send(Msg,selector, obj)
SendResult sr = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer queueNumber = (Integer)arg;
return mqs.get(queueNumber);
}
}, 2);
System.err.println(sr);
打印日志,指定的队列看queueId=2
SendResult [sendStatus=SEND_OK, msgId=C0A801039FEC18B4AAC20CA120060000, offsetMsgId=C0A8010300002A9F000000000005AF9C, messageQueue=MessageQueue [topic=test_topic, brokerName=SC-202111172212, queueId=2], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=C0A801039FEC18B4AAC20CA1201E0001, offsetMsgId=C0A8010300002A9F000000000005B05E, messageQueue=MessageQueue [topic=test_topic, brokerName=SC-202111172212, queueId=2], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=C0A801039FEC18B4AAC20CA120240002, offsetMsgId=C0A8010300002A9F000000000005B120, messageQueue=MessageQueue [topic=test_topic, brokerName=SC-202111172212, queueId=2], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=C0A801039FEC18B4AAC20CA120270003, offsetMsgId=C0A8010300002A9F000000000005B1E2, messageQueue=MessageQueue [topic=test_topic, brokerName=SC-202111172212, queueId=2], queueOffset=12]
SendResult [sendStatus=SEND_OK, msgId=C0A801039FEC18B4AAC20CA1202B0004, offsetMsgId=C0A8010300002A9F000000000005B2A4, messageQueue=MessageQueue [topic=test_topic, brokerName=SC-202111172212, queueId=2], queueOffset=13]
结尾
了解生产者的同步,异步发消息,自定义发送队列API以及状态返回等。
对于RocketMQ不能任意精度配置延迟时间,从网上了解到原因
- 延迟消息是基于普通消息队列+timer实现的
broker接收到延迟消息以后会把topic加个前缀投递到该topic对应的队列中去,然后开启定时器去扫描这些消息,等消息需要触发再投递到正常的topic队列中 - mq的核心io宗旨:顺序读写
mq是使用commit log来存储文件的,为了提升效率充分利用pagecache所以采用顺序读写的设计思想,队列中的消息基本都是先执行的排在前面
需要对比排序 - 为了消息顺序问题每条消息都要与之前的消息进行对比那对性能的损耗,所以mq最终决定只支持18个延迟等级,mq把每个topic对应18个存储队列,其中一个等级对应一个队列, 每个等级的消息后面发的肯定比前面发的后执行,所以可以保证顺序问题