RabbitMQ Federation

Federation 插件的目标是使 RabbitMQ 在不同的 Broker 节点之间进行消息传递而无须建立集群,该功能在很多场景下都非常有用:

  • Federation 插件能够在不同管理域(可能设置了不同的用户和 vhost ,也可能运行在不同版本的 RabbitMQ 和Erlang 上)中的Broker 或者集群之间传递消息。

  • Federation 插件基于AMQP 0-9-1 协议在不同的Broker 之间进行通信,并设计成能够容忍不稳定的网络连接情况。

  • 一个Broker 节点中可以同时存在联邦交换器(或队列)或者本地交换器(或队列),只需要对特定的交换器(或队列)创建Federation 连接CFederation link ) 。

  • Federation 不需要在 N 个Broker 节点之间创建O(N2)个连接(尽管这是最简单的使用方式) ,这也就意味着Federation 在使用时更容易扩展。

一个联邦交换器(federated exchange)或者一个联邦队列(federated queue) 接收上游(upstream) 的消息,这里的上游是指位于其他 Broker 上的交换器或者队列。联邦交换器能够将原本发送给上游交换器(upstream exchange)的消息路由到本地的某个队列中;联邦队列则允许一个本地消费者接收到来自上游
队列C(upstream queue)的消息。

联邦交换器

联邦交换器.PNG

场景描述:broker1 部署在北京,broker2 部署在上海,broker3 部署在广州。

在 broker3 中为交换器 exchangeA (broker3 中的队列 queueA 通过 "rkA" 与 exchangeA 进行了绑定)与北京的 broker1 之间建立一条单向的 Federation link。此时 Federation 插件会在 broker1 上会建立一个同名的交换器 xchangeA (这个名称可以配置,默认同名),同时建立一个内部的交换器 "exchangeA→ broker3 B ",并通过路由键 "rkA" 将这两个交换器绑定起来。

部署在北京的业务 ClientB 可以连接 broker1 并向 exchangeA 发送消息,这样 ClientB 可以迅速发完消息并收到确认消息,而之后消息会通过 Federation link 转发到 broker3 的交换器 exchangA 中。最终消息会存入与 exchangeA 绑定的队列 queueA 中,消费者可以消费队列 queueA 中的消息。

经过 Federation link 转发的消息会带有特殊的 headers 属性标记。

联邦队列

联邦队列可以在多个 Broker 节点(或集群)之间为单个队列提供负载均衡的功能。一个联邦队列可以连接一个或者多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息的需求。

联邦队列.PNG

上图队列 queue1 和 queue2 原本在 broker2 中,由于某种需求将其配置为 federated queue 并将broker1 作为 upstream。Federation 插件会在 broker1 上创建同名的队列 queue1 和 queue2 ,与 broker2 中的队列queue1和 queue2 分别建立两条单向独立的 Federation link。

此时的消费方式如下:

  • 当有消费者 ClinetA 连接 broker2并通过 Basic.Consume 消费队列 queue1 (或queue2) 中的消息时,如果队列queue1 (或queue2)中本身有若干消息堆积,那么ClientA 直接消费这些消息,此时 broker2 中的queue1 (或queue2 ) 并不会拉取 broker1中的queue1 (或queue2 ) 的消息。

  • 如果队列 queue1(或 queue2)中没有消息堆积或者消息被消费完了,那么它会通过 Federation link 拉取在 broker1 中的上游队列 queue1(或者 queue2)中的消息(如果有消息),然后存储到本地,之后再被消费者 ClientA 进行消费。

联邦队列的传递性.PNG

上图描述,如果队列 queue1 中有消息堆积,消费者连接 broker3 消费消息 queue3 中的消息,无论 queue3 处于何种状态,这些消费者都消费不到 queue1 中的消息,除非 queue2 有消费者。

上图中的 broker2 的队列 queue 没有消息堆积或者消息被消费完之后并不能通过 Basic.Get 来获取 broker1 中队列 queue 的消息。因为 Basic.Get 是一个异步的方法,如果要从 broker1 中队列 queue 拉取消息,必须要阻塞等待通过 Federation link 拉取消息存入 broker2 中的队列 queue 之后再消费消息,所以对于 federation queue 而言只能使用 Basic.Consume 进行消费。

federation queue 并不具备传递性。

Federation 的使用

为了能够使用 Federation 功能,需要配置以下 2 个内容:

(1)需要配置一个或多个 upstream ,每个 upstream 均定义了到其他节点的 Federation link。这个配置可以通过设置运行时的参数( Runtime Parameter ) 来完成,也可以通过 federation management 插件来完成。

(2)需要定义匹配交换器或者队列的一种/多种策略(Policy)。

rabbitmq-plugins enable rabbitmq_federation 命令可以开启 Federation 功能。Federation 内部基 AMQP 协议拉取数据,所以在开启 rabbitmq federation 插件的时候,默认会开启 amqp_client 插件。

rabbitmq-plugins enable rabbitmq_federation_management 开启 Federation 的管理插件。开启成功后可以在 RabbitMQ 的管理界面中 "Admin" 中看到关于 Federation 的 Tab 页。

rabbitmq_federation_management 插件依附于 rabbitmq_management 插件,所以开启rabbitmq_federation_management 插件的同时默认也会开启rabbitmq_management 插件。

注意:当需要在集群中使用 Federation 功能的时候,集群中所以的节点都应该开启 Federation 插件。

在 Federation 中存在 3 种级别的配置。

(1)Upstreams:每个 upstream 用于定义与其他 Broker 建立连接的信息。

(2)Upstreams Sets:每个 upstream set 用于对一系列使用 Federation 功能的 upstream 进行分组。

(3)Policies:每一个 Policy 会选定出一组交换器,或者队列,亦或者两者皆有而进行限定,进而作用于一个单独的 upstream 或者 upstream set 之上。

建立 federated exchange 示例:

(1)需要在上下游的 broker 中开启 rabbitmq federation 插件,最好同时开启 rabbitmq federation managemen 插件。

(2)在 broker 中定义一个 upstream (注意:该 broker 作为下游接收消息)。

add_federation_upstream.PNG
  • Name:定义这个 upstream 的名称。

  • URI:定义 upstream 的 AMQP 连接,如amqp://root:root123@192.168.0.2:5672

  • Prefetch count:定义 Federation 内部缓存的消息条数,即在收到上游消息之后且在发送到下游之前缓存的消息条数。

  • Reconnect delay

  • Acknowledgement Mode:定义 Federation link 的消息确认方式。

  • Trust User-ID:设定 Federation 是否使用 “Validated User-ID” 这个功能。

(3)定义一个 Policy 用于匹配交换器,并使用第二步中所创建的 upstream。

add_federation_policy.PNG

(4)效果图观察

  • 上游
上游.PNG

标注的 exchange 自动创建。

  • 下游
下游.PNG

当消息发送到上游的交换器后,上游交换器接收消息并转发到下游交换器中。

测试代码:

  • RabbitMQSender
public class RabbitMQSender {
    private final static String EXCHANGE_NAME = "federation.exchange1";
    private final static String DUHFMQ02 = "10.225.20.237";

    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setUsername("jiaflu");
        factory.setPassword("123456");
        factory.setVirtualHost("/vhost_jiaflu");

        // 创建连接
        try {
            Address[] address = new Address[]{new Address(DUHFMQ02)};
            Connection connection = factory.newConnection(address);
            // 创建信息管道
            Channel channel = connection.createChannel();
            // exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, false, null);
            String message;

            for (int i = 0; i < 10; i++) {
                message = System.currentTimeMillis() + " hello " + i;
                System.out.println(i + " send " + message);
                channel.basicPublish( EXCHANGE_NAME, "federation.exchange.test", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                Thread.sleep(30);
            }
            channel.close();
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e){
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • RabbitMQReceiver
public class RabbitMQReceiver {
    private final static String QUEUE_NAME = "federation.exchange.queue1";
    private final static String EXCHANGE_NAME = "federation.exchange1";

    private final static String DUHFMQ = "10.224.162.189";

    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setUsername("jiaflu");
        factory.setPassword("123456");
        factory.setVirtualHost("/vhost_jiaflu");

        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(2);

        // 创建连接
        try {
            Address[] address = new Address[]{new Address(DUHFMQ)};
            Connection connection = factory.newConnection(address);
            // 创建信息管道
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, false, null);
            // bind
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "federation.exchange.test");
            System.out.println("Queue Receiver Start!");

            // 定义一个消费者
            Consumer consumer = new DefaultConsumer(channel) {
                // 消息到达 触发方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("Recv msg: " + msg);
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            boolean autoAck = true;
            while (true) {
                channel.basicConsume(QUEUE_NAME, autoAck, consumer);
                break;
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e){
            e.printStackTrace();
        }
    }
}

小结

本来由于公司需要需要在两个 clutser 之间通过 Federation 插件来实现 HA(高可用),采取了双向的 federation exchange 来实现两个 cluster 之间的消息复制,实现了消息的同步复制,但没有实现两个 cluster 之间消息消费的同步。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,907评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,987评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,298评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,586评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,633评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,488评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,275评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,176评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,619评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,819评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,932评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,655评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,265评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,871评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,994评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,095评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,884评论 2 354

推荐阅读更多精彩内容

  • 利用RabbitMQ集群横向扩展能力,均衡流量压力,让消息集群的秒级服务能力达到百万,Google曾做过此类实验;...
    有货技术阅读 3,466评论 0 1
  • Federated Exchanges:联盟交换模式图下图 Federation插件,使得不同集群的节点之间可以传...
    凌雲木阅读 2,965评论 1 3
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,911评论 2 11
  • 应用场景 异步处理 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种: 1.串行的方式 2.并行的...
    lijun_m阅读 1,827评论 0 3
  • 今天,我的妈妈带我出去玩。 我们今天去了我们村儿的一个小树林里玩。我们带了好多好吃的,都装在了一个小背包里面儿,那...
    张晨曦_3582阅读 195评论 0 0