RabbitMQ消息监听容器
你好!欢迎来到Java成长笔记,主要是用于相互交流,相互学习,也希望分享能帮到大家,如有错误之处,希望指正,谢谢!
通过配置消息监听容器,可以对队列或者多个队列进行监听,开发者可以进行相应的业务处理,实现接收和发送数据的自定义处理。
SimpleMessageListenerContainer
1、SimpleMessageListenerContainer进行很多设置,消费者的配置项进行设置,可以满足监听队列(多个队列)、自动启动、自动声明功能。
2、简单消息监听容器
1)设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等。
2)设置消费者数量、最小最大数量、批量消费。
3、设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数。
4、设置消费者标签生成策略、是否独占模式、消费者属性等。
5、设置具体的监听器、消息转換器等等。
6、SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等。
基础设置加载,代码展示如下:
/**
* 基础设置加载
*/
public void init (final SimpleMessageListenerContainer container) {
// 同时监听多个队列
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
// 设置当前的消费者数量和最大消费者数量
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(5);
// 设置是否重回队列
container.setDefaultRequeueRejected(false);
// 设置是否自动签收
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 设置是否监听
container.setExposeListenerChannel(true);
// 设置消费端标签策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String s) {
return s + "_" + UUID.randomUUID().toString();
}
});
}
消息监听端配置,代码展示如下:
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
this.init(container);
// 设置消息监听
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
final String msg = new String(message.getBody());
log.error(" 消费者 --->>> " + msg);
}
});
return container;
}
MessageListenerAdapter
适配器方法(一)
指定消息转换器,将字节数组转换为String
代码展示如下:
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import java.util.Optional;
@Slf4j
public class TextMessageConverter implements MessageConverter {
/**
* @Description: Object转为Message
*/
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(object.toString().getBytes(), messageProperties);
}
/**
* @Description: Message转为Java对象
*/
@Override
public Object fromMessage(Message message) throws MessageConversionException {
final String contentType = message.getMessageProperties().getContentType();
Optional.ofNullable(contentType)
.filter(StringUtils::isNotBlank)
.filter(x->contentType.contains("test"))
.map(z->{
return new String(message.getBody());
});
return message.getBody();
}
}
// 监听类设置配置
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
this.init(container);
final MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
// 方法委托 自定义方法
adapter.setDefaultListenerMethod("consumeMessage");
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);
return container;
}
public void consumeMessage(byte[] messageBody) {
log.error("字节数组方法,消息内容:{}", new String(messageBody));
}
// 测试类
@Test
public void testSendMessage4Text() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("TextMessageConverter配置mq消息Message".getBytes(), messageProperties);
// 发送消息
rabbitTemplate.send("topic001", "spring.abc", message);
rabbitTemplate.send("topic002", "rabbit.abc", message);
}
如果不配置setDefaultListenerMethod方法,下图为监听类默认方法
适配器方法(二)
将队列名称和消息转换方法进行一对一匹配
代码展示如下:
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
this.init(container);
final MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setMessageConverter(new TextMessageConverter());
Map<String, String> queueOrTagToMethodName = Maps.newHashMap();
queueOrTagToMethodName.put("queue001", "method1");
queueOrTagToMethodName.put("queue002", "method2");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(adapter);
return container;
}
// 监听方法
public void method1(String messageBody) {
log.error("method1收到消息内容:{} ", messageBody);
}
public void method2(String messageBody) {
log.error("method2收到消息内容:{} ", messageBody);
}
// 测试类
@Test
public void testSendMessage4Text() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("TextMessageConverter配置mq消息Message".getBytes(), messageProperties);
// 发送消息
rabbitTemplate.send("topic001", "spring.abc", message);
rabbitTemplate.send("topic002", "rabbit.abc", message);
}
MessageConverter转换器
JSON格式转换器
设置json格式转换器
代码展示如下:
// 支持json格式转换器
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
this.init(container);
final MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
return container;
}
// 测试类
@Test
public void testSendJsonMessage() throws Exception {
final Order order =
new Order(UUID.randomUUID().toString().replaceAll("-",""),
"订单详情", "订单描述信息");
final String json = new ObjectMapper().writeValueAsString(order);
log.error("testSendJsonMessage json: " + json);
final MessageProperties messageProperties = new MessageProperties();
// 需要修改contentType为 application/json
messageProperties.setContentType("application/json");
final Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
// 监听类
public void consumeMessage(Map messageBody) {
log.error("consumeMessage方法,消息内容:{} ", messageBody);
}
Java对象转换
设置java对象转换
代码展示如下:
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
this.init(container);
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
return container;
}
// 监听类
public void consumeMessage(Order order) {
log.error("consumeMessage order对象,消息内容,id:{} ", order.getId(),",name:{} ", order.getName(), ", content:{} ", order.getContent());
}
// 测试类
@Test
public void testSendJavaMessage() throws Exception {
final Order order =
new Order(UUID.randomUUID().toString().replaceAll("-",""),
"订单详情", "订单描述信息");
final String json = new ObjectMapper().writeValueAsString(order);
log.error("testSendJavaMessage json: " + json);
final MessageProperties messageProperties = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties.setContentType("application/json");
messageProperties.getHeaders().put("__TypeId__", "com.show.model.Order");
final Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
支持java对象多映射转换
设置java对象多映射转换
代码展示如下:
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
this.init(container);
final MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
final Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
final DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
final Map<String, Class<?>> idClassMapping = Maps.newHashMap();
idClassMapping.put("order", com.show.model.Order.class);
idClassMapping.put("packaged", com.show.model.Packaged.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
return container;
}
// 监听类
public void consumeMessage(Order order) {
log.error("consumeMessage order对象,id{},name{},content{}", order.getId(), order.getName(), order.getContent());
}
public void consumeMessage(Packaged pack) {
log.error("consumeMessage package对象,id{},name{},content{}", pack.getId(), pack.getName(), pack.getDescription());
}
// 测试类
@Test
public void testSendMappingMessage() throws Exception {
final ObjectMapper mapper = new ObjectMapper();
// Order类处理
final Order order =
new Order(UUID.randomUUID().toString().replaceAll("-",""),
"订单详情", "订单描述信息");
final String json1 = mapper.writeValueAsString(order);
log.error("testSendMappingMessage json1: " + json1);
final MessageProperties messageProperties1 = new MessageProperties();
messageProperties1.setContentType("application/json");
messageProperties1.getHeaders().put("__TypeId__", "order");
final Message message1 = new Message(json1.getBytes(), messageProperties1);
rabbitTemplate.send("topic001", "spring.order", message1);
// Packaged类处理
final Packaged pack =
new Packaged(UUID.randomUUID().toString().replaceAll("-",""), "包裹详情", "包裹描述信息");
final String json2 = mapper.writeValueAsString(pack);
log.error("testSendMappingMessage json2: " + json2);
final MessageProperties messageProperties2 = new MessageProperties();
messageProperties2.setContentType("application/json");
messageProperties2.getHeaders().put("__TypeId__", "packaged");
final Message message2 = new Message(json2.getBytes(), messageProperties2);
rabbitTemplate.send("topic001", "spring.pack", message2);
}
Java多对象映射转换
设置java对象多映射转换
代码展示如下:
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
this.init(container);
final MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
final Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
final DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
final Map<String, Class<?>> idClassMapping = Maps.newHashMap();
idClassMapping.put("order", com.show.model.Order.class);
idClassMapping.put("packaged", com.show.model.Packaged.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
return container;
}
// 监听类
public void consumeMessage(Order order) {
log.error("consumeMessage order对象,id{},name{},content{}", order.getId(), order.getName(), order.getContent());
}
public void consumeMessage(Packaged pack) {
log.error("consumeMessage package对象,id{},name{},content{}", pack.getId(), pack.getName(), pack.getDescription());
}
// 测试类
@Test
public void testSendMappingMessage() throws Exception {
final ObjectMapper mapper = new ObjectMapper();
final Order order =
new Order(UUID.randomUUID().toString().replaceAll("-",""),
"订单详情", "订单描述信息");
final String json1 = mapper.writeValueAsString(order);
log.error("testSendMappingMessage json1: " + json1);
final MessageProperties messageProperties1 = new MessageProperties();
messageProperties1.setContentType("application/json");
messageProperties1.getHeaders().put("__TypeId__", "order");
final Message message1 = new Message(json1.getBytes(), messageProperties1);
rabbitTemplate.send("topic001", "spring.order", message1);
final Packaged pack =
new Packaged(UUID.randomUUID().toString().replaceAll("-",""),
"包裹详情", "包裹描述信息");
final String json2 = mapper.writeValueAsString(pack);
log.error("testSendMappingMessage json2: " + json2);
final MessageProperties messageProperties2 = new MessageProperties();
messageProperties2.setContentType("application/json");
messageProperties2.getHeaders().put("__TypeId__", "packaged");
final Message message2 = new Message(json2.getBytes(), messageProperties2);
rabbitTemplate.send("topic001", "spring.pack", message2);
}
多对象转换包含image和pdf转换
设置多对象转换包含image和pdf转换
代码展示如下:
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
this.init(container);
final MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
// 全局的转换器
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
final TextMessageConverter textConvert = new TextMessageConverter();
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert);
final Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConvert);
convert.addDelegate("application/json", jsonConvert);
final ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/jpg", imageConverter);
convert.addDelegate("image", imageConverter);
final PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter);
adapter.setMessageConverter(convert);
container.setMessageListener(adapter);
return container;
}
// PDF监听类
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.UUID;
@Slf4j
public class PDFMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
log.error("PDF MessageConverter:{}", System.currentTimeMillis());
final byte[] body = message.getBody();
final String fileName = UUID.randomUUID().toString().replaceAll("-","");
final String path = "H:/rebbitmq/" + fileName + ".pdf";
final File f = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}
}
// Image监听类
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.UUID;
@Slf4j
public class ImageMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
log.error("Image MessageConverter:{}", System.currentTimeMillis());
final Object _extName = message.getMessageProperties().getHeaders().get("extName");
final String extName = _extName == null ? "jpg" : _extName.toString();
final byte[] body = message.getBody();
final String fileName = UUID.randomUUID().toString().replaceAll("-","");
final String path = "H:/rebbitmq/" + fileName + "." + extName;
final File f = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}
}
// Image测试类
@Test
public void testSendImageExtConverterMessage() throws Exception {
byte[] body = Files.readAllBytes(Paths.get("H:/rebbitmq/", "aw.jpg"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("image/jpg");
messageProperties.getHeaders().put("extName", "jpg");
Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "image_queue", message);
}
// PDF测试类
@Test
public void testSendPdfExtConverterMessage() throws Exception {
byte[] body = Files.readAllBytes(Paths.get("H:/rebbitmq/", "dell.pdf"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/pdf");
Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "pdf_queue", message);
}
本章完结,后续还会持续更新,分享Java成长笔记,希望我们能一起成长。如果你觉得我的分享有用,记得点赞和关注哦!这对我是最好的鼓励。谢谢!
PS:转载请注明出处!