RabbitMQ方法实例

交换机

1、exchangeDeclare

声明交换机,exchangeDeclare有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成的。

 public Exchange.DeclareOk exchangeDeclare(String exchange, String type,
   boolean durable, boolean autoDelete, boolean internal, 
Map<String, Object> arguments);

这个方法的返回值是 Exchange.DeclareOk 用来标识成功声明了一个交换器。

各个参数详细说明如下所述。

  • exchange:交换器的名称。
  • type:交换器的类型。常见的如 fanout、direct、topic
  • durable: 设置是否持久化。 durable设置为 true 表示持久化, 反之是非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
  • autoDelete: 设置是否自动删除。 autoDelete 设置为 true 表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定 之后所有与这个交换器绑
    定的队列或者交换器都与之解绑。注意不能错误地将这个参数理解为 “当与此交换器连接的客户端都断开RabbitMQ会自动删除本交换器。
  • internal: 设置是否是内置的。如果设置为 true ,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
  • arguments: 其他一些结构化参数

2、exchangeDeclarePassive

用来检测相应的交换器是否存在。如果存在则正常返回;如果不存在则抛出异常 404 channel exception ,同时 Channel 也会被关闭。

Exchange.DeclareOk exchangeDeclarePassive(String name); 

3、exchangeDelete

 Exchange.DeleteOk exchangeDelete(String exchange); 

 void exchangeDeleteNoWait(String exchange , boolean ifUnused); 

 Exchange.DeleteOk exchangeDelete(String exchange , 
boolean ifUnused);

其中 exchange 表示交换器的名称,ifUnused用来设置是否在交换器没有被使用的情况下删除。如果 ifUnused设置为 true ,则只有在此交换器没有被使用的情况下才会被删除

队列

1、queueDeclare

声明队列

   public Queue.DeclareOk queueDeclare();

   public Queue.DeclareOk queueDeclare(String queue, 
      boolean durable, boolean exclusive, boolean autoDelete,
       Map<String, Object> arguments);

不带任何参数的 queueDeclare 方法默认创建一个由 RabbitMQ 命名的(类似这种amq.gen-S9h... 的名称,这种队列也称之为匿名队列)、排他的、自动删除的、非持久化的队列。
参数说明:

  • queue:队列的名称。
  • durable:设置是否持久化。为 true 则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。
  • exclusive:设置是否排他。为 true 则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:排他队列是基于连接( Connection )可见的,同一个连接的不同信道(Channel)是可以同时访问同一连接创建的排他队列:“首次”是指如果一个连接己经声明了排他队列,其他连接是不允许建立同名的排他队列的,这一个与普通队列不同:即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
  • autoDelete:设置是否自动删除。为 true 则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
  • arguments:设置队列的其他一些参数,如 x-message-ttl、 x-dead-letter-exchange、x-dead-letter-routing-key等。

2、queueDeclarePassive

用来检测相应的队列是否存在。 如果存在正常返回 ,如果不存在则抛出异常: 404 channel exception ,同时Channel 也会被关闭。方法定义如下

 public Queue.DeclareOk queueDeclarePassive(String queue);

3、queueDelete

(1) Queue.DeleteOk queueDelete(String queue);
(2) Queue.DeleteOk queueDelete(String queue , boolean ifUnused, 
boolean ifEmpty) ;
(3) void queueDeleteNoWait(String queue, boolean ifUnused,
 boolean ifEmpty) ;

其中 queue 表示队列的名称 ,如果 ifUnused设置为 true ,则只有在此队列在没有被使用的情况下才会被删除, ifEmpty设置为true,表示在队列为空(队列里面没有任何消息堆积)的情况下才能够删除。

4、queuePurge

清除给定队列的暂存内容

Queue.PurgeOk queuePurge(String queue);

绑定

1、queueBind

将队列和交换器绑定

    public Queue.BindOk queueBind(String queue, String exchange, String routingKey);

    public Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments);

    public void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments);

参数详解。

  • queue:队列名称。
  • exchange:交换器的名称。
  • routingKey: 用来绑定队列和交换器的路由键。
  • arguments:定义绑定的一些参数。

2、queueUnbind

将队列和交换器解除绑定

 Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;

 Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

具体的参数解释可以参考前面的内容,这里不再赘述

3、exchangeBind

将交换器与交换器绑定

    Exchange.BindOk exchangeBind(String destination, String source, String routingKey);

    Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments);

    void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments);
  • destination:目标交换机名称
  • source:源交换机名称
  • routingKey:路由key
  • arguments:额外参数
//声明source交换机
channel.exchangeDeclare ("source","direct", false , true , null) ; 
//声明destination交换机
channel.exchangeDeclare ("destination","fanout", false , true , null); 
//交换器与交换器绑定
channel.exchangeBind ("destination" , "source","exKey"); 
//声明对象
channel.queueDeclare ("queue", false, false , true , null); 
//将队列和交换器绑定
channel.queueBind ("queue","destination","");
//发布消息
channel.basicPublish ("source","exKey", null,"exToExDemo". getBytes () ) ;

生产者发送消息至交换器 source 中,交换器 source 根据路由键找到与其匹配的另一个交换机destination,并把消息转发到destination中,进而存储在destination绑定的队列 queue中。

4、exchangeUnbind

将交换器与交换器解除绑定

    Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey);

    Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments);

    void exchangeUnbindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments);

具体的参数解释可以参考前面的内容,这里不再赘述

发送消息

basicPublish

    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body);

    void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body);

    void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body);

参数解释:

  • exchange:交换器的名称,指明消息需要发送到哪个交换器中,如果设置为空字符串,则消息会被发送到RabbitMQ 默认的交换器中。
  • routingKey:路由键,交换器根据路由键将消息存储到相应的队列之中
  • props: 消息的基本属性集,其包含 14 个属性成员
public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties {
        private String contentType; //消息类型如(text/plain)
        private String contentEncoding; //字符编码
        private Map<String,Object> headers; //请求头信息
        private Integer deliveryMode;  //消息的投递模式
        private Integer priority;  //优先级
        private String correlationId; //用来关联RPC的请求和响应。
        private String replyTo;  //一般用来命名一个回调queue。
        private String expiration;  //过期时间
        private String messageId;//消息表示符 用于标示消息
        private Date timestamp; //消息发送的时间戳
        private String type;  //消息类型
        private String userId;  //连接到mq的用户名
        private String appId;  //消息的应用程序的表示符  比如你的计算机名称
        private String clusterId;

参考大佬博文:https://www.pianshen.com/article/19451397954/

  • byte[] body: 消息体( payload ),真正需要发送的消息
  • mandatory:当mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列时,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当 mandatory
    参数设置为 false 时,出现上述情形,则消息直接被丢弃。
  • immediate:当immediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic .Return 返回至生产者。
    概括来说, mandatory 参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。immediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。 RabbitMQ 3.0 版本开始去掉了对 immediate 参数的支持,对此 RabbitMQ 官方解释是: immediate 参数会影响镜像队列的性能,增加了代码复杂性,建议采用 TTL、DLX 的方法替代。

实例

投递模式(delivery mode)设置为2 ,即消息会被持久化(存入磁盘)在服务器中。同时这条消息的优先级(priority)设置为 1, contentType
为 text/plain,userId为当前登录用户名。

     channel.basicPublish("exchangeName" , "routingKey",
                new AMQP.BasicProperties.Builder()
                        .contentType ("text/plain")
                        .deliveryMode(2)
                        .priority(1)
                        .userId("test")
                        .build(),
                "ddf".getBytes());

发送一条带有 headers 的消息

Map<String, Object> headers = new HashMap<String, Object>() ;
        headers.put("token","782");
                channel.basicPublish("exchangeName", "routingKey",
        new AMQP.BasicProperties.Builder()
                .headers(headers)
                .build(),
        "ddf".getBytes()) ;

发送一条带有过期时间(expiration)的消息

 channel.basicPublish("exchangeName", "routingKey",
                new AMQP.BasicProperties.Builder()
                        . expiration ("6000")
                        .build(),
                "9999".getBytes());

消费消息

RabbitMQ 的消费模式分两种:推( Push )模式和拉( Pull )模式。推模式采用 Basic.Consume进行消费,而拉模式则是调用 Basic.Get 进行消费。

推模式

Channel 类中 basicConsume 方法太多,这里介绍几个常用的

String basicConsume(String queue , Consumer callback); 
String basicConsume(String queue , boolean autoAck, Consumer callback);

String basicConsume(String queue , boolean autoAck , Map<String , Object> 
arguments, Consumer callback); 

String basicConsume(String queue , boolean autoAck , String consumerTag, 
Consumer callback); 

String basicConsume(String queue , boolean autoAck , String consumerTag, 
boolean noLocal , boolean exclusive , Map<Str ng Object> arguments, 
Consumer callback);

对应的参数说明:

  • queue: 队列的名称
  • autoAck: 设置是否自动确认,如果设置 autoAck为false ,那么需要调用channel.basicAck 来确认消息己被成功接收。
  • consumerTag:消费者标签,用来区分多个消费者
  • noLocal:设置为 true, 则表示不能将同一个 Connection中生产者发送的消息传送给这个 Connection 中的消费者
  • exclusive:设置是否排他
  • arguments:设置消费者的其他参数
  • callback:设置消费者的回调函数。用来处理 RabbitMQ 推送过来的消息。比如DefaultConsumer,使用时需要客户端重写其中的方法。

拉模式

通过 channel.basicGet方法可以单条地获取消息,其返回值是 GetResponse。Channel 类的 basicGet 方法没有其他重载方法,只有

 GetResponse basicGet(String queue, boolean autoAck);

其中 queue 代表队列的名称,如果设置 autoAck为false ,那么需要调用
channel.basicAck 来确认消息己被成功接收。

消费端的确认与拒绝

1、basicReject

    void basicReject(long deliveryTag, boolean requeue);
  • deliveryTag: 消息的编号
  • requeue: 参数设置为 true,则 RabbitMQ 会重新将这条消息存入
    队列,以便可以发送给下一个订阅的消费者;如果 requeue 参数设置为 false,则 RabbitMQ会立即把消息从队列中移除。
    注:Basic.Reject命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack 这个命令。

2、basicNack

void basicNack(long deliveryTag, boolean multiple, boolean requeue);
  • deliveryTag:消息的编号
  • multiple:设置为 false时,则表示拒绝编号为 deliveryTag的这一条消息,这时候 basicNack方法和basicReject 方法一样, multiple 参数设置为 true 则表示拒绝 小于deliveryTag编号之前所有未被当前消费者确认的消息。
  • requeue:参数设置为 true,则 RabbitMQ 会重新将这条消息存入
    队列,以便可以发送给下一个订阅的消费者;如果 requeue 参数设置为 false,则 RabbitMQ会立即把消息从队列中移除。

3、basicRecover

是否恢复消息到队列

     Basic.RecoverOk basicRecover() throws IOException;

    Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

这个 channel.basicRecover方法用来请求 RabbitMQ 重新发送还未被确认的消息,requeue参数设置为 true ,则未被确认的消息会被重新加入到队列中,并且尽可能的将消息投递给其他消费者消费,而不是自己再次消费如果,requeue参数设置为 false ,消息会被重新投递给自己。默认情况下,如果不设置requeue这个参数默认为 true。

4、basicAck

    void basicAck(long deliveryTag, boolean multiple) throws IOException;
  • deliveryTag :消息的编号
  • multiple:设置为 false时,则表示确认消费编号为 deliveryTag的这一条消息,该参数为 true 时,则可以一次性确认消费小于等于deliveryTag值的所有消息。

以上知识均来自于朱忠华大佬《RabbitMQ实战指南》一书

image.png

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容