3.Publish/Subscribe#前山翻译

注:这是RabbitMQ-java版Client的指导教程翻译系列文章,欢迎大家批评指正
第一篇Hello Word了解RabbitMQ的基本用法
第二篇Work Queues介绍队列的使用
第三篇Publish/Subscribe介绍转换器以及其中fanout类型
第四篇Routing介绍direct类型转换器
第五篇Topics介绍topic类型转换器
第六篇RPC介绍远程调用

发布/订阅(Publish/Subscribe)

在上篇指导教程中我们创建了一个工作队列。在一个工作队列中,每个任务都被精确的分配给一个工作者。但在这篇指导中,我们做一些不同的事情,就是把一条消息分发给多个消费者。这就是著名的“发布/订阅”模型。

为了阐释这个模型。我们打算建立一个日志系统,它有两个应用组成,一个将会发出日志消息,另一个将会接受并打印出日志消息。

在我们的日志系统中运行相同的消费者应用都会接受到消息,按照这种方式,我们可以运行一个接受者将日志消息持久到磁盘上,与此同时我们可以再运行一个接受者在屏幕上查看的日志消息。

本质上,被发布的日志消息会广播给所有的接受者。

转换器(Exchanges)

在上篇指导教程中我们发送和接受消息都是来自一个队列,而现在来介绍RabbitMQ完整消息模型。

我们快速的复习一下我们上篇指导教程中学习的东西:

1.一个生产者的应用可以发送消息

2.一个队列可以缓存许多消息

3.一个消费者的应用可以接受消息

而RabbitMQ中消息模型的核心思想是生产者不会直接发送消息给一个队列。实际上,经常是生产者都不知道生产的消息是会被分发到哪一个队列中。

实际上,生产者只会发送消息给转换器(exchanges)。转换器的原理非常简单,一方面从生产者接受消息,另一方面将消息推送到队列中。但转换器必须明确的知道接收到的消息要做什么。是被添加到一个特殊的队列中,还是被添加到多个队列中,亦或者被销毁。这些规则都是由转换器的类型决定的。

exchanges.png

这里有四种转换器的类型可使用:direct,topic,headers和fanout。我们先学习最后一个:fanout,创建一个fanout类型的转换器,叫着logs;

channel.exchangeDeclare("logs", "fanout");  //前者参数为名称,后者参数为转换器类型

fanout转换器非常简单,或许你从名字就可以猜出来,它把自己所有消息广播给绑定它的所有队列。这确切是我们日志所需要的。

匿名的转换器

在上篇指导教程中我们并不知道转换器,但是我们依然能都发送消息给队列。那是因为我们使用了一种默认的转换器,定义的一个空字符串。

回想以下我们是如何发布一条消息的:

channel.basicPublish("", "hello", null, message.getBytes());

第一个参数就是转换器的名字,空的字符串表示默认的或者说匿名的转换器:如果是routingkey存在的话,那么消息将会路由到指定routingkey的队列中。

现在,我们使用我们命名的转换器发送消息:

channel.basicPublish( "logs", "", null, message.getBytes());

临时的队列(Temporary queues)

可能你还记得以前我们使用队列的时候都会有一个具体的名字(像hello 和 task_queue),对我们来说命名一个队列实在太重要了,我们需要把多个工作这指向同一个队列。当你想在生产者和消费者之间共享一个队列,那给一个队列命名就太重要。

但是这个例子不适合我们的日志系统。我们想知道所有的日志消息,而并不是它们中的一部分。我们同样只想获取到当下的消息而不是以前的消息,为了解决这个问题我们需要确定两件事情。

第一,无论什么时候连接上RabbitMQ,我们有一个新的空的队列。为了这样,我们可以创建一个随机命名的队列,或者更好的方式是:让服务器为我们选择一个随机命名的队列。

第二,一旦消费者失去连接,这个队列会被自动删除。

在java的客户端,提供了一个没有参数的queueDeclare()方法能够创建一个临时,唯一,会自动删除且自动命名的队列:

 String queueName = channel.queueDeclare().getQueue();

这样的话,这个队列就是一个随机命名的。举例来说它有点像:amq.gen-JzTY20BRgKO-HjmUJj0wLg

绑定(Bindings)

bindings.png

我们已经创建了一个fanout转换器和一个队列。现在我们需要告诉转换器发送消息给我们的队列。转换器和队列之间的关系就叫着绑定(binding):

channel.queueBind(queueName, "logs", "");

从现在开始,logs转换器将会添加消息到队列。

展示所有的绑定

可以展示已经存在的绑定关系,命令如下:

 rabbitmqctl list_bindings

综合

python-three-overall.png

发送消息的生产者应用,跟之前指导教程中的相差并不大。但是,最大的不同是我们现在需要将消息发布给名叫logs的转换器,而不是匿名的转换器。发送消息时本应该提供一个routingkey的值,但是对于fanout类型的转换器,routingkey属性就没有意义。
下面是EmitLog.java类的源代码,这里下载

     import com.rabbitmq.client.ConnectionFactory;
     import com.rabbitmq.client.Connection;
     import com.rabbitmq.client.Channel;
     public class EmitLog {

     private static final String EXCHANGE_NAME = "logs";

     public static void main(String[] argv)  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

        System.out.println(" [x] Sent '" + message + "'");

        channel.close();

        connection.close();

        }

        //...

     }

正如你所看到的,建立连接后,就声明了转换器,这一步是非常重要的。如果发布到一个不存在的转换器上是被禁止的。(预计会报错吧)
如果没有队列绑定这个转换器,消息将会被丢失。但对我们来说是没问题的,如果没有消费者监听,那我们可以安全的删除这个消息。
下面是ReceiveLogs.java类的源代码,这里下载

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();

    factory.setHost("localhost");

    Connection connection = factory.newConnection();

    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

    String queueName = channel.queueDeclare().getQueue();

    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {

    @Override

    public void handleDelivery(String consumerTag, Envelope envelope,

    AMQP.BasicProperties properties, byte[] body) throws IOException {

    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");

     }

    };

     channel.basicConsume(queueName, true, consumer);

  }

}

先编译,再完成:

javac -cp $CP EmitLog.java ReceiveLogs.java

如果你想保留日志到文件中,只需要打开一个窗口和记录:

java -cp $CP ReceiveLogs > logs_from_rabbit.log

如果你想在你的屏幕上看到日志,运行一个终端:

java -cp $CP ReceiveLogs

当然,发送日志类型:

java -cp $CP EmitLog 

使用rabbitmqctl list_bindings命令行,你可以核实是否是我们想要的绑定和队列。有两个ReceiveLogs.java应用在运行,应该就可以看到下面的情况:

sudo rabbitmqctl list_bindings

# => Listing bindings ...

# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue  []

# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue []

# => ...done.

结果是明确的:数据从logs转换器发送至两个服务端命名的队列中,这确切来说就是我们的意图。

第三节的内容大致翻译完了,这里是原文链接。接着进入下一节:Routing

终篇是我对RabbitMQ使用理解的总结文章,欢迎讨教。
--谢谢--

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,638评论 18 139
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,353评论 2 34
  • RabbitMQ 原理介绍及安装部署 标签:RabbitMQ 安装 简介 RabbitMQ 是一个用 Erlang...
    神仙CGod阅读 8,564评论 0 60
  • RabbitMQ笔记 本文参考资料:http://blog.csdn.net/chwshuang/article/...
    wangxiaoda阅读 2,819评论 0 11
  • 关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时...
    预流阅读 584,579评论 51 786