RocketMq总结

架构图

架构图.png

基本概念

Producer

消息生产者,负责产生消息,一般由业务系统负责产生消息

Consumer

消息消费者,负责消费消息,一般是后台系统负责异步消费

Broker

消息中转角色,负责存储消息,转収消息,一般也称为 Server。

NameServer

类似于zookeeper


概念模型

image.png

即消息是根据主题(即我们图里面的Topic)进行订阅,而每个Topic下面又可以有多个队列,只是这里的队列并不真正存储消息,而是起到类似索引的作用,消息真正存储在CommitLog里面,如下图


RocketMq消息实际存储结构.png

所有数据单独存储到一个 Commit Log,完全顺序写,随机读。对最终用户展现的队列实际只存储消息在 Commit Log 的位置信息

Message Queue

在RocketMQ中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset来访问,offset为java long类型,64位,理论上在100年内不会溢出,所以认为是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。也可以认为Message Queue是一个长度无限的数组,offset就是下标。

这样做的好处
(1)队列轻量化,单个队列数据量非常少
(2)对磁盘的访问串行化,避免磁盘竟争,丌会因为队列增加导致 IOWAIT 增高

这样做的缺点
(1)写虽然完全是顺序写,但是读却发成了完全的随机读。
(2)读一条消息,会先读 Consume Queue,再读 Commit Log,增加了开销。
(3)要保证 Commit Log 不 Consume Queue 完全的一致,增加了编程的复杂度

RocketMq的解决方案

随机读(主要是指磁盘随机读),尽可能让读命中 PAGECACHE,减少 IO 读操作,所以内存越大越好。同时由于缓存的局部性原理,可以很快的在内存上读取到消息


RocketMq里面的消息类型

顺序消息

消费消息的顺序要同収送消息的顺序一致,在 RocketMQ 中,主要指的是局部顺序,即一类消息为满足顺序性,必须 Producer 单线程顺序发送到同一个队列,返样 Consumer 就可以按照 Producer 发送的顺序去消费消息。

普通顺序消息

顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker重启,由于队列总数发生发化,哈希取模后定位的队列会发化,产生短暂的消息顺序不一致。如果业务能容忍在集群异常情况(如某个Broker宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。

严格顺序消息

严格顺序消息顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式Failover特性,即Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。(依赖同步双写,主备自动切换,自动切换功能目前还未实现)目前已知的应用只有数据库binlog同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。


获取消息的方式

  • Broker主动进行推送至消费者
    缺点:消费者可能消费过慢造成堆积,同时如果有很多消费者对于Broker也是一件很繁重的事情

  • 长轮询
    即消费者会主动去拉取,缺点是可能获取不及时,但长轮询指的是我会多等一会,类似于长连接


RocketMq里面消息的几种消费方式


涉及到磁盘,就会有零拷贝,RocketMq也不例外,常用的零拷贝有如下两种方式

内存映射

image.png

对应的java代码

File file = new File("data.zip");
        RandomAccessFile raf = new RandomAccessFile(file, "rw");
        FileChannel fileChannel = raf.getChannel();
        MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());

真正的零拷贝


image.png

对应的java代码

 File file = new File("test.zip");
        RandomAccessFile raf = new RandomAccessFile(file, "rw");
        FileChannel fileChannel = raf.getChannel();
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("", 1234));
        // 直接使用了transferTo()进行通道间的数据传输
        fileChannel.transferTo(0, fileChannel.size(), socketChannel);

这两种方式的比较

使用mmap + write方式
优点:即使频繁调用,使用小块文件传输,效率也很高
缺点:不能很好的利用DMA方式,会比sendfile多消耗CPU,内存安全性控制复杂,需要避免JVM Crash问题。

使用sendfile方式
优点:可以利用DMA方式,消耗CPU较少,大块文件传输效率高,无内存安全新问题。
缺点:小块文件效率低于mmap方式,只能是BIO方式传输,不能使用NIO。

RocketMq采用的是基于内存映射的方式,因为小块数据传输的更为频繁


消息的持久化

同步刷盘与异步刷盘.png

异步刷盘
写入到Page Cache后就立马返回了,然后再调用fsync函数异步的去将数据刷到磁盘

优点

效率高又快

缺点

断点或者重启,内存里面的数据还没来得及刷入到磁盘就没有了,所以会有丢消息的概率


同步刷盘

当然就是写入到Page Cache后就立马调用fsync函数立马刷入到磁盘

优点

可以做到不丢消息

缺点

当然就是牺牲性能了


接着再来分析下 几种消费消息的方式

At least Once

是指每个消息必须投递一次,RocketMQConsumer先pull消息到本地,消费完成后,才吐服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性

Exactly Only Once

(1).发送消息阶段,不允许収送重复的消息。
(2).消费消息阶段,不允许消费重复的消息。

只有以上两个条件都满足情况下,才能称为消息是“Exactly Only Once”,而要实现以上两点,在分布式系统环境下,不可避免要产生巨大的开销。所以RocketMQ为了追求高性能,并不保证此特性,要求在业务上进行去重,也就是说消费消息要做到幂等性。RocketMQ虽然步能严格保证不重复,但是正常情冴下很少会出现重复収送、消费情况,只有网络异常,Consumer启停等异常情况下会出现消息重复。此问题的本质原因是网络调用存在不确定性,即不成功也不失败的第三种状态,所以才产生了消息重复性问题。

定时消息

定时消息是指消息发到Broker后,不能立刻被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。如果要支持任意的时间精度,在Broker局面,必须要做消息排序,如果再涉及到持久化,那消息排序要不可避免的产生巨大性能开销。RocketMQ支持定时消息,但是不支持任意时间精度,支持特定的level,例如定时5s,10s,1m等。


消息过滤

RocketMQ的消息过滤方式有别于其他消息中间件,是在订阅时,再做过滤,先来看下Consume Queue的存储结构


ConsumeQueue单个存储单元结构.png

(1)在Broker端迕行Message Tag比对,先遍历Consume Queue,如果存储的Message Tag与订阅的Message Tag不符合,则跳过,继续比对下一个,符合则传输给Consumer。注意:Message Tag是字符串形式,Consume Queue中存储的是其对应的hashcode,比对时也是比对hashcode。(2).Consumer收到过滤后的消息后,同样也要执行在Broker端的操作,但是比对的是真实的Message Tag字符串,而不是Hashcode

这么做的原因?
(1)Message Tag存储Hashcode,是为了在Consume Queue定长方式存储,节约空间
(2)过滤过程中不会访问Commit Log数据,可以保证堆积情况下也能高效过滤
(3)即使存在Hash冲突,也可以在Consumer端进行修正,保证万无一失


高可用

谈到高可用,自然就想到集群,那么多台机器间消息的同步方式就有同步双写和异步复制两种

异步复制

异步复制的实现思路非常简单,Slave启劢一个线程,不断从Master拉取Commit Log中的数据,然后在异步build出Consume Queue数据结构。整个实现过程基本同Mysql主从同步类似。

同步双写

也类似于Mysql的半同步复制,即主上写完,其中一台从也要写完才统一返回给客户端ok.整体思想是类似的


上面我们谈到RocketMq没有使用Zookeeper而是自己实现了NameServer

 public boolean initialize() {
                .....        
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        .....
    }
  • 定时任务1:NameServer每隔10s扫描一次Broker,移除处于不激活状态的Broker
  • 定时任务2:NameServer每隔10分钟打印一次KV配置

我们可以看到,集群其实就是维护心跳,这里面其实还有很多细节,还没看完,看完再更新吧


Producer最佳实践
发送消息注意事项
(1)一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags在broker做消息过滤。message.setTags("TagA");

(2)每个消息在业务局面的唯一标识码,要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic,key来查询返条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,返样可以避免潜在的哈希冲突。// 订单IdString orderId = "20034568923546";message.setKeys(orderId);

(3)消息发送成功或者失败,要打印消息日志,务必要打印sendresult和key字段。

(4)send消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在sendResult里定义
SEND_OK消息发送成功
FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到Slave时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
SLAVE_NOT_AVAILABLE:消息发送成功,但是此时slave不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失对与精确发送顺序消息的应用,由亍顺序消息的局限性,可能会涉及到主备自动切换问题,所以如果sendresult中的status字段不等于SEND_OK,就应该尝试重试。对于其他应用,则没有必要这样。
(5)对于消息不可丢失应用,务必要有消息重发机制
例如如果消息发送失败,存储到数据库,能有定时程序尝试重发,或者人工触发重发。

Consumer最佳实践
(1)将消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等,消费之前判断是否在Db或(全局KV存储)中存在,如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入入,如果报主键冲突,则插入失败,直接跳过)msgId一定是全局唯一标识符,但是可能会存在同样的消息有两个不同msgId的情冴(有多种原因),返种情况可能会使业务上重复消费,建议最好使用消息内容中的唯一标识字段去重。
2.使用业务局面的状态机去重

具体可见幂等总结


负载均衡策略

AllocateMessageQueueAveragely

平均算法: 算出平均值,将连续的队列按平均值分配给每个消费者。 如果能够整除,则按顺序将平均值个Queue分配,如果不能整除,则将多余出的Queue按照Consumer顺序逐个分配


image.png

AllocateMessageQueueAveragelyByCircle

环形平均算法:将消费者按顺序形成一个环形,然后按照这个环形顺序逐个给消费者分配一个Queue


image.png

AllocateMessageQueueConsistentHash

一致性hash算法:先将消费端的hash值放于环上,同时计算队列的hash值,以顺时针方向,分配给离队列hash值最近的一个消费者节点


image.png

代码

/*
   1.初始化:默认使用AllocateMessageQueueAveragely算法分配Queue
 */
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
    public DefaultMQPushConsumer(final String consumerGroup) {
        this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
    }
}
/*
   2.开启一个RebalanceService任务执行分配策略
 */
public class MQClientInstance {
    public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    ...
                    // Start rebalance service
                    this.rebalanceService.start();
                    ...
                default:
                    break;
            }
        }
    }
}
/*
   3.RebalanceImpl.rebalanceByTopic执行具体的分配逻辑
 */
public abstract class RebalanceImpl {
    private boolean rebalanceByTopic(final String topic, final boolean isOrder) {
        boolean balanced = true;
        switch (messageModel) {
            ...
            case CLUSTERING: {
                // 拿到所有的Queue
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                // 拿到所有的消费者ID
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                ...
                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    // 初始化设置的分配算法:即AllocateMessageQueueAveragely平均分配算法
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        // 调用分配算法的具体实现
                        allocateResult = strategy.allocate(...mqAll, cidAll);
                    } catch (Throwable e) {
                        log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);
                        return false;
                    }
                    ...
                }
                break;
            }
            default:
                break;
        }
        return balanced;
    }
}
/*
   4.执行AllocateMessageQueueAveragely平均分配算法的具体实现
 */
public class AllocateMessageQueueAveragely extends AbstractAllocateMessageQueueStrategy {
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {

        List<MessageQueue> result = new ArrayList<>();
        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
            return result;
        }

        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
                mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                        + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容