前言
实际生产中,我们经常会碰到这样的场景: 业务方触发了某些预料之中的bug,(比如项目中调用了第三方的服务,但是第三方的服务出问题导致无法访问,这类错,我们往往不会直接提示用户,而是选择屏蔽此类错误,写入错误日志),我们处理此类bug往往需要去生产导出日志记录,然后排查,最后找到第三方服务的提供者去解决问题.
那么,与其等“被动”业务反馈,能不能让这类问题“主动”推送给开发呢? 我们能不做个“错误预警”的服务.
消息推送技术,即是解决这类问题的良方.
消息队列
消息队列,一般我们会简称它为MQ(Message Queue),再介绍消息队列前,我们还是先简单解释一下队列这种数据结构
队列
队列是一种先进先出的数据结构
如图,数据从队尾(右)进,从队头(左)出.
消息队列
消息队列可以简单的理解为:把要传输的数据放在队列中。
当我们需要使用消息的时候可以取出数据供自己使用。
消息队列的两种场景
从以上概念中我们不难看出有两个角色对队列至关重要,一个是放数据的,一个是取数据的.
当然,这两个角色都有是有规范的名字的,同时,消息队列有两种场景,在这两种不同的场景里,这两个角色名字是不同的:
- 生产者消费者模式
- 生产者: 放数据进队列
- 消费者: 从队列取数据
- 发布者订阅者模式
- 发布者: 放数据进队列
- 订阅者: 从队列取数据
场景区别
- 生产者消费者模式(一对一)
生产者将生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就
会从队列中取走消息;即对于每个消息只能被最多一个消费者拥有。
包括三个角色:
- 消息队列
- 发送者(生产者)
- 接收者(消费者)
生产消费者模式特点:
- 每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中)
- 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息
- 接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息
- 发布者订阅者模式(一对多)
发布者将生产消息放到队列里,多个监听队列的消费者都会收到同一份消息;
即正常情况下每个消费者收到的消息应该都是一样的。
包括三个角色:
- 角色主题(Topic)
- 发布者(Publisher)
- 订阅者(Subscriber)
发布订阅模式特点:
- 每个消息可以有多个订阅者
- 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息
- 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行
消息队列解决的问题
消息队列为了实现实现高性能,高可用,可伸缩和最终一致性架构,主要可以解决如下问题:
- 异步处理
多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间
场景举例:
用户注册后,需要发注册邮件和注册短信.
传统的做法有两种
- 串行方式
将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。
以上三个任务全部完成后,返回给客户端
- 并行方式
将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
- 消息队列异步处理方式
将注册信息写入数据库,再将任务写入消息队列后,立即返回成功给客户端,
则总的响应时间依赖于写入消息队列的时间,
而写入消息队列的时间本身是可以很快的,基本可以忽略不计,
因此总的处理时间相比串行提高了2倍,相比并行提高了一倍
- 应用耦合
多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败
场景举例:
银行身份证人脸识别系统,用户上传身份证图片,人脸识别系统会对该图片进行人脸识别.
一般的做法是:
服务器接收到图片后,图片上传系统立即调用人脸识别系统,调用完成后再返回成功
该方法有如下缺点:
- 人脸识别系统被调失败,导致图片上传失败
- 延迟高,需要人脸识别系统处理完成后,再返回给客户端,即使用户并不需要立即知道结果
- 图片上传系统与人脸识别系统之间互相调用,需要做耦合
为了解决以上缺点,我们采用消息队列解决应用间的耦合问题:
消息队列的做法:
用户上传图片后,图片上传系统将图片信息顺序写入消息队列,直接返回成功;
人脸识别系统则定时从消息队列中取数据,完成对图片的识别。
图片上传系统并不需要关心人脸识别系统是否对这些图片信息的处理、以及何时对这些图片信息进行处理。事实上,由于用户并不需要立即知道人脸识别结果,人脸识别系统可以选择不同的调度策略,按照闲时、忙时、正常时间,对队列中的图片信息进行处理。
- 限流削峰
广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况
场景举例:
电商秒杀活动,常见的形式是数量极少的热门商品让大量的用户抢购
传统的做法是用户直接请求业务系统,但往往因为并发用户过大,或导致业务系统崩溃,或着出现超卖等等现象.
采用消息队列后,系统可以从消息队列中取数据,相当于消息队列做了一次缓冲
采用消息队列处理秒杀有如下优点:
- 请求先入消息队列,而不是由业务处理系统直接处理,做了一次缓冲,极大地减少了业务处理系统的压力;
- 队列长度可以做限制,事实上,秒杀时,后入队列的用户无法秒杀到商品,这些请求可以直接被抛弃,返回活动已结束或商品已售完信息;
- 消息驱动的系统
场景举例:
用户新上传了一批照片,人脸识别系统需要对这个用户的所有照片进行聚类,聚类完成后由对账系统重新生成用户的人脸索引(加快查询)。这三个子系统间由消息队列连接起来,前一个阶段的处理结果放入队列中,后一个阶段从队列中获取消息继续处理.
使用消息队列有如下优点:
- 避免了直接调用下一个系统导致当前系统失败;
- 每个子系统对于消息的处理方式可以更为灵活,可以选择收到消息时就处理,可以选择定时处理,也可以划分时间段按不同处理速度处理;
Redis实现发布订阅模式
消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。目前使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ,这些消息中间件我们暂时不讲,本章,我们使用最为简单的方式REDIS来实现消息队列的发布订阅模式.
Redis
Redis从2.X版本开始,就支持一种基于非持久化消息的、使用发布/订阅模式实现的事件通知机制.
所谓基于非连接保持,是因为一旦消息订阅者由于各种异常情况而被迫断开连接,在其重新连接后,
其离线期间的事件是无法被重新通知的(一些Redis资料中也称为即发即弃).
而其使用的发布/订阅模式,意味着其机制并不是由订阅者周期性的从Redis服务拉取事件通知,
而是由Redis服务主动推送事件通知到符合条件的若干订阅者.
通俗的来讲,Redis实现的发布订阅模式有如下注意点:
- 基于Redis服务主动推送消息,而非订阅者循环拉取.
- 消息即发即丢(就是消息一发布,就丢失了,不会保存)
Springboot+Redis实现
- 引入redis依赖
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
- application.yml
spring:
redis:
port: 6379
database: 0
host: 127.0.0.1
password: 123456
jedis:
pool:
max-active: 8
max-wait: -1ms
max-idle: 8
min-idle: 0
timeout: 5000ms
server:
port: 9999
- redis配置类
package com.mrcoder.sbredispubsub.config.redis;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mrcoder.sbredispubsub.model.MessageSubscriber;
import com.mrcoder.sbredispubsub.utils.RedisPubSubUtil;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* @Description: Redis配置类
*/
@Configuration
@ConditionalOnClass({RedisTemplate.class})
public class RedisConfig {
/**
* Redis操作模板配置
*
* @param connectionFactory
* @return
*/
@Bean
public RedisTemplate<?, ?> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<byte[], byte[]> template = new RedisTemplate<byte[], byte[]>();
template.setConnectionFactory(connectionFactory);
template.setKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
@Bean
public RedisTemplate<?, ?> redisTemplate(RedisConnectionFactory connectionFactory, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer) {
RedisTemplate<byte[], byte[]> template = new RedisTemplate<byte[], byte[]>();
template.setConnectionFactory(connectionFactory);
// 设置key/hashkey序列化
RedisSerializer<String> stringSerializer = new StringRedisSerializer();
template.setKeySerializer(stringSerializer);
template.setHashKeySerializer(stringSerializer);
// 设置值序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
template.setKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
@Bean
public RedisPubSubUtil redisPubSubUtil(@Qualifier("redisTemplate") RedisTemplate<String, Object> redis) {
return new RedisPubSubUtil(redis);
}
/**
* 序列化定制
*
* @return
*/
@Bean
public Jackson2JsonRedisSerializer<Object> jackson2JsonSerializer() {
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(
Object.class);
// 初始化objectmapper
ObjectMapper mapper = new ObjectMapper();
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(mapper);
return jackson2JsonRedisSerializer;
}
/**
* 将订阅器绑定到容器
*
* @param connectionFactory
* @param listener
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listener, new PatternTopic("redis.pubsub.*"));
return container;
}
/**
* 消息监听器,使用MessageAdapter可实现自动化解码及方法代理
*
* @param jackson2JsonRedisSerializer
* @param subscriber
* @return
*/
@Bean
public MessageListenerAdapter listener(Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer, MessageSubscriber subscriber) {
MessageListenerAdapter adapter = new MessageListenerAdapter(subscriber, "onMessage");
adapter.setSerializer(jackson2JsonRedisSerializer);
adapter.afterPropertiesSet();
return adapter;
}
}
- 定义消息实体类
package com.mrcoder.sbredispubsub.model;
import lombok.Data;
import java.util.Date;
@Data
public class SimpleMessage {
private String publisher;
private String content;
private Date createTime;
}
- 消息发布类
package com.mrcoder.sbredispubsub.utils;
import com.mrcoder.sbredispubsub.model.SimpleMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import java.util.Date;
/**
* @Description: Redis发布订阅
*/
public class RedisPubSubUtil {
private static final Logger logger = LoggerFactory.getLogger(RedisPubSubUtil.class);
private RedisTemplate<String, Object> redisTemplate;
public RedisPubSubUtil(RedisTemplate<String, Object> redisTemplate) {
super();
this.redisTemplate = redisTemplate;
}
public void publish(String publisher, String content) {
logger.info("message send {} by {}", content, publisher);
SimpleMessage simpleMessage = new SimpleMessage();
simpleMessage.setContent(content);
simpleMessage.setPublisher(publisher);
simpleMessage.setCreateTime(new Date());
ChannelTopic channelTopic = new ChannelTopic("redis.pubsub.msg");
redisTemplate.convertAndSend(channelTopic.getTopic(), simpleMessage);
}
}
- 定义订阅者实体类
package com.mrcoder.sbredispubsub.model;
import com.mrcoder.sbredispubsub.utils.FastJsonUtil;
import org.springframework.stereotype.Component;
/**
* @Description: 消息订阅类
*/
@Component
public class MessageSubscriber {
public void onMessage(SimpleMessage simpleMessage, String pattern) {
logger.info("topic {} received {}", pattern, FastJsonUtil.javaToJsonSnakeCase(simpleMessage));
}
}
- 控制器
package com.mrcoder.sbredispubsub.controller;
import com.mrcoder.sbredispubsub.utils.RedisPubSubUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.*;
@CrossOrigin
@RestController
public class RedisPubSubController {
@Autowired
private RedisPubSubUtil redisPubSubUtil;
@GetMapping("redisPubSub")
public void redisPubSub(){
redisPubSubUtil.publish("echo", "testMessage"));
}
}
改造
以上已经实现了基于redis简单的发布订阅了.
那么,在此之上我们多做一点来更好的理解发布订阅这块的内容.
- 我们实现推送内容到企业微信
- 我们实现读取文件的内容来推送(可用于版本新功能的发布推送)
项目地址:
https://github.com/MrCoderStack/SpringBootDemo/tree/master/sb-redis-pubsub