RabbitMQ——基础知识与模拟体验

1、什么是RabbitMQ?
前面讲过一篇文件,关于ActiveMQ的,可以对照看一下,链接:https://www.jianshu.com/p/187d5c2a898d
那么,RabbitMQ又是什么呢?
RabbitMQ 是一个消息代理:它接收并转发消息。你可以将其视为邮局:当你将要发布的邮件放在邮箱中时,您可以确信 Postman 先生最终会将邮件发送给收件人。在这个比喻中,RabbitMQ 是一个邮箱,邮局和邮递员。
RabbitMQ 和邮局之间的主要区别在于它不处理纸张,而是接受,存储和转发二进制数据块的消息。

常见术语
1)生产者:一个发送消息的程序是一个生产者。
2)队列:队列类似于邮箱。虽然消息通过 RabbitMQ 在你的应用中传递,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制的限制,它本质上是一个大的消息缓冲区。不同的生产者可以通过同一个队列发送消息,此外,不同的消费者也可以从同一个队列上接收消息。
3)消费者:一个等待接收消息的程序是一个消费者。


rabbitmq.png

上图所示:“P”是我们的生产者,“C”是我们的消费者。中间的框是队列 - RabbitMQ 代表消费者的消息缓冲区。

整个过程非常简单,生产者创建消息,消费者接收这些消息。你的应用程序既可以作为生产者向其他应用程序发送消息,也可以作为消费者,等待接收其他应用程序的消息。其中,存储消息的是消息队列,它类似于邮箱,消息通过消息队列进行投递。

2、下载与安装(windows环境)
参考链接:https://blog.csdn.net/weixin_39735923/article/details/79288578

3、初步体验
引入依赖:

<!-- 引入RabbitMQ 依赖 -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.3</version>
        </dependency>

创建生产者,生产者连接到 RabbitMQ,发送一条数据:

package com.guxf.demo.rabbit;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 的主机名
        factory.setHost("localhost");
        // 创建一个连接 
        Connection connection = factory.newConnection();
        // 创建一个通道 
        Channel channel = connection.createChannel();    
        // 指定一个队列
        // queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        // 参数1 queue :队列名
        // 参数2 durable :是否持久化
        // 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
        // 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
        // 参数5 arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 发送消息
        String message = "Hello World!";
        // basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
        // 参数1 exchange :交换器
        // 参数2 routingKey : 路由键
        // 参数3 props : 消息的其他参数
        // 参数4 body : 消息体
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("生产者发送的消息是[" + message + "]");
        // 关闭频道和连接  
        channel.close();
        connection.close();
    }
}

消息接收者:

package com.guxf.demo.rabbit;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Receive {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("我在等待接收消息——");
        // 创建队列消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者收到的消息是[" + message + "]");
            }
        };
        // basicConsume(String queue, boolean autoAck, Consumer callback)
        // 参数1 queue :队列名
        // 参数2 autoAck : 是否自动ACK
        // 参数3 callback : 消费者对象的一个接口,用来配置回调
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

先运行Send,发出消息,再运行Receive接收消息,即可收到HelloWord,而且可以看到队列名(前提是得启动RabbitMQ):


看到了hello队列.png

4、任务队列
1)任务队列:饭店高峰期时,顾客单子不得不按照下单顺序一个个放在厨房,进行先后炒菜处理,这一堆的单子就是任务队列。
2)消息队列:消息队列(MQ)可以理解成两个应用程序间(生产者消费者间)的通信,例如短信发送模块,因为模块的发送速度跟不上,这时候需要有一个容器,暂存一下,缓解下压力,那么“消息队列”就是在消息的传输过程中保存消息的“容器”。然后短信模块就可以淡定的去消息队列取出要发出的短信内容,进行发送处理。
创建多消息的发送端:

package com.guxf.demo.rabbit;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class NewTask {
     private final static String QUEUE_NAME = "ningmo";
     
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接
            ConnectionFactory factory = new ConnectionFactory();
            // 设置 RabbitMQ 的主机名
            factory.setHost("localhost");
            // 创建一个连接 
            Connection connection = factory.newConnection();
            // 创建一个通道 
            Channel channel = connection.createChannel();    
            // 指定一个队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 发送消息 
            for (int i = 0; i < 10; i++) {  
                String message = "Task:" + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
                System.out.println("发送的消息为[" + message + "]");  
            }  
            // 关闭频道和连接  
            channel.close();
            connection.close();
        }
}

接收消息:

package com.guxf.demo.rabbit;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Worker {
    private final static String QUEUE_NAME = "ningmo";
     
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 的主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个通道
        Channel channel = connection.createChannel();
        // 指定一个队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 创建队列消费者
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
 
                System.out.println("收到的消息为[" + message + "]");
                try {
                    doWork(message);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } 
            }
        };
        // acknowledgment is covered below
        boolean autoAck = true; 
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
 
    private static void doWork(String task) throws InterruptedException {
        String[] taskArr = task.split(":");
        TimeUnit.SECONDS.sleep(Long.valueOf(taskArr[1]));
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,258评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,335评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,225评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,126评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,140评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,098评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,018评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,857评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,298评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,518评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,400评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,993评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,638评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,661评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容