一、简介安装
先安装erlang在安装RabbitMQ
启动:RabbitMQ Command Prompt(以管理员身份运行)
命令:
rabbitmq-service.bat remove
set RABBITMQ_BASE=D:\android_tool\java\erl_rabbit_mq\rabbitmq_server-3.9.15\data
rabbitmq-service.bat install
rabbitmq-plugins enable rabbitmq_management
rabbitmq-service start
http://127.0.0.1:15672/
账号密码:guest
二、普通消息队列使用
1、pom引入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、yml配置
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: guest
password: guest
3、代码实现
new Queue("simple_queue")创建消息队列
RabbitTemplate调用convertAndSend("simple_queue","消息体")方法发送消息
通过方法注解@RabbitListener(queues = "simple_queue")接收消息
三、WorkQueue
1、WorkQueue模型配置
listener:
simple:
prefetch: 1 #设置预取获取消息条数
2、创建消息队列
@Bean
public Queue directQueue() {
return new Queue("simple_queue");
}
3、发送消息
@Test
public void testRabbitTemplate() throws InterruptedException {
String queueName = "simple_queue";
String message = "hello spring amqp";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName, message);
Thread.sleep(20);
}
}
4、接受消息
@RabbitListener(queues = "simple_queue")
public void listenerSimpleQueue1(String msg) throws InterruptedException {
System.out.println("消息1:" + msg + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple_queue")
public void listenerSimpleQueue2(String msg) throws InterruptedException {
System.out.println("消息2:" + msg + LocalTime.now());
Thread.sleep(200);
}
四、FanoutExchange
1、发布订阅FanoutExchange配置:将消息发送给所有绑定的消息队列
@Configuration
public class FanoutConfig {
/**
* @return 声明FanoutExchange交换机
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("li.fanout");
}
/**
* @return 声明Queue消息队列
*/
@Bean
public Queue queue1() {
return new Queue("fanout_queue1");
}
@Bean
public Queue queue2() {
return new Queue("fanout_queue2");
}
/**
* 绑定队列和交换机
*
* @param queue1 消息队列
* @param fanoutExchange 交换机
* @return 绑定
*/
@Bean
public Binding bindingQueue1(Queue queue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
/**
* 绑定队列和交换机
*
* @param queue2 消息队列
* @param fanoutExchange 交换机
* @return 绑定
*/
@Bean
public Binding bindingQueue2(Queue queue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue2).to(fanoutExchange);
2、发送交换机消息
@Test
public void testSendFanoutExchange(){
//交换机名称
String exchangeName = "li.fanout";
//消息
String message = "hello FanoutExchange";
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
3、接收交换机消息
@RabbitListener(queues = "fanout_queue1")
public void listenerLiQueue1(String msg) {
System.out.println("消息1:" + msg + LocalTime.now());
}
@RabbitListener(queues = "fanout_queue2")
public void listenerLiQueue2(String msg) {
System.out.println("消息2:" + msg + LocalTime.now());
}
五、DirectExchange:将接收到消息根据规则路由到指定的queue
1、发送消息
@Test
public void testSendDirectExchange(){
//交换机名称
String exchangeName = "li.direct";
//消息
String message = "hello DirectExchange";
rabbitTemplate.convertAndSend(exchangeName,"red",message);
}
2、接收消息
/**
* DirectExchange交换机接收消息
*
* @param msg 消息体
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct_queue1"),
exchange = @Exchange(name = "li.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenerDirectExchange1(String msg) {
System.out.println("listenerDirectExchange1:" + msg + LocalTime.now());
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct_queue2"),
exchange = @Exchange(name = "li.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenerDirectExchange2(String msg) {
System.out.println("listenerDirectExchange2:" + msg + LocalTime.now());
}
六、TopicExchange:区别于routingKey必须是多个单词的列表,并且以.分割#表示通配符
1、发送消息
@Test
public void testSendTopicExchange(){
//交换机名称
String exchangeName = "li.topic";
//消息
String message = "hello topic exchange";
rabbitTemplate.convertAndSend(exchangeName,"#.news",message);
}
2、接收消息
/**
* TopicExchange交换机接收消息
*
* @param msg 消息体
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic_queue1"),
exchange = @Exchange(name = "li.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenerTopicExchange1(String msg) {
System.out.println("listenerDirectExchange1:" + msg + LocalTime.now());
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic_queue2"),
exchange = @Exchange(name = "li.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenerTopicExchange2(String msg) {
System.out.println("listenerDirectExchange2:" + msg + LocalTime.now());
}
七、消息转换
1、pom引入
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency
2、转换器替换
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}