RocketMq消息监听程序消除大量的if..else
承接上一篇文章,如果消费端订阅了多个topic和tag,则需要在消息监听器类中添加if..else,根据topic和tag处理不同的业务逻辑,使得消息监听类职责过重。
大概思路
消息监听器类只负责监听消息,获取到消息后通过topic和tag路由到需要调用的服务,消费者只需要编写对应的topic和tag的服务。
为了监听器类可以通过topic和tag路由到需要调用的服务,自定义一个消费者注解MQConsumeService(该注解包含topic和tag的定义),消费者实现类上添加该注解,然后就可以在监听器类中通过反射获取到实现类上面注解的topic和tag,和监听到的topic和tag比较,如果相同则调用该服务。
定义MQConsumeService注解
package com.clouds.common.rocketmq.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.stereotype.Service;
import com.clouds.common.rocketmq.TopicEnum;
/**
* 此注解用于标注消费者服务
* .<br/>
*
* Copyright: Copyright (c) 2017 zteits
*
* @ClassName: MQConsumeService
* @Description:
* @version: v1.0.0
* @author: zhaowg
* @date: 2018年3月2日 下午1:15:52
* Modification History:
* Date Author Version Description
*---------------------------------------------------------*
* 2018年3月2日 zhaowg v1.0.0 创建
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Service
public @interface MQConsumeService {
/**
* 消息主题
*/
TopicEnum topic();
/**
* 消息标签,如果是该主题下所有的标签,使用“*”
*/
String[] tags();
}
定义消费者返回消息Bean
package com.clouds.common.rocketmq.consumer.processor;
import java.io.Serializable;
/**
* 消费结果
* .<br/>
*
* Copyright: Copyright (c) 2017 zteits
*
* @ClassName: MQConsumeResult
* @Description:
* @version: v1.0.0
* @author: zhaowg
* @date: 2018年3月1日 上午11:12:55
* Modification History:
* Date Author Version Description
*---------------------------------------------------------*
* 2018年3月1日 zhaowg v1.0.0 创建
*/
public class MQConsumeResult implements Serializable{
private static final long serialVersionUID = 1L;
/**
* 是否处理成功
*/
private boolean isSuccess;
/**
* 如果处理失败,是否允许消息队列继续调用,直到处理成功,默认true
*/
private boolean isReconsumeLater = true;
/**
* 是否需要记录消费日志,默认不记录
*/
private boolean isSaveConsumeLog = false;
/**
* 错误Code
*/
private String errCode;
/**
* 错误消息
*/
private String errMsg;
/**
* 错误堆栈
*/
private Throwable e;
//省略set和get方法
}
定义统一的消息处理接口
package com.clouds.common.rocketmq.consumer.processor;
import java.util.List;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* 消息队列-消息消费处理接口
* .<br/>
*
* Copyright: Copyright (c) 2017 zteits
*
* @ClassName: MQMsgProcessorService
* @Description:
* @version: v1.0.0
* @author: zhaowg
* @date: 2018年3月1日 上午9:57:57
* Modification History:
* Date Author Version Description
*---------------------------------------------------------*
* 2018年3月1日 zhaowg v1.0.0 创建
*/
public interface MQMsgProcessor {
/**
* 消息处理<br/>
* 如果没有return true ,consumer会重新消费该消息,直到return true<br/>
* consumer可能重复消费该消息,请在业务端自己做是否重复调用处理,该接口设计为幂等接口
* @param topic 消息主题
* @param tag 消息标签
* @param msgs 消息
* @return
* 2018年3月1日 zhaowg
*/
MQConsumeResult handle(String topic, String tag, List<MessageExt> msgs);
}
定义抽象类实现上面的MQMsgProcessor接口
package com.clouds.common.rocketmq.consumer.processor;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.rocketmq.common.message.MessageConst;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* 所有消息处理继承该类
* .<br/>
*
* Copyright: Copyright (c) 2017 zteits
*
* @ClassName: AbstractMQMsgProcessorService
* @Description:
* @version: v1.0.0
* @author: zhaowg
* @date: 2018年3月1日 上午11:14:25
* Modification History:
* Date Author Version Description
*---------------------------------------------------------*
* 2018年3月1日 zhaowg v1.0.0 创建
*/
public abstract class AbstractMQMsgProcessor implements MQMsgProcessor{
protected static final Logger logger = LoggerFactory.getLogger(AbstractMQMsgProcessor.class);
@Override
public MQConsumeResult handle(String topic, String tag, List<MessageExt> msgs) {
MQConsumeResult mqConsumeResult = new MQConsumeResult();
/**可以增加一些其他逻辑*/
for (MessageExt messageExt : msgs) {
//消费具体的消息,抛出钩子供真正消费该消息的服务调用
mqConsumeResult = this.consumeMessage(tag,messageExt.getKeys()==null?null:Arrays.asList(messageExt.getKeys().split(MessageConst.KEY_SEPARATOR)),messageExt);
}
/**可以增加一些其他逻辑*/
return mqConsumeResult;
}
/**
* 消息某条消息
* @param tag 标签
* @param keys 消息关键字
* @param messageExt
* @return
* 2018年3月1日 zhaowg
*/
protected abstract MQConsumeResult consumeMessage(String tag,List<String> keys, MessageExt messageExt);
}
修改后的消费者监听器类
package com.clouds.common.rocketmq.consumer.processor;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.clouds.common.rocketmq.annotation.MQConsumeService;
import com.clouds.common.rocketmq.constants.RocketMQErrorEnum;
import com.clouds.common.rocketmq.exception.AppException;
import com.clouds.common.rocketmq.exception.RocketMQException;
/**
* 消费者消费消息路由
* .<br/>
*
* Copyright: Copyright (c) 2017 zteits
*
* @ClassName: RocketMQMessageListenerConcurrentlyProcessor
* @Description:
* @version: v1.0.0
* @author: zhaowg
* @date: 2018年2月28日 上午11:12:32
* Modification History:
* Date Author Version Description
*---------------------------------------------------------*
* 2018年2月28日 zhaowg v1.0.0 创建
*/
@Component
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently{
private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
@Autowired
private Map<String,MQMsgProcessor> mqMsgProcessorServiceMap;
/**
* 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息<br/>
* 不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if(CollectionUtils.isEmpty(msgs)){
logger.info("接受到的消息为空,不处理,直接返回成功");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
ConsumeConcurrentlyStatus concurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
try{
//根据Topic分组
Map<String, List<MessageExt>> topicGroups = msgs.stream().collect(Collectors.groupingBy(MessageExt::getTopic));
for (Entry<String, List<MessageExt>> topicEntry : topicGroups.entrySet()) {
String topic = topicEntry.getKey();
//根据tags分组
Map<String, List<MessageExt>> tagGroups = topicEntry.getValue().stream().collect(Collectors.groupingBy(MessageExt::getTags));
for (Entry<String, List<MessageExt>> tagEntry : tagGroups.entrySet()) {
String tag = tagEntry.getKey();
//消费某个主题下,tag的消息
this.consumeMsgForTag(topic,tag,tagEntry.getValue());
}
}
}catch(Exception e){
logger.error("处理消息失败",e);
concurrentlyStatus = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 如果没有return success ,consumer会重新消费该消息,直到return success
return concurrentlyStatus;
}
/**
* 根据topic 和 tags路由,查找消费消息服务
* @param topic
* @param tag
* @param value
* 2018年3月1日 zhaowg
*/
private void consumeMsgForTag(String topic, String tag, List<MessageExt> value) {
//根据topic 和 tag查询具体的消费服务
MQMsgProcessor imqMsgProcessor = selectConsumeService(topic, tag);
try{
if(imqMsgProcessor==null){
logger.error(String.format("根据Topic:%s和Tag:%s 没有找到对应的处理消息的服务",topic,tag));
throw new RocketMQException(RocketMQErrorEnum.NOT_FOUND_CONSUMESERVICE);
}
logger.info(String.format("根据Topic:%s和Tag:%s 路由到的服务为:%s,开始调用处理消息",topic,tag,imqMsgProcessor.getClass().getName()));
//调用该类的方法,处理消息
MQConsumeResult mqConsumeResult = imqMsgProcessor.handle(topic,tag,value);
if(mqConsumeResult==null){
throw new RocketMQException(RocketMQErrorEnum.HANDLE_RESULT_NULL);
}
if(mqConsumeResult.isSuccess()){
logger.info("消息处理成功:"+JSON.toJSONString(mqConsumeResult));
}else{
throw new RocketMQException(RocketMQErrorEnum.CONSUME_FAIL,JSON.toJSONString(mqConsumeResult),false);
}
if(mqConsumeResult.isSaveConsumeLog()){
logger.debug("开始记录消费日志");
//TODO 记录消费日志
}
}catch(Exception e){
if(e instanceof AppException){
AppException mqe = (AppException)e;
//TODO 记录消费失败日志
throw new AppException(mqe.getErrCode(),mqe.getErrMsg(),false);
}else{
//TODO 记录消费失败日志
throw e;
}
}
}
/**
* 根据topic和tag查询对应的具体的消费服务
* @param topic
* @param tag
* @return
* 2018年3月3日 zhaowg
*/
private MQMsgProcessor selectConsumeService(String topic, String tag) {
MQMsgProcessor imqMsgProcessor = null;
for (Entry<String, MQMsgProcessor> entry : mqMsgProcessorServiceMap.entrySet()) {
//获取service实现类上注解的topic和tags
MQConsumeService consumeService = entry.getValue().getClass().getAnnotation(MQConsumeService.class);
if(consumeService == null){
logger.error("消费者服务:"+entry.getValue().getClass().getName()+"上没有添加MQConsumeService注解");
continue;
}
String annotationTopic = consumeService.topic().getCode();
if(!annotationTopic.equals(topic)){
continue;
}
String[] tagsArr = consumeService.tags();
//"*"号表示订阅该主题下所有的tag
if(tagsArr[0].equals("*")){
//获取该实例
imqMsgProcessor = entry.getValue();
break;
}
boolean isContains = Arrays.asList(tagsArr).contains(tag);
if(isContains){
//获取该实例
imqMsgProcessor = entry.getValue();
break;
}
}
return imqMsgProcessor;
}
}
至此,通过topic和tag路由服务已经写完了,现在以新增订阅一个DemoTopic主题为例,编写消息接受方。
新建消费者类,继承AbstractMQMsgProcessor类
注意:该类必须继承AbstractMQMsgProcessor,且添加MQConsumeService注解,用于定义该类是针对那个topic和tag的消费服务。新增订阅主题,还需要在application.propertis中修改rocketmq.consumer.topics。
package com.clouds.common.demo;
import java.util.List;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.clouds.common.rocketmq.TopicEnum;
import com.clouds.common.rocketmq.annotation.MQConsumeService;
import com.clouds.common.rocketmq.consumer.processor.AbstractMQMsgProcessor;
import com.clouds.common.rocketmq.consumer.processor.MQConsumeResult;
@MQConsumeService(topic=TopicEnum.DemoTopic,tags={"*"})
public class DemoNewTopicConsumerMsgProcessImpl extends AbstractMQMsgProcessor{
@Override
protected MQConsumeResult consumeMessage(String tag, List<String> keys, MessageExt messageExt) {
String msg = new String(messageExt.getBody());
logger.info("获取到的消息为:"+msg);
//TODO 判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重)
//如果注解中tags数据中包含多个tag或者是全部的tag(*),则需要根据tag判断是那个业务,
//如果注解中tags为具体的某个tag,则该服务就是单独针对tag处理的
if(tag.equals("某个tag")){
//做某个操作
}
//TODO 获取该消息重试次数
int reconsume = messageExt.getReconsumeTimes();
//根据消息重试次数判断是否需要继续消费
if(reconsume ==3){//消息已经重试了3次,如果不需要再次消费,则返回成功
}
MQConsumeResult result = new MQConsumeResult();
result.setSuccess(true);
return result;
}
}
rocketmq.consumer.topics=DemoTopic~*;
运行测试类观察日志
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v1.5.2.RELEASE)
2018-03-03 16:54:25.079 INFO 12496 --- [ main] c.c.c.SpringBootRocketMqApplication : Starting SpringBootRocketMqApplication on DESKTOP-HS6GR38 with PID 12496 (D:\wsforjava\springboot-rocketmq\target\classes started by suolo in D:\wsforjava\springboot-rocketmq)
2018-03-03 16:54:25.086 INFO 12496 --- [ main] c.c.c.SpringBootRocketMqApplication : No active profile set, falling back to default profiles: default
2018-03-03 16:54:25.201 INFO 12496 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@236e3f4e: startup date [Sat Mar 03 16:54:25 CST 2018]; root of context hierarchy
2018-03-03 16:54:27.073 INFO 12496 --- [ main] c.c.c.r.p.MQProducerConfiguration : producer is start ! groupName:[springboot-rocketmq],namesrvAddr:[47.97.8.22:9876]
2018-03-03 16:54:27.430 INFO 12496 --- [ main] c.c.c.r.c.MQConsumerConfiguration : consumer is start !!! groupName:springboot-rocketmq,topics:DemoTopic~*;,namesrvAddr:47.97.8.22:9876
2018-03-03 16:54:27.747 INFO 12496 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2018-03-03 16:54:27.764 INFO 12496 --- [ main] c.c.c.SpringBootRocketMqApplication : Started SpringBootRocketMqApplication in 3.213 seconds (JVM running for 3.814)
2018-03-03 16:54:27.784 INFO 12496 --- [MessageThread_1] .c.c.r.c.p.MQConsumeMsgListenerProcessor : 根据Topic:DemoTopic和Tag:DemoTag 路由到的服务为:com.clouds.common.demo.DemoNewTopicConsumerMsgProcessImpl,开始调用处理消息
2018-03-03 16:54:30.162 INFO 12496 --- [MessageThread_1] c.c.c.r.c.p.AbstractMQMsgProcessor : 获取到的消息为:demo msg test
2018-03-03 16:54:30.194 INFO 12496 --- [MessageThread_1] .c.c.r.c.p.MQConsumeMsgListenerProcessor : 消息处理成功:{"reconsumeLater":true,"saveConsumeLog":false,"success":true}
项目源码
https://gitee.com/zhaowg3/springboot-rocketmq/tree/master/
完成。