(1)理论基础
Erlang语言
最初用于交换机领域的架构模式,这样使得RabbitMQ在Broker之间进行数据交互的性能非常优秀(Erlang有着和原生Socket一样的延迟)。
RabbitMQ
RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在不同的应用之间共享数据(跨平台跨语言)。RabbitMQ是使用Erlang语言编写,并且基于AMQP协议实现。
RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是一个快递站,一个快递员帮你传递快件。RabbitMQ与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。
(2)配置
(1)安装otp、并配置系统环境变量PATH
(2)安装 RabbitMQ、并sbin目录下运行
rabbitmq-plugins enable rabbitmq_management
(3)rabbitmq重新启动、并访问管理页面
http://127.0.0.1:15672
账号:guest
密码:guest
(4)Vue访问参数
export const PROTOCOL = 'ws' //连接协议
export const IP = 'localhost' //ip
export const PORT = '15674' //端口
export const ADDRESS = 'ws'
export const ACCOUNT = 'guest' //账号
export const PASSWORD = 'guest' //密码
Rabbit主要的端口说明:
4369 – erlang发现口
5672 --client端通信口
15672 – 管理界面ui端口
25672 – server间内部通信口
(3)生产者消费者模型
生产者消费者模型:添加了一个队列,并创建了两个消费者用于监听队列消息,我们发现,当有消息到达时,两个消费者会交替收到消息。这一过程虽然不用创建交换机,但会使用默认的交换机,并用默认的直连(default-direct)策略连接队列
//生产者消费者模式的配置,包括一个队列和两个对应的消费者
@Configuration
public class ProducerConsumerConfig {
@Bean
public Queue myQueue() {
Queue queue=new Queue("myqueue");
return queue;
}
}
@Component
public class QueueListener1 {
@RabbitListener(queues = "myqueue")
public void displayMail(Mail mail) throws Exception {
System.out.println("队列监听器1号收到消息"+mail.toString());
}
}
@Component
public class QueueListener2 {
@RabbitListener(queues = "myqueue")
public void displayMail(Mail mail) throws Exception {
System.out.println("队列监听器2号收到消息"+mail.toString());
}
}
(4)发布订阅模型
发布订阅模型,添加两个队列,分别各用一个消费者监听,设置一个交换机,类型为广播(fanout),交换机会将收到的消息广播给所有相连的队列
//发布订阅模式的配置,包括两个队列和对应的订阅者,发布者的交换机类型使用fanout(子网广播),两根网线binding用来绑定队列到交换机
@Configuration
public class PublishSubscribeConfig {
@Bean
public Queue myQueue1() {
Queue queue=new Queue("queue1");
return queue;
}
@Bean
public Queue myQueue2() {
Queue queue=new Queue("queue2");
return queue;
}
@Bean
public FanoutExchange fanoutExchange(){
FanoutExchange fanoutExchange=new FanoutExchange("fanout");
return fanoutExchange;
}
@Bean
public Binding binding1(){
Binding binding=BindingBuilder.bind(myQueue1()).to(fanoutExchange());
return binding;
}
@Bean
public Binding binding2(){
Binding binding=BindingBuilder.bind(myQueue2()).to(fanoutExchange());
return binding;
}
}
@Component
public class SubscribeListener1 {
@RabbitListener(queues = "queue1")
public void subscribe(Mail mail) throws IOException {
System.out.println("订阅者1收到消息"+mail.toString());
}
}
@Component
public class SubscribeListener2 {
@RabbitListener(queues = "queue2")
public void subscribe(Mail mail) throws IOException {
System.out.println("订阅者2收到消息"+mail.toString());
}
}
(5)direct直连交换机通信模型
direct直连交换机通信模型,包括一个direct交换机,三个binding,两个队列,两个消费者监听器,消息只会被投入到routingkey一致的队列中
//direct直连模式的交换机配置,包括一个direct交换机,两个队列,三根网线binding
@Configuration
public class DirectExchangeConfig {
@Bean
public DirectExchange directExchange(){
DirectExchange directExchange=new DirectExchange("direct");
return directExchange;
}
@Bean
public Queue directQueue1() {
Queue queue=new Queue("directqueue1");
return queue;
}
@Bean
public Queue directQueue2() {
Queue queue=new Queue("directqueue2");
return queue;
}
//3个binding将交换机和相应队列连起来
@Bean
public Binding bindingorange(){
Binding binding=BindingBuilder.bind(directQueue1()).to(directExchange()).with("orange");
return binding;
}
@Bean
public Binding bindingblack(){
Binding binding=BindingBuilder.bind(directQueue2()).to(directExchange()).with("black");
return binding;
}
@Bean
public Binding bindinggreen(){
Binding binding=BindingBuilder.bind(directQueue2()).to(directExchange()).with("green");
return binding;
}
}
@Component
public class DirectListener1 {
@RabbitListener(queues = "directqueue1")
public void displayMail(Mail mail) throws Exception {
System.out.println("directqueue1队列监听器1号收到消息"+mail.toString());
}
}
@Component
public class DirectListener2 {
@RabbitListener(queues = "directqueue2")
public void displayMail(Mail mail) throws Exception {
System.out.println("directqueue2队列监听器2号收到消息"+mail.toString());
}
}
(6)topic主题交换机通信
topic主题交换机通信,包括一个topic交换机,三个binding,两个队列,两个消费者监听器,消息只会被投入到routingkey能够匹配的队列中,#表示0个或若干个关键字,*表示一个关键字
//topic交换机模型,需要一个topic交换机,两个队列和三个binding
@Configuration
public class TopicExchangeConfig {
@Bean
public TopicExchange topicExchange(){
TopicExchange topicExchange=new TopicExchange("mytopic");
return topicExchange;
}
@Bean
public Queue topicQueue1() {
Queue queue=new Queue("topicqueue1");
return queue;
}
@Bean
public Queue topicQueue2() {
Queue queue=new Queue("topicqueue2");
return queue;
}
//3个binding将交换机和相应队列连起来
@Bean
public Binding bindingtopic1(){
Binding binding=BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.orange.*");//binding key
return binding;
}
@Bean
public Binding bindingtopic2(){
Binding binding=BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.*.rabbit");
return binding;
}
@Bean
public Binding bindingtopic3(){
Binding binding=BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lazy.#");//#表示0个或若干个关键字,*表示一个关键字
return binding;
}
}
@Component
public class TopicListener1 {
@RabbitListener(queues = "topicqueue1")
public void displayTopic(Mail mail) throws IOException {
System.out.println("从topicqueue1取出消息"+mail.toString());
}
}
@Component
public class TopicListener2 {
@RabbitListener(queues = "topicqueue2")
public void displayTopic(Mail mail) throws IOException {
System.out.println("从topicqueue2取出消息"+mail.toString());
}
}