RabbitMQ使用
你好!欢迎来到Java成长笔记,主要是用于相互交流,相互学习,也希望分享能帮到大家,如有错误之处,希望指正,谢谢!
RabbitMQ简介
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
Erlang语言最初用于交换机领域的架构模式,这样使得RabbitMQ在Broker之间进行数据交互的性能是非常优秀的,Erlang有着和原生Socket一样的延迟,这也是RabbitMQ高性能的原因。
Rabbit科技有限公司开发了RabbitMQ,并提供对其的支持。起初,Rabbit科技是LSHIFT和CohesiveFT在2007年成立的合资企业,2010年4月被VMware旗下的SpringSource收购。RabbitMQ在2013年5月成为GoPivotal的一部分。
常用消息中间件协议
AMQP协议
AMQP即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
优点:可靠、通用。
MQTT协议
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。
优点:格式简洁、占用带宽小、移动端通信、PUSH、嵌入式系统。
STOMP协议
STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。
优点:命令模式(非topic\queue模式)。
XMPP协议
XMPP(可扩展消息处理现场协议,Extensible Messaging and Presence Protocol)是基于可扩展标记语言(XML)的协议,多用于即时消息(IM)以及在线现场探测。适用于服务器之间的准即时操作。核心是基于XML流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同。
优点:通用公开、兼容性强、可扩展、安全性高,但XML编码格式占用带宽大。
其他基于TCP/IP自定义的协议
一些特殊框架(如:redis、kafka、zeroMq等)根据自身需要未严格遵循MQ规范,而是基于TCP/IP自行封装了一套协议,通过网络socket接口进行传输,实现了MQ的功能。
消息中间件优点
1、异步解耦
通过上下游业务的松耦合涉及,即使下游子系统出现不可用或者宕机,都不会影响核心交易系统的正产运转。
2、削峰填谷
在诸如抢红包、秒杀等活动带来的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,削峰填谷是解决该问题的最佳方式,
3、支持分布式部署,能够保证消息传递的高效和可靠,实现高并发、高可用、高性能,合理使用突破性能瓶颈。
常用消息中间件介绍
RabbitMQ安装
友情提示,RabbitMQ需要和Erlang版本相对应,本文使用的RabbitMQ版本是rabbitmq-server-generic-unix-3.8.3.tar.xz、Erlang版本是otp_src_22.2.tar
RabbitMQ下载地址
1、RabbitMQ官网地址
2、Erlang下载地址
3、RabbitMQ中文文档
RabbitMQ安装与常用命令
1、通过wget来安装
$ wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
$ sudo rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
2、通过zip文件安装
$ sudo yum install epel-release
$ sudo yum install erlang
3、rabbitmq启动
rabbitmqctl start_app
4、服务的停止
rabbitmqctl stop_app
5、查看节点状态
rabbitmqctl status
6、插件管理
rabbitmq- plugins enable rabbitmq_ management
7、管理后台地址
htp:/127.0.0.1:15672
8、添加用户
rabbitmqctl add_user username password
9、列出所有用户
rabbitmqctl list_users
10、删除用户
rabbitmqctl delete_user username
11、清除用户权限
rabbitmqctl clear_permissions -p vhostpath username
12、列出用户权限
rabbitmgctl list_user_permissions username
13、修改密码
rabbitmqctl change_password username newpassword
14、设置用户权限
rabbitmqctl set_permissions-p vhostpath username “.*”".*"".*"
15、创建虚拟主机
rabbitmactl add_vhost vhostpath
16、列出所有虚拟主机
rabbitmqctl list_vhosts
17、列出虚拟主机上所有权限
rabbitmqctl list_permissions -p vhostpath
18、删除虚拟主机
rabbitmqctl delete_vhost vhostpath
19、查看所有队列信息
rabbitmqctl list_queues
20、清除队列里的消息
rabbitmqctl -o vhostpath purge_queue blue
21、移除所有数据,要在rabbitmqctl stop_app之后使用
rabbitmqctl reset
22、组成集群命令
rabbitmqctl join_cluster< clusternode>[--ram]
23、查看集群状态
rabbitmqctl cluster_status
24、修改集群节点的存储形式
rabbitmqctl change_cluster_node_type disc ram
25、忘记节点(摘除节点)
rabbitmactl forget_cluster_node[--offline]
26、修改节点名称
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2][newnode2...]
RabbitMQ常用名词解释
1、Server:又称 Broker,接受客户端的连接,实现AMQP实体服务
2、Connection:连接,应用程序与Broker的网络连接
3、Channel:网络信道,几乎所有的操作都在 Channels中进行,Channel是进行消息读写的通道。客户端可建立多个 Channel,每个Channel代表一个会话任务。
4、Message:消息、服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则就是消息体内容。
5、Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个VirtualHost里面不能有相同名称的Exchange或Queue
6、Exchange:交换机,接收消息,根据路由鍵转发消息到绑定的队列
7、Binding:Exchange和Queue之间的虚拟连接,binding中可以包含 routing key
8、Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息
9、Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者
10、Connectionfactory:获取连接工厂
RabbitMQ基本使用
交换机基本属性
Name:交换机名称
Type:交换机类型direct、topic、fanout、headers
Durability:是否需要持久化,true为持久化 false不持久化
Auto Delete:当最后一个绑定到 Exchange上的队列删除后,自动删除该 Exchange
Internal:当前Exchange是否用于Rabbitmqp内部使用,默认为 False
Arguments:扩展参数,用于扩展AMQP协议自制定化使用
引入依赖
1、需要引入的maven依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、基本使用
// 消费端代码
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import static com.show.rabbit.RabbitMqUtil.rabbitVo;
@Slf4j
public class Consumer {
public static void main(String[] args) throws Exception {
// 1、创建一个Connectionfactory,并进行配置
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(rabbitVo.getHost());
connectionFactory.setPort(rabbitVo.getPort());
connectionFactory.setUsername(rabbitVo.getUserName());
connectionFactory.setPassword(rabbitVo.getPassWord());
connectionFactory.setVirtualHost(rabbitVo.getVirtualHost());
// 2、通过连接工厂的建连接
final Connection connection = connectionFactory.newConnection();
// 3、通过 connectioni的建一个 Channe1
final Channel channel = connection.createChannel();
// 4、申明创建一个队列 持久化服务器重启队列不会消失
final String queueName = "e-rabbitmq";
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
channel.queueDeclare(queueName, true, false, false, null);
// 5、创建消费者
final DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.error("消息内容:" + new String(body));
}
};
// 6、设置Channel
channel.basicConsume(queueName, true, defaultConsumer);
log.error(" 接收消息成功 ---> " + System.currentTimeMillis());
}
}
// 生产端代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import static com.show.rabbit.RabbitMqUtil.rabbitVo;
@Slf4j
public class Procuder {
public static void main(String[] args) throws Exception {
// 1、创建一个Connectionfactory,并进行配置
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(rabbitVo.getHost());
connectionFactory.setPort(rabbitVo.getPort());
connectionFactory.setUsername(rabbitVo.getUserName());
connectionFactory.setPassword(rabbitVo.getPassWord());
connectionFactory.setVirtualHost(rabbitVo.getVirtualHost());
// 2、通过连接工厂的建连接
final Connection connection = connectionFactory.newConnection();
// 3、通过 connectioni的建一个 Channe1
final Channel channel = connection.createChannel();
// 4、通过channe1发送数据
try {
for(int i=0;i<5;i++){
final String msg = "Hello RabbitMq";
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
channel.basicPublish("", "e-rabbitmq", null, msg.getBytes());
}
} finally {
// 5、记得要关闭连接
channel.close();
connection.close();
}
log.error(" 消息发送成功 ---> " + System.currentTimeMillis());
}
}
上面模式使用需要注意:消息生产者并没有指定Exchange,只指定了队列名称,消息消费端会根据AMQP default模式来进行处理,也就是根据与Routingkey名称相同的队列来进行处理。如下图所示:
Exchange三种类型使用
主要API介绍
// exchange:消息 Exchange 名称
// type:消息 Exchange 类型
// durable:是否持久化 true:是 false:否
// autoDelete:true表示当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
// internal:当前Exchange是否用于RabbitMQ内部使用,默认为False
// arguments:扩展参数,用于扩展AMQP协议自制定化使用
// 申明一个交换机
DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
// queue:队列名称
// durable:是否持久化 true:是 false:否
// exclusive:设置为独占、只有一个队列可以使用
// 创建一个队列
DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
// queue:队列名称
// exchagne:路由名称
// routingKey:路由key
// 建立路由、routingKey、队列绑定关系
BindOk queueBind(String queue, String exchange, String routingKey)
// BasicProperties 对象属性
Map<String,Object> map = ImmutableMap.of("msg1","msg1","msg2","msg2");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 1、不是持久化 2、持久化
.contentEncoding("UTF-8") // 设置字符集
.expiration("10000") // 10s没有消费会自动移除
.headers(map) //
.build();
// 获取消息信息
void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
log.error("消息内容:" + new String(body) + "BasicProperties属性:" + JSON.toJSONString(properties));
Direct Exchange使用
Direct模式:所有发送到Direct类型的Exchange消息被转发到Routekey中指定的Queue,注意:Direct模式可以使用RabbitMQ自带的Exchange:default
Exchange,所以不需要将Exchange进行任何绑定( binding)操作,消息传递时,Routekey必须完全匹配才会被队列接收,否则该消息会被抛弃。代码展示如下:
// 消费端代码
import com.rabbitmq.client.*;
import com.show.enums.MqTypeEnum;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ConsumerDirect {
public static void main(String[] args) throws Exception {
// 1、通过连接工厂的建连接
final Connection connection = mqFactory.newConnection();
// 2、通过 connection 建一个 Channe1
final Channel channel = connection.createChannel();
// 申明一个交换机
channel.exchangeDeclare(MqConfig.directExChange, MqTypeEnum.MQ_DIRECT.getType(), true, false, false, null);
// 是否持久化
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
channel.queueDeclare(MqConfig.directQueue, true, false,false, null);
// 建立一个绑定关系
channel.queueBind(MqConfig.directQueue, MqConfig.directExChange, MqConfig.directRoutingKey);
// 3、创建消费者
final DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.error("消息内容:" + new String(body));
}
};
// 4、设置Channel
channel.basicConsume(MqConfig.directQueue, true, defaultConsumer);
log.error(" 接收消息成功 ---> " + System.currentTimeMillis());
}
}
// 生产端代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import static com.show.rabbit.RabbitMqUtil.rabbitVo;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ProcuderDirect {
public static void main(String[] args) throws Exception {
// 1、通过连接工厂的建连接
final Connection connection = mqFactory.newConnection();
// 2、通过 connectioni的建一个 Channe1
final Channel channel = connection.createChannel();
// 3、通过channe1发送数据
try {
for(int i=0;i<5;i++){
final String msg = "Hello RabbitMq";
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
channel.basicPublish(MqConfig.directExChange, MqConfig.directRoutingKey, null, msg.getBytes());
}
} finally {
// 4、关闭连接
channel.close();
connection.close();
}
log.error(" 消息发送成功 ---> " + System.currentTimeMillis());
}
}
Topic Exchange使用
Topic模式:生产端发送的消息将Exchange、RouteKey和某Topic进行模糊匹配。例如:RouteKey中匹配符号,符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。示例:”10g.#“能够匹配到"log.info.aa”、"log."只会匹配到"log.error“。代码展示如下:
// 消费端代码
import com.rabbitmq.client.*;
import com.show.enums.MqTypeEnum;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Optional;
import static com.show.rabbit.RabbitMqUtil.rabbitVo;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ConsumerTopic {
public static void main(String[] args) throws Exception {
// 1、通过连接工厂的建连接
final Connection connection = mqFactory.newConnection();
// 2、通过 connection 建一个 Channe1
final Channel channel = connection.createChannel();
try {
Optional.ofNullable(channel).ifPresent(x->{
// 申明一个交换机
x.exchangeDeclare(MqConfig.topicExChange, MqTypeEnum.MQ_TOPIC.getType(), true, false, false, null);
// 是否持久化
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
x.queueDeclare(MqConfig.topicQueue, true, false,false, null);
// 建立一个绑定关系
x.queueBind(MqConfig.topicQueue, MqConfig.topicExChange, MqConfig.topicRoutingKey);
});
} catch (Exception e) {
throw new RuntimeException("channel连接失败!");
}
// 3、创建消费者
final DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.error("消息内容:" + new String(body));
}
};
// 4、设置Channel
channel.basicConsume(MqConfig.topicQueue, true, defaultConsumer);
log.error(" 接收消息成功 ---> " + System.currentTimeMillis());
}
}
// 生产端代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ProcuderTopic {
public static void main(String[] args) throws Exception {
// 1、通过连接工厂的建连接
final Connection connection = mqFactory.newConnection();
// 2、通过 connectioni的建一个 Channe1
final Channel channel = connection.createChannel();
// 3、通过channe1发送数据
try {
for(int i=0;i<5;i++){
final String msg = "Hello RabbitMq";
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
final String routingkey = MqConfig.topicProcuderRoutingKey + UUID.randomUUID().toString().replaceAll("-","");
channel.basicPublish(MqConfig.topicExChange, routingkey, null, msg.getBytes());
}
} finally {
// 4、关闭连接
channel.close();
connection.close();
}
log.error(" 消息发送成功 ---> " + System.currentTimeMillis());
}
}
Fanout Exchange使用
Fanout模式:不处理路由键,只需要简单的将队列绑定到交换机上。发送到交换机的消息都会被转发到与该交换机绑定的所有队列上,Fanout交换机转发消息是最快的。代码展示如下:
// 消费端代码
import com.rabbitmq.client.*;
import com.show.enums.MqTypeEnum;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Optional;
import static com.show.rabbit.RabbitMqUtil.rabbitVo;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ConsumerFanout {
public static void main(String[] args) throws Exception {
// 1、通过连接工厂的建连接
final Connection connection = mqFactory.newConnection();
// 2、通过 connection 建一个 Channe1
final Channel channel = connection.createChannel();
Optional.ofNullable(channel).ifPresent(x->{
try {
// 申明一个交换机
x.exchangeDeclare(MqConfig.fanoutExChange, MqTypeEnum.MQ_FANOUT.getType(), true, false, false, null);
// 是否持久化
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
x.queueDeclare(MqConfig.fanoutQueue, true, false,false, null);
// 建立一个绑定关系
x.queueBind(MqConfig.fanoutQueue, MqConfig.fanoutExChange, MqConfig.fanoutRoutingKey);
} catch (IOException e) {
e.printStackTrace();
}
});
// 3、创建消费者
final DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.error("消息内容:" + new String(body));
}
};
// 4、设置Channel
channel.basicConsume(MqConfig.fanoutQueue, true, defaultConsumer);
log.error(" 接收消息成功 ---> " + System.currentTimeMillis());
}
}
// 生产端代码展示
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
import static com.show.rabbit.RabbitMqUtil.rabbitVo;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ProcuderFanout {
public static void main(String[] args) throws Exception {
// 1、通过连接工厂的建连接
final Connection connection = mqFactory.newConnection();
// 2、通过 connectioni的建一个 Channe1
final Channel channel = connection.createChannel();
// 3、通过channe1发送数据
try {
for(int i=0;i<5;i++){
final String msg = "Hello RabbitMq";
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
channel.basicPublish(MqConfig.fanoutExChange, MqConfig.fanoutRoutingKey, null, msg.getBytes());
}
} finally {
// 4、关闭连接
channel.close();
connection.close();
}
log.error(" 消息发送成功 ---> " + System.currentTimeMillis());
}
}
消息可靠性投递
基本概念
RabbitMQ的消息确认机制
消息的确认,是指生产者投递消息后,如果Brokerl收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到Broker。
Return消息机制
Return Listener用于处理一些不可路由的消息,在某些情況下,我们在发送消息的时候,当前的Exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener。
消息可靠性投递(一)
1、利用RabbitMQ的消息的Confirm确认消息。
2、在发送消息之前,对消息进行落库,对消息状态进行打标。例如:投递状态、更新时间等。
3、通过消息状态和定时任务对投递失败的消息进行二次投送、或者通过内管后台进行单条数据的重试。PD如下图所示。
消息可靠性投递(二)
1、消息投递之前进行入库操作,第一条消息进入MQ Broker,进入消费端,如果消费成功,会发送一条Confirm消息到Callback服务,收到消息进行MSG DB Change。
2、第二条延时消息进行发送,进入MQ Broker,进入Callback服务,查询消息是否消费成功、如果成功不进行操作,如果当前消息状态没有变更消息消费失败,会通知当条消息进入ReSend操作。
3、这样处理的好处是节省服务器的资源,整个Callback作为单独服务统一管理,方便维护。具体流程图如下图所示。
消费端限流
RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consumer或者channel设置Qos的值)未被确认前,不进行消费新的消息。
// prefetchsize:设置为0表示不限制消息大小 一般我们设置为0
// prefetchCount:不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack,才进行后续的消费
// global:限流是channel级别的还是consumers级别
void Basicqos(uint prefetchsize, ushort prefetchcount, bool global)
// 消费端代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.show.custom.CustomConsumer;
import com.show.enums.MqTypeEnum;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Optional;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ConsumerAck {
public static void main(String[] args) throws Exception {
// 1、通过连接工厂的建连接
final Connection connection = mqFactory.newConnection();
// 2、通过 connection 建一个 Channe1
final Channel channel = connection.createChannel();
Optional.ofNullable(channel).ifPresent(x->{
try {
// 申明一个交换机
x.exchangeDeclare(MqConfig.ackExChange, MqTypeEnum.MQ_TOPIC.getType(), true, false, false, null);
// 是否持久化
x.queueDeclare(MqConfig.ackQueue, true, false,false, null);
// 建立一个绑定关系
x.queueBind(MqConfig.ackQueue, MqConfig.ackExChange, MqConfig.ackRoutingKey);
} catch (IOException e) {
e.printStackTrace();
}
});
// 3、限流处理
// int prefetchSize, int prefetchCount, boolean global
channel.basicQos(0, 1, false);
// 4、设置Channel String queue, boolean autoAck, Consumer callback
// autoAck true 自动签收 false 手动签收
channel.basicConsume(MqConfig.ackQueue, false, new AckConsumer(channel));
log.error(" 消息消费成功 ---> " + System.currentTimeMillis());
}
}
// 生产端代码
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ProcuderAck {
public static void main(String[] args) throws Exception {
// 1、通过连接工厂的建连接
final Connection connection = mqFactory.newConnection();
// 2、通过 connectioni的建一个 Channe1
final Channel channel = connection.createChannel();
// 3、通过channe1发送数据
for(int i=0;i<5;i++){
final String msg = "Hello RabbitMq";
final AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
final String routingkey = MqConfig.ackProcuderRoutingKey + UUID.randomUUID().toString().replaceAll("-","");
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
channel.basicPublish(MqConfig.ackExChange, routingkey, true, properties, msg.getBytes());
}
log.error(" 消息生产成功 ---> " + System.currentTimeMillis());
}
}
死信队列
死信队列:DLX,Dead-Letter-Exchange介绍<font>
当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就>是设置某个队列的属性。当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。<font>
TTL队列/消息介绍<font>
TTL是Time To Livel的缩写,也就是指生存时间、RabbitMQ支持设置消息的过期时间,在发送消息发送时可以进行指定,从消息进入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动被清除。<font>
消息变成死信有以下几种情况<font>
1、消息被拒绝(basic.reject/basic.nack)并且requeue=false
2、消息TTL过期
3、队列达到最大长度下面**
// 消费端代码
import com.google.common.collect.Maps;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.show.enums.MqTypeEnum;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ConsumerDlx {
public static void main(String[] args) throws Exception {
// 1、通过连接工厂的建连接
final Connection connection = mqFactory.newConnection();
// 2、通过 connection 建一个 Channe1
final Channel channel = connection.createChannel();
Map<String, Object> agruments = Maps.newHashMap();
agruments.put("x-dead-letter-exchange", MqConfig.dlxExChange);
// dlx test
channel.exchangeDeclare(MqConfig.dlxTestExChange, MqTypeEnum.MQ_TOPIC.getType(), true, false, false, null);
// 是否持久化
channel.queueDeclare(MqConfig.dlxTestQueue, true, false,false, agruments);
// 建立一个绑定关系
channel.queueBind(MqConfig.dlxTestQueue, MqConfig.dlxTestExChange, MqConfig.dlxTestRoutingKey);
// 申明 dlx
channel.exchangeDeclare(MqConfig.dlxExChange, MqTypeEnum.MQ_TOPIC.getType(), true, false, false, null);
// dlx 是否持久化
channel.queueDeclare(MqConfig.dlxQueue, true, false,false, null);
// dlx 建立一个绑定关系
channel.queueBind(MqConfig.dlxQueue, MqConfig.dlxExChange, MqConfig.dlxRoutingKey);
channel.basicConsume(MqConfig.dlxTestQueue, false, new DlxConsumer(channel));
log.error(" 接收消息成功 ---> " + System.currentTimeMillis());
}
}
// 生产端代码
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ProcuderDlx {
public static void main(String[] args) throws Exception {
// 1、通过连接工厂的建连接
final Connection connection = mqFactory.newConnection();
// 2、通过connectioni的建一个 Channe1
final Channel channel = connection.createChannel();
// 3、通过channe1发送数据
for(int i=0;i<5;i++){
final String msg = "Hello RabbitMq";
final AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
channel.basicPublish(MqConfig.dlxTestExChange, MqConfig.dlxTestProcuderRoutingKey, true, properties, msg.getBytes());
}
log.error(" 消息发送成功 ---> " + System.currentTimeMillis());
}
}
本章完结,后续还会持续更新,分享Java成长笔记,希望我们能一起成长。如果你觉得我的分享有用,记得点赞和关注哦!这对我是最好的鼓励。谢谢!
PS:转载请注明出处!