关于rabbitmq的用法参考:https://www.cnblogs.com/linkenpark/p/5393666.html
一、 配置
1. Exchange
exchange是一个路由器,消息是经过路由器投递到对应的队列中。
/**
* 消息交换机配置 可以配置多个
*
* @Author administrator
* @date 2019/4/5
*/
@Configuration
public class ExchangeConfig {
/**
* direct exchange: routing key完全匹配才转发
*
* @return
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(RabbitMqConfig.EXCHANGE, true, false);
}
/**
* fanout exchange: 不理会routing key,消息直接广播到所有绑定的queue
*
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(RabbitMqConfig.EXCHANGE, true, false);
}
/**
* topic exchange : 对routing key模式匹配
*
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(RabbitMqConfig.EXCHANGE, true, false);
}
}
2. Queue
队列,存放消息的载体。
@Configuration
public class QueueConfig {
@Bean
public Queue queue(){
return new Queue("whc");
}
}
3. RouteKey
映射规则,根据routekey将队列与路由器绑定。
package com.whc.config;
import com.whc.callback.MsgSendConfirmCallBack;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
*
*/
@Configuration
public class RabbitMqConfig {
/**
* 消息交换机的名字
*/
public static final String EXCHANGE = "exchange_test";
/**
* 队列key1
*/
public static final String ROUTE_KEY_1 = "query_one_key1";
/**
* 队列key2
*/
public static final String ROUTE_KEY_2 = "query_one_key2";
@Autowired
private QueueConfig queueConfig;
@Autowired
private ExchangeConfig exchangeConfig;
/**
* 连接工厂
*/
@Autowired
private ConnectionFactory factory;
/**
* 将消息队列和交换机通过路由key绑定
*
* @return
*/
@Bean
public Binding bindingOne() {
return BindingBuilder.bind(queueConfig.queue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTE_KEY_1);
}
/**
* queue监听器,观察者模式
* 当有消息到达时会通知监听在对应队列上的对象
*
* @return
*/
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory);
container.addQueues(queueConfig.queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(5);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
/**
* 消息确认机制
* Confirms给客户端一种轻量级的方式,能够跟踪哪些消息被broker处理,
* 哪些可能因为broker宕掉或者网络失败的情况而重新发布。
* 确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)
* 在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务
*
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
return rabbitTemplate;
}
@Bean
public MsgSendConfirmCallBack msgSendConfirmCallBack() {
return new MsgSendConfirmCallBack();
}
}
4. pom依赖和配置文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.whc</groupId>
<artifactId>springboot-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbitmq</name>
<description>rabitmq for springboot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--rabbit-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
二、 生产者和消费者
FirstConsumer
package com.whc.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 这里有两个消费者FirstConsumer和HelloReceiver,都订阅了whc这个消息队列,消息消费的原则是:只能由一个消费者消费
*
* @author Administrator
* @date 2019/4/8
*/
@Component
public class FirstConsumer {
@RabbitListener(queues = {"whc"}, containerFactory = "rabbitListenerContainerFactory")
public void handleMessage(String message) throws Exception {
System.out.println("FirstConsumer {} handle message..." + message);
}
}
package com.whc.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @ClassName HelloReceiver
* @Description TODO
* @Author Administrator
* @Date 2018/12/8 19:48
* @Version 1.0
*/
@Component
@RabbitListener(queues = "whc")
public class HelloReceiver {
@RabbitHandler
public void process(String hello) {
System.err.println("Receiver : " + hello);
}
}
package com.whc.sender;
import com.whc.config.RabbitMqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author Administrator
* @date 2019/4/8
*/
@Slf4j
@Component
public class FirstSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
* @param uuid
* @param msg
*/
public void send(String uuid, Object msg){
CorrelationData correlationId = new CorrelationData(uuid);
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTE_KEY_1, msg, correlationId);
}
}
package com.whc.sender;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @ClassName HelloSender
* @Description TODO
* @Author Administrator
* @Date 2018/12/8 19:47
* @Version 1.0
*/
@RestController
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
/**
* 发送单条信息
*/
@GetMapping("/send")
public void send() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String context = "hello " + sdf.format(new Date());
System.out.println("Sender : " + context);
//通过rabbitTemplate.convertAndSend发送context到whc队列中
this.rabbitTemplate.convertAndSend("whc", context);
}
/**
* 循环发送多条信息
*/
@GetMapping("/multiSend")
public void multiSend() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (int i = 0; i < 1000; i++) {
String context = "你好,这是我的第 " + i + "条消息,当前时间为:" + sdf.format(new Date());
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("whc", context);
}
}
}
三、 测试用例
package com.whc;
import com.whc.sender.FirstSender;
import com.whc.sender.HelloSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.UUID;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Autowired
private HelloSender helloSender;
@Test
public void contextLoads() {
}
@Test
public void hello() throws Exception {
helloSender.send();
}
@Autowired
private FirstSender firstSender;
@Test
public void send() throws InterruptedException {
String uuid = UUID.randomUUID().toString();
for (int i = 0; i < 100; ) {
firstSender.send(uuid, "hello world, this id send by the " +(++i)+ " one...");
System.out.println("I am sending you " + i + " message.");
Thread.sleep(100);
}
}
}