RocketMQ4.X

RocketMQ4.X

JMS

Java消息服务(Java Message Service),Java平台中关于面向消息中间件的接口

  • JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系数据库的 API
  • 使用场景:
    • 核心应用
      • 解耦:订单系统-》物流系统
      • 异步:用户注册-》发送邮件,初始化信息
      • 削峰:秒杀、日志处理
    • 跨平台 、多语言
    • 分布式事务、最终一致性
    • RPC调用上下游对接,数据源变动->通知下属

消息中间件常见概念和编程模型

  • 常见概念
    • JMS提供者:连接面向消息中间件的,JMS接口的一个实现,RocketMQ,ActiveMQ,Kafka等等
    • JMS生产者(Message Producer):生产消息的服务
    • JMS消费者(Message Consumer):消费消息的服务
    • JMS消息:数据对象
    • JMS队列:存储待消费消息的区域
    • JMS主题:一种支持发送消息给多个订阅者的机制
    • JMS消息通常有两种类型:点对点(Point-to-Point)、发布/订阅(Publish/Subscribe)
  • 基础编程模型
    • MQ中需要用的一些类
    • ConnectionFactory :连接工厂,JMS 用它创建连接
    • Connection :JMS 客户端到JMS Provider 的连接
    • Session: 一个发送或接收消息的线程
    • Destination :消息的目的地;消息发送给谁.
    • MessageConsumer / MessageProducer: 消息消费者,消息生产者

主流消息队列和技术选型讲解

  • ActiveMQ:http://activemq.apache.org/

    • Apache出品,历史悠久,支持多种语言的客户端和协议,支持多种语言Java, .NET, C++ 等,基于JMS Provider的实现

    缺点:吞吐量不高,多队列的时候性能下降,存在消息丢失的情况,比较少大规模使用

  • Kafka:http://kafka.apache.org/

    • 是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理大规模的网站中的所有动作流数据(网页浏览,搜索和其他用户的行动),副本集机制,实现数据冗余,保障数据尽量不丢失;支持多个生产者和消费者

    缺点:不支持批量和广播消息,运维难度大,文档比较少, 需要掌握Scala

  • RabbitMQ:http://www.rabbitmq.com/

    • 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错

    缺点:使用Erlang开发,阅读和修改源码难度大

  • RocketMQ:http://rocketmq.apache.org/

    • 阿里开源的一款的消息中间件, 纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点, 性能强劲(零拷贝技术),支持海量堆积, 支持指定次数和时间间隔的失败消息重发,支持consumer端tag过滤、延迟消息等,在阿里内部进行大规模使用,适合在电商,互联网金融等领域使用

RocketMQ4.x消息队列介绍

[官网]http://rocketmq.apache.org/

  • Apache RocketMQ作为阿里开源的一款高性能、高吞吐量的分布式消息中间件

  • 特点

    • 支持Broker和Consumer端消息过滤
    • 支持发布订阅模型,和点对点,
    • 支持拉pull和推push两种消息模式
    • 单一队列百万消息、亿级消息堆积
    • 支持单master节点,多master节点,多master多slave节点
    • 任意一点都是高可用,水平拓展,Producer、Consumer、队列都可以分布式
    • 消息失败重试机制、支持特定level的定时消息
    • 新版本底层采用Netty
    • 4.3.x支持分布式事务
    • 适合金融类业务,高可用性跟踪和审计功能。
  • 概念

    • Producer:消息生产者

    • Producer Group:消息生产者组,发送同类消息的一个消息生产组

    • Consumer:消费者

    • Consumer Group:消费同类消息的多个实例

    • Tag:标签,子主题(二级分类)对topic的进一步细化,用于区分同一个主题下的不同业务的消息

    • Topic:主题, 如订单类消息,queue是消息的物理管理单位,而topic是逻辑管理单位。一个topic下可以有多个queue,

      默认自动创建是4个,手动创建是8个

    • Message:消息,每个message必须指定一个topic

    • Broker:MQ程序,接收生产的消息,提供给消费者消费的程序

    • Name Server:给生产和消费者提供路由信息,提供轻量级的服务发现、路由、元数据信息,可以多个部署,互相独立(比zookeeper更轻量)

    • Offset: 偏移量,可以理解为消息进度

    • commit log: 消息存储会写在Commit log文件里面

Springboot2.X整合RocketMQ4.X

  • 导入依赖

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.4.0</version>
    </dependency>
    
  • 实例

    配置信息

    public class PayConfig {
        // nameServer地址,多个可以用;分开
        public static final String NAME_SERVER_ADDR = "139.224.101.91:9876";
    
        // 指定topic
        // 通过命令查看 ./bin/mqbroker -m
        // autoCreateTopicEnable=true 则自动创建topic
        public static final String PAY_TOPIC = "pay_test_topic";
    }
    

    生成者

    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.springframework.stereotype.Component;
    
    @Component
    public class PayProducer {
        private String producerGroup = "pay_producer_group";
        private DefaultMQProducer producer;
    
        /**
         * 获取DefaultMQProducer
         */
        public DefaultMQProducer getProducer() {
            return this.producer;
        }
    
        /**
         * 无参构造
         */
        public PayProducer() {
            producer = new DefaultMQProducer(producerGroup);
            // 指定nameServer地址
            producer.setNamesrvAddr(PayConfig.NAME_SERVER_ADDR);
    
            start();
        }
    
        /**
         * 对象在使用之前必须要调用一次,只能初始化一次
         */
        public void start() {
            try {
                this.producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 一般在应用上下文,使用上下文监听器进行关闭
         */
        public void shutdown() {
            this.producer.shutdown();
        }
    }
    

    消费者

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.springframework.stereotype.Component;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    
    @Component
    public class PayConsumer {
        private String consumerGroup = "pay_consumer_group";
    
        private DefaultMQPushConsumer consumer;
    
        public PayConsumer() throws MQClientException {
            consumer = new DefaultMQPushConsumer(consumerGroup);
            consumer.setNamesrvAddr(PayConfig.NAME_SERVER_ADDR);
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    
            consumer.subscribe(PayConfig.PAY_TOPIC, "*");
            // 注册监听
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try {
                        Message msg = msgs.get(0);
                        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
                        String topic = msg.getTopic();
                        String body = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        String keys = msg.getKeys();
                        System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            });
    
            consumer.start();
            System.out.println("consumer start ...");
        }
    }
    

    访问入口

    import cn.net.rocketmq.jms.PayConfig;
    import cn.net.rocketmq.jms.PayProducer;
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    
    @RestController
    public class PayController {
        @Resource
        PayProducer payProducer;
    
        @GetMapping("/payMess")
        public String  payMessage(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
            Message message = new Message(PayConfig.PAY_TOPIC, "taga", ("rocketMq" + text).getBytes());
            SendResult send = payProducer.getProducer().send(message);
            System.out.println("SendResult==" + send);
            return "ok";
        }
    }
    

    结果

    SendResult==SendResult [sendStatus=SEND_OK, msgId=C0A80B6E292018B4AAC27FB005A20000, offsetMsgId=8BE0655B00002A9F0000000000057F7B, messageQueue=MessageQueue [topic=pay_test_topic, brokerName=broker-a, queueId=2], queueOffset=0]
    ConsumeMessageThread_4 Receive New Messages: rocketMq5555 
    topic=pay_test_topic, tags=taga, keys=null, msg=rocketMq5555
    

生产者核心配置及核心知识

核心配置

type desc
compressMsgBodyOverHowmuch 消息超过默认字节4096后进行压缩
retryTimesWhenSendFailed 失败重发次数
maxMessageSize 最大消息配置,默认128k
topicQueueNums 主题下面的队列数量,默认是4
autoCreateTopicEnable 是否自动创建主题Topic, 开发建议为true,生产要为false
defaultTopicQueueNums 自动创建服务器不存在的topic,默认创建的队列数
autoCreateSubscriptionGroup 是否允许 Broker 自动创建订阅组,建议线下开发开启,线上关闭
brokerClusterName 集群名称
brokerId 0表示Master主节点 大于0表示从节点
brokerIP1 Broker服务地址
brokerRole broker角色 ASYNC_MASTER/ SYNC_MASTER/ SLAVE
deleteWhen 每天执行删除过期文件的时间,默认每天凌晨4点
flushDiskType 刷盘策略, 默认为 ASYNC_FLUSH(异步刷盘), 另外是SYNC_FLUSH(同步刷盘)
listenPort Broker监听的端口号
mapedFileSizeCommitLog 单个conmmitlog文件大小,默认是1GB
mapedFileSizeConsumeQueue ConsumeQueue每个文件默认存30W条,可以根据项目调整
storePathRootDir 存储消息以及一些配置信息的根目录 默认为用户的 ${HOME}/store
storePathCommitLog commitlog存储目录默认为${storePathRootDir}/commitlog
storePathIndex 消息索引存储路径
syncFlushTimeout 同步刷盘超时时间
diskMaxUsedSpaceRatio 检测可用的磁盘空间大小,超过后会写入报错

核心知识

消息发送状态

status desc
FLUSH_DISK_TIMEOUT 没有在规定时间内完成刷盘 (刷盘策略需要为SYNC_FLUSH 才会出这个错误)
FLUSH_SLAVE_TIMEOUT 主从模式下,broker是SYNC_MASTER, 没有在规定时间内完成主从同步
SLAVE_NOT_AVAILABLE 主从模式下,broker是SYNC_MASTER, 但是没有找到被配置成Slave的Broker
SEND_OK 发送成功,没有发生上面的三种问题
// 源码
package org.apache.rocketmq.client.producer;
public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}

消息发送模式

发送方式 发送 TPS 发送结果反馈 可靠性
同步发送 不丢失
异步发送 不丢失
单向发送 最快 可能丢失
  • 同步发送

    场景:重要通知邮件、报名短信通知、营销短信系统等

    官网:http://rocketmq.apache.org/docs/simple-example/

    样例:

        public String sendPayMsg(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
            // 同步发送
            DefaultMQProducer mqProducer = payProducer.getProducer();
            Message msg = new Message(jmsConfig.PAY_TOPIC, "taga", "pay_key", ("hello rocketMq " + text).getBytes());
            SendResult sendResult = mqProducer.send(msg);
    
            System.out.println(sendResult);
            System.out.println(mqProducer.toString());
            return "Success !!!";
        }
    
  • 异步发送

    场景:对RT时间敏感,可以支持更高的并发,回调成功触发相对应的业务,比如注册成功后通知积分系统发放优惠券

    官网:http://rocketmq.apache.org/docs/simple-example/

    样例:

    public String sendPayMsg(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
            // 异步发送
            DefaultMQProducer mqProducer = payProducer.getProducer();
            Message msg = new Message(jmsConfig.PAY_TOPIC, "taga", "pay_key", ("hello rocketMq " + text).getBytes());
            mqProducer.send(msg, new SendCallback() {
                // 成功回调
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }
    
                // 异常处理
                @Override
                public void onException(Throwable e) {
                    // 人为补偿机制
                    e.printStackTrace();
                }
            });
            return "Success !!!";
        }
    
  • 单向发送(OneWay)

    场景:主要是日志收集,适用于某些耗时非常短,但对可靠性要求并不高的场景, 也就是LogServer, 只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答

    官网:https://rocketmq.apache.org/docs/simple-example/

    样例:

    public String sendPayMsg(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
            // 单向发送
            DefaultMQProducer mqProducer = payProducer.getProducer();
            Message msg = new Message(jmsConfig.PAY_TOPIC, "taga", "pay_key", ("hello rocketMq " + text).getBytes());
            mqProducer.sendOneway(msg);
            return "Success !!!";
        }
    

消息重试及处理

  • 生产者Producer重试(异步单向下配置无效)

    • 消息重投(保证数据的高可靠性),本身内部支持重试,默认次数是2,
    • 如果网络情况比较差,或者跨集群则建改多几次
    public PayProducer() {
        this.producer = new DefaultMQProducer(this.payProducerGroup);
        // 也可以在发送具体消息时设置者重试次数
        // DefaultMQProducer mqProducer = payProducer.getProducer();
        // mqProducer.setRetryTimesWhenSendFailed(3);
        this.producer.setRetryTimesWhenSendFailed(3);
        this.producer.setNamesrvAddr(jmsConfig.NAME_SERVER);
    
        this.start();
    }
    
  • 消费端重试

    • 原因:消息处理异常、broker端到consumer端各种问题,如网络原因闪断,消费处理失败,ACK返回失败等等问题

    • 注意:

      • 重试间隔时间配置 ,默认每条消息最多重试 16 次

        messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        
      • 超过重试次数人工补偿

      • 消费端去重

      • 消费重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息,

      • 一条消息无论重试多少次,这些重试消息的 Message ID,key 不会改变。

    @Component
    public class PayConsumer {
        private String payConsumerGroup = "pay_consumer_group";
        private DefaultMQPushConsumer consumer;
      
          public PayConsumer() throws MQClientException {
            consumer = new DefaultMQPushConsumer(payConsumerGroup);
            consumer.setNamesrvAddr(jmsConfig.NAME_SERVER);
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            consumer.subscribe(jmsConfig.PAY_TOPIC, "*");
    
            // 默认是集群模式(MessageModel.CLUSTERING),广播模式(MessageModel.BROADCASTING)不支持消息重试机制
            // consumer.setMessageModel(MessageModel.BROADCASTING);
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    int times = 0;
                    try {
                        MessageExt msg = msgs.get(0);
                        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
                        String topic = msg.getTopic();
                        String body = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        String keys = msg.getKeys();
                        // 重试次数
                        times = msg.getReconsumeTimes();
                        System.out.println("重试次数= " + times);
    
                        // 测试消费端消息重试机制
                        if(keys.equals("pay_key")) {
                            throw new Exception();
                        }
    
                        System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    } catch (Exception e) {
                        if(times >= 2) {
                            System.out.println("消息重试测试大于2次,记录数据库");
                            // 通知broker,消息消费成功
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            });
    
            consumer.start();
            System.out.println("consumer start ...");
          }
    }
    

延迟消息

Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息,目前支持固定精度的消息

源代码:rocketmq-store > MessageStoreConfig.java 属性 messageDelayLevel

"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

使用场景:通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息

public String sendPayMsg(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        // 同步发送
        DefaultMQProducer mqProducer = payProducer.getProducer();
        Message msg = new Message(jmsConfig.PAY_TOPIC, "taga", "pay_key", ("hello rocketMq " + text).getBytes());
        // 设置消息延迟级别,从1开始
        msg.setDelayTimeLevel(2);
        SendResult sendResult = mqProducer.send(msg);
        System.out.println(sendResult);
        System.out.println(mqProducer.toString());
        return "Success !!!";
}

顺序消息

  • 顺序消息:消息的生产和消费顺序一致

    • 全局顺序:topic下面全部消息都要有序(少用)

      性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景,并行度成为消息系统的瓶颈, 吞吐量不够

    • 局部顺序:只要保证一组消息被顺序消费即可(RocketMQ使用)

      性能要求高 - 电商的订单创建,同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息、订单交易成功消息 都会按照先后顺序来发布和消费

  • 顺序发布:对于指定的一个 Topic,客户端将按照一定的先后顺序发送消息

  • 顺序消费:对于指定的一个 Topic,按照一定的先后顺序接收消息,即先发送的消息一定会先被客户端接收到。

  • 注意:

    • 顺序消息暂不支持广播模式
    • 顺序消息不支持异步发送方式,否则将无法严格保证顺序

生产者-使用MessageQueueSelector将消息发送到topic下的指定queue

DefaultMQProducer mqProducer = payProducer.getProducer();
Message msg = new Message(jmsConfig.PAY_TOPIC, "taga", "pay_key", ("hello rocketMq " + text).getBytes());
SendResult sendResult = mqProducer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        // 如果arg是订单号是字符串,则进行hash,得到一个hash值
        Long id = (Long) arg;
        long index = id % mqs.size();
        return mqs.get((int)index);
    }
}, 0);

消费者-使用MessageListenerOrderly监听消息,自带单线程消费消息

consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                MessageExt msg = list.get(0);
                try {
                    int queueId = msg.getQueueId();
                    String msgBody = new String(msg.getBody(), "UTF-8");
                    String msgId = msg.getMsgId();
                    
                    System.out.printf("consumer queueId=%s --- msgBody=%s --- msgId=%s \r\n", queueId, msgBody, msgId);
                    return ConsumeOrderlyStatus.SUCCESS;
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
        });
  • MessageQueueSelector

    生产消息使用MessageQueueSelector投递到Topic下指定的queue

    • 应用场景:顺序消息,分摊负载
    • 默认topic下的queue数量是4,可以配置
    • 支持同步,异步发送指定的MessageQueue
    • 选择的queue数量必须小于配置的,否则会出错
public String sendPayMsg(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        // MessageQueueSelector发送到topic下的指定queue
        DefaultMQProducer mqProducer = payProducer.getProducer();
        Message msg = new Message(jmsConfig.PAY_TOPIC, "taga", "pay_key", ("hello rocketMq " + text).getBytes());
        SendResult sendResult = mqProducer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer queueNum = (Integer) arg;
                return mqs.get(queueNum);
            }
        }, 0);
        System.out.println(sendResult);
        return "Success !!!";
    }
  • MessageListenerOrderly
    • Consumer会平均分配queue的数量
    • 并不是简单禁止并发处理,而是为每个Consumer Quene加个锁,消费每个消息前,需要获得这个消息所在的Queue的锁,这样同个时间,同个Queue的消息不被并发消费,但是不同Queue的消息可以并发处理

消费者核心配置及核心知识

核心配置

  • consumeFromWhere

    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    
    • CONSUME_FROM_FIRST_OFFSET:初次从消息队列头部开始消费,即历史消息(还储存在broker的)全部消费一遍,后续再启动接着上次消费的进度开始消费
    • CONSUME_FROM_LAST_OFFSET: 默认策略,初次从该队列最尾开始消费,即跳过历史消息,后续再启动接着上次消费的进度开始消费
    • CONSUME_FROM_TIMESTAMP(极少用) : 从某个时间点开始消费,默认是半个小时以前,后续再启动接着上次消费的进度开始消费
  • allocateMessageQueueStrategy

    • 负载均衡策略算法,即消费者分配到queue的算法,默认值是AllocateMessageQueueAveragely即取模平均分配
  • offsetStore:消息消费进度存储器 offsetStore 有两个策略:

    • LocalFileOffsetStore 和 RemoteBrokerOffsetStor 广播模式默认使用LocalFileOffsetStore 集群模式默认使用RemoteBrokerOffsetStore
  • consumeThreadMin 最小消费线程池数量

  • consumeThreadMax 最大消费线程池数量

  • pullBatchSize: 消费者去broker拉取消息时,一次拉取多少条。可选配置

  • consumeMessageBatchMaxSize: 单次消费时一次性消费多少条消息,批量消费接口才有用,可选配置

  • messageModel : 消费者消费模式,集群模式CLUSTERING(默认) ,广播模式BROADCASTING

    consumer.setMessageModel(MessageModel.BROADCASTING);
    

核心知识

集群和广播模式

  • 集群模式(默认):
    • Consumer实例平均分摊消费生产者发送的消息
    • 例子:订单消息,一般是只被消费一次
  • 广播模式:
    • 广播模式下消费消息:投递到Broker的消息会被每个Consumer进行消费,一条消息被多个Consumer消费,广播消费中ConsumerGroup暂时无用
    • 例子:群公告,每个人都需要消费这个消息
  • queue与消费者
    • 如果是4个队列,8个消息,4个节点则会各消费2条,如果不对等,则负载均衡会分配不均,
    • 如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用,所以需要控制让queue的总数量大于等于consumer的数量

tag标签

一个Message只有一个Tag,tag是二级分类, 用于消息过滤

  • tag过滤位置

    • Broker端过滤,减少了无用的消息的进行网络传输,增加了broker的负担
    • Consumer端过滤,完全可以根据业务需求进行实习,但是增加了很多无用的消息传输
  • tag过滤原理

    • Broker:遍历message queue存储的 message tag和 订阅传递的tag 的hashcode做对比,符合的则传输给Consumer,不一样则跳过,在consume queue :(message queue在CommitLog的标记)存储的是对应的hashcode, 对比也是通过hashcode对比;
    • Consumer:收到过滤消息后也会进行匹配操作,但是是对比真实的message tag而不是hashcode;
    • 总结:
      • consume queue存储使用hashcode定长,节约空间
      • 过滤中不访问commit log,可以高效过滤
      • 如果存在hash冲突,Consumer端可以进行再次确认
  • tag过滤表达式

    一般是监听 * ,或者指定 tag,|| 运算 , SLQ92 , FilterServer等;

    • tag性能高,逻辑简单

      consumer.subscribe(jmsConfig.ORDER_TOPIC, "order_create || order_finished");
      
    • SQL92 性能差点,支持复杂逻辑(只支持PushConsumer中使用),如果使用SQL92语法,配置文件broker.conf中添加配置:enablePropertyFilter=true

      • 语法:> , < = ,IS NULL, AND, OR, NOT 等,sql where后续的语法即可(大部分)
      // 发送端
      Message msg = new Message(jmsConfig.ORDER_TOPIC, "order_create", "pay_key", ("hello rocketMq " + text).getBytes());
              msg.putUserProperty("amount", "60");
      
      // 消费者
      consumer.subscribe(jmsConfig.ORDER_TOPIC, MessageSelector.bySql("amount > 50"));
      
  • 注意事项:

    • 订阅关系一致:订阅关系由 Topic和 Tag 组成,同一个 group name,订阅的 topic和tag 必须是一样;如果订阅关系要一致,会造成消费混乱,甚至会造成消息丢失
    • 如果想使用多个Tag,可以使用sql表达式,但是不建议
    • 建议:单一职责,多个队列

PushConsumer/PullConsumer

  • Push和Pull优缺点分析
    • Push:实时性高;但增加服务端负载,消费端能力不同,如果Push推送过快,消费端会出现很多问题
    • Pull:消费者从Server端拉取消息,主动权在消费者端,可控性好;但 间隔时间不好设置,间隔太短,则空请求,浪费资源;间隔时间太长,则消息不能及时处理
    • 长轮询: Client请求Server端也就是Broker的时候, Broker会保持当前连接一段时间 默认是15s,如果这段时间内有消息到达,则立刻返回给Consumer,没消息的话 超过15s,则返回空,再进行重新请求;主动权在Consumer中,Broker即使有大量的消息 也不会主动提送Consumer, 缺点:服务端需要保持Consumer的请求,会占用资源,需要客户端连接数可控 否则会一堆连接
  • PushConsumer本质是长轮训
    • 系统收到消息后自动处理消息和offset,如果有新的Consumer加入会自动做负载均衡,
    • 在broker端可以通过longPollingEnable=true来开启长轮询
    • 消费端代码:DefaultMQPushConsumerImpl->pullMessage->PullCallback
    • 服务端代码:broker.longpolling
    • 虽然是push,但是代码里面大量使用了pull,是因为使用长轮训方式达到Push效果,既有pull有的,又有Push的实时性
    • 优雅关闭:主要是释放资源和保存Offset, 调用shutdown()即可 ,参考 @PostConstruct、@PreDestroy
  • PullConsumer需要自己维护Offset(参考官方例子)
    • 官方例子路径:org.apache.rocketmq.example.simple.PullConsumer
    • 获取MessageQueue遍历
    • 客户维护Offset,需用用户本地存储Offset,存储内存、磁盘、数据库等
    • 处理不同状态的消息 FOUND、NO_NEW_MSG、OFFSET_ILLRGL、NO_MATCHED_MSG、4种状态
    • 灵活性高可控性强,但是编码复杂度会高
    • 优雅关闭:主要是释放资源和保存Offset,需用程序自己保存好Offset,特别是异常处理的时候

Offset

消息偏移量

  • offset
    • message queue是无限长的数组,一条消息进来下标就会涨1,下标就是offset,消息在某个Message Queue里的位置,通过offset的值可以定位到这条消息,或者指示Consumer从这条消息开始向后处理;
    • message queue中的maxOffset表示消息的最大offset, maxOffset并不是最新的那条消息的offset,而是最新消息的offset+1,minOffset则是现存的最小offset。
    • fileReserveTime=48 (配置文件中)默认消息存储48小时后,消费会被物理地从磁盘删除,message queue的min offset也就对应增长;所以比minOffset还要小的那些消息已经不在broker上了,就无法被消费;
  • 类型(父类是OffsetStore)
    • 本地文件类型:DefaultMQPushConsumer的BROADCASTING模式,各个Consumer没有互相干扰,使用LoclaFileOffsetStore,把Offset存储在本地;
    • Broker代存储类型:DefaultMQPushConsumer的CLUSTERING模式,由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore
  • 作用:主要是记录消息的偏移量,以供多个消费者进行准确消费
  • 建议:采用pushConsumer,RocketMQ自动维护OffsetStore;如果使用pullConsumer,需要自己进行维护OffsetStore

CommitLog

RocketMq中消息存储是由ConsumeQueue和CommitLog配合完成

  • ConsumeQueue: 是逻辑队列, CommitLog是真正存储消息文件的,ConsumeQueue存储的是queue指向物理存储的地址;Topic下的每个message queue都有对应的ConsumeQueue文件,内容也会被持久化到磁盘;

    默认地址:store/consumequeue/{topicName}/{queueid}/fileName

  • CommitLog:

    CommitLog是消息文件的存储地址

    • 生成规则:每个文件的默认1G =1024 * 1024 * 1024,commitlog的文件名fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1 073 741 824Byte;当这个文件满了,第二个文件名字为00000000001073741824,起始偏移量为1073741824, 消息存储的时候会顺序写入文件,当文件满了则写入下一个文件
    • 定位消息位置:例如 1073742827 为物理偏移量,则其对应的相对偏移量为 1003 = 1073742827 - 1073741824,并且该偏移量位于第二个 CommitLog。

ZeroCopy

零拷贝

  • 请求kernel直接把disk的data传输给socket,而不是通过应用程序传输。Zero copy大大提高了应用程序的性能,减少不必要的内核缓冲区跟用户缓冲区间的拷贝,从而减少CPU的开销和减少了kernel和user模式的上下文切换,达到性能的提升

  • 对应零拷贝技术有mmap及sendfile

    • mmap:小文件传输快(RocketMQ 选择这种方式,mmap+write 方式,小块数据传输,效果会比 sendfile 更好)
    • sendfile:大文件传输比mmap快
  • 应用:Kafka、Netty、RocketMQ等都采用了零拷贝技术;Java中的TransferTo()实现了Zero-Copy

  • RocketMq高效原因分析

    • CommitLog顺序写, 存储了MessagBody、message key、tag等信息
    • ConsumeQueue随机读 + 操作系统的PageCache + 零拷贝技术ZeroCopy
  • 传统拷贝

copy01.png
  • ZeroCopy

    zeroCopy01.png

分布式事务消息

分布式事务

  • 来源:单体应用—>拆分为分布式应用;
  • 一个接口需要调用多个服务,且操作不同的数据库,数据一致性难保障;

RokcetMQ分布式事务消息

  • RocketMQ事务消息:

    • RocketMQ 提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致
  • 半消息Half Message:

    • 暂不能消费,Producer已经将消息成功发送到了Broker端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息
  • 消息回查:

    • 由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 RocketMQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。
  • 整体交互流程

    jiaohu.png
  • Producer向broker端发送消息。

  • 服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。

  • 发送方开始执行本地事务逻辑。

  • 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半消息,订阅方将不会接受该消息

  • 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查

  • 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果

  • 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半消息进行操作

  • RocketMQ事务消息的状态

    • COMMIT_MESSAGE: 提交事务消息,消费者可以消费此消息
    • ROLLBACK_MESSAGE:回滚事务消息,消息会在broker中删除,消费者不能消费
    • UNKNOW:Broker需要回查确认消息的状态
  • 关于事务消息的消费

    • 事务消息consumer端的消费方式和普通消息是一样的,RocketMQ能保证消息能被consumer收到(消息重试等机制,最后也存在consumer消费失败的情况,这种情况出现的概率极低)。
    • 注意点:TransactionMQProducer 的groupName要唯一,不能和普通的producer一样

执行本地事务-回查本地事务

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class TransactionListenerImpl implements TransactionListener {
    /**
     * 执行本地事务
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("----> executeLocalTransaction <----");
        String msgBody = new String(msg.getBody());
        String key = msg.getKeys();
        String transactionId = msg.getTransactionId();
        System.out.printf("----> executeLocalTransaction transactionId=%s, key=%s, msgBody=%s \r\n", transactionId, key, msgBody);

        // TODO 执行本地事务 begin
        // TODO 执行本地事务 end

        // 测试-二次确认消息
        int status = Integer.valueOf(arg.toString());
        // COMMIT_MESSAGE 消费者可以消费消息
        if(1 == status) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }

        // ROLLBACK_MESSAGE 消息回滚,broker会删除半消息
        if(2 == status) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }

        // UNKNOW broker会进行回查消息
        if(3 == status) {
            return LocalTransactionState.UNKNOW;
        }
        return null;
    }

    /**
     *  回查本地事务
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("----> checkLocalTransaction <----");
        String msgBody = new String(msg.getBody());
        String key = msg.getKeys();
        String transactionId = msg.getTransactionId();
        System.out.printf("----> checkLocalTransaction transactionId=%s, key=%s, msgBody=%s", transactionId, key, msgBody);
        // 要么提交,要么回滚,根据业务需求检查本地事务状态
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

事务消息生产者

import cn.net.rocketmq.config.jmsConfig;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;

@Component
public class TransactionProducer {
    private String transProducerGroup = "trans_producer_group";
    private TransactionMQProducer producer;
    private TransactionListener transactionListener;

    // 自定义线程池给定线程池名称
    //创建自定义线程池
    //@param corePoolSize    池中所保存的核心线程数
    //@param maximumPoolSize    池中允许的最大线程数
    //@param keepActiveTime    非核心线程空闲等待新任务的最长时间
    //@param timeunit    keepActiveTime参数的时间单位
    //@param blockingqueue    任务队列
    private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }
    });

    // 构造器初始化
    public TransactionProducer() {
        transactionListener = new TransactionListenerImpl();

        producer = new TransactionMQProducer(transProducerGroup);
        producer.setTransactionListener(transactionListener);
        producer.setExecutorService(executorService);
        producer.setNamesrvAddr(jmsConfig.NAME_SERVER);

        start();
    }

    // 获取TransactionMQProducer
    public TransactionMQProducer getProducer() {
        return this.producer;
    }

    // TransactionMQProducer开始入口
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    // TransactionMQProducer结束入口
    public void shutDown() {
        this.producer.shutdown();
    }
}

事务消息消费者

import cn.net.rocketmq.config.jmsConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.util.List;

@Component
public class TransactionConsumer {
    private String payConsumerGroup = "trans_consumer_group";
    private DefaultMQPushConsumer consumer;

    public TransactionConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(payConsumerGroup);
        consumer.setNamesrvAddr(jmsConfig.NAME_SERVER);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.subscribe(jmsConfig.TRANSACTION_TOPIC, "*");

        // 并行消费
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    MessageExt msg = msgs.get(0);
                    System.out.printf("----> TransactionConsumer %s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
                    String topic = msg.getTopic();
                    String body = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    String keys = msg.getKeys();
                    System.out.println("----> TransactionConsumer topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });
        consumer.start();
        System.out.println("TransactionConsumer consumer start ...");
    }
}

访问入口

import cn.net.rocketmq.config.jmsConfig;
import cn.net.rocketmq.jms.TransactionProducer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;

@RestController
public class TransactionProducerController {

    @Resource
    private TransactionProducer transactionProducer;

    @GetMapping("/sendTransMsg")
    public String sendTransMsg(String text, String otherParam) throws MQClientException {
        Message msg = new Message(jmsConfig.TRANSACTION_TOPIC, "trans_tags", "trans_keys", text.getBytes());
        TransactionSendResult sendResult = transactionProducer.getProducer().sendMessageInTransaction(msg, otherParam);
        System.out.println("sendResult=" + sendResult);
        return "Success !!!";
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容