一、背景
前文我们有讲过,不同类型的消息使用mq队列来隔离。说到隔离级别,我梳理可以有三个级别:队列--》vhost--》节点。
本文我们就要讲述,一个ws通道服务,如何把所有对接的应用隔离开来,让他们互相不影响。这里,我们使用的是多个vhost把消息隔离开来不同的业务应用。
二、目标
1、不同的业务对接通道服务,我们会定义各自的应用标识。在消息的转发过程中,做到互不影响。
2、在一个应用中,定义多个vhost,还不是多个mq实例。
三、设计
四、源码示例
1、RabbitMQConfig.java
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@EnableRabbit
@Configuration
@EnableConfigurationProperties(RabbitProperties.class)
public class RabbitMQConfig {
/**
* {@link org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration}
* 自动发送JSON结构或消费时自动将JSON转成相应的对象
*
* @return mq 消息序列化工具
*/
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean("rabbitAdmin")
public RabbitAdmin rabbitAdmin(@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
@Bean("firstRabbitAdmin")
public RabbitAdmin firstRabbitAdmin(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
@Bean("secondRabbitAdmin")
public RabbitAdmin secondRabbitAdmin(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
// TODO 这里是写死的,你可以使用@value注入,或者properties方式。
private static final String DEFAULT_VHOST = "channel-service";
private static final String FIRST_VHOST = "channel-service-1";
private static final String SECOND_VHOST = "channel-service-2";
@Bean("defaultConnectionFactory")
@Primary
public ConnectionFactory defaultConnectionFactory(RabbitProperties rabbitProperties) {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(rabbitProperties.getHost());
cachingConnectionFactory.setPort(rabbitProperties.getPort());
cachingConnectionFactory.setUsername(rabbitProperties.getUsername());
cachingConnectionFactory.setPassword(rabbitProperties.getPassword());
cachingConnectionFactory.setVirtualHost(DEFAULT_VHOST);
return cachingConnectionFactory;
}
@Bean("rabbitTemplate")
@Primary
public RabbitTemplate rabbitTemplate(
@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean(name = "defaultRabbitListenerContainer")
@Primary
public SimpleRabbitListenerContainerFactory defaultRabbitListenerContainer(SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
////////////////////////////////////////////////////////////////////////////////////////////////////
@Bean("firstConnectionFactory")
public ConnectionFactory firstConnectionFactory(RabbitProperties rabbitProperties) {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(rabbitProperties.getHost());
cachingConnectionFactory.setPort(rabbitProperties.getPort());
cachingConnectionFactory.setUsername(rabbitProperties.getUsername());
cachingConnectionFactory.setPassword(rabbitProperties.getPassword());
cachingConnectionFactory.setVirtualHost(FIRST_VHOST);
return cachingConnectionFactory;
}
@Bean("firstRabbitTemplate")
public RabbitTemplate firstJsonRabbitTemplate(
@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean("firstRabbitListenerContainer")
public SimpleRabbitListenerContainerFactory firstRabbitListenerContainer(SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
////////////////////////////////////////////////////////////////////////////////////////////////////
@Bean("secondConnectionFactory")
public ConnectionFactory secondConnectionFactory(RabbitProperties rabbitProperties) {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(rabbitProperties.getHost());
cachingConnectionFactory.setPort(rabbitProperties.getPort());
cachingConnectionFactory.setUsername(rabbitProperties.getUsername());
cachingConnectionFactory.setPassword(rabbitProperties.getPassword());
cachingConnectionFactory.setVirtualHost(SECOND_VHOST);
return cachingConnectionFactory;
}
@Bean("secondRabbitTemplate")
public RabbitTemplate secondJsonRabbitTemplate(
@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean("secondRabbitListenerContainer")
public SimpleRabbitListenerContainerFactory secondRabbitListenerContainer(SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
}
2、配置文件
spring:
rabbitmq:
host: 192.168.8.24
port: 5672
username: admin
password: xxx
listener:
simple:
retry:
enabled: false
concurrency: 8
max-concurrency: 16
acknowledge-mode: auto
3、Mqconfig.java
import lombok.Data;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* RabbitMQ 配置
*
*/
@Configuration("mqConfig")
@Data
public class MQConfig {
/**
* 通道队列名(第一个)
*/
private String firstChannelQueue = MqConstants.TOPIC_PREFIX_ARRAY[0] + ".channel.queue." + HostUtils.getMac();
/**
* 通道队列名(第二个)
*/
private String secondChannelQueue = MqConstants.TOPIC_PREFIX_ARRAY[1] + ".channel.queue." + HostUtils.getMac();
/**
* 通道队列名(第三个)
*/
private String thirdChannelQueue = MqConstants.TOPIC_PREFIX_ARRAY[2] + ".channel.queue." + HostUtils.getMac();
/**
* 通道队列名(第四个)
*/
private String fourChannelQueue = MqConstants.TOPIC_PREFIX_ARRAY[3] + ".channel.queue." + HostUtils.getMac();
/**
* 死信队列名
*/
private String deadChannelQueue = "channel.dead.queue";
/**
* 死信交换机名
*/
private String deadExchange = "channel.dead.letter.exchange";
/**
* 死信路由名
*/
private String deadRoutingKey = "channel.dead.letter.routeKey";
/**
* 死信交换机
*
* @param: []
* @return: org.springframework.amqp.core.Exchange
**/
@Bean
public Exchange deadLetterExchange() {
return ExchangeBuilder.fanoutExchange(deadExchange).build();
}
/**
* 死信队列
*
* @param: []
* @return: org.springframework.amqp.core.Queue
**/
@Bean
public Queue deadLetterQueue() {
return new Queue(deadChannelQueue);
}
/**
* 绑定死信队列
*
* @param: []
* @return: org.springframework.amqp.core.Binding
**/
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(deadRoutingKey).noargs();
}
}
4、MqConstants.java
public class MqConstants {
public static final String[] TOPIC_PREFIX_ARRAY = {"first", "second", "third", "four"};
/**
* 通道交换机名
*/
public static final String channelExchange = "channel.exchange.new";
}
5、mq消息发送
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MQSenderImpl implements MQSender {
@Autowired
@Qualifier("rabbitTemplate")
private RabbitTemplate defaultRabbitTemplate;
@Autowired
@Qualifier("firstRabbitTemplate")
private RabbitTemplate firstRabbitTemplate;
@Autowired
@Qualifier("secondRabbitTemplate")
private RabbitTemplate secondRabbitTemplate;
public int getHashByRoomId(String roomId) {
return Math.abs(MD5Utils.getMd5(roomId).hashCode()) % TOPIC_PREFIX_ARRAY.length;
}
@Override
public void send(OutMessage message) {
RabbitTemplate rabbitTemplate = defaultRabbitTemplate;
String routingKey = TOPIC_PREFIX_ARRAY[0];
if (StringUtils.isNotEmpty(message.getRoomId())) {
int iHash = getHashByRoomId(message.getRoomId());
routingKey = TOPIC_PREFIX_ARRAY[iHash];
rabbitTemplate = getRabbitTemplate(message);
}
rabbitTemplate.convertAndSend(MqConstants.channelExchange, routingKey, message);
}
private RabbitTemplate getRabbitTemplate(OutMessage message) {
try {
String appId = message.getRoomId().split("#")[0];
switch (appId) {
case YY_BUSINESS_NAME:
return defaultRabbitTemplate;
case XX_BUSINESS_NAME:
return firstRabbitTemplate;
case ZZ_BUSINESS_NAME:
return secondRabbitTemplate;
default:
return defaultRabbitTemplate;
}
} catch (Exception e) {
return defaultRabbitTemplate;
}
}
}
6、mq消费端
@RabbitHandler
@RabbitListener(containerFactory = "defaultRabbitListenerContainer",
bindings = @QueueBinding(value = @Queue(value = "#{mqConfig.firstChannelQueue}", durable = "false",
arguments = {
@Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Long"),
@Argument(name = "x-dead-letter-exchange", value = "#{mqConfig.deadExchange}"),
@Argument(name = "x-dead-letter-routing-key", value = "#{mqConfig.deadRoutingKey}")
}),
exchange = @Exchange(value = MqConstants.channelExchange, type = ExchangeTypes.TOPIC),
key = "first.#"
))
public void process1(OutMessage outMessage) {
log.info("队列1收到消息:{}", JSON.toJSONString(outMessage));
dealMessage(outMessage);
}
@RabbitHandler
@RabbitListener(containerFactory = "firstRabbitListenerContainer",
bindings = @QueueBinding(value = @Queue(value = "#{mqConfig.firstChannelQueue}", durable = "false",
arguments = {
@Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Long"),
@Argument(name = "x-dead-letter-exchange", value = "#{mqConfig.deadExchange}"),
@Argument(name = "x-dead-letter-routing-key", value = "#{mqConfig.deadRoutingKey}")
}),
exchange = @Exchange(value = MqConstants.channelExchange, type = ExchangeTypes.TOPIC),
key = "first.#"
))
public void process11(OutMessage outMessage) {
log.info("队列11收到消息:{}", JSON.toJSONString(outMessage));
dealMessage(outMessage);
}
@RabbitHandler
@RabbitListener(containerFactory = "secondRabbitListenerContainer",
bindings = @QueueBinding(value = @Queue(value = "#{mqConfig.firstChannelQueue}", durable = "false",
arguments = {
@Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Long"),
@Argument(name = "x-dead-letter-exchange", value = "#{mqConfig.deadExchange}"),
@Argument(name = "x-dead-letter-routing-key", value = "#{mqConfig.deadRoutingKey}")
}),
exchange = @Exchange(value = MqConstants.channelExchange, type = ExchangeTypes.TOPIC),
key = "first.#"
))
public void process111(OutMessage outMessage) {
log.info("队列111收到消息:{}", JSON.toJSONString(outMessage));
dealMessage(outMessage);
}
@RabbitHandler
@RabbitListener(containerFactory = "defaultRabbitListenerContainer",
bindings = @QueueBinding(value = @Queue(value = "#{mqConfig.secondChannelQueue}", durable = "false",
arguments = {
@Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Long"),
@Argument(name = "x-dead-letter-exchange", value = "#{mqConfig.deadExchange}"),
@Argument(name = "x-dead-letter-routing-key", value = "#{mqConfig.deadRoutingKey}")
}),
exchange = @Exchange(value = MqConstants.channelExchange, type = ExchangeTypes.TOPIC),
key = "second.#"
))
public void process2(OutMessage outMessage) {
log.info("队列2收到消息:{}", JSON.toJSONString(outMessage));
dealMessage(outMessage);
}
@RabbitHandler
@RabbitListener(containerFactory = "firstRabbitListenerContainer",
bindings = @QueueBinding(value = @Queue(value = "#{mqConfig.secondChannelQueue}", durable = "false",
arguments = {
@Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Long"),
@Argument(name = "x-dead-letter-exchange", value = "#{mqConfig.deadExchange}"),
@Argument(name = "x-dead-letter-routing-key", value = "#{mqConfig.deadRoutingKey}")
}),
exchange = @Exchange(value = MqConstants.channelExchange, type = ExchangeTypes.TOPIC),
key = "second.#"
))
public void process22(OutMessage outMessage) {
log.info("队列22收到消息:{}", JSON.toJSONString(outMessage));
dealMessage(outMessage);
}
@RabbitHandler
@RabbitListener(containerFactory = "secondRabbitListenerContainer",
bindings = @QueueBinding(value = @Queue(value = "#{mqConfig.secondChannelQueue}", durable = "false",
arguments = {
@Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Long"),
@Argument(name = "x-dead-letter-exchange", value = "#{mqConfig.deadExchange}"),
@Argument(name = "x-dead-letter-routing-key", value = "#{mqConfig.deadRoutingKey}")
}),
exchange = @Exchange(value = MqConstants.channelExchange, type = ExchangeTypes.TOPIC),
key = "second.#"
))
public void process222(OutMessage outMessage) {
log.info("队列222收到消息:{}", JSON.toJSONString(outMessage));
dealMessage(outMessage);
}
@RabbitHandler
@RabbitListener(containerFactory = "defaultRabbitListenerContainer",
bindings = @QueueBinding(value = @Queue(value = "#{mqConfig.thirdChannelQueue}", durable = "false",
arguments = {
@Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Long"),
@Argument(name = "x-dead-letter-exchange", value = "#{mqConfig.deadExchange}"),
@Argument(name = "x-dead-letter-routing-key", value = "#{mqConfig.deadRoutingKey}")
}),
exchange = @Exchange(value = MqConstants.channelExchange, type = ExchangeTypes.TOPIC),
key = "third.#"
))
public void process3(OutMessage outMessage) {
log.info("队列3收到消息:{}", JSON.toJSONString(outMessage));
dealMessage(outMessage);
}
@RabbitHandler
@RabbitListener(containerFactory = "firstRabbitListenerContainer",
bindings = @QueueBinding(value = @Queue(value = "#{mqConfig.thirdChannelQueue}", durable = "false",
arguments = {
@Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Long"),
@Argument(name = "x-dead-letter-exchange", value = "#{mqConfig.deadExchange}"),
@Argument(name = "x-dead-letter-routing-key", value = "#{mqConfig.deadRoutingKey}")
}),
exchange = @Exchange(value = MqConstants.channelExchange, type = ExchangeTypes.TOPIC),
key = "third.#"
))
public void process33(OutMessage outMessage) {
log.info("队列33收到消息:{}", JSON.toJSONString(outMessage));
dealMessage(outMessage);
}
@RabbitHandler
@RabbitListener(containerFactory = "secondRabbitListenerContainer",
bindings = @QueueBinding(value = @Queue(value = "#{mqConfig.thirdChannelQueue}", durable = "false",
arguments = {
@Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Long"),
@Argument(name = "x-dead-letter-exchange", value = "#{mqConfig.deadExchange}"),
@Argument(name = "x-dead-letter-routing-key", value = "#{mqConfig.deadRoutingKey}")
}),
exchange = @Exchange(value = MqConstants.channelExchange, type = ExchangeTypes.TOPIC),
key = "third.#"
))
public void process333(OutMessage outMessage) {
log.info("队列333收到消息:{}", JSON.toJSONString(outMessage));
dealMessage(outMessage);
}
@RabbitHandler
@RabbitListener(containerFactory = "firstRabbitListenerContainer",
bindings = @QueueBinding(value = @Queue(value = "#{mqConfig.fourChannelQueue}", durable = "false",
arguments = {
@Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Long"),
@Argument(name = "x-dead-letter-exchange", value = "#{mqConfig.deadExchange}"),
@Argument(name = "x-dead-letter-routing-key", value = "#{mqConfig.deadRoutingKey}")
}),
exchange = @Exchange(value = MqConstants.channelExchange, type = ExchangeTypes.TOPIC),
key = "four.#"
))
public void process4(OutMessage outMessage) {
log.info("队列4收到消息:{}", JSON.toJSONString(outMessage));
dealMessage(outMessage);
}
@RabbitHandler
@RabbitListener(containerFactory = "firstRabbitListenerContainer",
bindings = @QueueBinding(value = @Queue(value = "#{mqConfig.fourChannelQueue}", durable = "false",
arguments = {
@Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Long"),
@Argument(name = "x-dead-letter-exchange", value = "#{mqConfig.deadExchange}"),
@Argument(name = "x-dead-letter-routing-key", value = "#{mqConfig.deadRoutingKey}")
}),
exchange = @Exchange(value = MqConstants.channelExchange, type = ExchangeTypes.TOPIC),
key = "four.#"
))
public void process44(OutMessage outMessage) {
log.info("队列44收到消息:{}", JSON.toJSONString(outMessage));
dealMessage(outMessage);
}
@RabbitHandler
@RabbitListener(containerFactory = "secondRabbitListenerContainer",
bindings = @QueueBinding(value = @Queue(value = "#{mqConfig.fourChannelQueue}", durable = "false",
arguments = {
@Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Long"),
@Argument(name = "x-dead-letter-exchange", value = "#{mqConfig.deadExchange}"),
@Argument(name = "x-dead-letter-routing-key", value = "#{mqConfig.deadRoutingKey}")
}),
exchange = @Exchange(value = MqConstants.channelExchange, type = ExchangeTypes.TOPIC),
key = "four.#"
))
public void process444(OutMessage outMessage) {
log.info("队列444收到消息:{}", JSON.toJSONString(outMessage));
dealMessage(outMessage);
}
五、总结
1、消息的死信周期
@Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Long")
主要这里需要指定Argument的类型,默认它是"java.lang.String"类型。
2、mq队列没有自动创建,需要增加下面的代码
@Bean("rabbitAdmin")
public RabbitAdmin rabbitAdmin(@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
3、RabbitTemplate工厂类,根据业务应用的编号,选择对应的消息发送至哪个vhost。
@Autowired
@Qualifier("rabbitTemplate")
private RabbitTemplate defaultRabbitTemplate;
// TODO 工厂类,目前是使用一个简单的case ... when ... 实现。
4、消息订阅哪个vhost,必须指定containerFactory值。
@RabbitListener(containerFactory = "secondRabbitListenerContainer")