RabbitMQ系列-2.快速入门

概述

在上篇文章中,我们讲述了RabbitMQ中的基本概念,这篇我们一起看一下如何安装RabbitMQ以及Java中如何使用RabbitMQ客户端

RabbitMQ安装

RabbitMQ的安装教程很多,本文说一下mac用户下RabbitMQ的安装
Mac用户,个人推荐使用HomeBrew来安装,如果你没有安装过HomeBrew,可以先安装HomeBrew

1.安装HomeBrew

打开HomeBrew官网:https://brew.sh/

安装HomeBrew

拷贝图中的命令在终端执行即可,命令执行的过程中会提示输入密码,此时输入登录mac系统的账号的密码。当命令执行结束后会出现一下提示:
HomeBrew安装完成页面

2.使用HomeBrew安装RabbitMQ

  • 安装
    brew指令可以安装、更新和卸载应用,通过brew指令安装RabbitMQ很简单。打开新的终端窗口,回到根目录之后输入 brew install rabbitmq指令即可进行rabbitmq服务的自动安装(自动给我们安装RabbitMQ相关依赖)。
    HomeBrew安装RabbitMQ

    安装完成后会提示我们安装的位置:/usr/local/Cellar/rabbitmq
    我们找到这个目录下的sbin目录
    sbin目录
  • 启动RabbitMQ
    双击上图中的rabbitmq-server执行文件或在终端输入./rabbitmq-server即可启动RabbitMQ服务

    服务启动成功

  • 后台启动
    如果想让 RabbitMQ 以守护程序的方式在后台运行,可以在启动的时候加上 -detached 参数

    ./rabbitmq-server -detached
    
  • 查询服务状态
    sbin 目录下有个特别重要的文件叫 rabbitmqctl ,它提供了 RabbitMQ 管理需要的几乎一站式解决方案,绝大部分的运维命令它都可以提供。
    查询 RabbitMQ 服务器的状态信息可以用参数 status

    ./rabbitmqctl status
    

该命令将输出服务器的很多信息,比如 RabbitMQ 和 Erlang 的版本、OS 名称、内存等等

  • 关闭 RabbitMQ 节点
    RabbitMQ是用Erlang语言写的,在Erlang中有两个概念:节点和应用程序。节点就是Erlang虚拟机的每个实例,而多个Erlang应用程序可以运行在同一个节点之上。节点之间可以进行本地通信(不管他们是不是运行在同一台服务器之上)。比如一个运行在节点A上的应用程序可以调用节点B上应用程序的方法,就好像调用本地函数一样。如果应用程序由于某些原因奔溃,Erlang 节点会自动尝试重启应用程序。
    如果要关闭整个 RabbitMQ 节点可以用参数 stop :
./rabbitmqctl stop

它会和本地节点通信并指示其干净的关闭,也可以指定关闭不同的节点,包括远程节点,只需要传入参数 -n :

./rabbitmqctl -n rabbit@server.example.com stop 

-n node 默认 node 名称是 rabbit@server ,如果你的主机名是 server.example.com ,那么 node 名称就是 rabbit@server.example.com

  • 关闭 RabbitMQ 应用程序
    如果只想关闭应用程序,同时保持 Erlang 节点运行则可以用 stop_app:
./rabbitmqctl stop_app

这个命令在后面要讲的集群模式中将会很有用。

  • 启动 RabbitMQ 应用程序
./rabbitmqctl start_app
  • 重置 RabbitMQ 节点
./rabbitmqctl reset

该命令将清除所有的队列。

  • 查看已声明的队列
./rabbitmqctl list_queues
  • 查看交换器
./rabbitmqctl list_exchanges

该命令还可以附加参数,比如列出交换器的名称、类型、是否持久化、是否自动删除:

./rabbitmqctl list_exchanges name type durable auto_delete
  • 查看绑定
./rabbitmqctl list_bindings
  • 添加账号
    例如我们向添加一个admin账号,密码也是admin,并赋予超级权限
## 添加账号
./rabbitmqctl add_user admin admin
## 添加访问权限
./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
## 设置超级权限
./rabbitmqctl set_user_tags admin administrator
  • 添加虚拟主机
./rabbitmqctl add_vhost test_host_1
  • 启动后台管理系统
./rabbitmq-plugins enable rabbitmq_management

防火墙开放15672端口,访问http://localhost:15672即可进入管理页面,默认用户名guest,密码guest

RabbitMQ Management

Java客户端访问

  • maven工程在pom.xml中添加依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>
  • 消息生产者
package com.tp.pandora.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * FileName: MessageProducer
 * Author:   TP
 * Date:     2019-06-07 18:56
 * Description:消息生产者
 */
public class MessageProducer {
    //交换器名称
    private static final String EXCHANGE_NAME = "exchange_demo";
    //路由键
    private static final String ROUTING_KEY = "routingKey_demo";
    //队列名称
    private static final String QUEUE_NAME = "queue_demo";
    //IP地址
    private static final String IP_ADDRESS = "127.0.0.1";
    //端口(RabbitMQ服务端口默认为5672)
    private static final int PORT = 5672;
    //用户名
    private static final String USER_NAME = "admin";
    //密码
    private static final String PASS_WORD = "admin";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(IP_ADDRESS);
        connectionFactory.setPort(PORT);
        connectionFactory.setUsername(USER_NAME);
        connectionFactory.setPassword(PASS_WORD);
        //获取连接
        Connection connection = connectionFactory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //创建一个类型为direct、持久化的、非自动删除的交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
        //创建一个持久化、非排他、非自动删除的队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //将交换机与队列通过路由键绑定(绑定key和路由key使用同一个)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        //发送一条持久化的消息:HELLO Rabbit MQ
        String message = "HELLO Rabbit MQ";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        //关闭资源
        channel.close();
        connection.close();
        System.out.println("MQ消息发送成功");
    }
}

执行main函数,我们到后台查看可以发现队列里已经有了我们发送的消息

管理后台查看消息
  • 消息消费者
package com.tp.athena.mq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * FileName: MessageReceiver
 * Author:   TP
 * Date:     2019-06-07 18:56
 * Description:消息消费者
 */
public class MessageReceiver {
    //队列名称
    private static final String QUEUE_NAME = "queue_demo";
    //IP地址
    private static final String IP_ADDRESS = "127.0.0.1";
    //端口(RabbitMQ服务端口默认为5672)
    private static final int PORT = 5672;
    //用户名
    private static final String USER_NAME = "admin";
    //密码
    private static final String PASS_WORD = "admin";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Address[] addresses = new Address[]{
                new Address(IP_ADDRESS, PORT)
        };
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername(USER_NAME);
        connectionFactory.setPassword(PASS_WORD);
        //这里的创建连接连接方式与生产者的demo略有不同,注意辨别区别
        Connection connection = connectionFactory.newConnection(addresses);
        Channel channel = connection.createChannel();
        //设置客户端最多接收未被ack的消息的个数
        channel.basicQos(64);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:" + new String(body));
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                //消费确认回执
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, consumer);
        //等待回调函数执行完毕之后 , 关闭资源
        TimeUnit.SECONDS.sleep(5);
        channel.close();
        connection.close();
    }
}

执行main函数发现控制台输出如下:

控制台输出
消息被消费

我们声明的交换器

我们声明的队列

绑定信息
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352

推荐阅读更多精彩内容

  • 什么是消息中间件? 定义一:消息(Message)是指在应用间传送的数据。 定义二:消息队列中间件(Message...
    RainyBlossom阅读 395评论 0 1
  • 什么叫消息队列? 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复...
    Agile_dev阅读 2,371评论 0 24
  • 关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时...
    Johnson_zx阅读 1,108评论 0 5
  • 关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时...
    预流阅读 584,640评论 51 786
  • 关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时...
    中v中阅读 1,968评论 0 20