Rabbitmq概念及HelloWorld

相关概念:

  • Producer:消息产生者

  • Consumer: 消息消费者

  • Broker:消息中间件的服务节点,对于Rabbitmq来说,一个Rabbitmq broker可以认为是一个Rabbitmq的服务实例。

  • Connection,与Broker的tcp长连接,Producer和Consumer都需要建立连接之后才可以使用

  • Channel,建立在Connection基础上,每个线程分配一个channel,类似于NIO的多路复用,节省连接资源。大部分RabbitMQ的操作和核心概念都是基于Channel的,需要特别注意。

  • Queue:队列,RabbitMQ中用于存储消息的容器。而且RabbitMQ中的消息只能存储在Queue中,这点跟kafka不同,kafka只能将消息放在topic中,而kafka中的queue只是topic实际存储文件中的位移标识。

    多个consumer可以消费同一个queue中的消息,这时候消息的处理是互斥的,即一个消息只能被一个consumer处理。

  • Exchange:还是不翻译成中文了,太怪。在producer将消息发到Broker中时,是通过exchange按照一定规则转发到不同的queue中,而不是直接放入queue中。

  • RoutingKey:producer在发送消息给exchange时,一般会指定一个RoutingKey,用来指定这个消息的路由规则。

  • BindingKey:RoutingKey和BindingKey匹配时(注意不是相同,可能是模糊匹配),exchange才会把消息发送到对应的queue中

Snip20180606_6.png
exchange类型
  • fanout,会把消息路由到所有绑定的queue中,无视RoutingKey和BindingKey
  • direct,只能将消息路由到RoutingKey和BindingKey完全匹配的queue,queue可以有多个,只要匹配就行
  • topic,可以支持# * 等通配符匹配RoutingKey和BindingKey
  • headers,不常用,不会依赖RoutingKey,而是根据消息内容中的headers属性跟exchange绑定的内容进行匹配,性能较低,不过非常灵活。

一些关键点

大部分情况下,按照最简单的方式使用就好了,作为工具书去查询《RabbitMQ实战详解》里面的配置。

  • channel.basicQos是设置一个channel中consumer所能保持的最大未确认消息,也就是说,如果一个channel中的qos值已经到了最大,那么rabbitmq就不会继续往这个channel中push对应的消息。
  • rabbitmq的顺序一致性其实是无法保证的,
    • 比如事务消息或者发送消息确认,当发送失败需要重试时,这一条(批)数据跟其之后的数据在producer端就不一致了
    • 如果producer发送的时候设置了不同的超时时间,并且也设置了死信队列,那么消费者在处理死信队列的时候,也会出现数据顺序与发送顺序不同的情况。
    • 设置优先级,也会导致顺序一致性收到影响。
  • 死信队列、延迟队列、消费优先级、持久化等,查询工具书即可。

web端管理

  • 使用rabbitmq-plugins enable rabbitmq_management 来启用web管理插件,重启才会生效.
  • 使用rabbitmq-plugins list来查看正在使用的插件。
  • 访问 server_ip:15672可以访问,guest用户无法登陆远程服务器,需要使用上面创建的root:root123 用户/密码来登陆
HelloWorld

加入maven依赖:

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.2.0</version>
        </dependency>
producer代码:
package rabbitmq.server;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQProducer {

    private static final String exchange_name = "exchange_siyu";

    private static final String routing_key = "routing_key_siyu";

    private static final String queue_name = "queue_siyu";

    private static final String rabbitmq_server_ip_addr = "10.199.189.30";

    private static final int rabbitmq_server_port = 5672;

    public static void main(String[] args) throws IOException, TimeoutException {

        // 连接工厂类
        ConnectionFactory connectionFactory = new ConnectionFactory();

        // 设置连接属性及用户名密码,用户、密码要通过rabbitmqctl设置过权限
        connectionFactory.setHost(rabbitmq_server_ip_addr);
        connectionFactory.setPort(rabbitmq_server_port);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root123"); // 如果用户名密码不匹配,会连接失败

        // 建立连接,一个tcp长连接
        Connection connection = connectionFactory.newConnection();

        // 创建信道,主要操作通过channel执行,可以认为channel是虚拟化出来的一个Connection,用于复用
        Channel channel = connection.createChannel();

        // 定义路由,direct是point-2-piont的,直接到对应的单个queue中
        channel.exchangeDeclare(exchange_name,"direct",true,false,null);

        // 定义queue
        channel.queueDeclare(queue_name,true,false,false,null);

        // 通过routingkey 绑定queue和exchange
        channel.queueBind(queue_name,exchange_name,routing_key);



        // 开始发送消息
        String message = "Hello World!!";

        /* MessageProperties中预置了一部分消息的参数,比如PERSIST_TEXT_PLAIN,其中的定义如下:
        *
        *
        public static final BasicProperties PERSISTENT_TEXT_PLAIN =
        new BasicProperties("text/plain",
                            null,
                            null,
                            2,
                            0, null, null, null,
                            null, null, null, null,
                            null, null);
        * */
        channel.basicPublish(exchange_name,routing_key, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

        // 关闭channel和connection
        channel.close();
        connection.close();

    }

}

Consumer
package rabbitmq.client;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class RabbitMQConsumer {

    private static final String queue_name = "queue_siyu";
    private static final String rabbitmq_server_ip_address = "10.199.189.30";
    private static final int port = 5672;


    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        Address[] addresses = new Address[]{
                new Address(rabbitmq_server_ip_address,port)
        };

        // 长连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root123");

        // 这里创建连接跟server端不同,传入了address
        Connection connection = connectionFactory.newConnection(addresses);

        // 创建channel
        final Channel channel = connection.createChannel();

        channel.basicQos(64);// ?? 设置客户端最多接收未被ack的消息个数

        // 创建消费
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("receive msg:" + new String(body));

                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(),false); // 发送ack之后,消息会在queue中被删除
            }
        };

        channel.basicConsume(queue_name,consumer);

        TimeUnit.SECONDS.sleep(5);

        // 如果先关闭connection,再关闭channel,就会抛出异常:
        // com.rabbitmq.client.AlreadyClosedException: connection is already closed due to clean connection shutdown;
        // 所以这里一定要注意关闭的顺序
        channel.close();
        connection.close();

    }

}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容