1.RabbitMQ(基本理解)
1.它是一个消息中间件
2.主要作用: 给进程(微服务) 提供消息通信的桥梁
3.好处: 解决数据高峰的作用(有一个高可靠性的保证)
2.学前必读
1.Java的基本编码有一定的了解和使用,(比如封装、继承、多态等)
2.学习工具
2.1 IntelliJ IDEA 2019.3
2.2 Maven(3.6.3)
3.事宜人群
1.有一定的Java基础(上面讲过了)
2.想快速的学习一种消息中间件
3.有解决大量消息通信导致收发端程序崩溃的需求
4.学习顺序
1.RabbitMQ基础讲解
2.RabbitMQ整合SpringBoot
5.基本流程图
注意: RabbitMQ 此时为快递公司(都会在以下讲解)
也可以定义虚拟主机 (也就是接受不同的快递) (都会在以下讲解)
也可以定义死信交换机 (都会在以下讲解)
6.RabbitMQ环境安装(基于Linux)
6.1系统准备
VMWare 15.0
RHEL7.2/CentOS 7.2
6.2 安装软件-erlang
http://www.erlang.org/downloads
rpm –i er*
6.3 安装软件-socat
https://pkgs.org/download/socat
rpm –I so*
6.4 安装软件-RabbitMQ-SERVER
http://www.rabbitmq.com/download.html
rpm –I rabb*
6.5 设置开机启动
chkconfig rabbitmq-server on
6.6 启动关闭RabbbitMQ-Server
启动 rabbitmq-server start &
停止 rabbitmqctl app_stop
管理插件:rabbitmq-plugins enable rabbitmq_management
管理员帐号:guest/guest
lsof –i:5672 检查是否启动,可以看到rabbitmq使用的amqp协议
7.RabbitMQ环境安装(基于windows)
如有不懂,可以查看我写的其他的文章
8、RabbitMQ基础开发
1、 Maven添加依RabbitMQ的jar包依赖
配置Maven,添加依赖。把下面的依赖包加入到dependency
<!-- 添加rabbitmq的依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
2、创建生产者(Producer)用于测试RabbitMQ 用于发送数据
package com.study.rabbitmq.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/22 15:46
* @description Producer 生产者
* 创建创建连接和信道
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
/**
* 声明一个队列(货架子)
*
* (这个属性是在第二个参数里设置)
* durable:表示是否持久化写入到磁盘中 如果为true则表示关闭了服务重启后依然有数据 如果为false则表 * 示关闭了服务就没有数据了
* 这个属性则表示如果有必要为持久化那就设置为true 没有必要就为false节省内存空间,提高吞吐 * 量
*
* (这个属性是在第三个参数里设置)
* exclusive:独占 如果设置为true那就表示为整个程序只能让它自己使用,别的程序那就不能使用这个货架子 * 主要特点: 同一时刻,只能有一个程序连接队列。独占队列必定是自动删除的队列,给自己 * 用的,无法实现进程间的通信。
*
* 如果设置为false 则表示所有的程序都可以使用这个货架子,一般我们会把它设置为false * 的
*/
channel.queueDeclare("queueXiaoMi",true,false,false,null);
/**
* channel.basicPublish("","queueXiaoMi",basicProperties,oneData.getBytes());
* 上面方法的作用是:发送数据
*
* (就是第三个参数)
* props:表示是否要传入什么数据 这里的数据是持久化的,是保存到本地磁盘的
* 如果为null则表示什么都不传入
* 也可以传入数据 比如:MessageProperties.PERSISTENT_BASIC 这是系统自带的 当然也可以自己new一个 * (保存一些自定义的一些数据)
* 如下代码:
* */
Map<String,Object> headers = new HashMap<>();
headers.put("姓名","老王");
headers.put("电话","15110012023");
headers.put("地址","江西");
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
.builder()
//设置字符编码集
.contentEncoding("UTF-8")
//设置保存时间是多少 毫秒为单位,如果10秒中内数据没有被消费,那就自动删除了
.expiration("1000000")
//设置它的保存模式 有两种保存模式 1为保存,但不会保存到磁盘上,
// 2为保存,但会保存到磁盘上
.deliveryMode(2)
//自定义的一些属性详情 存放一个Map集合
.headers(headers)
.build();
String oneData = "hello rabbitMq";
//这是以前的,是没有传入持久化的
//channel.basicPublish("","queueXiaoMi",null,oneData.getBytes());
//这是现在的,传入持久化的
//其实最终发送的还是hello rabbitMq这句话,只不过在它的基础上增加了一些附带 //的信息。比如清单信息
channel.basicPublish("","queueXiaoMi",basicProperties,oneData.getBytes());
//关闭信道和连接 顺序从小到大
channel.close();
connection.close();
}
}
3、创建消费者(Consumer)用于测试RabbitMQ 用于接收数据
package com.study.rabbitmq.demo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/22 16:33
* @description Consumer 消费者
*
* 最简单的消息接收
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
//接收数据 最简单的消息接收
//不使用交换机,路由规则即交换机的方式
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String strRecv = "我收到了" + new String(body);
System.out.println(strRecv);
//获取快递清单数据
System.out.println("同时我还获取了快递的清单的详情信息");
Map<String, Object> headers = properties.getHeaders();
if(headers != null){
for (Object value : headers.entrySet()) {
System.out.println(value);
}
}
}
};
channel.basicConsume("queueXiaoMi",true,consumer);
}
}
9、交换机Exchange
交换机的好处是可以更灵活的配置把消息路由到队列
交换机的几种类型:直连型交换机,fanout交换机,topic交换机,header型交换机 (前三种用的比较多)
1、创建直连型交换机-绑定队列-发送数据 (发送了,还没有接收)
package com.study.rabbitmq.p02directdemo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/25 09:36
* @description
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
//声明一个交换机
//第一个参数是它的名字(最好是英文,见名知意),
//第二个参数是它的交换机的类型,
//第三个参数是否要保存到本地磁盘中
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息(这里暂时没有)
channel.exchangeDeclare("交换机direct","direct",true,false,null);
//有了交换机,再申明一个队列出来
//第一个参数是它的名字(最好是英文,见名知意),
//第二个参数是否要保存到本地磁盘中
//第三个参数是否要独占(如果是独占,那就只能让它自己使用了,这样不太好)
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息(这里暂时没有)
channel.queueDeclare("队列direct",true,false,false,null);
//绑定队列
//第一个参数表示 要绑定的队列名称
//第二个参数表示 要把队列绑定到那个交换机上
//第三个参数表示 路由键 (因为这里是直连型交换机,暂时没有路由键,所以可以暂时随便写一个,比如:小兔子乖乖)
channel.queueBind("队列direct","交换机direct","小兔子乖乖");
//发送数据到交换机
//第一个参数表示 要绑定的交换机名称(要发送给哪个交换机)
//第二个参数表示 路由键 跟上面的路由键保持一致
//第三个参数表示 携带的头信息数据(这里暂时没有)
//第四个参数表示 要发送的数据 (这里为了演示,发送了一句话,正常情况下,我们会发送真实的数据)
String oneData = "这是新买的洗面奶";
channel.basicPublish("交换机direct","小兔子乖乖",null,oneData.getBytes());
//关闭信道和连接 顺序从小到大
channel.close();
connection.close();
}
}
2、直连型交换机 控制台观察队列和交换机的绑定关系-消费数据 (接收了)
package com.study.rabbitmq.p02directdemo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/25 09:36
* @description
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String strRecv = "我收到了" + new String(body);
System.out.println(strRecv);
}
};
//获取生产者生产出来的数据
//第一个参数表示 要从哪个队列里获取数据
//第二个参数表示 是否要自动确认
//第三个参数表示 主要通过它来获取数据
channel.basicConsume("队列direct",true, consumer);
//关闭信道和连接 顺序从小到大
/*channel.close();
connection.close();*/
}
}
3、直连型交换机总结
主要是处理跟生产者的关系,然后把数据按照一定的路由规则,放到相应的队列里 这样的一个作用
4、fanout型交换机(废掉路由规则) 创建-绑定-数据消费
不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的
生产数据 fanout型交换机**
package com.study.rabbitmq.p02directdemo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/25 09:36
* @description
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
//========================fanout型交换机====================================
//声明一个fanout型交换机
//第一个参数是它的名字(最好是英文,见名知意),
//第二个参数是它的交换机的类型,
//第三个参数是否要保存到本地磁盘中
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息(这里暂时没有)
channel.exchangeDeclare("交换机fanout","fanout",true,false,null);
//有了交换机,再申明一个队列出来
//第一个参数是它的名字(最好是英文,见名知意),
//第二个参数是否要保存到本地磁盘中
//第三个参数是否要独占(如果是独占,那就只能让它自己使用了,这样不太好)
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息(这里暂时没有)
channel.queueDeclare("队列fanout",true,false,false,null);
//绑定队列
//第一个参数表示 要绑定的队列名称
//第二个参数表示 要把队列绑定到那个交换机上
//第三个参数表示 路由键 这里使用的是fanout型交换机(它给废弃了,所以写和不写都一样)
channel.queueBind("队列fanout","交换机fanout","abc");
//发送数据到交换机
//第一个参数表示 要绑定的交换机名称(要发送给哪个交换机)
//第二个参数表示 路由键 这里使用的是fanout型交换机(它给废弃了,所以写和不写都一样)
//第三个参数表示 携带的头信息数据(这里暂时没有)
//第四个参数表示 要发送的数据 (这里为了演示,发送了一句话,正常情况下,我们会发送真实的数据)
String oneData2 = "这是新买的蛋糕";
channel.basicPublish("交换机fanout","efg",null,oneData2.getBytes());
//关闭信道和连接 顺序从小到大
channel.close();
connection.close();
}
}
消费数据 fanout型交换机
package com.study.rabbitmq.p02directdemo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/25 09:36
* @description
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String strRecv = "我收到了" + new String(body);
System.out.println(strRecv);
}
};
//========================fanout型交换机====================================
//获取生产者生产出来的数据
//第一个参数表示 要从哪个队列里获取数据
//第二个参数表示 是否要自动确认
//第三个参数表示 主要通过它来获取数据
channel.basicConsume("队列fanout",true, consumer);
//关闭信道和连接 顺序从小到大
/*channel.close();
connection.close();*/
}
}
5、topic主题型交换机(主题匹配)用的也是最多的
将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
“.”单词的分隔符(不是必须,可以使用其他分隔符)
“*” 可以匹配一个单词,也只可以是0个单词
“#” 可以匹配多个单词,或者是0个
当一个队列被绑定为routing key为”#”时,它将会接收所有的消息,此时等价于fanout类型交换机。当routing key不包含”*”和”#”时,等价于direct型交换机。
生产数据 topic主题型交换机 #匹配**
package com.study.rabbitmq.p02directdemo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/25 09:36
* @description
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
//========================topic主题型交换机#匹配====================================
//声明一个topic主题型交换机
//第一个参数是它的名字(最好是英文,见名知意),
// “.”单词的分隔符(不是必须,可以使用其他分隔符)
// “*” 可以匹配一个单词,也只可以是0个单词
// “#” 可以匹配多个单词,或者是0个
//第二个参数是它的交换机的类型,
//第三个参数是否要保存到本地磁盘中
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息(这里暂时没有)
channel.exchangeDeclare("交换机topic#","topic",true,false,null);
//有了交换机,再申明一个队列出来
//第一个参数是它的名字(最好是英文,见名知意)
// “.”单词的分隔符(不是必须,可以使用其他分隔符)
// “*” 可以匹配一个单词,也只可以是0个单词
// “#” 可以匹配多个单词,或者是0个
//第二个参数是否要保存到本地磁盘中
//第三个参数是否要独占(如果是独占,那就只能让它自己使用了,这样不太好)
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息(这里暂时没有)
channel.queueDeclare("队列topic#",true,false,false,null);
//绑定队列
//第一个参数表示 要绑定的队列名称
//第二个参数表示 要把队列绑定到那个交换机上
//第三个参数表示 路由键 这里使用的是topic主体型交换机 可以使用通配符
// “.”单词的分隔符(不是必须,可以使用其他分隔符)
// “*” 可以匹配一个单词,也只可以是0个单词
// “#” 可以匹配多个单词,或者是0个
channel.queueBind("队列topic#","交换机topic#","#.乖乖");
//发送数据到交换机
//第一个参数表示 要绑定的交换机名称(要发送给哪个交换机)
//第二个参数表示 路由键 这里使用的是topic主体型交换机 可以使用通配符
// “.”单词的分隔符(不是必须,可以使用其他分隔符)
// “*” 可以匹配一个单词,也只可以是0个单词
// “#” 可以匹配多个单词,或者是0个
//第三个参数表示 携带的头信息数据(这里暂时没有)
//第四个参数表示 要发送的数据 (这里为了演示,发送了一句话,正常情况下,我们会发送真实的数据)
String oneData2 = "应该发送到(队列topic#里的数据)这是新买的iPad";
channel.basicPublish("交换机topic#","小兔子.乖乖",null,oneData2.getBytes());
//关闭信道和连接 顺序从小到大
channel.close();
connection.close();
}
}
消费数据 topic#主题型交换机 #匹配
package com.study.rabbitmq.p02directdemo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/25 09:36
* @description
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String strRecv = "我收到了" + new String(body);
System.out.println(strRecv);
}
};
//========================topic主题型交换机#匹配====================================
//获取生产者生产出来的数据
//第一个参数表示 要从哪个队列里获取数据
//第二个参数表示 是否要自动确认
//第三个参数表示 主要通过它来获取数据
channel.basicConsume("队列topic#",true, consumer);
//关闭信道和连接 顺序从小到大
/*channel.close();
connection.close();*/
}
}
*生产数据 topic主题型交换机 匹配
package com.study.rabbitmq.p02directdemo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/25 09:36
* @description
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
//========================topic主题型交换机*匹配====================================
//声明一个topic主题型交换机
//第一个参数是它的名字(最好是英文,见名知意),
// “.”单词的分隔符(不是必须,可以使用其他分隔符)
// “*” 可以匹配一个单词,也只可以是0个单词
// “#” 可以匹配多个单词,或者是0个
//第二个参数是它的交换机的类型,
//第三个参数是否要保存到本地磁盘中
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息(这里暂时没有)
channel.exchangeDeclare("交换机topic*","topic",true,false,null);
//有了交换机,再申明一个队列出来
//第一个参数是它的名字(最好是英文,见名知意)
// “.”单词的分隔符(不是必须,可以使用其他分隔符)
// “*” 可以匹配一个单词,也只可以是0个单词
// “#” 可以匹配多个单词,或者是0个
//第二个参数是否要保存到本地磁盘中
//第三个参数是否要独占(如果是独占,那就只能让它自己使用了,这样不太好)
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息(这里暂时没有)
channel.queueDeclare("队列topic*",true,false,false,null);
//绑定队列
//第一个参数表示 要绑定的队列名称
//第二个参数表示 要把队列绑定到那个交换机上
//第三个参数表示 路由键 这里使用的是topic主体型交换机 可以使用通配符
// “.”单词的分隔符(不是必须,可以使用其他分隔符)
// “*” 可以匹配一个单词,也只可以是0个单词
// “#” 可以匹配多个单词,或者是0个
channel.queueBind("队列topic*","交换机topic*","*.乖乖");
//发送数据到交换机
//第一个参数表示 要绑定的交换机名称(要发送给哪个交换机)
//第二个参数表示 路由键 这里使用的是topic主体型交换机 可以使用通配符
// “.”单词的分隔符(不是必须,可以使用其他分隔符)
// “*” 可以匹配一个单词,也只可以是0个单词
// “#” 可以匹配多个单词,或者是0个
//第三个参数表示 携带的头信息数据(这里暂时没有)
//第四个参数表示 要发送的数据 (这里为了演示,发送了一句话,正常情况下,我们会发送真实的数据)
String oneData2 = "应该发送到(队列topic*里的数据)这是新买的iPad";
channel.basicPublish("交换机topic*","小兔子.乖乖",null,oneData2.getBytes());
//关闭信道和连接 顺序从小到大
channel.close();
connection.close();
}
}
*消费数据 topic#主题型交换机 匹配
package com.study.rabbitmq.p02directdemo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/25 09:36
* @description
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String strRecv = "我收到了" + new String(body);
System.out.println(strRecv);
}
};
//========================topic主题型交换机 *匹配====================================
//获取生产者生产出来的数据
//第一个参数表示 要从哪个队列里获取数据
//第二个参数表示 是否要自动确认
//第三个参数表示 主要通过它来获取数据
channel.basicConsume("队列topic*",true, consumer);
//关闭信道和连接 顺序从小到大
/*channel.close();
connection.close();*/
}
}
6、Header自定义属性型交换机
不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。
匹配规则x-match有下列两种类型:
x-match = all :表示所有的键值对都匹配才能接受到消息(不包括x-match)
x-match = any :表示至少有一个键值对匹配就能接受到消息
详情见视频 http://www.sikiedu.com/course/629/task/51898/show
// header交换机
channel.exchangeDeclare("exchange交换机小米-header-mathc-all","headers",true,false,null);
channel.queueDeclare("Queue小米header-mathc-all", true, false, false, null);
//设置消息头键值对信息
Map<String, Object> headers = new Hashtable<String,Object>();
//这里x-match有两种类型
//all:表示所有的键值对都匹配才能接受到消息
//any:表示最少一个键值对匹配就能接受到消息
headers.put("x-match", "any");
headers.put("name", "jack");
headers.put("age" , 31);
channel.queueBind("Queue小米header-mathc-all","exchange交换机小米-header-mathc-all", "", headers);
10、设置大小限制的队列
package com.study.rabbitmq.p03MaxLengthQueue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/28 10:55
* @description
*/
public class ProducerMaxLengthQueue {
public static void main(String[] args) {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = null;
try {
connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
Map<String,Object> map = new HashMap<>();
map.put("x-max-length",2); //最大保存2个消息(包括2个消息)
//第一个参数是它的名字(最好是英文,见名知意)
//第二个参数是否持久化到磁盘中
//第三个参数是否独占
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息 比如设置大小限制的队列
channel.queueDeclare("队列-最多保存2个消息",true,false,false,map);
String str = "最大限制为2";
channel.basicPublish("","队列-最多保存2个消息", null, str.getBytes());
//关闭信道和连接 顺序从小到大
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
11、 生产端确认消息是否发送到了MQ
生产者将信道设置成confirm模式,一旦消息被被MQ接收了,MQ就会发送一个确认给生产者(包含消息的唯一ID);否则就表示消息没有到达MQ,可能由于网络闪断等原因。
package com.study.rabbitmq.p04ConfirmListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/28 10:55
* @description
*/
public class ProducerConfirmListener {
public static void main(String[] args) {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = null;
try {
connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
//第一个参数是它的名字(最好是英文,见名知意)
//第二个参数是否持久化到磁盘中
//第三个参数是否独占
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息 这里没有
channel.queueDeclare("确认消息是否收到",true,false,false,null);
String str = "确认消息是否收到MQ";
//打开生产者的确认模式
channel.confirmSelect();
//添加确认监听
channel.addConfirmListener((deliveryTag, multiple) -> {
System.out.println("数据成功到达MQ");
}, (deliveryTag, multiple)->{
System.out.println("数据出错,可能原因是网络闪断,请排错");
});
channel.basicPublish("","确认消息是否收到", null, str.getBytes());
//关闭信道和连接 顺序从小到大
//channel.close();
//connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
12、 Return 作用(确保路由规则设置正确)
在某种情况下,如果我们在发送消息的时候,当前的路由key错误,需要监听这种不可达的消息,就要使用return listener。
basicPublish 的参数Mandatory为false时,如果消息无法正确路由到队列后,会被MQ直接丢失
basicPublish 的参数mandatory设置true,消息无法正确路由到队列后,会返还给发送者
package com.study.rabbitmq.p05ReturnListener;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/29 09:51
* @description
*/
public class ProducerReturnListener {
public static void main(String[] args) {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = null;
try {
connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
//创建一个交换机
//声明一个直连型交换机
//第一个参数是它的名字(最好是英文,见名知意),
//第二个参数是它的交换机的类型,
//第三个参数是否要保存到本地磁盘中
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息(这里暂时没有)
channel.exchangeDeclare("交换机ReturnListener","direct",true,false,null);
//有了交换机就创建一个相对应的队列
//第一个参数是它的名字(最好是英文,见名知意)
//第二个参数是否持久化到磁盘中
//第三个参数是否独占
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息 这里没有
channel.queueDeclare("队列ReturnListener",true,false,false,null);
//进行绑定
channel.queueBind("队列ReturnListener","交换机ReturnListener","有内鬼停止交易");
String str = "确认消息是否收到MQ";
//添加确认监听
channel.addReturnListener(
// 第一个参数 返回的数字码
// 第二个参数 返回的文本信息
// 第三个参数 是哪个交换机
// 第四个参数 路由键
// 第五个参数 附带的信息 (信息清单等)
// 第六个参数 真正传输的数据是什么
new ReturnListener() {
@Override
public void handleReturn(
int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
System.out.println(properties);
System.out.println(body);
}
}
);
//basicPublish方法,里面的参数mandatory设置true,消息无法正确路由到队列后,会返还给发送者
// mandatory 则表示手动处理返回的信息 上面的ReturnListener才会得到错误的路由信息
channel.basicPublish("交换机ReturnListener","有内鬼停止交易000", true, null, str.getBytes());
//关闭信道和连接 顺序从小到大
//channel.close();
//connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
13、消费者限流QOS(削峰作用、重回队列)
//同一时刻服务器只会发送一条消息给消费者
channel.basicQos(10000);
//据说prefetchSize 和global这两项,rabbitmq没有实现,暂且不研究
channel.basicQos(0, 10000, false);
//消费者确认一条,再处理一条。
//最后一个false,是否为多条。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
//消费时必须取消自动确认
channel.basicConsume("队列Qos限流",true,cosumer);
示例代码:
package com.study.rabbitmq.p06ConsumerQos;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/29 10:42
* @description 消费者
*/
public class ConsumerTomato {
public static void main(String[] args) {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = null;
try {
connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
//消费之前,先限流
//第二个参数起到作用,表示每次最多处理一条消息
channel.basicQos(0,1,false);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String strRecv = "我收到了" + new String(body);
System.out.println(strRecv);
//收到消息后,在通知消息中间件,在给我数据
// 第一个参数用于标识唯一包裹
// 第二个参数用于批处理 一般为false
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//autoAck 消费时必须取消自动确认,否则消息会直接都扔过来
channel.basicConsume("队列Qos限流",false,consumer);
//关闭信道和连接 顺序从小到大
// channel.close();
// connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
14、自定义消费者处理
自己写的工具类
package com.study.rabbitmq.p07CustomConsumerClass;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
/**
* @author 武帅
* @date 2020/5/30 09:28
* @description 自己定义一个工具类
*/
public class ConsumerTomatoTool extends DefaultConsumer {
public ConsumerTomatoTool(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String strRecv = "我收到了" + new String(body);
System.out.println(strRecv);
//收到消息后,在通知消息中间件,在给我数据
// 第一个参数用于标识唯一包裹
// 第二个参数用于批处理 一般为false
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
}
生产者代码 其实没变
package com.study.rabbitmq.p07CustomConsumerClass;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/29 10:36
* @description
*/
public class ProducerTomato {
public static void main(String[] args) {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = null;
try {
connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
//创建一个交换机
//声明一个直连型交换机
//第一个参数是它的名字(最好是英文,见名知意),
//第二个参数是它的交换机的类型,
//第三个参数是否要保存到本地磁盘中
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息(这里暂时没有)
channel.exchangeDeclare("交换机Qos限流","direct",true,false,null);
//有了交换机就创建一个相对应的队列
//第一个参数是它的名字(最好是英文,见名知意)
//第二个参数是否持久化到磁盘中
//第三个参数是否独占
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息 这里没有
channel.queueDeclare("队列Qos限流",true,false,false,null);
//进行绑定
channel.queueBind("队列Qos限流","交换机Qos限流","西红柿好吃吗");
String str = "我爱西红柿";
for (int i = 1; i <=5 ; i++) {
channel.basicPublish("交换机Qos限流","西红柿好吃吗", false,null, str.getBytes());
}
//关闭信道和连接 顺序从小到大
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
消费者代码
package com.study.rabbitmq.p07CustomConsumerClass;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/29 10:42
* @description 消费者
*/
public class ConsumerTomato {
public static void main(String[] args) {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = null;
try {
connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
//消费之前,先限流
//第二个参数起到作用,表示每次最多处理一条消息
channel.basicQos(0,1,false);
//自己定义的工具类
ConsumerTomatoTool consumer = new ConsumerTomatoTool(channel);
//autoAck 消费时必须取消自动确认,否则消息会直接都扔过来
channel.basicConsume("队列Qos限流",false,consumer);
//关闭信道和连接 顺序从小到大
// channel.close();
// connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
16、 TTL(消息的最大生存时长)
TTL是Time To Live的缩写, 也就是生存时间
RabbitMQ支持消息的过期时间, 在消息发送时可以进行指定
RabbitMQ支持队列的过期时间, 从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除
队列里的所有消息最长的生存时间60秒
package com.study.rabbitmq.p08QueueTTL;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/30 09:53
* @description
*/
public class ProducerBanana {
public static void main(String[] args) {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = null;
try {
connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
//创建一个交换机
//声明一个直连型交换机
//第一个参数是它的名字(最好是英文,见名知意),
//第二个参数是它的交换机的类型,
//第三个参数是否要保存到本地磁盘中
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息(这里暂时没有)
channel.exchangeDeclare("交换机TTL60秒","direct",true,false,null);
// 定义整个队列的所有的消息的TTL(最大存活时间)为20秒 当然通常情况下,时间控制的不会这么短
Map<String,Object> map = new HashMap<>();
// 20秒后消息还没有被消费,会被丢弃
map.put("x-message-ttl",20000);
//有了交换机就创建一个相对应的队列
//第一个参数是它的名字(最好是英文,见名知意)
//第二个参数是否持久化到磁盘中
//第三个参数是否独占
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息 这里携带的是,数据最多的存活时间 如果使用则写在最后一个参数里,再把str注释打开
channel.queueDeclare("队列TTL60秒",true,false,false,null);
//进行绑定
channel.queueBind("队列TTL60秒","交换机TTL60秒","香蕉好吃吗");
//String str = "我爱吃香蕉";
//channel.basicPublish("交换机TTL60秒","香蕉好吃吗", false,null, str.getBytes());
//这个是设置某一条消息的最大存活时间 为7秒
String str222 = "我爱吃香蕉222";
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder()
.expiration("7000")
.build();
channel.basicPublish("交换机TTL60秒","香蕉好吃吗", false,properties, str222.getBytes());
//关闭信道和连接 顺序从小到大
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
17、 死信队列(问题产品(专用)货架)
测试时,让消息过期后,消息会被转移到死信队列上
出现死信的三种情况
1、消息被拒绝(basic.reject/ basic.nack)并且requeue=false
2、消息TTL过期(参考:RabbitMQ之TTL(Time-To-Live 过期时间))
3、队列达到最大长度
给一个正常队列A添加一个死信队列DLX,当A中出现死信时,会被自动路由到DLX交换机对应的队列,从而不去影响正常的队列处理数据
死信队列的原理和作用:
准备基本的生产者和消费者的代码
生产者
package com.study.rabbitmq.p09DeadLetter;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/30 09:53
* @description
*/
public class ProducerBanana {
public static void main(String[] args) {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = null;
try {
connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
//创建一个交换机
//声明一个直连型交换机
//第一个参数是它的名字(最好是英文,见名知意),
//第二个参数是它的交换机的类型,
//第三个参数是否要保存到本地磁盘中
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息(这里暂时没有)
channel.exchangeDeclare("交换机正常生产20秒","direct",true,false,null);
// 定义整个队列的所有的消息的TTL(最大存活时间)为20秒 当然通常情况下,时间控制的不会这么短
Map<String,Object> map = new HashMap<>();
// 20秒后消息还没有被消费,会被丢弃
map.put("x-message-ttl",20000);
//有了交换机就创建一个相对应的队列
//第一个参数是它的名字(最好是英文,见名知意)
//第二个参数是否持久化到磁盘中
//第三个参数是否独占
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息 这里携带的是,数据最多的存活时间 如果使用则写在最后一个参数里, 再把str注释打开
channel.queueDeclare("队列正常生产活20秒",true,false,false,map);
//进行绑定
channel.queueBind("队列正常生产活20秒","交换机正常生产20秒","香蕉好吃吗");
String str = "我爱吃香蕉";
channel.basicPublish("交换机正常生产20秒","香蕉好吃吗", false,null, str.getBytes());
//关闭信道和连接 顺序从小到大
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
消费者
package com.study.rabbitmq.p09DeadLetter;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/30 11:00
* @description
*/
public class ConsumerBanana {
public static void main(String[] args) {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = null;
try {
connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String strRecv = "我收到了" + new String(body);
System.out.println(strRecv);
//收到消息后,在通知消息中间件,在给我数据
// 第一个参数用于标识唯一包裹
// 第二个参数用于批处理 一般为false
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//autoAck 消费时必须取消自动确认,否则消息会直接都扔过来
channel.basicConsume("交换机正常生产",false,consumer);
//关闭信道和连接 顺序从小到大
// channel.close();
// connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
准备死信的生产者和死信的消费者的代码
死信的生产者 在这里死信的生产者和基本的生产者是写在一个类里面
package com.study.rabbitmq.p09DeadLetter;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/30 09:53
* @description
*/
public class ProducerBanana {
public static void main(String[] args) {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = null;
try {
connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
//创建一个死信交换机
channel.exchangeDeclare("交换机---处理死信","topic",true,false,null);
//创建一个死信队列
channel.queueDeclare("队列---处理死信",true,false,false,null);
//死信交换机和死信队列 进行绑定
channel.queueBind("队列---处理死信","交换机---处理死信","#");
//创建一个交换机
//声明一个直连型交换机
//第一个参数是它的名字(最好是英文,见名知意),
//第二个参数是它的交换机的类型,
//第三个参数是否要保存到本地磁盘中
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息(这里暂时没有)
channel.exchangeDeclare("交换机正常生产20秒","direct",true,false,null);
// 定义整个队列的所有的消息的TTL(最大存活时间)为20秒 当然通常情况下,时间控制的不会这么短
Map<String,Object> map = new HashMap<>();
// 20秒后消息还没有被消费,会被丢弃
map.put("x-message-ttl",20000);
// 正常队列 和 死信交换机进行绑定
map.put("x-dead-letter-exchange","交换机---处理死信");
//有了交换机就创建一个相对应的队列
//第一个参数是它的名字(最好是英文,见名知意)
//第二个参数是否持久化到磁盘中
//第三个参数是否独占
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息 这里携带的是,数据最多的存活时间 如果使用则写在最后一个参数里, 再把str注释打开
channel.queueDeclare("队列正常生产活20秒",true,false,false,map);
//进行绑定
channel.queueBind("队列正常生产活20秒","交换机正常生产20秒","香蕉好吃吗");
String str = "我爱吃香蕉";
channel.basicPublish("交换机正常生产20秒","香蕉好吃吗", false,null, str.getBytes());
//关闭信道和连接 顺序从小到大
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
死信的消费者
package com.study.rabbitmq.p09DeadLetter;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 武帅
* @date 2020/5/30 11:00
* @description
*/
public class ConsumerDeadLetter {
public static void main(String[] args) {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = null;
try {
connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String strRecv = "我收到了" + new String(body);
System.out.println("确认了死信的信息:"+strRecv);
//收到消息后,在通知消息中间件,在给我数据
// 第一个参数用于标识唯一包裹
// 第二个参数用于批处理 一般为false
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//autoAck 消费时必须取消自动确认,否则消息会直接都扔过来
channel.basicConsume("队列---处理死信",false,consumer);
//关闭信道和连接 顺序从小到大
// channel.close();
// connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
拒绝消息队列的代码
public static void main(String[] args) {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = null;
try {
connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String strRecv = "我收到了" + new String(body);
System.out.println("确认了死信的信息:"+strRecv+"拒绝消息");
// 拒绝 消费队列的消息
// 第一个参数用于标识唯一包裹
// 第二个参数用于批处理 一般为false
channel.basicReject(envelope.getDeliveryTag(), false);
}
};
//autoAck 消费时必须取消自动确认,否则消息会直接都扔过来
channel.basicConsume("队列---处理死信",false,consumer);
//关闭信道和连接 顺序从小到大
// channel.close();
// connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
死信处理 ---队列达到最大长度
public static void main(String[] args) {
// 创建连接工厂,用来修建高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置基本配置
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建一条高速公路
Connection connection = null;
try {
connection = connectionFactory.newConnection();
//画定双向车道
Channel channel = connection.createChannel();
//创建一个交换机
//声明一个直连型交换机
//第一个参数是它的名字(最好是英文,见名知意),
//第二个参数是它的交换机的类型,
//第三个参数是否要保存到本地磁盘中
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息(这里暂时没有)
channel.exchangeDeclare("交换机正常生产-队列最多保存2个","direct",true,false,null);
Map<String,Object> map = new HashMap<>();
map.put("x-max-length",2);
map.put("x-dead-letter-exchange","交换机---处理死信");
//有了交换机就创建一个相对应的队列
//第一个参数是它的名字(最好是英文,见名知意)
//第二个参数是否持久化到磁盘中
//第三个参数是否独占
//第四个参数是否要自动删除
//第五个参数是它要携带的头信息 这里携带的是,最多保存多少个数据 如果使用则写在最后一个参数里
channel.queueDeclare("队列正常生产-队列最多保存2个",true,false,false,map);
//进行绑定
channel.queueBind("队列正常生产-队列最多保存2个","交换机正常生产-队列最多保存2个","香蕉好吃吗");
String str = "我爱吃香蕉";
for (int i = 1; i <=3 ; i++) {
channel.basicPublish("交换机正常生产-队列最多保存2个","香蕉好吃吗", false,null, str.getBytes());
}
//关闭信道和连接 顺序从小到大
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
其余的RabbitMQ和Spring的整合(详情见RabbitMQ(基本使用2))