概述
在上篇文章中,我们讲述了RabbitMQ中的基本概念,这篇我们一起看一下如何安装RabbitMQ以及Java中如何使用RabbitMQ客户端
RabbitMQ安装
RabbitMQ的安装教程很多,本文说一下mac用户下RabbitMQ的安装
Mac用户,个人推荐使用HomeBrew来安装,如果你没有安装过HomeBrew,可以先安装HomeBrew
1.安装HomeBrew
打开HomeBrew官网:https://brew.sh/
拷贝图中的命令在终端执行即可,命令执行的过程中会提示输入密码,此时输入登录mac系统的账号的密码。当命令执行结束后会出现一下提示:
2.使用HomeBrew安装RabbitMQ
-
安装
brew指令可以安装、更新和卸载应用,通过brew指令安装RabbitMQ很简单。打开新的终端窗口,回到根目录之后输入 brew install rabbitmq指令即可进行rabbitmq服务的自动安装(自动给我们安装RabbitMQ相关依赖)。
安装完成后会提示我们安装的位置:/usr/local/Cellar/rabbitmq
我们找到这个目录下的sbin目录
-
启动RabbitMQ
双击上图中的rabbitmq-server执行文件或在终端输入./rabbitmq-server即可启动RabbitMQ服务
-
后台启动
如果想让 RabbitMQ 以守护程序的方式在后台运行,可以在启动的时候加上 -detached 参数./rabbitmq-server -detached
-
查询服务状态
sbin 目录下有个特别重要的文件叫 rabbitmqctl ,它提供了 RabbitMQ 管理需要的几乎一站式解决方案,绝大部分的运维命令它都可以提供。
查询 RabbitMQ 服务器的状态信息可以用参数 status./rabbitmqctl status
该命令将输出服务器的很多信息,比如 RabbitMQ 和 Erlang 的版本、OS 名称、内存等等
-
关闭 RabbitMQ 节点
RabbitMQ是用Erlang语言写的,在Erlang中有两个概念:节点和应用程序。节点就是Erlang虚拟机的每个实例,而多个Erlang应用程序可以运行在同一个节点之上。节点之间可以进行本地通信(不管他们是不是运行在同一台服务器之上)。比如一个运行在节点A上的应用程序可以调用节点B上应用程序的方法,就好像调用本地函数一样。如果应用程序由于某些原因奔溃,Erlang 节点会自动尝试重启应用程序。
如果要关闭整个 RabbitMQ 节点可以用参数 stop :
./rabbitmqctl stop
它会和本地节点通信并指示其干净的关闭,也可以指定关闭不同的节点,包括远程节点,只需要传入参数 -n :
./rabbitmqctl -n rabbit@server.example.com stop
-n node 默认 node 名称是 rabbit@server ,如果你的主机名是 server.example.com ,那么 node 名称就是 rabbit@server.example.com 。
-
关闭 RabbitMQ 应用程序
如果只想关闭应用程序,同时保持 Erlang 节点运行则可以用 stop_app:
./rabbitmqctl stop_app
这个命令在后面要讲的集群模式中将会很有用。
- 启动 RabbitMQ 应用程序
./rabbitmqctl start_app
- 重置 RabbitMQ 节点
./rabbitmqctl reset
该命令将清除所有的队列。
- 查看已声明的队列
./rabbitmqctl list_queues
- 查看交换器
./rabbitmqctl list_exchanges
该命令还可以附加参数,比如列出交换器的名称、类型、是否持久化、是否自动删除:
./rabbitmqctl list_exchanges name type durable auto_delete
- 查看绑定
./rabbitmqctl list_bindings
-
添加账号
例如我们向添加一个admin账号,密码也是admin,并赋予超级权限
## 添加账号
./rabbitmqctl add_user admin admin
## 添加访问权限
./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
## 设置超级权限
./rabbitmqctl set_user_tags admin administrator
- 添加虚拟主机
./rabbitmqctl add_vhost test_host_1
- 启动后台管理系统
./rabbitmq-plugins enable rabbitmq_management
防火墙开放15672端口,访问http://localhost:15672即可进入管理页面,默认用户名guest,密码guest
Java客户端访问
- maven工程在pom.xml中添加依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
- 消息生产者
package com.tp.pandora.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* FileName: MessageProducer
* Author: TP
* Date: 2019-06-07 18:56
* Description:消息生产者
*/
public class MessageProducer {
//交换器名称
private static final String EXCHANGE_NAME = "exchange_demo";
//路由键
private static final String ROUTING_KEY = "routingKey_demo";
//队列名称
private static final String QUEUE_NAME = "queue_demo";
//IP地址
private static final String IP_ADDRESS = "127.0.0.1";
//端口(RabbitMQ服务端口默认为5672)
private static final int PORT = 5672;
//用户名
private static final String USER_NAME = "admin";
//密码
private static final String PASS_WORD = "admin";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(IP_ADDRESS);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(PASS_WORD);
//获取连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//创建一个类型为direct、持久化的、非自动删除的交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
//创建一个持久化、非排他、非自动删除的队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//将交换机与队列通过路由键绑定(绑定key和路由key使用同一个)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
//发送一条持久化的消息:HELLO Rabbit MQ
String message = "HELLO Rabbit MQ";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
//关闭资源
channel.close();
connection.close();
System.out.println("MQ消息发送成功");
}
}
执行main函数,我们到后台查看可以发现队列里已经有了我们发送的消息
- 消息消费者
package com.tp.athena.mq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* FileName: MessageReceiver
* Author: TP
* Date: 2019-06-07 18:56
* Description:消息消费者
*/
public class MessageReceiver {
//队列名称
private static final String QUEUE_NAME = "queue_demo";
//IP地址
private static final String IP_ADDRESS = "127.0.0.1";
//端口(RabbitMQ服务端口默认为5672)
private static final int PORT = 5672;
//用户名
private static final String USER_NAME = "admin";
//密码
private static final String PASS_WORD = "admin";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Address[] addresses = new Address[]{
new Address(IP_ADDRESS, PORT)
};
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(PASS_WORD);
//这里的创建连接连接方式与生产者的demo略有不同,注意辨别区别
Connection connection = connectionFactory.newConnection(addresses);
Channel channel = connection.createChannel();
//设置客户端最多接收未被ack的消息的个数
channel.basicQos(64);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
//消费确认回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, consumer);
//等待回调函数执行完毕之后 , 关闭资源
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
}
}
执行main函数发现控制台输出如下: