一、搭建初始环境
1.引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
2. 配置
spring:
application:
name: rabbitmq-springboot
rabbitmq:
host: 127.0.0.1
port: 5672
username: test
password: test
virtual-host: test
二、Hello World
1.生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testHello() {
rabbitTemplate.convertAndSend("hello", "hello world");
}
2.消费者
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class HelloMq {
@RabbitHandler
public void receive(String message) {
System.out.println("message:" + message);
}
}
三、Work queues
1.生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testWork() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work", "hello work!" + i);
}
}
2.消费者
@Component
public class Work {
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message) {
System.out.println("message1:" + message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message) {
System.out.println("message2:" + message);
}
}
说明:默认在Spring AMQP实现中Work这种方式就是公平调度,如果需要实现能者多劳需要额外配置。
四、Publish/Subscribe
1.生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testFanout() throws InterruptedException {
rabbitTemplate.convertAndSend("logs", "", "这是⽇志⼴播");
}
2.消费者
@Component
public class Fanout {
@RabbitListener(bindings = @QueueBinding(value = @Queue, exchange = @Exchange(name = "logs", type = "fanout")))
public void receive1(String message) {
System.out.println("message1=" + message);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue, exchange = @Exchange(value = "logs", type = "fanout")))
public void receive2(String message) {
System.out.println("message2=" + message);
}
}
五、Routing
1.生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testDirect() {
rabbitTemplate.convertAndSend("directs", "info", "info⽇志信息");
rabbitTemplate.convertAndSend("directs", "warn", "warn⽇志信息");
rabbitTemplate.convertAndSend("directs", "error", "error⽇志信息");
}
2.消费者
@Component
public class RoutingKey {
@RabbitListener(bindings = @QueueBinding(value = @Queue, key = {"info", "warn", "error"}, exchange = @Exchange(name = "directs", type = "direct")))
public void receive1(String message) {
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue, key = {"error"}, exchange = @Exchange(name = "directs", type = "direct")))
public void receive2(String message) {
System.out.println("message2 = " + message);
}
}
五、Routing
1.生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testTopic() {
rabbitTemplate.convertAndSend("topics", "user.save","user.save消息");
rabbitTemplate.convertAndSend("topics", "user.save.findAll","user.save.findAll消息");
}
2.消费者
@Component
public class Topic {
@RabbitListener(bindings = @QueueBinding(value = @Queue, key = "user.*", exchange = @Exchange(name = "topics", type = "topic")))
public void receive1(String message) {
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue, key = "user.#", exchange = @Exchange(name = "topics", type = "topic")))
public void receive2(String message) {
System.out.println("message2 = " + message);
}
}