交换机
1、exchangeDeclare
声明交换机,exchangeDeclare有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成的。
public Exchange.DeclareOk exchangeDeclare(String exchange, String type,
boolean durable, boolean autoDelete, boolean internal,
Map<String, Object> arguments);
这个方法的返回值是 Exchange.DeclareOk 用来标识成功声明了一个交换器。
各个参数详细说明如下所述。
- exchange:交换器的名称。
- type:交换器的类型。常见的如 fanout、direct、topic
- durable: 设置是否持久化。 durable设置为 true 表示持久化, 反之是非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
- autoDelete: 设置是否自动删除。 autoDelete 设置为 true 表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定 之后所有与这个交换器绑
定的队列或者交换器都与之解绑。注意不能错误地将这个参数理解为 “当与此交换器连接的客户端都断开RabbitMQ会自动删除本交换器。 - internal: 设置是否是内置的。如果设置为 true ,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
- arguments: 其他一些结构化参数
2、exchangeDeclarePassive
用来检测相应的交换器是否存在。如果存在则正常返回;如果不存在则抛出异常 404 channel exception ,同时 Channel 也会被关闭。
Exchange.DeclareOk exchangeDeclarePassive(String name);
3、exchangeDelete
Exchange.DeleteOk exchangeDelete(String exchange);
void exchangeDeleteNoWait(String exchange , boolean ifUnused);
Exchange.DeleteOk exchangeDelete(String exchange ,
boolean ifUnused);
其中 exchange 表示交换器的名称,ifUnused用来设置是否在交换器没有被使用的情况下删除。如果 ifUnused设置为 true ,则只有在此交换器没有被使用的情况下才会被删除
队列
1、queueDeclare
声明队列
public Queue.DeclareOk queueDeclare();
public Queue.DeclareOk queueDeclare(String queue,
boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments);
不带任何参数的 queueDeclare 方法默认创建一个由 RabbitMQ 命名的(类似这种amq.gen-S9h... 的名称,这种队列也称之为匿名队列)、排他的、自动删除的、非持久化的队列。
参数说明:
- queue:队列的名称。
- durable:设置是否持久化。为 true 则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。
- exclusive:设置是否排他。为 true 则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:排他队列是基于连接( Connection )可见的,同一个连接的不同信道(Channel)是可以同时访问同一连接创建的排他队列:“首次”是指如果一个连接己经声明了排他队列,其他连接是不允许建立同名的排他队列的,这一个与普通队列不同:即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
- autoDelete:设置是否自动删除。为 true 则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
- arguments:设置队列的其他一些参数,如 x-message-ttl、 x-dead-letter-exchange、x-dead-letter-routing-key等。
2、queueDeclarePassive
用来检测相应的队列是否存在。 如果存在正常返回 ,如果不存在则抛出异常: 404 channel exception ,同时Channel 也会被关闭。方法定义如下
public Queue.DeclareOk queueDeclarePassive(String queue);
3、queueDelete
(1) Queue.DeleteOk queueDelete(String queue);
(2) Queue.DeleteOk queueDelete(String queue , boolean ifUnused,
boolean ifEmpty) ;
(3) void queueDeleteNoWait(String queue, boolean ifUnused,
boolean ifEmpty) ;
其中 queue 表示队列的名称 ,如果 ifUnused设置为 true ,则只有在此队列在没有被使用的情况下才会被删除, ifEmpty设置为true,表示在队列为空(队列里面没有任何消息堆积)的情况下才能够删除。
4、queuePurge
清除给定队列的暂存内容
Queue.PurgeOk queuePurge(String queue);
绑定
1、queueBind
将队列和交换器绑定
public Queue.BindOk queueBind(String queue, String exchange, String routingKey);
public Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments);
public void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments);
参数详解。
- queue:队列名称。
- exchange:交换器的名称。
- routingKey: 用来绑定队列和交换器的路由键。
- arguments:定义绑定的一些参数。
2、queueUnbind
将队列和交换器解除绑定
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
具体的参数解释可以参考前面的内容,这里不再赘述
3、exchangeBind
将交换器与交换器绑定
Exchange.BindOk exchangeBind(String destination, String source, String routingKey);
Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments);
void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments);
- destination:目标交换机名称
- source:源交换机名称
- routingKey:路由key
- arguments:额外参数
//声明source交换机
channel.exchangeDeclare ("source","direct", false , true , null) ;
//声明destination交换机
channel.exchangeDeclare ("destination","fanout", false , true , null);
//交换器与交换器绑定
channel.exchangeBind ("destination" , "source","exKey");
//声明对象
channel.queueDeclare ("queue", false, false , true , null);
//将队列和交换器绑定
channel.queueBind ("queue","destination","");
//发布消息
channel.basicPublish ("source","exKey", null,"exToExDemo". getBytes () ) ;
生产者发送消息至交换器 source 中,交换器 source 根据路由键找到与其匹配的另一个交换机destination,并把消息转发到destination中,进而存储在destination绑定的队列 queue中。
4、exchangeUnbind
将交换器与交换器解除绑定
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey);
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments);
void exchangeUnbindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments);
具体的参数解释可以参考前面的内容,这里不再赘述
发送消息
basicPublish
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body);
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body);
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body);
参数解释:
- exchange:交换器的名称,指明消息需要发送到哪个交换器中,如果设置为空字符串,则消息会被发送到RabbitMQ 默认的交换器中。
- routingKey:路由键,交换器根据路由键将消息存储到相应的队列之中
- props: 消息的基本属性集,其包含 14 个属性成员
public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties {
private String contentType; //消息类型如(text/plain)
private String contentEncoding; //字符编码
private Map<String,Object> headers; //请求头信息
private Integer deliveryMode; //消息的投递模式
private Integer priority; //优先级
private String correlationId; //用来关联RPC的请求和响应。
private String replyTo; //一般用来命名一个回调queue。
private String expiration; //过期时间
private String messageId;//消息表示符 用于标示消息
private Date timestamp; //消息发送的时间戳
private String type; //消息类型
private String userId; //连接到mq的用户名
private String appId; //消息的应用程序的表示符 比如你的计算机名称
private String clusterId;
参考大佬博文:https://www.pianshen.com/article/19451397954/
- byte[] body: 消息体( payload ),真正需要发送的消息
- mandatory:当mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列时,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当 mandatory
参数设置为 false 时,出现上述情形,则消息直接被丢弃。 - immediate:当immediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic .Return 返回至生产者。
概括来说, mandatory 参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。immediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。 RabbitMQ 3.0 版本开始去掉了对 immediate 参数的支持,对此 RabbitMQ 官方解释是: immediate 参数会影响镜像队列的性能,增加了代码复杂性,建议采用 TTL、DLX 的方法替代。
实例
投递模式(delivery mode)设置为2 ,即消息会被持久化(存入磁盘)在服务器中。同时这条消息的优先级(priority)设置为 1, contentType
为 text/plain,userId为当前登录用户名。
channel.basicPublish("exchangeName" , "routingKey",
new AMQP.BasicProperties.Builder()
.contentType ("text/plain")
.deliveryMode(2)
.priority(1)
.userId("test")
.build(),
"ddf".getBytes());
发送一条带有 headers 的消息
Map<String, Object> headers = new HashMap<String, Object>() ;
headers.put("token","782");
channel.basicPublish("exchangeName", "routingKey",
new AMQP.BasicProperties.Builder()
.headers(headers)
.build(),
"ddf".getBytes()) ;
发送一条带有过期时间(expiration)的消息
channel.basicPublish("exchangeName", "routingKey",
new AMQP.BasicProperties.Builder()
. expiration ("6000")
.build(),
"9999".getBytes());
消费消息
RabbitMQ 的消费模式分两种:推( Push )模式和拉( Pull )模式。推模式采用 Basic.Consume进行消费,而拉模式则是调用 Basic.Get 进行消费。
推模式
Channel 类中 basicConsume 方法太多,这里介绍几个常用的
String basicConsume(String queue , Consumer callback);
String basicConsume(String queue , boolean autoAck, Consumer callback);
String basicConsume(String queue , boolean autoAck , Map<String , Object>
arguments, Consumer callback);
String basicConsume(String queue , boolean autoAck , String consumerTag,
Consumer callback);
String basicConsume(String queue , boolean autoAck , String consumerTag,
boolean noLocal , boolean exclusive , Map<Str ng Object> arguments,
Consumer callback);
对应的参数说明:
- queue: 队列的名称
- autoAck: 设置是否自动确认,如果设置 autoAck为false ,那么需要调用channel.basicAck 来确认消息己被成功接收。
- consumerTag:消费者标签,用来区分多个消费者
- noLocal:设置为 true, 则表示不能将同一个 Connection中生产者发送的消息传送给这个 Connection 中的消费者
- exclusive:设置是否排他
- arguments:设置消费者的其他参数
- callback:设置消费者的回调函数。用来处理 RabbitMQ 推送过来的消息。比如DefaultConsumer,使用时需要客户端重写其中的方法。
拉模式
通过 channel.basicGet方法可以单条地获取消息,其返回值是 GetResponse。Channel 类的 basicGet 方法没有其他重载方法,只有
GetResponse basicGet(String queue, boolean autoAck);
其中 queue 代表队列的名称,如果设置 autoAck为false ,那么需要调用
channel.basicAck 来确认消息己被成功接收。
消费端的确认与拒绝
1、basicReject
void basicReject(long deliveryTag, boolean requeue);
- deliveryTag: 消息的编号
- requeue: 参数设置为 true,则 RabbitMQ 会重新将这条消息存入
队列,以便可以发送给下一个订阅的消费者;如果 requeue 参数设置为 false,则 RabbitMQ会立即把消息从队列中移除。
注:Basic.Reject命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack 这个命令。
2、basicNack
void basicNack(long deliveryTag, boolean multiple, boolean requeue);
- deliveryTag:消息的编号
- multiple:设置为 false时,则表示拒绝编号为 deliveryTag的这一条消息,这时候 basicNack方法和basicReject 方法一样, multiple 参数设置为 true 则表示拒绝 小于deliveryTag编号之前所有未被当前消费者确认的消息。
- requeue:参数设置为 true,则 RabbitMQ 会重新将这条消息存入
队列,以便可以发送给下一个订阅的消费者;如果 requeue 参数设置为 false,则 RabbitMQ会立即把消息从队列中移除。
3、basicRecover
是否恢复消息到队列
Basic.RecoverOk basicRecover() throws IOException;
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
这个 channel.basicRecover方法用来请求 RabbitMQ 重新发送还未被确认的消息,requeue参数设置为 true ,则未被确认的消息会被重新加入到队列中,并且尽可能的将消息投递给其他消费者消费,而不是自己再次消费如果,requeue参数设置为 false ,消息会被重新投递给自己。默认情况下,如果不设置requeue这个参数默认为 true。
4、basicAck
void basicAck(long deliveryTag, boolean multiple) throws IOException;
- deliveryTag :消息的编号
- multiple:设置为 false时,则表示确认消费编号为 deliveryTag的这一条消息,该参数为 true 时,则可以一次性确认消费小于等于deliveryTag值的所有消息。
以上知识均来自于朱忠华大佬
《RabbitMQ实战指南》一书