rocketMQ技术架构笔记

rocketMQ架构图:


image.png

组成介绍

  • Producer:消息发布的角色,支持分布式集群方式部署。发送消息时,Producer 会随机选择一个nameserver建立长连接,定期获取topic路由信息(包括所有队列列表)。Producer会轮询队列列表,与队列所在的broker建立长连接从而向broker发送消息,投递的过程支持快速失败并且低延迟。
  • Nameserver:提供broker的动态注册与发现。主要包含两个功能:
  1. Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
  2. 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
    NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。
  • BrokerServer: Broker主要负责消息的存储、投递和查询以及服务高可用保证。broker架构:
  1. Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
    Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
  2. Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
  3. HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
  4. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。


    rocketmq_architecture_2.png
  • ConsumerServer: 消息消费的角色,支持分布式集群方式部署。支持以push推(特殊的pull模式),pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费。

功能与特性

  • 基于Message Id, Message Key消息查询
  • 消息轨迹查询
  • 集群消费和广播消费
  • topic 级别权限控制
  • 重置消费位点,在消费者组级别重置消费者队列位点
  • 重试队列和死信队列

使用样例

  • 生产者同步发送
Message message = new Message("topic1", "inOrder", "messageId_" + i,
                    ("hello world" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
  • 生产者异步发送
Message msg = new Message("topic1", "async","uniqueId123123123",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // SendCallback接收异步返回结果的回调
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                }
                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }
            });
  • 生产者sendOneway
Message message = new Message("topic1", "sendOneWay", "messageId_" + i,
                    ("hello world" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(message);
  • 顺序消息
    同一个订单的不同业务消息需要保证顺序,根据业务唯一标识订单id对队列集合数量取模,保证每个订单的消息只会发送到一个队列中。
//生产者
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
               @Override
               public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                   Long id = (Long) arg;  //根据订单id选择发送queue
                   long index = id % mqs.size();
                   return mqs.get((int) index);
               }
           }, orderList.get(i).getOrderId());//订单id
//消费者
consumer.registerMessageListener(new MessageListenerOrderly() {

           Random random = new Random();

           @Override
           public ConsumeOrderlyStatus consumeMessage(
                List<MessageExt> msgs, ConsumeOrderlyContext context) {
               context.setAutoCommit(true);
               for (MessageExt msg : msgs) {
                   // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
                   System.out.println("consumeThread=" + Thread.currentThread().getName() +
 "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
               }
               try {
                   //模拟业务逻辑处理中...
                   TimeUnit.SECONDS.sleep(random.nextInt(10));
               } catch (Exception e) {
                   e.printStackTrace();
               }
               return ConsumeOrderlyStatus.SUCCESS;
           }
});

  • 延时消息
//生产者
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
          // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
          // 发送消息
producer.send(message);
//消费者
consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,
                ConsumeConcurrentlyContext context) {
                // 延迟消息 消息生成 和 消息store 之间延迟
                for (MessageExt message : messages) {
                    // Print approximate delay time period
                     System.out.println("Receive message[msgId=" + 
                     message.getMsgId() + "] " + (message.getStoreTimestamp() - 
                     message.getBornTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
  • 重试队列和死信队列
consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                  List<MessageExt> msgs, 
                 ConsumeConcurrentlyContext context) {
                // 返回此状态会判断是否存在重试topic 没有就创建
                // 默认重试16次。16次后进入死信队列,等待后续订阅处理
               //三种导致重试的情况
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
//                return null;
//                throw new RuntimeException();
            }
        });
  • 消息过滤方式
//by tag
consumer.subscribe("topic1", "TagA || TagC || TagD");
// by SQL
consumer.subscribe("topic1", MessageSelector.bySql("a between 0 and 3"));

消息持久化和消费逻辑

消息持久化

消息发送到broker时会在broker节点生成三类文件。windows存储目录在C:\Users\YOURUSERNAME\store

  • commitlog
    存储消息的主体
  • consumeQueue
    单个文件由30W个条目组成,单个条目由8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode组成,用于快速定位消息实体的索引。每个topic中的每个队列都有对应的一个consumeQueue文件。


    rocketmq_design_1.png
  • indexFile
    提供了一种可以通过key或时间区间来查询消息的方法底层实现为hashMap

消息生产和消费逻辑

  1. 生产者通过负载均衡策略从队列列表中选择一个队列,再与队列所在的broker建立连接,从而发送消息。broker会持久化消息到commitLog文件,同时在consumeQueue文件中生成一个条目,条目中记录消息在commitLog中的偏移量、消息长度和tag的hashcode。消费者可以高效的通过条目找到对应的消息。
  2. 消费者以消费者组为单位订阅Topic,每个消费者组在topic所有队列中的Offset都不同,代表每个消费者组消费是隔离的。

最佳实践

  • topic和tag使用场景
    不同的消息类型(普通消息,事务消息,定时(延时消息),顺序消息)使用不同topic,不能使用tag区分。
    业务相关联的消息使用tag区分,业务不直接相关的用topic区分
    消息优先级和数据量级不同的用topic区分,可能导致优先级高的消息不能及时被消费或者等待现象。
  • 消息幂等
    可以将业务唯一标识设置为message的key,再通过代码实现幂等。
  • 订阅关系一致
    同一个消费者组下consumer实例订阅的topic,tag必须保持一致。
    正确的示例:


    p169720.png

    错误的示例:


    p169724.png

常见问题

  1. 消息轨迹中消息详情报错?
    配置traceTopicEnable=true,生成生产者和消费者示例时设置enableMsgTrace=true
  2. broker中消息存储文件commitlog保存时间?3天 (fileReservedTime) 每天4am删除
  3. consumeQueue indexFile构建?
  4. 不同的消息类型不共用Topic。
  5. 并发消费 顺序消费 实现
    并发消费: 一个队列有多个消费者线程消费
    顺序消费:一个队列只有一个消费者线程消费
  6. 生产者 消费者 重试机制
    生产者同步模式下 默认重试两次,异步,oneway模式不会重试
    消费者消费失败后最多重试16次,如果再失败会被投递到死信队列中。
  7. messageId 生成策略
    由broker IP, port 加上offset根据一定算法生成, 所以根据messageId查询效率较高。
  8. 延时消息的实现
    消息发送到broker后,由broker实现。延迟存储到commitLog库。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351