RabbitMQ(四) - 发布/订阅(Publish/Subscribe)

发布/订阅

上一个教程中我们创建了一个工作队列。如果说工作队列是将一个任务完全分发给一个消费者。在这部分,我们所做的完全不同 —— 我们将把一个消息交付给多个消费者。这种模式称之为发布/订阅(publish/subscribe)。

为了说明这种模式,我们创建一个简单的日志系统。它由两个程序组成 —— 第一个(生产者)将发出一个日志消息,第二个(消费者)将接收它并打印它。

在我们的日志系统中每个接收者程序都能得到消息的拷贝。这样我们就可以跑一个接收者并直接将日志记录到磁盘;与此同时能跑另一个接收者将日志打印在屏幕。

本质上,发布一个日志消息将被广播到所有的接收者。

交换机(exchange)

在前面的教程中,我们向队列发送消息,从队列中接收消息。现在是时候介绍Rabbit中完整的消息传递模型了。

让我们快速的回顾下之前的教程:

  • 生产者是一个用来发送消息的用户应用
  • 队列用来缓存存储消息
  • 消费者是一个用来接收消息的用户应用

RabbitMQ的消息模型的核心思想是生产者不会直接的向队列发送任何消息。实际上,生产者都不知道将消息交付给哪个队列。

相反的,生产者只将消息往exchange中发送,exchange是非常简单的。一边从生产者中接收消息,一边将消息推到队列。exchange能正确的知道如何处理接收到的消息。是添加到一个特定的队列?是添加到很多队列?还是会丢弃。这些规则都通过exchange类型来定义。

rabbitmq-exchange

exchange有以下几种类型:

  • direct
  • topic
  • headers
  • fanout

我们目前关注最后一个 —— fanout。让我们新建一个这样的exchange类型,并命名为logs

channel.exchangeDeclare("logs", "fanout");

fanout exchange非常简单。你可以尽可能的从它的名字中猜测,它只是将接收到的消息广播到它所知道的队列。这正是我们需要的日志。

exchange 列表

你想在服务中查看exchange的列表,你能使用命令 rabbitmqctl

sudo rabbitmqctl list_exchanges

在列表中有些默认的、格式为amq.*的exchange。这些都是默认创建的,但是你不太可能需要使用到它们。

匿名 exchange

在前面的教程中我们对exchange一无所知,但是我们还是能将消息发送到队列。之所以可以,因为我们使用了默认的exchange,就是我们用的空字符串("")。

回想我们之前发布的一个消息:

channel.basicPublish("", "hello", null, message.getBytes());

第一个参数就是exchange的名称。空字符表示默认的或者匿名的exchange。消息路由到指定routingKey的队列中,如果它存在。

现在,我们能发布我们命名的exchange了:

channel.basicPublish( "logs", "", null, message.getBytes());

临时队列

还记得前面教程中我们使用的队列都是指定名字的(hello、work.queue)。命名一个队列对我们来是至关重要的——我们需要指定工作者到相同的队列中。当你想在生产者和消费者之间共享队列时,给队列命名是很重要的。

但是在我们的日志中者不是我们要关心的。我们想得到所有的日志消息,而不是他们的子集。我们感兴趣的也只是当前活动的消息而不是老的那个。为了解决这个我们需要做两个步骤:

第一,无论何时我们连接Rabbit需要一个新的,空的队列。要达到这个我们需要在创建队列的时候给它随机一个名字,或者更好方案 —— 让服务选择一个随机队列名给我们。

第二,一旦我们消费完队列断开连接就自动的删除队列。

我们提供了一个无参方法queueDeclare()创建一个非持久化、独有的、自动删除的、随机生成名字的队列。

String queueName = channel.queueDeclare().getQueue();

队列名是随机的,它的格式类似于:amq.gen-JzTY20BRgKO-HjmUJj0wLg

绑定(bindings)

rabbitmq-binding

我们已经创建了一个fanout exchange和一个队列。现在我们需要告诉exchange发送消息到我们的队列。这种exchange和队列的关联关系称之为绑定binding

channel.queueBind(queueName, "logs", "");

现在在logs exchange中可以将消息附加到我们的队列

绑定列表

你能显示当前正在使用的bindings列表

rabbitmqctl list_bindings

信息汇总

rabbitmq-three-overall

这个发送日志消息的生产者程序,和之前的教程没有相差很多。最重要的改变是我们用名称为logs的exchange代替了匿名的写法。当我们发送消息的时候要提供一个routingKey,但是在类型为fanout的exchange中,我们忽略它。下面是EmitLog.java整个程序的代码。

package com.roachfu.tutorial.rabbitmq.website.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 定义 exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = "this is fanout exchange";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

如你所见,建立连接之后我们定义了exchange。这步是必要的,推送到一个不存在的exchange是不予许的。

如果没有队列绑定到exchange,消息将被丢失,但是对我们来说是可以的;如果没有消费者监听消息,我们能安全的丢弃消息。

官网使用命令行实现的。能很好的实现是显示在控制台还是记录到日志文件中。这里我们写两个程序,一个用来在console控制台打印消息,一个用来将消息记录到日志文件中。

ReceiveLogsToConsole.java

package com.roachfu.tutorial.rabbitmq.website.fanout;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 打印消息到控制台
 */
public class ReceiveLogToConsole {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 定义exchange,消费者和生产者都要定义。因为并不知道exchange存不存在。
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 获取队列名
        String queryName = channel.queueDeclare().getQueue();
        // 将队列名和exchange进行绑定
        channel.queueBind(queryName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for message and handle it to console . . . ");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received message '" + message + "'");
            }
        };

        channel.basicConsume(queryName, true, consumer);
    }
}

ReceiveLogToFile.java

package com.roachfu.tutorial.rabbitmq.website.fanout;

import com.rabbitmq.client.*;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 接收消息并打印到文件
 */
public class ReceiveLogToFile {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for message and handle it to file. . . ");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                File file = new File("/Users/temp/fanout.log");

                FileOutputStream out = new FileOutputStream(file, true);
                out.write(body);
                out.write(("\r\n").getBytes());

                out.flush();
                out.close();
            }
        };

        channel.basicConsume(queueName, true, consumer);
    }
}

测试结果

我们先启动两个消费者,然后再启动三次生产者。

ReceiveLogToConsole.java 控制台显示:

 [*] Waiting for message and handle it to console . . . 
 [x] Received message 'this is fanout exchange'
 [x] Received message 'this is fanout exchange'
 [x] Received message 'this is fanout exchange'

ReceiveLogToFile.java 文件显示

rabbitmq-logtofile
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,490评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,581评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,830评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,957评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,974评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,754评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,464评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,847评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,995评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,137评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,819评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,482评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,023评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,149评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,409评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,086评论 2 355

推荐阅读更多精彩内容

  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,361评论 2 34
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,672评论 18 139
  • 继续翻译第一次尝试进行这样模式的学习,感觉好难进行,不过还是要坚持住! 简介 在之前的教程中,我们创建了一个工作队...
    AceCream佳阅读 1,305评论 0 3
  • 1 RabbitMQ安装部署 这里是ErLang环境的下载地址http://www.erlang.org/down...
    Bobby0322阅读 2,239评论 0 11
  • > 这是开始读书的一丝想法。包括读前与后。 未雨绸缪 做任何事情都顺带着培养自己的习惯。 这些习惯更多的预示着在无...
    Songlairui阅读 382评论 0 0