SpringBoot 与 RabbitMQ

简介

RabbitMQ是一款开源的消息队列中间件, 使用Advanced Message Queuing Protocol (AMQP) 为协议来处理队列消息。由于当下分布式系统越来越普及,消息队列中间件也越来越被得到使用。

和RabbitMQ同类型的消息中间件也有

  • ActiveMQ
  • RocketMQ
  • Kafka

这篇文章暂时不分析几种MQ的比较,主要是写一些SpringBoot与RabbitMQ的使用

Ubuntu 上简单安装 RabbitMQ

首先找一台Linux服务器.

这里我用的是Ubuntu 16.04(ubuntu-xenial)

添加下载信息

> echo "deb http://www.rabbitmq.com/debian/ testing main" >> /etc/apt/sources.list
> curl http://www.rabbitmq.com/rabbitmq-signing-key-public.asc | sudo apt-key add -
> apt-get update

安装RabbitMQ

> apt-get install rabbitmq-server

安装完后咋们看一下RabbitMQ 有没有跑起来

> service rabbitmq-server status

如果跑起来的话会显示RabbitMQ的基本信息,没有的话运行

> service rabbitmq-server start

接下来我们需要一个后台的控制页面来管理我们的RabbitMQ服务, RabbitMQ其实自带了一个管理后台,所以直接激活一下就行了

> rabbitmq-plugins enable rabbitmq_management

接下来我们需要创建一个可以登录的用户,刚开始可以创建一个admin管理员,并给这个用户最高权限

> rabbitmqctl add_user [username] [password]
> rabbitmqctl set_user_tags [username] administrator
> rabbitmqctl set_permissions -p "/" [username] ".*" ".*" ".*"

当用户创建完毕后访问 http://[你的服务器IP]:15672 , 然后会出现登录画面,用刚才创建的用户登录即可

RabbitMQ 的简单概念与使用

首先看一下面的模型,简单了来讲就是Publisher将消息发送到Exchange上,然后Exchange通过Routes的健值来发布到Queue里,然后Consumer去Queue里消费消息.

RabbitMQ消息队列模型图

这边有三种常用的Exchange的方式

  • Direct Exchange
  • Topic Exchange
  • Fanout Exchange

下面就对上面三种方式我们来进行一下代码的实现

Direct Exchange

Direct Exchange 是RabbitMQ 默认的交换形式,从图上可以看出, 消息发送方直接将带有Routing Key: green 这个参数发送到Exchange中,然后Exchange再将消息分配到对应的Queue中

Direct Exchange Type

Spring 代码实现如下

Producer 部分

@Component
public class DirectProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "This direct exchange message";
        this.rabbitTemplate.convertAndSend("testqueue" , context);
    }
}

代码中将“testqueue” 做为routing key, exchange会将消息发送到名为“testqueue”的Queue里面

Consumer部分

@Component
@RabbitListener(queues = "testqueue")
public class DirectConsumer {

    @RabbitHandler
    public void consume(String context) {
        System.out.println("Direct Exchange Consumer  : " + context);
    }
}

Consumer对“testqueue” 进行监听然后消费消息。

Topic Exchange

Topic Exchange的模式跟Direct 相似,也需要传routing key,但是转发消息是通过通配符来做的。比如:

  • 路由键必须是一串字符,用句号(.) 隔开,比如说 main.queue

  • 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:main.subqueue.*,那么就只能匹配路由键是这样子的:第一个单词是 main,第二个单词是 subqueue。

  • 井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是main.subqueue.test.#,那么,以main.subqueue.test.one开头的路由键都是可以的。

Topic Exchange type

首先我们先造3个Consumer分别监听不同的queue,但是这里面监听的queue有绑定不同的通配符规则, 其中Annotation里的key包含了通配符规则

@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "topicQueueA",durable = "true"),
                exchange = @Exchange(value = "testTopicExchange",type = ExchangeTypes.TOPIC),
                key = "*.topic.A")
)
public class TopicConsumerA {
    @RabbitHandler
    public void consume(String message) {
        System.out.println("Topic Consumer A  : " + message);
    }
}
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "topicQueueB",durable = "true"),
                exchange = @Exchange(value = "testTopicExchange",type = ExchangeTypes.TOPIC),
                key = "*.topic.*" )
)
public class TopicConsumerB {
    @RabbitHandler
    public void consume(String message) {
        System.out.println("Topic Consumer B  : " + message);
    }
}
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "topicQueueC",durable = "true"),
                exchange = @Exchange(value = "testTopicExchange",type = ExchangeTypes.TOPIC),
                key = "main.topic.*")
)
public class TopicConsumerC {
    @RabbitHandler
    public void consume(String message) {
        System.out.println("Topic Consumer C  : " + message);
    }
}

然后我们通过发送不同routing key来控制那些Consumer能收到消息

this.rabbitTemplate.convertAndSend("testTopicExchange", "main.topic.A", context);

main.topic.A 这条routing key 应该是满足上面三个通配符规则的,所有三个Consumer 都能收到发送的消息

this.rabbitTemplate.convertAndSend("testTopicExchange", "test.topic.A", context);

test.topic.A 只能满足A和B的通配符规则,即只有ConsumerA和ConsumerB能收到消息

this.rabbitTemplate.convertAndSend("testTopicExchange", "main.topic.KK", context);

main.topic.KK 只有ConsumerB和C能收到

Fanout Exchange

Fanout Exchange模式是,不管routing key 只要这个queue绑定到fanout exchange 这些queues 都会收到消息,有点像广播的形式.

Fanout Exchange type

接下来我看一下Spring代码的实现部分

@Component
public class FanoutProducer {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "This fanout exchange message";
        rabbitTemplate.convertAndSend("testFanoutExchange","", context);
    }
}

Producer将消息发送到"testFanoutExchange"的exchange中,第二个参数因为是Fanout模式所以我们不需要传routing key。

@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "fanoutqueueA",durable = "true"),
                exchange = @Exchange(value = "testFanoutExchange",type = ExchangeTypes.FANOUT))
)
public class FanoutConsumerA {

    @RabbitHandler
    public void consume(String message) {
        System.out.println("Fanout Exchange Consumer A  : " + message);
    }
}
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "fanoutqueueB",durable = "true"),
                exchange = @Exchange(value = "testFanoutExchange",type = ExchangeTypes.FANOUT))
)
public class FanoutConsumerB {

    @RabbitHandler
    public void consume(String message) {
        System.out.println("Fanout Exchange Consumer B  : " + message);
    }
}

这边consumer中只要将监听的queue绑定到接收消息的"testFanoutExchange" 即可.

这样在fanout这种模式下只要是绑定到指定的exchange上所有的queues都能收到消息

演示代码

以上演示代码可以到 https://github.com/dreamcatchernick/spring-boot-samples 的spring-boot-rabbitmq 目录下载并运行

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,186评论 19 139
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,426评论 2 34
  • 1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的...
    高广超阅读 6,140评论 3 51
  • 1 RabbitMQ安装部署 这里是ErLang环境的下载地址http://www.erlang.org/down...
    Bobby0322阅读 2,291评论 0 11
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,376评论 0 1