一、概述
1.1 核心概念
1.1.1 JMS
JMS:Java Message Service,java消息服务,是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
JMS是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。
1.1.2 P2P
p2p:点对点发送,一个消息只能被消费一次
涉及:
消息队列(Queue)
发送者(Sender)
接收者(Receiver)
每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着信息,直到它们被消费或超时。
示意图:p2p示意图
特点:
- 不用同时在线
- 一个消息只能被消费1次
1.1.3 Pub/Sub
Pub/Sub:发布订阅,一个消息可以被消费多次
涉及角色:
主题(Topic)
发布者(Publisher)
订阅者(Subscriber)
客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
示意图:Pub/Sub示意图
特点:
- 发布者和订阅者同时在线
- 一个消息可以被多个订阅者消费
1.1.4 MQ
MQ:消息中间件(MOM:Message Orient middleware),消息队列
作为系统间通信的必备技术,低耦合、可靠传输、流量控制、最终一致性
实现异步消息通信
1.2 MQ的优缺
- 解耦
降低系统模块的耦合度 - 提高系统响应时间
- 异步消息
- 过载保护
基于MQ实现削峰填谷
1.3 主流MQ
1.3.1 ActiveMQ
Apache下
完全支持Java的JMS协议
消息模式:1、点对点 2、发布订阅
1.3.2 RabbitMQ
Erlang语言实现的开源的MQ中间件,支持多种协议
主要的通信协议是AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用协议的一个开放标准,为面向消息的中间件设计。
1.3.3 Kafka
Apache下开源项目
高性能分布式消息队列,一般海量数据传输,大数据部门用
单机吞吐量:10w/s
1.3.4 RocketMQ
阿里 贡献给了Apache
参考了Kafka实现基于Java 消息中间件
1.3.5 ZeroMQ
消息传输最快
对比
- 从社区活跃度
- 持久化消息比较
ZeroMQ不支持,ActiveMQ和RabbitMQ都支持。持久化消息主要是值我们机器在不可抗李因素等情况挂掉了,消息不会丢失的机制。 - 综合技术实现
可靠性、灵活的路由、集群、高可用的队列、消息排序、问题追踪、可视化管理工具、插件系统等等。
RabbitMQ/Kafka最好,ActiceMQ次之,ZeroMQ最差。当然ZeroMQ也可以做到,不过自己不过自己必须手动写代码实
现,代码量不小。尤其是可靠性中的:持久性、投递确认、发布者证实和高可用性。 - 高并发
毋庸置疑,RabbitMQ最高,原因是它的实现语言是天生具备高并发可用的erlang语言。
比较示意图:各大消息中间件对比
二、RabbitMQ
2.1 简介
RabbitMQ是一个开源的AMQP实现,服务端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
2.2 通信模块
- 发送端
将消息发送到对应的交换器中,可以指定路由关键字 - MQ服务器
将交换器获取到的消息,根据路由规则,转发到对应的消息队列中 - 消费端
监听MQ服务器的消息队列,如果由消息变化,就会从消息队列中获取消息
2.3 核心类说明
- ConnectionFactory
为Connection的制造工厂,可以设置服务器和端口号和账号密码等信息 - Connection
是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑 - Channel
与RabbitMQ打交道的最重要的一个接口,我们大部分业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。 - Queue
RabbitMQ的作用是存储消息,队列的特性是先进先出。即生产者生成消息被送到RabbitMQ的内部对象Queue中去,而消费者则是从Queue队列中取出数据 - Exchange(交换机、交换器)
根据绑定的匹配规则,对消息进行匹配处理
三、RabbitMQ初体验
涉及角色:
3.1 MQ服务器
可以基于Docker安装RabbitMQ,记住其端口:
15672:网页版可视化服务器数据
5672:客户端连接的端口号
3.1.1 Docker安装RabbitMQ
- 搜索
docker search rabbitmq:management - 下载
docker pull rabbitmq:management - 创建
docker create --name rabbitmq -p 15671:15671 -p 15672:15672 -p 5671:5671 -p 5672:5672 rabbitmq:management
docker run -d --name rabbitmq -p 15671:15671 -p 15672:15672 -p 5671:5671 -p 5672:5672 rabbitmq:management - 启动
docker start rabbitmq - 测试
浏览器输入:
http://服务器ip:15672
用户名和密码:
默认:guest 密码:guest
切记:开放端口,否则拦截
3.2 MQ消息发送者
//http://39.105.189.141:15672
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//设置连接信息
factory.setHost("39.105.189.141");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//2、获取连接对象
Connection connection=factory.newConnection();
//3、获取通道对象
Channel channel=connection.createChannel();
//4、创建队列
/**
* 定义队列 参数说明
* 1、队列名称
* 2、是否持久化 队列消息是否存储到磁盘
* 3、是否独占队列
* 4、是否断开之后自动删除消息
* 5、额外设置的数据信息 */
channel.queueDeclare("queue1902",false,false,false,null);
//5、发送消息
/*参数说明:
* 1、交换机名称
* 2、队列名称
* 3、属性参数
* 4、发送的消息内容 要求字节*/
channel.basicPublish("","queue1902",null,"你睡着了吗".getBytes());
//6、关闭
channel.close();
connection.close();
}
3.3 MQ消息接收者
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//设置连接信息
factory.setHost("39.105.189.141");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//2、获取连接对象
Connection connection=factory.newConnection();
//3、获取通道对象
Channel channel=connection.createChannel();
//4、创建队列
/**
* 定义队列 参数说明
* 1、队列名称
* 2、是否持久化 队列消息是否存储到磁盘
* 3、是否独占队列
* 4、是否断开之后自动删除消息
* 5、额外设置的数据信息 */
channel.queueDeclare("queue1902",false,false,false,null);
//5、定义消费者
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者:"+new String(body));
}
};
//6、绑定消费者
/**
* 参数说明:
* 1、队列名称
* 2、是否自动应答
* 3、消费者对象*/
channel.basicConsume("queue1902",true,consumer);
}
四、RabbitMQ的消息模式
4.1 普通消息
点对点消息
一个消息只能消费一次
只需要队列就可以,不需要交换机
消息发送者和消息接收者者可以不同时在线
4.2 交换器消息
RabbitMQ特色就在于Exchange,主要有以下类型:
fanout:只要有消息就转发给绑定的队列,不会进行消息的路由判断
direct:会根据路由匹配规则,将消息发送到指定队列中,注意路由规则不支持特殊字符
topic:会根据路由匹配规则,将消息发送到指定队列中,注意路由规则支持特殊字符,比如:* #
五、使用场景
- 异步解耦
如果某个方法,有很多逻辑需求处理,但是部分的逻辑处理和用户无关。那么就可以采用异步 - 耗时的操作
网络请求、IO流、复杂的业务逻辑 - 削峰填谷