怎么保证消息的可靠性
生产者丢失
生产者丢失消息的可能点在于程序发送失败抛异常了没有重试处理,或者发送的过程成功但是过程中网络闪断MQ没收到,消息就丢失了。
1.同步发送
2.异步发送,根据回调来确认是否发送成功
MQ丢失
RocketMQ分为同步刷盘和异步刷盘两种方式,默认的是异步刷盘,就有可能导致消息还未刷到硬盘上就丢失了,可以通过设置为同步刷盘的方式来保证消息可靠性,这样即使MQ挂了,恢复的时候也可以从磁盘中去恢复消息。
消费者丢失
消费者丢失消息的场景:消费者刚收到消息,此时服务器宕机,MQ认为消费者已经消费,不会重复发送消息,消息丢失。
RocketMQ默认是需要消费者回复ack确认,消费方不返回ack确认,重发的机制根据MQ类型的不同发送时间间隔、次数都不尽相同,如果重试超过次数之后会进入死信队列,需要手工来处理了。
Broker的Master和Slave之间是怎么同步数据
1.在broker收到消息后,会被标记为uncommitted状态,然后会把消息发送给所有的slave
2.slave在收到消息之后返回ack响应给master
3.master在收到超过半数的ack之后,把消息标记为committed
4.发送committed消息给所有slave,slave也修改状态为committed
消息积压达到磁盘上限,怎么办
1.先确认是不是consume端出现代码错误,导致不能正常消费
2.是的话,先停掉consume,另外新建一个临时的。
3.如果还是堆积,可以新建一个临时consume,然后把消息转发到不同的临时topic和queue,采用多个consume来同时消费,降低堆积。
RocketMQ为什么速度快?
1.我们在写入commitlog的时候是顺序写入的,这样比随机写入的性能就会提高很多
2.写入commitlog的时候并不是直接写入磁盘,而是先写入操作系统的PageCache
3.最后由操作系统异步将缓存中的数据刷到磁盘
broker是如何保存消息的?
RocketMQ主要的存储文件包括commitlog文件、consumequeue文件、indexfile文件。
Broker在收到消息之后,会把消息保存到commitlog的文件当中,而同时在分布式的存储当中,每个broker都会保存一部分topic的数据,同时,每个topic对应的messagequeue下都会生成consumequeue文件用于保存commitlog的物理位置偏移量offset,indexfile中会保存key和offset的对应关系。
MQ的架构
NameServer
是个无状态的服务器,角色类似ZooKeeper注册中心,但是比ZooKeeper更加轻量。
1.每个节点互相独立,通过部署多个节点来达到标识成一个伪集群
2.producer发送消息前,会通过nameserver来获取指定topic的路由地址,也就是发送到哪个broker,consume也会定时与nameserver通信来获取topic的路由信息。broker在启动的时候会把topic路由信息注册到nameserver,并定时保持心跳连接,维持topic的路由信息。
broker
消息存储和中转角色,负责存储和转发消息。
Broker内部维护着一个个Message Queue,用来存储消息的索引,真正存储消息的地方是CommitLog(日志文件)。
单个Broker与所有的Nameserver保持着长连接和心跳,并会定时将Topic信息同步到NameServer,和NameServer的通信底层是通过Netty实现的。
分布式事务
半消息:Producer成功发送到Broker端,但是暂时不能被Consume消费的消息。
1.producer发送半消息,发送成功,但是consume不能消费。
2.执行本地事务。
3.本地事务执行成功,Producer向Broker端发送commit,此时消息变成正常消息,Consume可以消费。本地事务执行失败,Producer发送Rollback,Borker端丢弃改消息。
4.异常情况,Broker端迟迟等不到二次确认。在一定时间后,会查询所有的半消息,然后到Producer端查询半消息的执行情况。
5.Producer端查询本地事务的状态。
6.重复3流程。
prducer的负载均衡
这个方法就是随机选择一个要发送消息的broker来达到负载均衡的效果,选择的标准:尽量不选刚刚选过的broker,尽量不选发送上条消息延迟过高或没有响应的broker,也就是找到一个可用的broker。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}