不废话直接上代码测试过的可以直接用
依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
生产者
@Component
public class Publisher {
@Autowired
private RabbitTemplate rabbitTemplate;
public static final String Exchange="sc_exchange";
public static final String RoutingKey="sc_routingKey";
public String snderMsg(String msg){
rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey)-> {
try {
System.out.println("GG的消息是:"+new String(message.getBody(),"utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
});
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
System.out.println("到交换机了");
}else {
System.out.println("没到交换机");
}
}
});
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(Exchange, RoutingKey, msg,(message)->{
MessageProperties properties = message.getMessageProperties();
// properties.setExpiration("5000");
properties.setContentEncoding("utf-8");
return message;
},correlationData);
return "已发送";
}
}
消费者
@Component
public class Consume {
public static final String QUEUE = "sc_queue";
public static final String RoutingKey = "sc_routingKey";
public static final String Exchange = "sc_exchange";
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = QUEUE, durable = "true", exclusive = "false", autoDelete = "false"),
exchange = @Exchange(name = Exchange),
key = RoutingKey
))
@RabbitHandler
public void HandlerMsg(Message msg, Channel channel) throws IOException {
System.out.println("收到的消息是:" + new String(msg.getBody()));
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
//业务处理
}
}
配置文件
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=5
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.template.mandatory=true
启动类
@SpringBootApplication
@EnableRabbit
public class CommunicateApplication {
public static void main(String[] args) {
SpringApplication.run(CommunicateApplication.class, args);
}
}