消息中间件:RabbitMQ

(1)理论基础

Erlang语言

最初用于交换机领域的架构模式,这样使得RabbitMQ在Broker之间进行数据交互的性能非常优秀(Erlang有着和原生Socket一样的延迟)。

RabbitMQ

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在不同的应用之间共享数据(跨平台跨语言)。RabbitMQ是使用Erlang语言编写,并且基于AMQP协议实现。

RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是一个快递站,一个快递员帮你传递快件。RabbitMQ与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。

(2)配置

(1)安装otp、并配置系统环境变量PATH

(2)安装 RabbitMQ、并sbin目录下运行

rabbitmq-plugins enable rabbitmq_management

(3)rabbitmq重新启动、并访问管理页面

http://127.0.0.1:15672

账号:guest

密码:guest

(4)Vue访问参数

export const PROTOCOL = 'ws'    //连接协议

export const IP = 'localhost'  //ip

export const PORT = '15674'      //端口

export const ADDRESS = 'ws' 

export const ACCOUNT = 'guest'  //账号

export const PASSWORD = 'guest'  //密码

Rabbit主要的端口说明:

4369 – erlang发现口

5672 --client端通信口

15672 – 管理界面ui端口

25672 – server间内部通信口

(3)生产者消费者模型

生产者消费者模型:添加了一个队列,并创建了两个消费者用于监听队列消息,我们发现,当有消息到达时,两个消费者会交替收到消息。这一过程虽然不用创建交换机,但会使用默认的交换机,并用默认的直连(default-direct)策略连接队列

//生产者消费者模式的配置,包括一个队列和两个对应的消费者

@Configuration

public class ProducerConsumerConfig {

@Bean

    public Queue myQueue() {

      Queue queue=new Queue("myqueue");

      return queue;

    }


}

@Component

public class QueueListener1 {

@RabbitListener(queues = "myqueue")

public void displayMail(Mail mail) throws Exception {

System.out.println("队列监听器1号收到消息"+mail.toString());

}

}

@Component

public class QueueListener2 {

@RabbitListener(queues = "myqueue")

public void displayMail(Mail mail) throws Exception {

System.out.println("队列监听器2号收到消息"+mail.toString());

}

}

(4)发布订阅模型

发布订阅模型,添加两个队列,分别各用一个消费者监听,设置一个交换机,类型为广播(fanout),交换机会将收到的消息广播给所有相连的队列

//发布订阅模式的配置,包括两个队列和对应的订阅者,发布者的交换机类型使用fanout(子网广播),两根网线binding用来绑定队列到交换机

@Configuration

public class PublishSubscribeConfig {

@Bean

    public Queue myQueue1() {

      Queue queue=new Queue("queue1");

      return queue;

    }

@Bean

    public Queue myQueue2() {

      Queue queue=new Queue("queue2");

      return queue;

    }

@Bean

public FanoutExchange fanoutExchange(){

FanoutExchange fanoutExchange=new FanoutExchange("fanout");

return fanoutExchange;

}

@Bean

public Binding binding1(){

Binding binding=BindingBuilder.bind(myQueue1()).to(fanoutExchange());

return binding;

}

@Bean

public Binding binding2(){

Binding binding=BindingBuilder.bind(myQueue2()).to(fanoutExchange());

return binding;

}

}

@Component

public class SubscribeListener1 {

@RabbitListener(queues = "queue1")

public void subscribe(Mail mail) throws IOException {

System.out.println("订阅者1收到消息"+mail.toString());

}

}

@Component

public class SubscribeListener2 {

@RabbitListener(queues = "queue2")

public void subscribe(Mail mail) throws IOException {

System.out.println("订阅者2收到消息"+mail.toString());

}

}

(5)direct直连交换机通信模型

direct直连交换机通信模型,包括一个direct交换机,三个binding,两个队列,两个消费者监听器,消息只会被投入到routingkey一致的队列中

//direct直连模式的交换机配置,包括一个direct交换机,两个队列,三根网线binding

@Configuration

public class DirectExchangeConfig {

@Bean

public DirectExchange directExchange(){

DirectExchange directExchange=new DirectExchange("direct");

return directExchange;

}

@Bean

    public Queue directQueue1() {

      Queue queue=new Queue("directqueue1");

      return queue;

    }

@Bean

    public Queue directQueue2() {

      Queue queue=new Queue("directqueue2");

      return queue;

    }

//3个binding将交换机和相应队列连起来

@Bean

public Binding bindingorange(){

Binding binding=BindingBuilder.bind(directQueue1()).to(directExchange()).with("orange");

return binding;

}

@Bean

public Binding bindingblack(){

Binding binding=BindingBuilder.bind(directQueue2()).to(directExchange()).with("black");

return binding;

}

@Bean

public Binding bindinggreen(){

Binding binding=BindingBuilder.bind(directQueue2()).to(directExchange()).with("green");

return binding;

}

}

@Component

public class DirectListener1 {

@RabbitListener(queues = "directqueue1")

public void displayMail(Mail mail) throws Exception {

System.out.println("directqueue1队列监听器1号收到消息"+mail.toString());

}

}

@Component

public class DirectListener2 {

@RabbitListener(queues = "directqueue2")

public void displayMail(Mail mail) throws Exception {

System.out.println("directqueue2队列监听器2号收到消息"+mail.toString());

}

}

(6)topic主题交换机通信

topic主题交换机通信,包括一个topic交换机,三个binding,两个队列,两个消费者监听器,消息只会被投入到routingkey能够匹配的队列中,#表示0个或若干个关键字,*表示一个关键字

//topic交换机模型,需要一个topic交换机,两个队列和三个binding

@Configuration

public class TopicExchangeConfig {

@Bean

public TopicExchange topicExchange(){

TopicExchange topicExchange=new TopicExchange("mytopic");

return topicExchange;

}

@Bean

    public Queue topicQueue1() {

      Queue queue=new Queue("topicqueue1");

      return queue;

    }

@Bean

    public Queue topicQueue2() {

      Queue queue=new Queue("topicqueue2");

      return queue;

    }

//3个binding将交换机和相应队列连起来

@Bean

public Binding bindingtopic1(){

Binding binding=BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.orange.*");//binding key

return binding;

}

@Bean

public Binding bindingtopic2(){

Binding binding=BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.*.rabbit");

return binding;

}

@Bean

public Binding bindingtopic3(){

Binding binding=BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lazy.#");//#表示0个或若干个关键字,*表示一个关键字

return binding;

}

}

@Component

public class TopicListener1 {

@RabbitListener(queues = "topicqueue1")

public void displayTopic(Mail mail) throws IOException {

System.out.println("从topicqueue1取出消息"+mail.toString());

}

}

@Component

public class TopicListener2 {

@RabbitListener(queues = "topicqueue2")

public void displayTopic(Mail mail) throws IOException {

System.out.println("从topicqueue2取出消息"+mail.toString());

}

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容