本文分为二个部分来阐述 :
一、什么是消息队列
二、消息队列的常见应用场景
一、消息队列的概念
消息队列(Message Queue),简称MQ,从字面上其实就可以知道它的本质,是存储消息的队列,队列在大学数据结构的课程中我们已经接触过很多了(先进后出的存储结构),那什么是消息呢?消息可以是你的程序产生的任何数据,前提是你要把数据转换成消息队列可以接受的格式(序列化)。
下面是一段最简单的像消息队列发送消息的代码,在发送消息之前我们做了一系列的配置,其中就包括如何序列化你的消息(数据)。
public String sendMessege(){
Properties properties = new Properties();
properties.put("bootstrap.servers", "10.173.235.151:2181;10.173.235.152:2181;10.173.235.153:2181");
properties.put("acks", "all");//是否接受确认码
properties.put("retries", 2);//重试次数
properties.put("batch.size", 16384);//批量发送的单位
properties.put("linger.ms", 1);//最长等时间
properties.put("buffer.memory", 33554432);//缓冲区
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = null;
try {
producer = new KafkaProducer<String, String>(properties);
producer.send(new ProducerRecord<String, String>("GANTRYFEE_UPLOADS_TEST", "i am transaction"));
new ProducerRecord("1",2);
return "success";
} catch (Exception e) {
e.printStackTrace();
return "fail";
} finally {
producer.close();
}
}
二、消息队列的应用场景
1.解耦
假设现在有一个A模块,负责的是采集高速公路车辆的通行数据,比如车牌号、车辆类型、通行费等。
现在B模块要获取车辆的通行数据的车牌号,A模块的人说好,我这边写一个接口给你,你调用就好。
过了不久C模块获取车辆的通行费,A模块的人说好,我这边写一个接口给你,你调用就好。
又过了不就D模块要获取车辆的车辆类型,A模块的人好,我这边写一个接口给你,你调用就好。
。。。。
循环往复开发A模块的人迟早都会被这些琐事搞炸,因为每一次接口在上线之前都要经过双方的沟通联调,往往开发的时间是占比很小的,那为什么不把车辆的通行数据都放在一个地方,然后各自去取呢,于是消息队列就出现了。
A模块把所有的车辆通行数据发送到一个专门的地方,然后其他模块按需要从中取数据,这样A模块的人就不用因为每增加一个模块就要对系统进行改动了。
2.异步
上边改进的后的架构是我在工作项目中存在的,但是通行数据不是像上边说的由B、C、D模块需要的时候才去取,而是要求实时处理这些数据。
我们来看看这个架构下,实时处理的流程是怎么样的。
1
首先A模块接受到一条数据后,调用发送数据给B模块的接口2
B模块接收到数据后,开始处理自己的业务,处理完之后,返回响应给A模块3
A模块根据B模块的响应码判断处理是否成功,处理成功则结束,不成功则要重新发送数据,循环往复。
这里存在的一个很大的问题就是B模块要先处理完数据才能通知A,那么如果这个处理时间很长的话,A模块的一个线程就会一直处于阻塞的状态等待B的返回,但是明明A模块的任务已经完成了,为什么要白白损耗性能等待B的返回呢?于是就有了异步的设计。
这样流程就变为A模块把数据发送给消息队列,B模块订阅一个主题,当消息队列中存在数据时,B获取数据并处理,不成功时重新拉取数据,此时A时完全不用等待B的。这样A模块的的性能就上来了。
3.削峰/限流(流量控制)
当别人调用我们的接口时,我们在尽可能保证性能的同时最重要的是要保证我们的服务不被冲垮,但是我们又不可能不人道的把锅甩给其他模块的人,我们处理请求太慢就丢弃他们的请求,让他们重发,一个很好的思路就是当系统某一时刻处于瓶颈时,将请求都扔到消息队列中,系统挨个处理(是不是跟线程池的任务队列很像?其实思路都是一样的)