《RocketMQ实战与原理解析》——杨开元
有些场景,需要应用程序处理几种类型的消息,不同消息的优先级不同。RocketMQ是个先入先出的队列,不支持消息级别或者Topic级别的优先级。业务中简单的优先级需求,可以通过间接的方式解决,下面列举三种优先级相关需求的具体处理方法。
第一种是比较简单的情况,如果当前Topic里有多种相似类型的消息,比如类型AA、AB、AC,当AB、AC的消息量很大,但是处理速度比较慢的时候,队列里会有很多AB、AC类型的消息在等候处理,这个时候如果有少量AA类型的消息加入,就会排在AB、AC类型消息后面,需要等候很长时间才能被处理。
如果业务需要AA类型的消息被及时处理,可以把这三种相似类型的消息分拆到两个Topic里,比如AA类型的消息在一个单独的Topic,AB、AC类型的消息在另外一个Topic。把消息分到两个Topic中以后,应用程序创建两个Consumer,分别订阅不同的Topic,这样消息AA在单独的Topic里,不会因为AB、AC类型的消息太多而被长时间延时处理。
第二种情况和第一种情况类似,但是不用创建大量的Topic。举个实际应用场景:一个订单处理系统,接收从100家快递门店过来的请求,把这些请求通过Producer写入RocketMQ;订单处理程序通过Consumer从队列里读取消息并处理,每天最多处理1万单。如果这100个快递门店中某几个门店订单量大增,比如门店一接了个大客户,一个上午就发出2万单消息请求,这样其他的99家门店可能被迫等待门店一的2万单处理完,也就是两天后订单才能被处理,显然很不公平。
这时可以创建一个Topic,设置Topic的MessageQueue数量超过100个,Producer根据订单的门店号,把每个门店的订单写入一个MessageQueue。DefaultMQPushConsumer默认是采用循环的方式逐个读取一个Topic的所有MessageQueue,这样如果某家门店订单量大增,这家门店对应的MessageQueue消息数增多,等待时间增长,但不会造成其他家门店等待时间增长。
DefaultMQPushConsumer默认的pullBatchSize是32,也就是每次从某个MessageQueue读取消息的时候,最多可以读32个。在上面的场景中,为了更加公平,可以把pullBatchSize设置成1。
第三种情况是强优先级需求,上两种情况对消息的“优先级”要求不高,更像一个保证公平处理的机制,避免某类消息的增多阻塞其他类型的消息。现在有一个应用程序同时处理TypeA、TypeB、TypeC三类消息。TypeA处于第一优先级,要确保只要有TypeA消息,必须优先处理;TypeB处于第二优先级;TypeC处于第三优先级。对这种要求,或者逻辑更复杂的要求,就要用户自己编码实现优先级控制,如果上述的三类消息在一个Topic里,可以使用PullConsumer,自主控制MessageQueue的遍历,以及消息的读取;如果上述三类消息在三个Topic下,需要启动三个Consumer,实现逻辑控制三个Consumer的消费。