5.发送和消费信息

文章参考:Rabbit实战指南

发送消息

​ 如果要发送一个消息,可以使用Channel类的basicPublish方法,比如发送一条内容为“Hello World”的消息,参考如下:

byte[] messageBodyBytes = "Hello,world!".getBytes();
channel.basicPublish(exchangeName,routingKey,null,messageBodyBytes);

为了更好地可控制发送,可以使用mandatory这个参数,或者可以发送一些特定属性的信息:

channel.basicPublish(exchangeName,routingKey,mandatory,
                     MessageProperties.PERSISTENT_TEXT_PLAIN,
                     messageBodyBytes);

下面这行代码发送了一条消息,这条消息的投递模式(delivery mode)设置为2,即消息会被持久化(存入磁盘中)在服务器中。同时这条消息的优先级(priority)设置为1,content-type为“text/plain”。可以自己设定消息的属性:

channel.basicPublish(exchangeName,routingKey,
                     new AMQP.BasicProperties.Builder()
                     .contentType("text/plain")
                     .deliveryMode(2)
                     .priority(1)
                     .userId("hidden")
                     .build(),
                     messageBodyBytes
                    );

也可以发送一条带有headers的消息:

Map<String,Object> headers = new HashMap<>();
headers.put("localtion","here");
headers.put("time","today");
channel.basicPublish(exchangeName,routingKey,
                    new AMQP.BasicProperties.Builder()
                    .headers(headers)
                     .build(),
                     messageBodyBytes
                    );

也可以发送一条带有过期时间(expiration)的消息

channel.basicPublish(exchangeName,routingKey,
                     new AMQP.BasicProperties.Builder()
                     .expiration("6000")
                     .build(),
                     messageBodyBytes
                    );

basicPublish的重载方法:

void basicPublish(String exchange,String routingKey,
                  BasicProperties props,
                  byte[] body) throws IOException;

void basicPublish(String exchange,String routingKey,
                  boolean mandatory,
                  BasicProperties props,
                  byte[] body) throws IOException;

void basicPublish(String exchange,String routingKey,
                  boolean mandatory,boolean imediate,
                  BasicProperties props,
                  byte[] body) throws IOException;

具体参数解释如下:

  • exchange:交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会发送到RabbitMQ默认的交换器中。
  • routingKey:路由键,交换器根据路由键将消息存储到相应的队列。
  • props:消息的基本属性集,其包含14个属性成员,分别有contentType、contentEncoding、header(Map<Stirng,Object>)、deliveryMode、priority。correlationId、replyTo、expiration、messageId、timestamp、type、urserId、appId、clusterId。
  • byte[] body:消息体(payload),真正需要发送的消息。
  • mandatory和immediate

消费消息

RabbitMQ的消费模式分为两种:推(Push)模式和拉(Pull)模式。推模式采用Basic.Consume进行消费,而拉模式则调用Basic.Get进行消费。

推模式

在推模式中,可以通过持续订阅的方式来消费信息,使用到的相关类有:

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DeafultConsumer;

接收消息一般通过实现Consumer接口或者继承DefaultConsumer类来实现。当调用与Consumer类来实现。当调用与Consumer相关的API方法,不同的订阅采用不同的消费者标签来区分彼此,在同一个Channel中的消费者也需要通过唯一的消费者标签以作区别。代码如下:

boolean autoAck = false;
channel.basicQos(64);
channel.basicConsume(queueName,autoAck,"myConsumerTag",
        new DefaultConsumer(channel){
            @Override
            public void handleDevlivery(String consumerTag,
                                       Envelop envelope,
                                  AMQP.BasicProperties properties,
                                        byte[] bode
                                       )throws IOException
            {
                String routingKey = envelope.getRoutingKey();
                String contentType = properties.getContentType();
                long deliveryTag = envelope.getDeliveryTag();
                //(process the message components here ...)
                channel.basicAck(deliveryTag,false);
            }
                    }
                    )

上面代码显式地设置autoAck为false,然后在接受到消息之后进行显式ack操作(channel.basicAck),对应消费者来说这个设置是非常必要的,可以防止消息不必要的丢失。

​ Channel类中basicConsume方法有如下几种形式:

  1. String basicConsume(String queue,Consumer callback) throws IOException

  2. String basicConsume(String queue,boolean autoAck,Consumer callback) throws IOException

  3. String basicConsume(String queue,boolean autoAck,Map<String,Object> arguments,Consumer callback) throws IOException

  4. String basicConsume(String queue,boolean autoAck,String consumerTag,Consumer callback) throws IOException

  5. String basicConsume(String queue,boolean autoAck,String consumerTag,boolean noLocal,boolean exclusive,Map<String,Object> arguments,Consumer callback) throws IOException

    对应参数说明:

    • queue:队列的名称
    • autoAck:设置是否自动确认。建议设置为false
    • consumerTag:消费者标签,用来区分多个消费者;
    • noLocal:设置为true则表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者
    • exclusive:设置是否排他;
    • arguments:设置消费者的其他参数
    • callback:设置消费者的回调函数。用来处理RabbitMQ推送过来的消息,比如DefaultConsumer,使用时需要客户端重写其中的方法。

    对于消费者客户端来说,重写handleDelivery方法是十分方便的。更复杂的消费者客户端会重写更多的方法,具体如下:

    void handleConsumeOk(String consumerTag);
    void handleCancelOk(String consumerTag);
    void handleCancel(String consumerTag) throws IOException;
    void handleShutdownSignal(String consumerTag,ShutdowSignalException sig);
    void handleRecoverOk(String consumerTag);
    

    比如handleShutdownSignal方法,当Channel或者Connection关闭的时候回调用。再者,handleConsumeOk方法会在其他方法之前调用,返回消费者标签。

    重写handleCancelOk和handleCancel方法,这样消费端可以再显示地或者隐式地取消订阅的时候调用。也可以通过channel.basicCancel方法来显式地取消一个消费者的订阅:

    channel.basicCancel(consumerTag)

    注意上面这行代码会首先触发handleConsumerOk方法,之后触发handleDelivery方法,最后才触发handleCanceOk方法。

和生产者一样,消费者客户端同样需要考虑线程安全的问题。消费者客户端的这些callback会被分配到与Channel不同的线程池上,这意味着消费者客户端可以安全地调用这些阻塞方法,如channe.queueDeclare、channel.basicCancel等。

每个Channel都拥有自己独立的线程。最常用的做法是一个Channel对应一个消费者。意味着消费者彼此之间没有任何关联。也可以在一个Channel中维持多个消费者,但是要注意一个问题,如果Channel中的一个消费者一直在运行,那么其他消费者的callback会被“耽搁”。

拉模式

通过channel.basicGet方法可以单条的获取消息,其返回值是GetResponse。Channel类的basicGet方法没有其他重载方法,只有:

 GetResponse basicGet(String queue,boolean autoAck) throws IOException;

其中queue代表队列名称,如果设置autoAck为false,那么同样需要调用channel.basicAck来确认消息被成功接收。

拉模式的关键代码如下所示:

GetResponse response = channel.basicGet(QUEUE_NAME,false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
注意:

Basic.Consume将信道(Channel)置为接收模式,知道取消队列的订阅为止。在接收模式期间,RabbitMQ会不断地推送消息给消费者,当然推送消息的个数还是会受到Basic.Qos的限制。

如果只想从队列获取单挑消息而不是持续订阅,建议使用Basic.Get进行消费。但不能将Basic.Get放在一个循环里来代替Basic.Consume,这样做会严重影响RabbitMQ的性能。消费者理应使用Basic.Consume方法。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352