【消息中间件】01-Rabbit入门及整合Java客户端

0. 概述

0.1 什么是MQ?

Message Queue(MQ),消息队列中间件。消息队列就是在消息传输过程中保存消息的容器

0.2 为什么要使用MQ?

  • 提高系统响应速度
  • 提高系统稳定性
  • 降低系统耦合性

0.3 RabbitMQ概念

  • 生成者、消费者

Producer:消息的生产者
Consumer:消息的消费者

  • Queue
    消息队列,提供了 FIFO 的处理机制,具有缓存消息的能力。RabbitMQ中,队列消息可以设置为持久化,临时或者自动删除。

  • ExChange
    Exchange 类似于数据通信网络中的交换机,提供消息路由策略。

RabbitMQ 中,Producer 不是通过信道直接将消息发送给 Queue,而是先发送给 ExChange。一个 ExChange 可以和多个 Queue 进行绑定,Producer 在传递消息的时候,会传递一个 ROUTING_KEY,ExChange 会根据这个 ROUTING_KEY 按照特定的路由算法,将消息路由给指定的 Queue。

0.4 Exchange类型

  • direct(默认):直接交换器,工作方式类似于单播,ExChange 会将消息发送完全匹配 ROUTING_KEY 的 Queue(key 就等于 queue)

    direct.jpeg

  • fanout:广播是式交换器,不管消息的 ROUTING_KEY 设置为什么,ExChange 都会将消息转发给所有绑定的 Queue(无视 key,给所有的 queue 都来一份)

    fanout.jpeg

  • topic:主题交换器,工作方式类似于组播,ExChange 会将消息转发和 ROUTING_KEY 匹配模式相同的所有队列(key 可以用“宽字符”模糊匹配 queue),比如,ROUTING_KEY 为 user.stock 的 Message 会转发给绑定匹配模式为 * .stock,user.stock, * . * 和 #.user.stock.# 的队列。( * 表是匹配一个任意词组,# 表示匹配 0 个或多个词组)


    topic.jpeg
  • headers(不推荐):消息体的 header 匹配,无视 key,通过查看消息的头部元数据来决定发给那个 queue(AMQP 头部元数据非常丰富而且可以自定义)。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在

1. RabbitMQ安装及使用

Rabbit官方文档:https://www.rabbitmq.com/documentation.html

1.1 使用Docker安装RabbitMQ

Rabbit是基于Erlang开发的,因此直接安装需要先安装Erlang环境,比较繁琐(具体安装步骤可以看官方文档)
这里建议使用Docker安装

# 安装命令
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

# docker-compose
version: '3.1'
services:
  rabbitmq:
    restart: always
    image: rabbitmq:management
    container_name: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    environment:
      TZ: Asia/Shanghai
      RABBITMQ_DEFAULT_USER: rabbit
      RABBITMQ_DEFAULT_PASS: 123456
    volumes:
      - ./data:/var/lib/rabbitmq

# 启动后访问:http://ip:15627

1.2 Topic exchange 实例(Java)

下面是安装官方教程整理的实例:
https://www.rabbitmq.com/tutorials/tutorial-five-java.html

  • 导入依赖
# pom.xml 依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.1</version>
</dependency>
  • 生产者客户端
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    // 连接参数
    factory.setHost("localhost");
    factory.setUsername("rabbit");
    factory.setPassword("123456");
    
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        
        // 声明 exchange 类型为Topic
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);
        String message = getMessage(argv);

        // 向exchange发送带有routingKey的消息
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
    }
  }
  //..
}
  • 消费者客户端
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    // 连接参数
    factory.setHost("localhost");
    factory.setUsername("rabbit");
    factory.setPassword("123456");
    
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 声明 exchange 类型为Topic
    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    // 创建一个临时queue
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
        System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
        System.exit(1);
    }

    for (String bindingKey : argv) {
        // 使用bindingKey绑定exchange和queue
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
    }

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    // 定义消息回调
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" +
            delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}
  • 运行结果
# 订阅 logs.* 消息日志
java -cp $CP ReceiveLogsTopic "logs.*"

# 订阅 logs.error 消息日志
java -cp $CP ReceiveLogsTopic "logs.error"

# 发送日志
java -cp $CP EmitLogTopic "logs.error" "Runtime error"  

# 两个订阅者都能收到 "Runtime error" 消息

1.3 Spring-Rabbit

  • 导入依赖

选择连接器:
PooledChannelConnectionFactory:
ThreadChannelConnectionFactory
CachingConnectionFactory

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容