架构图
基本概念
Producer
消息生产者,负责产生消息,一般由业务系统负责产生消息
Consumer
消息消费者,负责消费消息,一般是后台系统负责异步消费
Broker
消息中转角色,负责存储消息,转収消息,一般也称为 Server。
NameServer
类似于zookeeper
概念模型
即消息是根据主题(即我们图里面的Topic)进行订阅,而每个Topic下面又可以有多个队列,只是这里的队列并不真正存储消息,而是起到类似索引的作用,消息真正存储在CommitLog里面,如下图
所有数据单独存储到一个 Commit Log,完全顺序写,随机读。对最终用户展现的队列实际只存储消息在 Commit Log 的位置信息
Message Queue
在RocketMQ中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset来访问,offset为java long类型,64位,理论上在100年内不会溢出,所以认为是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。也可以认为Message Queue是一个长度无限的数组,offset就是下标。
这样做的好处
(1)队列轻量化,单个队列数据量非常少
(2)对磁盘的访问串行化,避免磁盘竟争,丌会因为队列增加导致 IOWAIT 增高
这样做的缺点
(1)写虽然完全是顺序写,但是读却发成了完全的随机读。
(2)读一条消息,会先读 Consume Queue,再读 Commit Log,增加了开销。
(3)要保证 Commit Log 不 Consume Queue 完全的一致,增加了编程的复杂度
RocketMq的解决方案
随机读(主要是指磁盘随机读),尽可能让读命中 PAGECACHE,减少 IO 读操作,所以内存越大越好。同时由于缓存的局部性原理,可以很快的在内存上读取到消息
RocketMq里面的消息类型
顺序消息
消费消息的顺序要同収送消息的顺序一致,在 RocketMQ 中,主要指的是局部顺序,即一类消息为满足顺序性,必须 Producer 单线程顺序发送到同一个队列,返样 Consumer 就可以按照 Producer 发送的顺序去消费消息。
普通顺序消息
顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker重启,由于队列总数发生发化,哈希取模后定位的队列会发化,产生短暂的消息顺序不一致。如果业务能容忍在集群异常情况(如某个Broker宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。
严格顺序消息
严格顺序消息顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式Failover特性,即Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。(依赖同步双写,主备自动切换,自动切换功能目前还未实现)目前已知的应用只有数据库binlog同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。
获取消息的方式
Broker主动进行推送至消费者
缺点:消费者可能消费过慢造成堆积,同时如果有很多消费者对于Broker也是一件很繁重的事情长轮询
即消费者会主动去拉取,缺点是可能获取不及时,但长轮询指的是我会多等一会,类似于长连接
RocketMq里面消息的几种消费方式
涉及到磁盘,就会有零拷贝,RocketMq也不例外,常用的零拷贝有如下两种方式
内存映射
对应的java代码
File file = new File("data.zip");
RandomAccessFile raf = new RandomAccessFile(file, "rw");
FileChannel fileChannel = raf.getChannel();
MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());
真正的零拷贝
对应的java代码
File file = new File("test.zip");
RandomAccessFile raf = new RandomAccessFile(file, "rw");
FileChannel fileChannel = raf.getChannel();
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("", 1234));
// 直接使用了transferTo()进行通道间的数据传输
fileChannel.transferTo(0, fileChannel.size(), socketChannel);
这两种方式的比较
使用mmap + write方式
优点:即使频繁调用,使用小块文件传输,效率也很高
缺点:不能很好的利用DMA方式,会比sendfile多消耗CPU,内存安全性控制复杂,需要避免JVM Crash问题。
使用sendfile方式
优点:可以利用DMA方式,消耗CPU较少,大块文件传输效率高,无内存安全新问题。
缺点:小块文件效率低于mmap方式,只能是BIO方式传输,不能使用NIO。
RocketMq采用的是基于内存映射的方式,因为小块数据传输的更为频繁
消息的持久化
异步刷盘
写入到Page Cache后就立马返回了,然后再调用fsync函数异步的去将数据刷到磁盘
优点
效率高又快
缺点
断点或者重启,内存里面的数据还没来得及刷入到磁盘就没有了,所以会有丢消息的概率
同步刷盘
当然就是写入到Page Cache后就立马调用fsync函数立马刷入到磁盘
优点
可以做到不丢消息
缺点
当然就是牺牲性能了
接着再来分析下 几种消费消息的方式
At least Once
是指每个消息必须投递一次,RocketMQConsumer先pull消息到本地,消费完成后,才吐服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性
Exactly Only Once
(1).发送消息阶段,不允许収送重复的消息。
(2).消费消息阶段,不允许消费重复的消息。
只有以上两个条件都满足情况下,才能称为消息是“Exactly Only Once”,而要实现以上两点,在分布式系统环境下,不可避免要产生巨大的开销。所以RocketMQ为了追求高性能,并不保证此特性,要求在业务上进行去重,也就是说消费消息要做到幂等性。RocketMQ虽然步能严格保证不重复,但是正常情冴下很少会出现重复収送、消费情况,只有网络异常,Consumer启停等异常情况下会出现消息重复。此问题的本质原因是网络调用存在不确定性,即不成功也不失败的第三种状态,所以才产生了消息重复性问题。
定时消息
定时消息是指消息发到Broker后,不能立刻被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。如果要支持任意的时间精度,在Broker局面,必须要做消息排序,如果再涉及到持久化,那消息排序要不可避免的产生巨大性能开销。RocketMQ支持定时消息,但是不支持任意时间精度,支持特定的level,例如定时5s,10s,1m等。
消息过滤
RocketMQ的消息过滤方式有别于其他消息中间件,是在订阅时,再做过滤,先来看下Consume Queue的存储结构
(1)在Broker端迕行Message Tag比对,先遍历Consume Queue,如果存储的Message Tag与订阅的Message Tag不符合,则跳过,继续比对下一个,符合则传输给Consumer。注意:Message Tag是字符串形式,Consume Queue中存储的是其对应的hashcode,比对时也是比对hashcode。(2).Consumer收到过滤后的消息后,同样也要执行在Broker端的操作,但是比对的是真实的Message Tag字符串,而不是Hashcode
这么做的原因?
(1)Message Tag存储Hashcode,是为了在Consume Queue定长方式存储,节约空间
(2)过滤过程中不会访问Commit Log数据,可以保证堆积情况下也能高效过滤
(3)即使存在Hash冲突,也可以在Consumer端进行修正,保证万无一失
高可用
谈到高可用,自然就想到集群,那么多台机器间消息的同步方式就有同步双写和异步复制两种
异步复制
异步复制的实现思路非常简单,Slave启劢一个线程,不断从Master拉取Commit Log中的数据,然后在异步build出Consume Queue数据结构。整个实现过程基本同Mysql主从同步类似。
同步双写
也类似于Mysql的半同步复制,即主上写完,其中一台从也要写完才统一返回给客户端ok.整体思想是类似的
上面我们谈到RocketMq没有使用Zookeeper而是自己实现了NameServer
public boolean initialize() {
.....
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
.....
}
- 定时任务1:NameServer每隔10s扫描一次Broker,移除处于不激活状态的Broker
- 定时任务2:NameServer每隔10分钟打印一次KV配置
我们可以看到,集群其实就是维护心跳,这里面其实还有很多细节,还没看完,看完再更新吧
Producer最佳实践
发送消息注意事项
(1)一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags在broker做消息过滤。message.setTags("TagA");
(2)每个消息在业务局面的唯一标识码,要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic,key来查询返条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,返样可以避免潜在的哈希冲突。// 订单IdString orderId = "20034568923546";message.setKeys(orderId);
(3)消息发送成功或者失败,要打印消息日志,务必要打印sendresult和key字段。
(4)send消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在sendResult里定义
SEND_OK消息发送成功
FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到Slave时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
SLAVE_NOT_AVAILABLE:消息发送成功,但是此时slave不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失对与精确发送顺序消息的应用,由亍顺序消息的局限性,可能会涉及到主备自动切换问题,所以如果sendresult中的status字段不等于SEND_OK,就应该尝试重试。对于其他应用,则没有必要这样。
(5)对于消息不可丢失应用,务必要有消息重发机制
例如如果消息发送失败,存储到数据库,能有定时程序尝试重发,或者人工触发重发。
Consumer最佳实践
(1)将消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等,消费之前判断是否在Db或(全局KV存储)中存在,如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入入,如果报主键冲突,则插入失败,直接跳过)msgId一定是全局唯一标识符,但是可能会存在同样的消息有两个不同msgId的情冴(有多种原因),返种情况可能会使业务上重复消费,建议最好使用消息内容中的唯一标识字段去重。
2.使用业务局面的状态机去重
具体可见幂等总结
负载均衡策略
AllocateMessageQueueAveragely
平均算法: 算出平均值,将连续的队列按平均值分配给每个消费者。 如果能够整除,则按顺序将平均值个Queue分配,如果不能整除,则将多余出的Queue按照Consumer顺序逐个分配
AllocateMessageQueueAveragelyByCircle
环形平均算法:将消费者按顺序形成一个环形,然后按照这个环形顺序逐个给消费者分配一个Queue
AllocateMessageQueueConsistentHash
一致性hash算法:先将消费端的hash值放于环上,同时计算队列的hash值,以顺时针方向,分配给离队列hash值最近的一个消费者节点
代码
/*
1.初始化:默认使用AllocateMessageQueueAveragely算法分配Queue
*/
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
public DefaultMQPushConsumer(final String consumerGroup) {
this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
}
}
/*
2.开启一个RebalanceService任务执行分配策略
*/
public class MQClientInstance {
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
...
// Start rebalance service
this.rebalanceService.start();
...
default:
break;
}
}
}
}
/*
3.RebalanceImpl.rebalanceByTopic执行具体的分配逻辑
*/
public abstract class RebalanceImpl {
private boolean rebalanceByTopic(final String topic, final boolean isOrder) {
boolean balanced = true;
switch (messageModel) {
...
case CLUSTERING: {
// 拿到所有的Queue
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 拿到所有的消费者ID
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
...
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
// 初始化设置的分配算法:即AllocateMessageQueueAveragely平均分配算法
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 调用分配算法的具体实现
allocateResult = strategy.allocate(...mqAll, cidAll);
} catch (Throwable e) {
log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);
return false;
}
...
}
break;
}
default:
break;
}
return balanced;
}
}
/*
4.执行AllocateMessageQueueAveragely平均分配算法的具体实现
*/
public class AllocateMessageQueueAveragely extends AbstractAllocateMessageQueueStrategy {
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
List<MessageQueue> result = new ArrayList<>();
if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
return result;
}
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
}