使用 spring-boot 快速搭建 RabbitMQ 通信,并能够发布/接受消息
- 创建交换器,队列,进行绑定
package com.lee.rabbitmqdemo;
import org.springframework.amqp.core.*;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class RabbitmqdemoApplication {
public static final String topicExchangeName = "spring-boot-exchange";
public static final String queueName = "spring-boot";
/**
* 创建队列
*/
@Bean
Queue queue() {
return new Queue(queueName, false);
}
/**
* 创建交换器
*/
@Bean
DirectExchange exchange() {
return new DirectExchange(topicExchangeName);
}
/**
* 进行绑定
* "foo.bar.#" 是路由键(routing key)
*/
@Bean
Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
}
public static void main(String[] args) {
SpringApplication.run(RabbitmqdemoApplication.class, args);
}
}
- 编写发布消息
package com.lee.rabbitmqdemo.hello;
import com.lee.rabbitmqdemo.RabbitmqdemoApplication;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 发送消息
*/
@Component
public class Runner implements CommandLineRunner {
private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;
public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
this.receiver = receiver;
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void run(String... args) throws Exception {
System.out.println("Sending message...");
rabbitTemplate.convertAndSend(RabbitmqdemoApplication.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
}
}
- 编写接受消息
package com.lee.rabbitmqdemo.hello;
import com.lee.rabbitmqdemo.RabbitmqdemoApplication;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
@Component
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
/**
* 接受消息
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitmqdemoApplication.queueName, durable = "false"),
exchange = @Exchange(value = RabbitmqdemoApplication.topicExchangeName),
key = "foo.bar.baz")
)
public void receiveMessage(String message) {
System.out.println("Received <" + message + ">");
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
- 运行
如果运行成功控制台(Console)会打印消息
Sending message...
Received <Hello from RabbitMQ!>
参考:
Messaging with RabbitMQ
tutorial-one-spring-amqp
Spring AMQP