基于默认的direct(即路由)模式
yml的rabbitmq配置
image.png
引入maven依赖
image.png
1.自定义消息类
package com.zkmeiling.biz.rabbitmq;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
/**
* 箱任务博
*
* @author WeiWei
* @date 2022/10/17
*/
@NoArgsConstructor
@Data
public class BoxTaskBo {
private String request;
private Date time;
private Date data;
}
2.开启手动ack模式并准备相关的处理回调函数
2.1---ConfirmCallback 回调函数类( 发布端----send消息--> broker[由交换机exchang和队列queue组成 {只要进入exchange就触发ConfirmCllback }] )
package com.zkmeiling.biz.rabbitmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
/**
* Confirmation callback.
*
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("bbk样本库消息发送异常!");
} else {
log.info("bbk样本库发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", correlationData, ack, cause);
}
}
}
2.2---ReturnsCallback 回调函数类( 发布端----send消息--> broker[由交换机exchang和队列queue组成 {进入exchange 但未投递到queqe就触发ReturnsCallback }] )
package com.zkmeiling.biz.rabbitmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback{
/**
* Returned message callback.
*
* @param returned the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("bbk样本库returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", returned.getReplyCode(), returned.getReplyText(), returned.getExchange(), returned.getRoutingKey());
}
}
3.发布端配置
下图是发布端服务(简单)相关目录结构
image.png
3.1发布端(生产端配置代码)
package com.zkmeiling.biz.rabbitmq;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//声明交换机和需要绑定的队列(这部分也可单独拿出做一个rabbitmq配置供应端)
@Configuration
public class RabbitmqQueueConfig {
/**
* 声明样本库管理系统->工控系统队列 支持持久化
*/
@Bean(name = "bbkToIpcQueue")
public Queue BbkToIpcQueue() {
return QueueBuilder.durable("zkmeiling.bbk.to.ipc").build();
}
//声明交换机
@Bean(name = "directExchange")
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange("zkmeiling.direct").durable(true).build();
}
//声明交换机和队列的绑定关系
@Bean
public Binding bbkToIpcBinding(
@Qualifier("directExchange") DirectExchange directExchange,
@Qualifier("bbkToIpcQueue") Queue bbkToIpcQueue) {
return BindingBuilder.bind(bbkToIpcQueue).to(directExchange).with("bbk2ipc_routing_key");
}
}
3.2---发布端具体发送消息代码
package com.zkmeiling.biz.rabbitmq;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.UUID;
@RestController
public class PublishController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfirmCallbackService confirmCallbackService;
@Autowired
private ReturnCallbackService returnCallbackService;
@PostMapping("frozenBoxPutInStore")
public void frozenBoxPutInStore(@RequestBody BoxTaskBo boxs){
sendMessage("zkmeiling.direct","bbk2ipc_routing_key",boxs);
}
public void sendMessage(String exchange, String routingKey, BoxTaskBo msg) {
// /**
// * 确保消息发送失败后可以重新返回到队列中
// * 注意:yml需要配置 publisher-returns: true
// */
// rabbitTemplate.setMandatory(true);
/**
* 消费者确认收到消息后,手动ack回执回调处理
*/
rabbitTemplate.setConfirmCallback(confirmCallbackService);
/**
* 消息投递到队列失败回调处理
*/
rabbitTemplate.setReturnsCallback(returnCallbackService);
/**
* 开启Jackson2JsonMessageConverter的消息转换器(进行msg编码操作)
*/
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
/**
* 发送消息
*/
rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> {
//这里是对msg的aop做一些前置处理操作
//下面是为了解决
<----原理说明--->
实体类转换异常
failed to resolve class name.
解决方案
1.依赖系统1的jar包,直接使用类A来接收
2.不依赖系统1的jar包,自己建一个和A一模一样的类,连名称,包路径都一样
3.负责监听 queue 的类实现 MessageListener 接口,直接接收 Message 类,再自己转
显然都不够好也不是自己想要的
在 JsonMessageConverter 的 fromMessage 方法中有这么一段:
f (getClassMapper() == null) {
JavaType targetJavaType = getJavaTypeMapper()
.toJavaType(message.getMessageProperties());
content = convertBytesToObject(message.getBody(), encoding, targetJavaType);
} else {
Class<?> targetClass = getClassMapper().toClass(
message.getMessageProperties());
content = convertBytesToObject(message.getBody(), encoding, targetClass);
}
就是说默认情况下,JsonMessageConverter 使用的 ClassMapper 是 DefaultJackson2JavaTypeMapper,在转换时通过 Message 的 Properties 来获取要转换的目标类的类型。通过 Debug 可以发现,目标类的类型是存储在 Message 的 Proterties 的 一个 headers 的 Map 中,Key 叫“__TypeId__”。所以只要想办法在传输消息时更改__TypeId__的值即可。
<----原理说明--->
message.getMessageProperties().getHeaders().put("__TypeId__","com.example.rabbitmqdemo.BoxTaskBo");
return message;
} ,new CorrelationData(UUID.randomUUID().toString()));
System.out.println("bbk样本库exchange = " + exchange);
}
}
4.消费端配置
下图是消费端服务(简单)相关目录结构
image.png
4.1---消费端配置代码
package com.example.rabbitmqdemo;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//设置开启Jackson2JsonMessageConverter
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//设置开启手动确认ack
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
4.1---消费端具体接收消息消费业务代码
package com.example.rabbitmqdemo;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
@RabbitListener(queues = "zkmeiling.bbk.to.ipc")
public class ReceiverMessage1 {
@RabbitHandler
public void processHandler(@Payload BoxTaskBo msg, Channel channel, Message message) throws IOException {
try {
log.info("ipc工业端小富收到消息:{}", msg);
//TODO 具体业务
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("ipc工业端消息已重复处理失败,拒绝再次接收...");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
} else {
log.error("ipc工业端消息即将再次返回队列处理...");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
验证下
1.postmain 模拟发送请求
image.png
2.发布端:发送成功日志
image.png
3.消费端: 接收成功日志
image.png