消息中间件-RabbitMQ

一、简介

1.1 什么是AMQP

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

1.2 什么是RabbitMQ

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。

1.3 RabbitMQ 的特点

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

  • 可靠性(Reliability):RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认
  • 灵活的路由(Flexible Routing):在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange
  • 消息集群(Clustering):多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker
  • 高可用(Highly Available Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用
  • 多种协议(Multi-protocol):RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等
  • 多语言客户端(Many Clients):RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等
  • 管理界面(Management UI):RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面
  • 跟踪机制(Tracing):如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么
  • 插件机制(Plugin System):RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件

1.4 RabbitMQ 的基本概念

下面展示了RabbitMQ 消息的过程,我们会围绕这张图学习一下 RabbitMQ 的一些基本概念


image

1.4.1 生产者与消费者

与其它的消息中间件一样,RabbitMQ中包含消息生产者和消息消费者,生产者创建消息发布到代理服务器,消费者从代理服务器获取消息。在实际应用中,生产者和消费者之间的角色是可以相互转换的。

1.4.2 消息

消息由有效载荷(payload)和标签(label)组成。有效载荷就是你想要传输的数据,可以是任何内容,一个数组,一个集合,甚至二进制数据都可以。而标签描述了有效载荷,由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

1.4.3 信道

多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

1.4.4 交换器与绑定(原文)

交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:

  • fanout:把所有发送到该Exchange的消息路由到所有与它绑定的Queue中
  • direct:把消息路由到bindingKey与routingKey完全匹配的Queue中
  • topic:把消息路由到bindingKey与routingKey模糊匹配的Queue中
  • headers:headers类型的Exchange不依赖于routingKey与bindingKey的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配
1.4.4.1 Fanout
fanout
  • 生产者(P)发送到Exchange(X)的所有消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费
  • 如果配置了routing_key会被忽略
1.4.4.2 direct
direct
  • routingKey=”error”发送消息,则会同时路由到Queue1(amqp.gen-S9b…)和Queue2(amqp.gen-Agl…)
  • routingKey=”info”或routingKey=”warning”发送消息,则只会路由到Queue2
  • 以其它routingKey发送消息,则不会路由到这两个Queue中
1.4.4.3 topic
topic
  • routingKey=”quick.orange.rabbit”发送信息,则会同时路由到Q1与Q2
  • routingKey=”lazy.orange.fox”发送信息,则只会路由到Q1
  • routingKey=”lazy.brown.fox”发送消息,则只会路由到Q2
  • routingKey=”lazy.pink.rabbit”发送消息,则只会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配)
  • routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”发送消息,则会被丢弃,它们并没有匹配任何bindingKey
1.4.4.4 header

headers类型的Exchange不依赖于routingKey与bindingKey的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

1.5 虚拟主机

一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”

二、Spring Boot 2.0 整合RabbitMQ

2.1 添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2 配置文件

spring.application.name=rabbitmq
#IP地址
spring.rabbitmq.host=127.0.0.1
#rabbitmq默认端口号
spring.rabbitmq.port=5672
#账户名和密码
spring.rabbitmq.username=simon
spring.rabbitmq.password=123456

2.3 rabbitmq 配置

@Configuration
public class RabbitConfig {
  //fanout 把所有发送到该Exchange的消息路由到所有与它绑定的Queue中
  public static final String FANOUT_QUEUE1 = "fanout.queue1";
  public static final String FANOUT_QUEUE2 = "fanout.queue2";
  public static final String FANOUT_EXCHANGE = "fanout.exchange";

  //redirect 把消息路由到bindingKey与routingKey完全匹配的Queue中
  public static final String DIRECT_QUEUE = "direct.queue";
  public static final String DIRECT_EXCHANGE = "direct.exchange";

  //topic 把消息路由到bindingKey与routingKey模糊匹配的Queue中
  public static final String TOPIC_QUEUE1 = "topic.queue1";
  public static final String TOPIC_QUEUE2 = "topic.queue2";
  public static final String TOPIC_EXCHANGE = "topic.exchange";

  /**
   * Fanout模式 队列1
   *
   * @return
   */
  @Bean
  public Queue fanoutQueue1() {
    return new Queue(FANOUT_QUEUE1);
  }

  /**
   * Fanout模式 队列2
   *
   * @return
   */
  @Bean
  public Queue fanoutQueue2() {
    return new Queue(FANOUT_QUEUE2);
  }

  /**
   * Fanout模式 交换器
   *
   * @return
   */
  @Bean
  public FanoutExchange fanoutExchange() {
    return new FanoutExchange(FANOUT_EXCHANGE);
  }

  /**
   * Fanout模式 队列1与交换器绑定
   *
   * @return
   */
  @Bean
  public Binding fanoutBinding1() {
    return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
  }

  /**
   * Fanout模式 队列2与交换器绑定
   *
   * @return
   */

  @Bean
  public Binding fanoutBinding2() {
    return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
  }

  /**
   * direct模式 队列
   *
   * @return
   */
  @Bean
  public Queue directQueue() {
    return new Queue(DIRECT_QUEUE);
  }

  /**
   * direct模式 队列与交换机绑定
   *
   * @return
   */
  @Bean
  public DirectExchange directExchange() {
    return new DirectExchange(DIRECT_EXCHANGE);
  }

  @Bean
  public Binding directBinding1() {
    return BindingBuilder.bind(directQueue()).to(directExchange()).with("rabbitmq.springboot");
  }

  /**
   * Topic模式 队列1
   *
   * @return
   */
  @Bean
  public Queue topicQueue1() {
    return new Queue(TOPIC_QUEUE1);
  }

  /**
   * Topic模式 队列2
   *
   * @return
   */
  @Bean
  public Queue topicQueue2() {
    return new Queue(TOPIC_QUEUE2);
  }

  /**
   * Topic模式 交换器
   *
   * @return
   */
  @Bean
  public TopicExchange topicExchange() {
    return new TopicExchange(TOPIC_EXCHANGE);
  }

  /**
   * Topic模式 队列1与交换器绑定
   *
   * @return
   */
  @Bean
  public Binding topicBinding1() {
    return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("rabbitmq.springboot");
  }

  /**
   * Topic模式 队列2与交换器绑定
   *
   * @return
   */
  @Bean
  public Binding topicBinding2() {
    return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("rabbitmq.#");
  }
}

2.4 消息实体

public class User implements Serializable {
  private String id;
  private String username;
  private String password;

  public User(String id, String username, String password) {
    this.id = id;
    this.username = username;
    this.password = password;
  }

  public String getId() {
    return id;
  }

  public void setId(String id) {
    this.id = id;
  }

  public String getUsername() {
    return username;
  }

  public void setUsername(String username) {
    this.username = username;
  }

  public String getPassword() {
    return password;
  }

  public void setPassword(String password) {
    this.password = password;
  }
}

2.5 消息发送

package priv.simon.springboot.rabbitmq.sender;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import priv.simon.springboot.rabbitmq.config.RabbitConfig;
import priv.simon.springboot.rabbitmq.entity.User;

/**
 * Created by simon on 2018/12/20.
 */
@Component
public class Sender {
  @Autowired
  private AmqpTemplate rabbitTemplate;

  /**
   * fanout 发送信息
   *
   * @param user 用户信息
   */
  public void fanoutSend(User user) {
    this.rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE, "", user);
  }

  /**
   * direct 发送信息
   *
   * @param user 用户信息
   */
  public void directSend(User user) {
    this.rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, "rabbitmq.springboot", user);
  }

  /**
   * topic 发送信息
   *
   * @param user 用户信息
   */
  public void topicSend(User user) {
    this.rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE,"rabbitmq.springboot", user);
    this.rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, "rabbitmq.springcloud", user);
  }
}

2.6 消息消费

@Component
public class Receiver {
  @RabbitListener(queues = RabbitConfig.FANOUT_QUEUE1)
  public void receiveFanout1(User user) {
    System.out.println("fanout_queue1接收消息" + user);
  }
  @RabbitListener(queues = RabbitConfig.FANOUT_QUEUE2)
  public void receiveFanout2(User user) {
    System.out.println("fanout_queue2接收消息" + user);
  }

  @RabbitListener(queues = RabbitConfig.DIRECT_QUEUE)
  public void receiveDirect1(User user) {
    System.out.println("direct_queue接收消息" + user);
  }

  @RabbitListener(queues = RabbitConfig.TOPIC_QUEUE1)
  public void receiveTopic1(User user) {
    System.out.println("topic_queue1接收消息" + user.toString());
  }
  @RabbitListener(queues = RabbitConfig.TOPIC_QUEUE2)
  public void receiveTopic2(User user) {
    System.out.println("topic_queue2接收消息" + user.toString());
  }
}

2.7 测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {

  @Autowired
  private Sender sender;

  @Test
  public void testFanout(){
    User user=new User("1","simon","fanout");
    sender.directSend(user);
  }

  @Test
  public void testDirect(){
    User user=new User("1","simon","direct");
    sender.directSend(user);
  }

  @Test
  public void testTopic(){
    User user=new User("1","simon","topic");
    sender.directSend(user);
  }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,899评论 2 11
  • 文章摘要:本篇文章为RabbitMQ的入门文章,不像其他一些程序代码和应用实战性的文章会带着大家从一个“Hello...
    癫狂侠阅读 3,615评论 0 13
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,353评论 2 34
  • 1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的...
    高广超阅读 6,093评论 3 51
  • 我们都是受过伤害的人 想爱,却又怯于爱 那些情欲,曾 剧烈摇晃过我们的梦 成为了所谓的诗人,并且 什么也不再是,也...
    李知树阅读 267评论 0 2