本章的重点是可靠性,解决如何让消息队列满足业务逻辑需求,同时稳定、可靠地长期运行。
6.1 顺序消息
顺序消息是指消息的消费顺序和产生顺序相同,在有些业务逻辑下,必须保证顺序。比如订单的生成、付款、发货,这3个消息必须按顺序处理才行。顺序消息分为全局顺序消息和部分顺序消息,全局顺序消息指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可,比如上面订单消息的例子,只要保证同一个订单ID 的三个消息能按顺序消费即可。
6.1.1 全局顺序消息
RocketMQ 在默认情况下不保证顺序,比如创建一个Topic,默认八个写队列,八个读队列。这时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个Consumer,每个Consumer 也可能启动多个线程并行处理,所以消息被哪个Consumer 消费,被消费的顺序和写人的顺序是否一致是不确定的。
要保证全局顺序消息, 需要先把Topic 的读写队列数设置为一,然后Producer 和Consumer 的并发设置也要是一。简单来说,为了保证整个Topic 的全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理。这时高并发、高吞吐量的功能完全用不上了。
在实际应用中,更多的是像订单类消息那样,只需要部分有序即可。在这种情况下,我们经过合适的配置,依然可以利用RocketMQ 高并发、高吞吐量的能力。
6.1.2 部分顺序消息
要保证部分消息有序,需要发送端和消费端配合处理。在发送端,要做到把同一业务ID 的消息发送到同一个Message Queue ;在消费过程中,要做到从同一个Message Queue 读取的消息不被并发处理,这样才能达到部分有序。
发送端使用MessageQueueSelector 类来控制把消息发往哪个MessageQueue。消费端通过使用MessageListenerOrderly 类来解决单Message Queue 的消息被并发处理的问题。
6.2 消息重复问题
对分布式消息队列来说,同时做到确保一定投递和不重复投递是很难的,也就是所谓的“有且仅有一次” 。在鱼和熊掌不可兼得的情况下, RocketMQ 选择了确保一定投递,保证消息不丢失,但有可能造成消息重复。
消息重复一般情况下不会发生,但是如果消息量大,网络有波动,消息重复就是个大概率事件。比如Producer 有个函数setRetryTimesWhenSendFailed,设置在同步方式下自动重试的次数,默认值是2 ,这样当第一次发送消息时,Broker 端接收到了消息但是没有正确返回发送成功的状态,就造成了消息重复。
解决消息重复有两种方法:第一种方法是保证消费逻辑的幂等性(多次调用和一次调用效果相同);另一种方法是维护一个已消费消息的记录,消费前查询这个消息是否被消费过。这两种方法都需要使用者自己实现。
6.3 动态增减机器
一个消息队列集群由多台机器组成,持续稳定地提供服务,因为业务需求或硬件故障,经常需要增加或减少各个角色的机器,本节介绍如何在不影响服务稳定性的情况下动态地增减机器。
6.3.1 动态增减NameServer
NameServer 是RocketMQ 集群的协调者,集群的各个组件是通过NameServer 获取各种属性和地址信息的。主要功能包括两部分:一个是各个Broker 定期上报自己的状态信息到NameServer ;另一个是各个客户端,包括Producer 、Consumer ,以及命令行工具,通过NameServer 获取最新的状态信息。所以,在启动Broker 、生产者和消费者之前,必须告诉它们NameServer的地址,为了提高可靠性,建议启动多个NameServer 。NameServer 占用资源不多,可以和Broker 部署在同一台机器。有多个NameServer 后,减少某个NameServer 不会对其他组件产生影响。
有四种种方式可设置NameServer 的地址, ’ 下面按优先级由高到低依次介绍:
- 通过代码设置,比如在Producer 中,通过Producer.setNamesrv Addr(”name-server1-ip:port;name-server2-ip:port ”)来设置。在mqadmin 命令行工具中,是通过-n name-server-ip1:port;name-server- ip2:port 参数来设置的。如果自定义了命令行工具,也可以通过defaultMQAdminExt.setNamesrvAddr ("name-server-ip1:port;name-server- ip2:port")来设置。
- 使用Java 启动参数设置,对应的option 是rocketmq.namesrv.addr 。
- 通过Linux 环境变量设置,在启动前设置变量: NAMESRV_ADDR 。
- 通过HTTP 服务来设置,当上述方法都没有使用,程序会向一个HTTP地址发送请求来获取NameServer 地址,默认的URL 是http://jmenv.tbsite.net:8080/rocketmq/nsaddr(淘宝的测试地址),通过rocketmq.namesrv.domain 参数来覆盖jmenv.tbsite.net;通过rocketmq.namesrv.domain.subgroup 参数来覆盖nsaddr 。
第4 种方式看似繁琐,但它是唯一支持动态增加NameServer ,无须重启其他组件的方式。使用这种方式后其他组件会每隔2 分钟请求一次该URL ,获取最新的NameServer 地址。
6.3 .2 动态、增减Broker
由于业务增长,需要对集群进行扩容的时候,可以动态增加Broker 角色的机器。只增加Broker 不会对原有的Topic 产生影响,原来创建好的Topic中数据的读写依然在原来的那些Broker 上进行。
集群扩容后, 一是可以把新建的Topic 指定到新的Broker 机器上,均衡利用资源;另一种方式是通过updateTopic 命令更改现有的Topic 配置,在新加的Broker 上创建新的队列。比如TestTopic 是现有的一个Topic ,因为数据量增大需要扩容,新增的一个Broker 机器地址是192.168.0.1:10911 ,这个时候执行下面的命令: sh ./bin/mqadmin updateTopic -b 192.168.0.1:10911 -t TestTopic -n 192.168.0.100:9876 ,结果是在新增的Broker 机器上,为TestTopic 新创建了8个读写队列。
如果因为业务变动或者置换机器需要减少Broker ,此时该如何操作呢?减少Broker 要看是否有持续运行的Producer ,当一个Topic 只有一个MasterBroker ,停掉这个Broker 后,消息的发送肯定会受到影响,需要在停止这个Broker 前,停止发送消息。
当某个Topic 有多个Master Broker ,停了其中一个,这时候是否会丢失消息呢?答案和Producer 使用的发送消息的方式有关,如果使用同步方式send ( msg )发送,在DefaultMQ Producer 内部有个自动重试逻辑,其中一个Broker 停了,会自动向另一个Broker 发消息,不会发生丢消息现象。如果使用异步方式发送send (msg, callback ),或者用sendOneWay 方式,会丢失切换过程中的消息。因为在异步和sendOneWay 这两种发送方式下,Producer.setRetryTimesWhensendFailed 设置不起作用,发送失败不会重试。DefaultMQProducer 默认每30 秒到NameServer 请求最新的路由消息, Producer如果获取不到已停止的Broker 下的队列信息,后续就自动不再向这些队列发送消息。
如果Producer 程序能够暂停,在有一个Master 和一个Slave 的情况下也可以顺利切换。可以关闭Producer 后关闭Master Broker ,这个时候所有的读取都会被定向到Slave 机器,消费消息不受影响。把Master Broker 机器置换完后,基于原来的数据启动这个Master Broker ,然后再启动Producer 程序正常发送消息。
用Linux 的kill pid 命令就可以正确地关闭Broker, BrokerController 下有个shutdown 函数,这个函数被加到了ShutdownHook 里,当用Linux 的kill 命令时(不能用kill -9 ), shutdown 函数会先被执行。也可以通过RocketMQ 提供的工具(mqshutdown broker)来关闭Broker ,它们的原理是一样的。
6.4 各种故障对消息的影响
我们期望消息队列集群一直可靠稳定地运行,但有时候故障是难免的,本节我们列出可能的故障情况,看看如何处理:
- Broker 正常关闭,启动;
- Broker 异常Crash ,然后启动;
- OS Crash ,重启;
- 机器断电,但能马上恢复供电;
- 磁盘损坏;
- CPU 、主板、内存等关键设备损坏。
假设现有的RocketMQ 集群,每个Topic 都配有多Master 角色的Broker 供写人,并且每个Master 都至少有一个Slave 机器(用两台物理机就可以实现上述配置),我们来看看在上述情况下消息的可靠性情况。
第1 种情况属于可控的软件问题,内存中的数据不会丢失。如果重启过程中有持续运行的Consumer, Master 机器出故障后, Consumer 会自动重连到对应的Slave 机器,不会有消息丢失和偏差。当Master 角色的机器重启以后, Consumer 又会重新连接到Master 机器( 注意在启动Master 机器的时候,如果Consumer 正在从Slave 消费消息,不要停止Consumer 。假如此时先停止Consumer 后再启动Master 机器,然后再启动Consumer ,这个时候Consumer就会去读Master 机器上已经滞后的offset 值,造成消息大量重复) 。
如果第1 种情况出现时有持续运行的Producer , 一台Master 出故障后,Producer 只能向Topic 下其他的Master 机器发送消息,如果Producer 采用同步发送方式,不会有消息丢失。
第2、3 、4 种情况属于软件故障,内存的数据可能丢失,所以刷盘策略不同,造成的影响也不同,如果Master 、Slave 都配置成SYNC_FLUSH ,可以达到和第1种情况相同的效果。
第5 、6 种情况属于硬件故障,发生第5 、6 种情况的故障,原有机器的磁盘数据可能会丢失。如果Master 和Slave 机器间配置成同步复制方式,某一台机器发生5 或6 的故障,也可以达到消息不丢失的效果。如果Master 和Slave机器间是异步复制,两次Sync间的消息会丢失。
总的来说,当设置成:
- 多Master ,每个Master 带有Slave
- 主从之间设置成SYNC_MASTER;
- Producer 用同步方式写;
- 刷盘策略设置成SYNC FLUSH 。
就可以消除单点依赖,即使某台机器出现极端故障也不会丢消息。
6.5 消息优先级
有些场景,需要应用程序处理几种类型的消息,不同消息的优先级不同。RocketMQ 是个先入先出的队列,不支持消息级别或者Topic 级别的优先级。业务中简单的优先级需求,可以通过间接的方式解决,下面列举三种优先级相关需求的具体处理方法。
第一种是比较简单的情况,如果当前Topic 里有多种相似类型的消息,比如类型AA 、AB 、AC ,当AB 、AC 的消息量很大,但是处理速度比较慢的时候,队列里会有很多AB 、AC 类型的消息在等候处理,这个时候如果有少量AA 类型的消息加人,就会排在AB 、AC 类型消息后面,需要等候很长时间才能被处理。
如果业务需要AA 类型的消息被及时处理,可以把这三种相似类型的消息分拆到两个Topic 里,比如AA 类型的消息在一个单独的Topic, AB 、AC 类型的消息在另外一个Topic 。把消息分到两个Topic 中以后,应用程序创建两个Consumer ,分别订阅不同的Topic ,这样消息AA 在单独的Topic 里,不会因为AB 、AC 类型的消息太多而被长时间延时处理。
第二种情况和第一种情况类似,但是不用创建大量的Topic 。举个实际应用场景:一个订单处理系统,接收从100 家快递门店过来的请求,把这些请求通过Producer 写人RocketMQ ;订单处理程序通过Consumer 从队列里读取消息并处理,每天最多处理1 万单。如果这100 个快递门店中某几个门店订单量大增,比如门店一接了个大客户,一个上午就发出2 万单消息请求,这样其他的99 家门店可能被迫等待门店一的2 万单处理完,也就是两天后订单才能被处理,显然很不公平。
这时可以创建一个Topic , 设置Topic 的MessageQueue 数量超过100 个,Producer 根据订单的门店号,把每个门店的订单写人一个MessageQueue 。DefaultMQPushConsumer 默认是采用循环的方式逐个读取一个Topic 的所有MessageQueue ,这样如果某家门店订单量大增,这家门店对应的MessageQueue 消息数增多,等待时间增长,但不会造成其他家门店等待时间增长。
DefaultMQPushConsumer 默认的pullBatchSize 是32 ,也就是每次从某个MessageQueue 读取消息的时候,最多可以读32 个。在上面的场景中,为了更加公平,可以把pullBatchSize 设置成1。
第三种情况是强优先级需求,上两种情况对消息的“优先级”要求不高,更像一个保证公平处理的机制,避免某类消息的增多阻塞其他类型的消息。现在有一个应用程序同时处理TypeA 、TypeB 、TypeC 三类消息。TypeA 处于第一优先级,要确保只要有TypeA 消息,必须优先处理; TypeB 处于第二优先级; TypeC 处于第三优先级。对这种要求,或者逻辑更复杂的要求,就要用户自己编码实现优先级控制,如果上述的三类消息在一个Topic 里,可以使用PullConsumer ,自主控制MessageQueue 的遍历,以及消息的读取;如果上述三类消息在三个Topic 下,需要启动三个Consumer , 实现逻辑控制三个Consumer 的消费。
6.6 本章小结
本章根据使用场景,讨论如何“可靠”地收发消息。即在要求消息顺序的场景下,如何既能并发执行,又能保证消息顺序;然后分析在可能的故障场景下,如何应对以保证不丢消息、不中断服务。RocketMQ 在设计上,有重试机制来保证消息不丢,造成的结果是可能存在消息重复,这一点需要用户根据具体业务场景来处理。下一章将讨论处理大数据量消息的方法。