[用官方文档学习RabbitMQ]——3.RabbitMQ的发布订阅模式——Publish/Subscribe

继续翻译第一次尝试进行这样模式的学习,感觉好难进行,不过还是要坚持住

简介

在之前的教程中,我们创建了一个工作队列,工作队列使用情况的假设是:每个人物都交付给一个Worker,也就是消费者。在这部分中,我们将做一些完全不同的事情——我们将向多个消费者传递消息。这样的模式被称为“发布/订阅"模式,检查P/S模式。
为了说明这个模式,我们将会构建一个简单的日志记录系统。它将由两个程序组成:1.第一个程序发送日志消息。2.第二个程序将接受打印这些日志。
在我们的日志系统中,有接收功能的程序都将得到消息。所以,我们就可以运行一个接收器,将这些日志引导到磁盘。同时我们再运行另一个接收器,功能是让我们在屏幕上看到日志。
本质是:生产者发送的消息,会被传播到所有消费者那里去。

Exchange——交换器

在之前的教程之中,我们只是通过队列发送和接受消息。接下来我们需要了解一下完整的消息传递模型。
让我们快速的回顾一下前面教程中介绍的内容:

  • 生产者是一个应用程序,它的任务是发送消息
  • 队列是存储消息的缓冲区
  • 消费者是一个应用程序,它的任务是接收消息

完整的RabbitMQ消息传递模型的核心思想是——生产者不会直接向队列发送任何消息!甚至消费者都不知道消息是否会被传递到哪些队列。
那么这些消息发送给谁了呢?
生产者只能将消息发送到Exchange,也就是交换器里面,交换是一个很简单的事情。一方面,它接受来自生产者的消息,另一方面则把消息推送到队列里面。交换器必须知道如何处理它收到的消息——它是否应该推送到特定的队列中?它是否应该被推送到N个队列里面?获取它应该被抛弃?
答:这个规则是由交换类型定义的。

Exchange交换器

有一些可用的交换类型:directtopicheadersfanout。我们这次主要关注点在fanout上,我们将会创建这种类型的交换,并调用它的日志。

channel.exchangeDeclare("logs","fanout");

fanout类型很简单,它会将接收到的所有消息,传播到它知道的所有队列中去。对于我们的系统来说,这正是我们需要的。
简单说一下这几个交换类型

  • direct : 所有发送到direct类型的交换器中的消息都会被转发到”RouteKey“中指定的队列。

  • fanout : 所有发送到fanout类型的交换器中的消息都会被转发到所有与该交换器绑定的队列。

  • topic : 所有发送到topic类型的交换器中的消息都会被转发到所有关心RouteKey中指定的话题的队列。

  • header : 这个用的比较少,忽略了RouteKey的路由方式,使用Headers来匹配。Headers是个键值对。

(我自己也是在学习中,不是很熟悉,以后我研究研究,明白了单写一个番外)

交换器列表
我们可以通过rabbitmqctl语句列出我们可以运行的可用的交换器列表:

rabbitmqctl list_exchanges
listing Exchanges

出来这些东西,莫方!很多带着amq做开头的交换器和没有命名的交换器,这些都是默认创建的,而且我们目前用不上他们。

没有名字的交换器
教程的前面几个部分里面,我们对交换器一无所知,但是仍然能够将消息发送到队列里面去。这是为啥?
我们使用的是默认的交换器,我们用空字符串("")去识别它。
回忆一下前面我们是如何发送消息的:

channel.basicPublish("","hello",null,message.getBytes());

第一个参数是空字符串,这个就是我们使用的交换器的名称。空字符串表示默认或者匿名的交换器。如果消息存在,那么则使用RoutingKey指定的名称将消息放到队列中去。

现在,我们可以发布到我们自己命名的交换器啦:

channel.basicPublish("logs","",null,message.getBytes());

临时队列——Temporyary Queues

你可能记得我们使用过有指定名称的队列(还记得"hello"和task_queue"吗?)。对于我们来说,能够给队列命名,是至关重要的,因为我们需要把Worker(消费者)指向相同的队列。当我们想要在消费者和生产者之间共享队列的时候,给队列命名就会尤为重要。(我也不知道为啥官方把这句话说了两遍,可能很重要吧ㄟ( ▔, ▔ )ㄏ...)
但是!!!对于我们的Log来说,情况就不一样啦。我们希望拿到所有关于日志的消息,而不只是它们中的一部分。我们也只对当前流动的消息感兴趣,而不是旧消息。想要解决这个问题,我们需要理清两件事:
首先,每当我们连接到RabbitMQ,我们都需要一个新的,空的队列。要做到这一点,我们可以创建一个带有随机名称的队列,或者,我们选择更好的方式——让服务器为我们选择一个随机的队列名称。其次,一旦我们断开了消费者的连接,应该自动删除队列。在Java客户端,当我们没有向queueDeclare()提供参数的时候,我们会创建一个非持久的,独占的,自动删除的队列,并会生成一个名称。

String queueName = channel.queueDeclare().getQueue();

此时,queueName包含一个随机队列名称。比如,他可能看起来像amq.gen-jzty20brgko-hjmuj0wlg (总之是乱七八糟的)

绑定——Bindings

Bindings

我们已经创建了一个fanout类型的交换器,和一个队列,现在我们需要告诉交换器,让他将消息发送到我们的队列中区。交换器和队列之间的关系叫做绑定。

channel.queueBind(queueName,"logs","")

从现在开始,我们的log交换器将会向我们队列添加消息了。

完整示例

这次我没有像之前,先贴例子。这次我选择先写小部分,最后写完整的,这样就和官方基本上一模一样了,会在某些地方加上自己的理解。还有注释!我还是会写的很完整的!


p/s

发布日志消息的生产者程序与前一期没啥太大的不同,不过有些变化比较重要。我们现在想要将消息发布到日志交换器中,而不是无名的交换器。我们需要在发送的时候提供一个路由键(RoutingKey),但是我们忽略了它的value,因为我们的交换类型是fanout。

EmitLog.java

public class EmitLog {
    //设置交换器的名字
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException {

        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明交换器,给它名字,设置交换类型为fanout
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //待传递的消息内容
        String message = getMessage(args);
        //传消息
        channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
        System.out.println("[x] Sent '"+message+"'");
        //关闭连接通道
        channel.close();
        connection.close();
    }

    private static String getMessage(String[] strings) {
        if (strings.length<1){
            return "hello world";
        }
        return joinStrings(strings," ");
    }

    private static String joinStrings(String[] strings, String delimiter) {
        int length = strings.length;
        if (length == 0){
            return "";
        }
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1;i < length; i++){
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

(我贴心的把官方没给你们的工具函数也写上了!快夸我(☆▽☆))
正如你们看到的,建立连接之后,我们声明了交换器,这个步骤是必要的,因为把消息发布到一个不存在的交换器上是禁止的!
如果没有队列绑定到交换中,消息将消失,但是对于我们是允许的,因为如果没有消费者在收集消息,我们可以安全的丢弃信息。
ReceiveLogs.java

public class ReceiveLogs {
    //设置交换器的名字
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明交换器,给它名字,设置交换类型为fanout
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //得到队列的名字
        String queueName = channel.queueDeclare().getQueue();
        //队列和交换器进行绑定
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("[*] Waiting for message.To exit press CTRL+C");
        //接收
        Consumer consumer =new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("[x]Receive '"+ message+"'");
            }
        };
        boolean autoAck = true;
        channel.basicConsume(queueName,autoAck,consumer);
    }
}

这里可以自己试试,打开两个或者三个ReceiveLogs,然后尝试看看发送一条信息,会发生什么?

结果

本来想你们自己尝试来着,不过我还是回来把结果图贴上。具体怎么做请看第二期工作队列模式里面教的方式。
生产者:

生产者

打开的三个消费者:

生产者1
生产者2

第三个反正长得一样就不贴了,╭(╯^╰)╮

试验成功,一个发送,三个同时接受成功!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,580评论 18 139
  • 在前面的教程里,我们构建了一个简单的日志记录系统。我们已经能够向许多消费者传送日志消息啦。在本期,我们将会做一些修...
    AceCream佳阅读 526评论 0 2
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,343评论 2 34
  • RabbitMQ笔记 本文参考资料:http://blog.csdn.net/chwshuang/article/...
    wangxiaoda阅读 2,811评论 0 11
  • RabbitMQ 原理介绍及安装部署 标签:RabbitMQ 安装 简介 RabbitMQ 是一个用 Erlang...
    神仙CGod阅读 8,550评论 0 60