RabbitMQ的基本概念

有问题请联系我QQ:273206491

RabbitMQ是基于Erlang语言(俗称:二郎神)对AMQP协议的实现。

1、各个模块之间的一览图

image.png

2、连接

这里以Java的客户端进行说明,客户端与RabbitMQ服务器之间是基于TCP连接的,而TCP连接的创建和销毁都非常耗费资源,因此RabbitMQ使用连接复用模式,也就是我们常用的Channel,一个TCP连接可以创建多个Channel,不同Channel之间是互相独立的,一个线程使用一个Channel是安全的,不会出现多线程共享同一个连接的问题(线程共享一个资源很容易出现线程安全的问题)。

2.1、连接创建示例

//连接工厂的初始化
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("rabbitmq服务器的地址");
connectionFactory.setUsername("用户名");
connectionFactory.setPassword("密码");
//通过连接工厂与rabbitmq之间建立一个tcp连接
Connection connection =connectionFactory.newConnection();
//通过连接创建信道,多个线程之间不要共享同一个信道
Channel channel = connection.createChannel();

2.2、资源释放问题

在连接不使用的时候及时关闭连接是非常重要的步骤,可能你在本地开发时不释放连接不会有什么问题,但一旦程序上线后,连接不释放很有很多导致RabbitMQ的连接因耗尽而无法接受新的连接请求或者其他什么问题。
释放连接的方式也非常简单,直接调用Connection对象的close方法可以释放一个连接,同时也会释放这个连接下的所有Channel资源。

3、生产者

用于生产消息,使用basicPublish向RabbitMQ发送消息

upChannel.basicPublish("msgExchange", "message", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8"));
  • exchange 交换器的名称
  • routingKey 路由key(当交换器的类型是fanout时,路由key是不生效的)
  • props 用来设置消息的相关属性
///下面这两种方式是等价的
MessageProperties.PERSISTENT_TEXT_PLAIN
或者
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder();
properties.deliveryMode(2);
properties.contentType("text/plain");
properties.headers(null);//用于设置自己的header属性,交换器的类型中就有一种为headers(但不推荐使用),当我们设置的headers属性值和交换器绑定的值一致是就能够路由到响应队列中
  • body 消息体
  • mandatory

4、交换器

交换器可以认为是一个消息中转站,他通过和队列进行绑定,我们把消息发送到交换器中,交换器根据路由key再决定将消息投递给哪个队列(交换器的类型为fanout时,路由key是无效的)。

4.1、类型

  • fanout 会把消息给绑定到这个交换器的所有队列都发生一遍
  • direct 只将消息发生给路由匹配的队列
  • topic 将消息发生获取路由匹配的对象,但是这里的匹配支持模糊匹配,rabbitmq的路由key使用点"."来分隔一个单词,"*"匹配一个单词,"#"匹配一个或者多个单词。举个例子:路由key:com.pingwazi.rabbitmq,绑定key1:com.#,绑定key2:com.*.*是匹配的。
  • headers 发生消息的headers属性完全匹配是则认为匹配,这个模式不常用,并且性能也很差。

4.2、创建

//声明一个交换器,如果存在就不创建,如果存在的交换器参数与声明交换器参数不匹配就会报错,如果不存在就会创建
//这方式什么是同步的
upChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
//这种方式声明式异步的,但是不推荐使用,因为可能你在调用了之后就去使用的时候RabbitMQ服务器还没有创建好。
upChannel.exchangeDeclareNoWait();
  • exchange 交换器的名字
  • type 交换器的类型
  • durable 指定这个交换器是持久化的
  • autoDelete 是否自动删除,true则表示自动删除,但是有一个前提条件,那就是曾经至少要有一个队列或者交换器与之绑定,但是现在都已经接触绑定了。这时RabbitMQ服务器才会自动的吧这个交换器给删除掉,也就是说如果这个交换器创建出来后,没有任何队列或者交换器与之绑定的话,RabbitMQ是不会自动删除的,即使我们设置了自动删除。
  • internal 设置是否是内置的交换器,如果设置为true,则表示是内置的,那么客户端程序将无法直接向这个交换器发送消息,只能通过交换器路由到交换器的方式。
  • argument 其他可选参数

4.3、判断交换器是否存在

//判断指定交换器是否存在,如果不存在就会报404异常。
upChannel.exchangeDeclarePassive();

4.4、删除交换器

// 同步删除
upChannel.exchangeDelete("",true);
//异步删除(不需要等待删除完成)
upChannel.exchangeDeleteNoWait("",true);
  • exchange 交换器的名称
  • ifUnused 是否只删除没有被使用的交换器

5、绑定路由key

//将交换器与队列进行绑定通过message进行绑定
upChannel.queueBind("msgQueue","msgExchange","message");
//将交换器与队列进行解绑
upChannel.queueUnbind("","","")
//将交换器与交换器绑定在一起
upChannel.exchangeBind( "","","")
//将交换器与交换器解绑
upChannel.exchangeUnbind( "","","")

绑定路由key是在绑定交换器和队列时指定的一个key,其中message就是masQueue队列与msgExchange交换器的绑定路由key。

路由key是发送消息的时候指定的key,交换器类型为direct或者topic时,路由key与绑定路由key后才会将消息发送到对应的队列中。

6、队列

队列是RabbitMQ实际存储消息的地方

//声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
upChannel.queueDeclare("msgQueue", true, false, false, null);
//异步声明一个队列,不推荐使用,原因同交换器一样
upChannel.queueDeclareNoWait();
  • queue 队列的名称
  • durable 设置队列是否为持久化队列,如果为true这表示是持久化队列
  • exclusive 是否为排它队列,值为true则表示为排他队列,如果一个队列是排它队列,那么除了创建他的连接(注意:这里说的是连接,不是channel)能够使用之外,其他连接都不能使用,并且在创建它的连接断开时,这个队列会自动删除,即使设置为持久化队列也是如此。
  • autoDelete 是否自动删除,值为true则表示自动删除,自动删除有一个前提条件,那就是曾经至少有一个消费者使用了这个队列,并且现在已经没有任何消费与这个队列建立了连接,这时候RabbitMQ才会自动删除这个队列。也就是说如果这个队列还没有任何消费与建立过连接,那么RabbitMQ是不会自动删除的。
  • arguments 其他的一些参数设置

6.1、判断队列是否存在

//判断队列是否存在
upChannel.queueDeclarePassive();

6.2、删除队列

//同步删除
upChannel.queueDelete("",true,true);
//异步删除
upChannel.queueDeleteNoWait("",true,true);
  • queue 队列名称
  • ifUnused 是否只删除没有被使用过的队列
  • ifEmpty 是否只删除空的队列

7、消费者

消息的消费者有两种常用的模式,即推模式和拉模式。两种模式的实现方式完全不同,推模式是只要队列中有消息了就会推送给消费者(当然了也要受未确认消息数的限制),而拉模式则是消费者需要的时候再去RabbitMQ中获取,而他一次也只能获取获取一条消息。

7.1、拉模式

image.png

从图中可以看出,在单线程的情况下,消息的处理速度是比较慢的,当然了这里也可以使用多线程不断的从Rabbitmq中去获取,但这样就需要手动实现获取算法了。不废话,先上代码!

private void receiveGetMessage()
    {
        try
        {
            Channel downChannel=connection.createChannel();
            //交换器类型:fanout、direct、topic
            //声明一个名字为msgExchange、类型为direct并且持久化的交换器,如果交换器已经存在就不再创建
            downChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
            //声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
            downChannel.queueDeclare("msgQueue", true, false, false, null);
            //将交换器与队列进行绑定通过message进行绑定
            downChannel.queueBind("msgQueue","msgExchange","message");
            //消息未确认消息的数量
            downChannel.basicQos(1);//在非自动确认的模式下,限制最多允许未确认的消息数量
            boolean isBreak=false;
            while (!isBreak)
            {
                //消费消息
                GetResponse msgData = downChannel.basicGet("", false);
                String msgBody=new String(msgData.getBody(), "utf-8");
                System.out.println(Thread.currentThread().getId()+"RabbitMQ拉模式消费者收到消息: " + msgBody);
                //回复确认消息
                downChannel.basicAck(msgData.getEnvelope().getDeliveryTag(),false);
                if(StringUtils.isEmpty(msgBody))
                    isBreak=true;
            }
            downChannel.close();
        }
        catch (ShutdownSignalException ex)
        {
            //连接异常关闭了,这里要进行检查,并尝试重新建立连接
            ex.printStackTrace();
        }
        catch (IOException ex)
        {
            //发生io异常需要进行处理,对应channel可能关闭了
            ex.printStackTrace();
        } catch (TimeoutException e) {
            //信道资源释放超时,可能对应的channel关闭了
            e.printStackTrace();
        }
    }

以上代码我是通过但线程循环的方式从RabbitMQ中拉取代码,这种模式处理速度较慢,在不是用多线程进行处理的情况下,这中模式适合用于处理单个消息比较耗时的场景。

7.2、推模式

image.png

通过运行如下代码可以看得出,推模式实际上是使用了多线程的在进行处理的。但是他的吞吐量是默认拉模式的好几倍,这中模式适合于处理每个消息的时间比较短的场景。

private void receivePushMessage()
    {
        try
        {
            Channel downChannel=connection.createChannel();
            //交换器类型:fanout、direct、topic
            //声明一个名字为msgExchange、类型为direct并且持久化的交换器,如果交换器已经存在就不再创建
            downChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
            //声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
            downChannel.queueDeclare("msgQueue", true, false, false, null);
            //将交换器与队列进行绑定通过message进行绑定
            downChannel.queueBind("msgQueue","msgExchange","message");
            //消息未确认消息的数量
            downChannel.basicQos(10000);//在非自动确认的模式下,限制最多允许未确认的消息数量
            //消费消息
            downChannel.basicConsume("msgQueue",createConsumer(downChannel));
            System.out.println("RabbitMQ消费者正在运行中...");
            //不能释放信道资源!!!
            //因为这里的消费者是用的推模式,如果关闭了信道,后面在进行消息消费的时候会报错
            //downChannel.close();
        }
        catch (ShutdownSignalException ex)
        {
            //连接异常关闭了,这里要进行检查,并尝试重新建立连接
            ex.printStackTrace();
        }
        catch (IOException ex)
        {
            //发生io异常需要进行处理,对应channel可能关闭了
            ex.printStackTrace();
        }
    }

    /**
     * 创建消费对象
     * @param channel
     * @return
     */
    private Consumer createConsumer(Channel channel)
    {
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                String message = new String(body);
                System.out.println(Thread.currentThread().getId()+"RabbitMQ推模式消费者收到消息: " + message);
                // 消息确认
                try {
                    channel.basicAck(envelope.getDeliveryTag(), false);//手动确认消息
                } catch (IOException e) {
                    //发生io异常需要进行处理,对应channel可能关闭了
                    e.printStackTrace();
                }
            }
        };
        return consumer;
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,718评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,683评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,207评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,755评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,862评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,050评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,136评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,882评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,330评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,651评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,789评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,477评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,135评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,864评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,099评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,598评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,697评论 2 351