RabbitMQ基本使用

本节详细讲述RabbitMQ的几个基本API,围绕Connection和channel两个AMQP协议接口.详解以下几点:连接、交换器、队列的创建和绑定、消息的发送和消费、消息确认和关闭连接

一、连接RabbitMQ

不贴代码了,参考上一节。这里要说一下,channel是非线程安全的,在有些情况下可能会导致网络上出现错误的通信帧交错,同时也会影响发送方确认机制的运行。

二、使用Exchange和Queue

交换器和队列是AMQP中high-level层面的构建模块,应用程序应确保使用前已经存在了。使用前需要声明。
生产者和消费者都可以声明一个交换器和队列.如果尝试声明一个已存在的交换器或队列,只要声明的参数完全匹配,RabbitMQ就会什么都不做.否则会抛出异常.

  1. exchange的声明和删除
  • 声明: exchangeDeclare方法详解
    exchangeDeclare方法有多个重载方法,都是通过下面这个方法缺省某些参数构成的
 /**
     * Declare an exchange, via an interface that allows the complete set of
     * arguments.
     * @see com.rabbitmq.client.AMQP.Exchange.Declare
     * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
     * @param exchange the name of the exchange
     * @param type the exchange type
     * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
     * @param autoDelete true if the server should delete the exchange when it is no longer in use
     * @param internal true if the exchange is internal, i.e. can't be directly
     * published to by a client.
     * @param arguments other properties (construction arguments) for the exchange
     * @return a declaration-confirm method to indicate the exchange was successfully declared
     * @throws java.io.IOException if an error is encountered
     */
    Exchange.DeclareOk exchangeDeclare(String exchange,
                                              String type,
                                              boolean durable,
                                              boolean autoDelete,
                                              boolean internal,
                                              Map<String, Object> arguments) throws IOException;

详解一下几个参数:

  • exchange: 交换器名称

  • type: 交换器类型,上节提到的direct fanout headers和topic

  • durable: 是否持久化 持久话将会将交换器存盘,服务器重启时不会丢失

  • autoDelete: 是否自动删除. 自动删除的前提是至少有一个队列或者交换器与这个交换器绑定.之后所有的交换器和队列与这个交换器解绑后,该交换器会自动删除.不能错误的理解为连接断开后交换器会删除

  • internal: 是否内置的 这个参数为true时表示这个交换器只能由其他交换器发送消息.不能与客户端直连

  • arguments: 其他一些结构化参数

  • 删除交换器: exchangeDelete

 /**
     * Delete an exchange
     * @see com.rabbitmq.client.AMQP.Exchange.Delete
     * @see com.rabbitmq.client.AMQP.Exchange.DeleteOk
     * @param exchange the name of the exchange
     * @param ifUnused true to indicate that the exchange is only to be deleted if it is unused
     * @return a deletion-confirm method to indicate the exchange was successfully deleted
     * @throws java.io.IOException if an error is encountered
     */
    Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
  1. queue的声明和删除
    声明队列:
/**
     * Like {@link Channel#queueDeclare(String, boolean, boolean, boolean, java.util.Map)} but sets nowait
     * flag to true and returns no result (as there will be no response from the server).
     * @param queue the name of the queue
     * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
     * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
     * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
     * @param arguments other properties (construction arguments) for the queue
     * @throws java.io.IOException if an error is encountered
     */
    void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                            Map<String, Object> arguments) throws IOException;

其他几个参数含义跟declareExchange相同,说一下exclusive:设置是否排他.为true队列是排他的.如果一个队列是排他的,该队列仅对首次声明他的连接可见.并在连接断开时断连.同一个连接的不同channel可以访问该连接创建的排他队列.
删除队列:

/**
     * Delete a queue
     * @see com.rabbitmq.client.AMQP.Queue.Delete
     * @see com.rabbitmq.client.AMQP.Queue.DeleteOk
     * @param queue the name of the queue
     * @param ifUnused true if the queue should be deleted only if not in use
     * @param ifEmpty true if the queue should be deleted only if empty
     * @return a deletion-confirm method to indicate the queue was successfully deleted
     * @throws java.io.IOException if an error is encountered
     */
    Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

删除队列中的消息:

/**
     * Purges the contents of the given queue.
     * @see com.rabbitmq.client.AMQP.Queue.Purge
     * @see com.rabbitmq.client.AMQP.Queue.PurgeOk
     * @param queue the name of the queue
     * @return a purge-confirm method if the purge was executed successfully
     * @throws java.io.IOException if an error is encountered
     */
    Queue.PurgeOk queuePurge(String queue) throws IOException;
  1. queueBind exchangeBind queueUnbind exchangeUnbind
  • queueBind方法详解
/**
     * Bind a queue to an exchange.
     * @see com.rabbitmq.client.AMQP.Queue.Bind
     * @see com.rabbitmq.client.AMQP.Queue.BindOk
     * @param queue the name of the queue 要绑定的队列名
     * @param exchange the name of the exchange 要绑定的交换器名
     * @param routingKey the routing key to use for the binding 绑定时的routingBind
     * @param arguments other properties (binding parameters) 绑定参数
     * @return a binding-confirm method if the binding was successfully created
     * @throws java.io.IOException if an error is encountered
     */
    Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
  • queueUnbind 可以解绑已经绑定的队列和交换器
  1. 何时创建
  • 由生产者和消费者声明
    RabbitMQ 官方建议,生产者和消费者都应该尝试创建队列.
  • 程序上线前在服务器上创建好,比如通过页面管理、RabbitMQ命令或更好的配置中心下发
    预先创建的好处:
    免去声明过程;
    确保交换器和队列正确绑定;
    配合mandatory参数或者备份交换器提高程序的健壮性

三、发送消息


    /**
     * Publish a message.
     *
     * Publishing to a non-existent exchange will result in a channel-level
     * protocol exception, which closes the channel.
     *
     * Invocations of <code>Channel#basicPublish</code> will eventually block if a
     * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
     *
     * @see com.rabbitmq.client.AMQP.Basic.Publish
     * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
     * @param exchange the exchange to publish the message to
     * @param routingKey the routing key
     * @param mandatory true if the 'mandatory' flag is to be set
     * @param immediate true if the 'immediate' flag is to be
     * set. Note that the RabbitMQ server does not support this flag.
     * @param props other properties for the message - routing headers etc
     * @param body the message body
     * @throws java.io.IOException if an error is encountered
     */
    void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
            throws IOException;

  • exchange: 交换器名称
  • RoutingKey: 路由键
  • props: 消息的基本属性集 包括14个属性
        private String contentType;
        private String contentEncoding;
        private Map<String,Object> headers;
        private Integer deliveryMode;
        private Integer priority;
        private String correlationId;
        private String replyTo;
        private String expiration;
        private String messageId;
        private Date timestamp;
        private String type;
        private String userId;
        private String appId;
        private String clusterId;
  • byte[] 消息体
  • mandatory和immediate下节讲述

四、消费消息

 /**
     * Start a non-nolocal, non-exclusive consumer.
     * @param queue the name of the queue 队列名称
     * @param autoAck true if the server should consider messages 是否自动确认.建议设置成false
     * acknowledged once delivered; false if the server should expect
     * explicit acknowledgements
     * @param consumerTag a client-generated consumer tag to establish context 消费者标签,用来区分不同消费者
     * @param callback an interface to the consumer object 回调函数,用来处理推送来的消息
     * @return the consumerTag associated with the new consumer
     * @throws java.io.IOException if an error is encountered
     * @see com.rabbitmq.client.AMQP.Basic.Consume
     * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
     * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
     */
    String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;

五、消费端的确认和拒绝

为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制.当自动确认autoack参数为false时,消费者客户端需要显示的调用basicAck确认消费.RabbitMQ不会为未确认的消息设置过期时间.它判断此消息是否需要重新投递的唯一依据是该消费者的连接是否已断开.
RabbitMQ提供了basicReject方法来告诉RabbitMQ拒绝一个消息.定义如下:

/**
     * Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
     * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
     * containing the received message being rejected.
     * @see com.rabbitmq.client.AMQP.Basic.Reject
     * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
     * @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered
     * @throws java.io.IOException if an error is encountered
     */
    void basicReject(long deliveryTag, boolean requeue) throws IOException;

当requeue为true时,拒绝的消息将重新存入队列,发送给下一个订阅的消费者.
这里提供一个demo:

public void consumeMessage() throws IOException {
        Connection connection = ConnectionCreator.getConnection();
        Channel channel = connection.createChannel();
        boolean autoAck = false;
        channel.basicConsume("test_queue", autoAck, "testConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(envelope);
                System.out.println(new String(body));
                long tag = envelope.getDeliveryTag();
                if (tag / 2 == 0) {
                    channel.basicReject(tag, true);
                } else {
                    channel.basicAck(envelope.getDeliveryTag(), true);
                }
            }
        });
         RabbitMQUtils.close(connection, channel);
    }

rabbitMQ还提供了basicNack方法.可以批量拒绝消息
注意:
basicReject或者basicNack中的requeue设置为false时,可以启用"死信队列"功能.死信队列可以通过检测被拒绝或者未送达的消息来追踪问题.
channel.basicReject

六、关闭连接

RabbitMQ为channel和connection提供了addShutdownListener方法对连接的关闭进行回调.如下:

public void closeChannel() throws IOException {
        Connection connection = ConnectionCreator.getConnection();
        Channel channel = connection.createChannel();
        channel.addShutdownListener(new ShutdownListener() {
            @Override
            public void shutdownCompleted(ShutdownSignalException cause) {
                System.out.println(cause);
            }
        });
        RabbitMQUtils.close(channel);
    }

附:
两个测试时省代码的封装类

  • 创建连接:
package com.pctf.basic;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionCreator {
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 5672;
    private static final String USER_NAME = "root";
    private static final String PASSWORD = "123456";

    private static final ConnectionFactory connectionFactory;

    static {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setPassword(PASSWORD);
        connectionFactory.setUsername(USER_NAME);
        connectionFactory.setHost(HOST);
        connectionFactory.setPort(PORT);
    }

    public static Connection getConnection() {
        try {
            return connectionFactory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return null;
    }
}
  • 关闭连接:
package com.pctf.utils;

import java.io.Closeable;
import java.io.IOException;

public class RabbitMQUtils {

    public static void close(AutoCloseable... closeables) {
        if (closeables != null) {
            for (AutoCloseable c : closeables) {
                if (c != null) {
                    try {
                        c.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,904评论 2 11
  • 本文章翻译自http://www.rabbitmq.com/api-guide.html,并没有及时更新。 术语对...
    joyenlee阅读 7,651评论 0 3
  • 什么叫消息队列? 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复...
    Agile_dev阅读 2,371评论 0 24
  • 我一直在鼓励家长拥抱自己的孩子,因为我是做家庭教育的,我所能要求的只有家长,相信很多读我文字的人都会明白。 可是在...
    王书朋阅读 757评论 1 1
  • 我会为你唱情歌到声沙,静静看着你闹的笑话,给你说一辈子情话,酸,甜,苦和辣,你不会害怕,不会害怕,让我做孩子的...
    怒涛阅读 523评论 0 2