Rabbitmq打怪升级之路(六)生产者与消费者模型

简书:亚武de小文 【原创:转载请注明出处】

生产者与消费者模型

LengToo上学.png

RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上, RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。


一、基本模型图
[亚武de小文]生产者消费者模型图.png
二、工作流程
  • 发送端
    1)创建连接 2)创建通道 3)声明队列 4)发送消息
  • 接收端
    1)创建连接 2)创建通道 3)声明队列 4)监听队列 5)接收消息 6)ack回复
三、代码
生产者-发件人
  • MsgProducer.java

    package com.yawu.xiaowen.pc;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 生产者-发件人
     * @date 2019.06.25
     * @author yawu
     */
    public class MsgProducer {
        private static final String QUEUE_NAME = "mq_pc_hello";
        private static final Logger LOGGER = LoggerFactory.getLogger(MsgProducer.class);
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            Connection connection = null;
            Channel channel = null;
            try {
                // 连接管理器:应用程序与RabbitMQ建立连接的管理器。
                ConnectionFactory factory = new ConnectionFactory();
                // 服务器地址
                factory.setHost("127.0.0.1");
                // 帐号密码:默认为guest/guest,可省略
                factory.setUsername("guest");
                factory.setPassword("guest");
                // 新建连接
                connection = factory.newConnection();
                // 再创建一个信道
                channel = connection.createChannel();
    
                //1、在信道中声明一个队列
                /**
                 * 参数详解
                 * queue:要创建的队列名
                 * durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息
                 * exclusive:true表示一个队列是否独占连接,
                 * autoDelete:true表示服务器不在使用这个队列是会自动删除它
                 * arguments:其它属性参数
                 */
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                //2、创建一条消息
                String message = "Hello,亚武de小文!";
                // 3、采用二进制流的方式传输
                byte[] msg = message.getBytes("UTF-8");
                // 4、channel是一个信道,它接收到msg数据,并将纳入到QUEUE_NAME队列中
                /**
                 * 消息发布方法参数详解:
                 * exchange:如果没有指定,则使用Default Exchange
                 * routingKey:消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
                 * props:消息包含的属性
                 * body:消息体
                 */
                channel.basicPublish("", QUEUE_NAME, null, msg);
    
                LOGGER.info("发件人---发送消息:{}", message);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (channel != null) {
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
    
        }
    }
    
    
消费者-收件人
  • MsgConsumer.java

    package com.yawu.xiaowen.pc;
    
    import com.rabbitmq.client.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    /**
     * 消费者-收件人
     * @date 2019.06.25
     * @author yawu
     */
    public class MsgConsumer {
        private static final String QUEUE_NAME = "mq_pc_hello";
        private static final Logger LOGGER = LoggerFactory.getLogger(MsgConsumer.class);
    
        public static void main(String[] args) {
            try {
                // 应用程序与RabbitMQ建立连接的管理器。
                ConnectionFactory factory = new ConnectionFactory();
                // 服务器地址
                factory.setHost("127.0.0.1");
    
                // 新建一个连接
                Connection connection = factory.newConnection();
                // 创建一个信道
                Channel channel = connection.createChannel();
    
                //1、首先在通道中申明一个队列
                /**
                 * 参数详解
                 * queue:要创建的队列名
                 * durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息
                 * exclusive:true表示一个队列只能被一个消费者占有并消费
                 * autoDelete:true表示服务器不在使用这个队列是会自动删除它
                 * arguments:其它参数
                 */
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
                //2、创建消费者,并重写如何消费的方法,eg:输出消息
                //3、首先从信道里面获取数据
                Consumer consumer = new DefaultConsumer(channel) {
                    /**
                     * 消费者接收消息调用此方法
                     * @param consumerTag   消费者的标签,在channel.basicConsume()去指定
                     * @param envelope  消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
                     * (收到消息失败后是否需要重新发送)
                     * @param properties
                     * @param body
                     * @throws IOException
                     */
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                            throws IOException {
                        String message = new String(body, "UTF-8");
                        LOGGER.info("收件人---收到消息:{}", message);
                    }
                };
    
                /**
                 * 4、收到了消息后,提示信道已经收到消息了。可以继续发送其它消息
                 * 【注】第二个参数autoAck如果为false,那么消息会一直保存在RabbitMQ服务器中,Unacked
                 * 消费者没有确认消息被消费,消息一直留在队列中,只有当从有新的消费者加入时,消息被分发到新的消费者。
                 */
                channel.basicConsume(QUEUE_NAME, true, consumer);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    
四、几种情况运行与分析
1、分别启动生产者服务和消费者服务
PC同启1.png
PC同启2.png
PC同启3.png
2、关闭生产者服务,开启消费者服务
P关C启1.png
P关C启2.png
3、关闭消费者服务,开启生产者服务
P启C关1.png

该信息处于队列中等待状态,等待消费者消费

P启C关2.png

P启C关3.png
4、服务都保持启动
  • 设置autoAck参数为false

    /**
     * 4、收到了消息后,提示信道已经收到消息了。可以继续发送其它消息
     * 【注】第二个参数autoAck如果为false,那么消息会一直保存在RabbitMQ服务器中
     * 消费者没有确认消息被消费,消息一直留在队列中,只有当从有新的消费者加入时,消息被分发到新的消费者。
     */
    channel.basicConsume(QUEUE_NAME, false, consumer);
    
    PC同启_autoAck1.png
  • 生产者发送多条信息(此处我发出五条消息)


    PC同启_autoAck2.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容