误区一
对于queue和routingkey的绑定,一个队列可以绑定多个routingkey,这个绑定一般在生产者中做 ,如果在消费者中做,那么 也只是队列绑定功能 而不是指定获取哪个routingkey的消息 这个功能我反复试验过 .如果队列绑定了多个routingkey,那么消费者端并不能指定routingkey去消费,会接收这个队列的所有消息
简单使用
@Configuration
public class AmqpConfiguration {
//向spring容器中 注入一个名为demoQueue队列 下同
@Bean
public Queue demoQueue(){
return new Queue("demoQueue");
}
@Bean
public Queue demoQueue2(){
return new Queue("demoQueue2");
}
//向spring容器中注入名为directExchange的Direct类型的交换机 下同
@Bean
DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
@Bean
TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
// Fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
//将demoQueue队列绑定在direct类型的名为directExchange的交换机上,routingkey为demo
@Bean
public Binding bindingDemo() {
Binding binding = BindingBuilder.bind(demoQueue2()).to(directExchange()).with("demo");
return binding;
}
//将demoQueue队列绑定在direct类型的名为directExchange的交换机上,routingkey为demo2
@Bean
public Binding bindingDemo1() {
Binding binding = BindingBuilder.bind(demoQueue()).to(directExchange()).with("demo2");
return binding;
}
//将demoQueue队列绑定在direct类型的名为directExchange的交换机上,routingkey为demo2
@Bean
public Binding bindingDemo2() {
Binding binding = BindingBuilder.bind(demoQueue2()).to(directExchange()).with("demo2");
return binding;
}
}
然后就可以发送消息了
@Slf4j
@Service
public class AmqbSender {
@Autowired
private AmqpTemplate rabbitTemplate;
@Scheduled(fixedDelay = 5000)
public void send(){
AliSMSModel aliSMS=new AliSMSModel();
aliSMS.setPhoneNumber(String.valueOf(Math.random()));
aliSMS.setItem(String.valueOf(Math.random()));
//发送了两种类型的消息,所有的都用默认的即可
//AliSMSModel 必须实现序列化接口 否则传输出错
rabbitTemplate.convertAndSend( "directExchange","demo2","msg"+System.currentTimeMillis());
rabbitTemplate.convertAndSend("directExchange","demo",aliSMS);
log.info("已经发送消息....");
}
}
消息监听
/**
* 这种情况下 发送什么类型 就用什么类型去接收 如果接收到未指明类型 则报错
*/
@Component
@Slf4j
@RabbitListener(queues = "demoQueue2")
public class AmqbListen {
@RabbitHandler
public void receive(String msg) {
log.info("接收到消息{}",msg);
}
@RabbitHandler
public void receive(Double msg) {
log.info("接收到Double{}",msg);
}
@RabbitHandler
public void receive(AliSMSModel msg) {
log.info("接收到AliSMSModel{}",msg.toString());
}
}
生产者确认机制
配置template
配置文件
spring:
rabbitmq:
host: *****
port: *****
virtual-host: ****
username: ******
password: ****
listener:
simple:
concurrency: 5
max-concurrency: 20
prefetch: 5
使用@Bean显示注入
@Configuration
@Slf4j
public class RabbitmqConfig {
@Value("${spring.rabbitmq.listener.simple.concurrency}")
public int concurrency;
@Value("${spring.rabbitmq.listener.simple.max-concurrency}")
public int maxConcurrency;
@Value("${spring.rabbitmq.listener.simple.prefetch}")
public int prefetch;
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
/**
* 单一消费者
*
* @return
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//必须注释掉 用默认 这样可以使用自定义的对象进行传输和解析
// factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
factory.setTxSize(1);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
/**
* 多个消费者
*
* @return
*/
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory, connectionFactory);
//必须注释掉 用默认 这样可以使用自定义的对象进行传输和解析
// factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.NONE);
factory.setConcurrentConsumers(concurrency);
factory.setMaxConcurrentConsumers(maxConcurrency);
factory.setPrefetchCount(prefetch);
return factory;
}
@Bean(name = "rabbitTemplate2")
public RabbitTemplate rabbitTemplate() {
/**
若使用confirm-callback必须要配置publisherConfirms为true
若使用return-callback必须要配置publisherReturns为true
每个rabbitTemplate只能有一个confirm-callback和return-callback,如果这里配置了,那么写生产者的时候不能再写confirm-callback和return-callback
使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true
*/
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}
/**
* 如果消息没有到exchange,则confirm回调,ack=false
* 如果消息到达exchange,则confirm回调,ack=true
* exchange到queue成功,则不回调return
* exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
*/
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
}
});
return rabbitTemplate;
}
}
这样 使用注入的rabbitTemplate就可以实现确认了
@Service
public class AmqbSender {
@Resource(name = "rabbitTemplate2")
private RabbitTemplate rabbitTemplate2;
@Scheduled(fixedDelay = 5000)
public void send(){
AliSMSModel aliSMS=new AliSMSModel();
aliSMS.setPhoneNumber(String.valueOf(Math.random()));
aliSMS.setItem(String.valueOf(Math.random()));
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate2.convertAndSend("directExchange","demo2","msg"+System.currentTimeMillis(),correlationData);
rabbitTemplate2.convertAndSend("directExchange","demo",aliSMS);
}
消费者启用多个线程
在上面注入的类中已经配置好了 只需要在消费者中设置containerFactory 即可,这是 RabbitMQ监听器listener 的容器工厂
@Component
@Slf4j
@RabbitListener(queues = "demoQueue2",containerFactory = "multiListenerContainer")
public class AmqbListen {
@RabbitHandler
public void receive(String msg) {
log.info("接收到消息{}",msg);
}
@RabbitHandler
public void receive(Double msg) {
log.info("接收到Double{}",msg);
}
@RabbitHandler
public void receive(AliSMSModel msg) {
log.info("接收到AliSMSModel{}",msg.toString());
}
}
消费确认机制
配置
@Bean(name = "multiListenerContainerConfirm")
public SimpleRabbitListenerContainerFactory listenerContainerConfirm() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(5);
factory.setMaxConcurrentConsumers(10);
factory.setPrefetchCount(1);
factory.setTxSize(1);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
监听
@Component
@Slf4j
@RabbitListener(queues = "demoQueue2",containerFactory = "multiListenerContainerConfirm")
public class AmqbListen {
@RabbitHandler
public void receive(String msg, Message message, Channel channel) throws IOException {
Long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("接收到消息{},{}",msg,deliveryTag);
//第一个deliveryTag参数为每条信息带有的tag值,第二个multiple参数为布尔类型
//为true时会将小于等于此次tag的所有消息都确认掉,如果为false则只确认当前tag的信息,可根据实际情况进行选择。
channel.basicAck(deliveryTag,false);
}
@RabbitHandler
public void receive(Double msg,Message message, Channel channel) {
log.info("接收到Double{}",msg);
}
@RabbitHandler
public void receive(AliSMSModel msg, Message message, Channel channel) throws IOException {
log.info("接收到AliSMSModel{},channel",msg.toString());
Long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag,false);
}
}