什么是消息队列
消息队列,即MQ,Message Queue。
消息队列是典型的生产者-消费者模式。消息队列采用异步的方式进行工作,生产者和消费者互不干扰没有业务逻辑的入侵,只关心消息的发送和接收,从而实现了生产者和消费者的解耦。
AMQP和JMS
MQ是消息通信模型,并不是具体实现,而具体实现MQ的有两种主流方式:AMQP、JMS
AMQP:一个提供统一消息服务的应用层标准高级消息队列协议,客户端与中间件传递消息不受开发语言限制。
JMS:Java应用提供统一的消息操作,仅限于Java语言。
AMQP和JMS的区别
a)JMS定义统一接口对消息操作进行统一,AMQP通过规定协议来统一数据交互的格式。
b)JMS限定Java语言,AMQP不限定语言
c)JMS规定两种消息模式,AMQP支持多种消息模型
常见的MQ产品
ActiveMQ:基于JMS
RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
Kafka:分布式消息系统,高吞吐量
为什么需要中间件(MQ)
在高并发下系统来不及同步处理,请求往往会发生堵塞。消息队列是异步处理,对业务进行解耦和减少请求时间、同时对大并发量进行限流/削峰减小系统的压力。
使用消息队列有什么问题
高可用:消息队列不能是单机模式一旦消息队列挂掉整个系统将不可用,所以系统采用集群/分布式要做必然希望消息队列能提供现成的支持。(补充:所有的高可用都是通过RPC和存储的高可用来做的。而消息队列的高可用,只能保证broker接受消息和确认消息的接口是幂等的,consumer处理消息是幂等这样系统的可用性就交由RPC进行处理,采用共享存储来保证幂等,broker多设备共享一个DB/一个分布式文件,则处理消息自然是幂等。当单点故障时,其他节点将自动补上,另外failover可以依赖定时任务的补偿,这是消息队列本身天然就可以支持的功能。对于不共享存储的队列,如Kafka使用分区加主备模式。)
数据丢失问题:消息队列如果挂掉数据将丢失,要做到数据不丢失那么消息队列要做到同步存储还是异步存储,存储在哪磁盘、数据库、redis、分布式文件系统(个人所知道的一种模式是异步存储在消息存放进消息队列时同时把数据存入数据库,当消息队列一旦挂掉就采用数据库数据)
消费者怎么得到消息队列的数据:
a)生产者将数据放入队列,队列有数据后主动去通知消费者进行消费
b)消费者不断的轮训队列,当队列中有新数据则进行消费
其他问题
a)消息重复消费怎么办?
解决方案:增加消息状态表,用来记录消息的处理状态,每次处理消息之前都去表中查询一次,如果已经有相同的消息存在,那么不处理防止消息重复消费。
b)如何保证消息是绝对有顺序的?
如何避免消息丢失?
a)消费者的ACK机制,可以防止消费者丢失消息
b)消息进行持久化(消息持久化的前提是队列交换机持久化)
IBM WebSphere MQ
IBM WebSphere MQ 支持两种不同的应用程序接口:java消息服务(JMS)和消息队列接口(MQI)。在IBM服务器上,JMS的绑定方式被映射到MQI。
IBM WebSphere MQ 编程原理:程序和队列管理器连接,程序通过MQ Connect调用进行连接Queue Manager。当要在队列上存储数据时则调用MQ Open打开一个队列,使用MQ Put将数据存放到队列上。要接受数据,应用程序调用MQ Get接受队列上的数据。
消息通道代理(MCA)使用TCP/IP、SNA将消息从本地传输队列移到目标队列管理器(Queue Manager)。
通道出口是用户写入库,可以在通道运作期间,从已定位位置号之一进入这些库。
通道出口和对象权限管理器(OAM)提供访问控制功能。程序的每个请求传递给OAM,OAM将根据访问控制表授权或拒绝访问。OAM只提供授权服务,不提供数据保护。
OAM、通道出口、MCA三个组件对IBM WebSphere MQ的安全性解决方案。
Rocket MQ
动机:在早期阿里团队在使用Active MQ 5.x基础上构建队列,但随着业务量的增加队列越来越长,Active MQ的IO达到一个瓶颈,阿里团队尝试使用节流、断路器、降级来解决都无果,当时最新的消息解决方案kafka也无法满足阿里团队的低延迟和高可用的两个需求,因此Rocket MQ孕育而生。
Rocket MQ三种发送方法:可靠同步发送、可靠异步发送和单向发送
可靠同步发送:多用于重要的通知消息、短信通知、短信营销系统;
可靠异步发送:用于对响应时间敏感的业务场景;
单向发送:用于要求一定可靠性的场景,例如日志收集;
Rabbit MQ五种消息模型:
基本消息模型:生产者将消息发送到队列,消费着从队列中获取消息,队列是储存消息的缓冲区
消息确认机制:确认消息是否被接收
自动ACK:消息一旦被接收,消费者自动发送ACK(生产者出现异常时消费者还是能正常消费)
手动ACK:消息接收后,不会发送ACK,需要手动调用
work消息模型:创建一个工作队列,在多个工作者之间分配耗时任务。主要思想是避免执行资源密集型任务,适用于Web程序因为在短的HTTP请求窗口无法处理复杂的任务。
订阅模型分类:生产者将消息发送给交换机,交换机只做消息转发不做存储,再由交换机发送给队列,每个消费者都有自己单独的队列,实现一条消息被多个消费者消费
订阅模型-Fanout:生产者发送消息给交换机,交换机来决定发给哪个队列实现一条消息被多个消费者消费
订阅模型-Direct:消息发送方在发送给交换机时同时指定路由KEY,指定路由KEY绑定特定的消费者
订阅模型-Topic:Topic和Direct不同的,Topic绑定路由KEY可以使用通配符
Active MQ
ActiveMQ是apache提供的开源的,实现消息传递的一个中间插件,可以和spring整合,是目前最流行的开源消息总线,ActiveMQ是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。较相似的还有rabbitMQ和kafka等,都是最为消息传递的插件。
Active MQ传递消息的两种方式点对点方式、发布/订阅方式
点对点方式(PTP):一个消费者对应一个生产者
发布/订阅模式(Publish/Sub):一个生产者产生消息发送后,可以被多个消费者进行接收。
JMS定义了五种消息正文格式,以及消息的调用类型,允许发送和接收一些不同类型的数据,提供现有消息格式的一些级别的兼容性。
StreamMessage:--JAVA原始的数据流
TextMessage:一个字符串对象
ObjectMessage:一个系列化的java对象
BytesMessage:一个字节对象
MapMessage:key/value方式的键值对