我的应用场景:因为业务需求,我们会每10分钟从kafka得到数据开始处理,这时就会存在一种情况,如果kafka数据没有传递过来,我们是不是要通过一种方式知道数据没传递通知我们。在这里我选用的模式是redis提供的观察者模式。
原理:从kafka得到的数据保存到redis中,key的失效时间可以自定义配置(我定义15分钟失效),每次从kafka得到数据都去刷新redis,这样如果kafka每次都传递数据,redis就不会失效,如果不传递数据,redis就会失效,然后通过redis的监听器得到这个失效的redis再进行后续处理(我们这边是进行邮件报警)
1:配置redis的失效监听,需要修改redis.conf配置文件
增加:notify-keyspace-events "Ex"
配置文件中找到notify-keyspace-events,修改成notify-keyspace-events "Ex"
Ex 的解释如下:
2:配置文件修改好后,重新启动redis,我的redis是用docker启动的。重新启动了容器。
3:验证redis失效监听是否好用。
进入redis容器:
docker exec -it redis /bin/sh
运行redis客户端:
redis-cli
运行监听命令:
psubscribe __keyevent@0__:expired
再启动一个redis-cli
创建一个10秒后失效的reids:
setex test 10 test
10秒后,可以看到监听端口可以接收到失效的redis的key.
4:java代码编写,pom.xml引入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
5:配置redis的config,配置了两种方式,第一种方式是支持@Cacheable创建redis.第二种方式是直接引用redis提供的StringRedisTemplate类来调用redis的set和get方法。
第一种方式的配置文件写法:RedisCacheConfig.java
@Configuration
public class RedisCacheConfig{
@Bean
public RedisCacheManager cacheManager(RedisConnectionFactory connectionFactory) {
return new RedisCacheManager(
RedisCacheWriter.nonLockingRedisCacheWriter(connectionFactory),
this.getRedisCacheConfigurationBase(), // 默认策略
this.getRedisCacheConfigurationMap() // 指定 key 策略
);
}
private RedisSerializer<String> keySerializer() {
return new StringRedisSerializer();
}
private RedisSerializer<Object> valueSerializer() {
return new GenericJackson2JsonRedisSerializer();
}
private Map<String, RedisCacheConfiguration> getRedisCacheConfigurationMap() {
Map<String, RedisCacheConfiguration> redisCacheConfigurationMap = new HashMap<>();
redisCacheConfigurationMap.put("NoDataCache0", this.getRedisCacheConfigurationWithTtl(60));
return redisCacheConfigurationMap;
}
private RedisCacheConfiguration getRedisCacheConfigurationBase() {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
//.entryTtl(this.timeToLive) --定义到期时间
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(keySerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(valueSerializer()));
//.disableCachingNullValues(); //保存的数据不能为null
return config;
}
private RedisCacheConfiguration getRedisCacheConfigurationWithTtl(Integer seconds) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(seconds)) //定义到期时间
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(keySerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(valueSerializer()));
//.disableCachingNullValues(); //保存的数据不能为null
return config;
}
}
用RedisCacheManager的构造方法来实现,我定义了一个60秒失效的策略,redis的写法如下:
@Cacheable(value = "NoDataCache0" key="'rc_alarmrule_2_'+#tenantId+'_'+#metricSpecId")
public List<AlarmRuleRedisAllVO> getAlarmRuleAllRedis(long tenantId,long metricSpecId) {
return null;
}
1)value = "NoDataCache0" 是我策略里定义的名称redisCacheConfigurationMap.put("NoDataCache0", this.getRedisCacheConfigurationWithTtl(60));
这个可以定义多个。每个的名称和失效时间配置不一样。
2)这种方式的配置,失效时间只能写在配置文件中或者写死,如果我想按照我表中定义一个失效时间实时的进行变化就做不到了。所以我又用了第二种配置方案。
第二种方式的配置文件写法:RedisCacheConfig.java
@Service
public class RedisService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 永久
* @param key
* @param value
*/
public void set(String key, Object value) {
redisTemplate.opsForValue().set(key, JsonUtil.convertObj2String(value));
}
/**
* 将 key,value 存放到redis数据库中,设置过期时间单位是秒
* @param key
* @param value
* @param timeout
*/
public void setBySeconds(String key, Object value, long timeout) {
redisTemplate.opsForValue().set(key, JsonUtil.convertObj2String(value), timeout, TimeUnit.SECONDS);
}
/**
* 将 key,value 存放到redis数据库中,设置过期时间单位是分钟
* @param key
* @param value
*/
public void setByMinutes(String key, Object value, long timeout) {
redisTemplate.opsForValue().set(key, JsonUtil.convertObj2String(value), timeout, TimeUnit.MINUTES);
}
/**
* 将 key,value 存放到redis数据库中,设置过期时间单位是小时
* @param key
* @param value
*/
public void setByHours(String key, Object value, long timeout) {
redisTemplate.opsForValue().set(key, JsonUtil.convertObj2String(value), timeout, TimeUnit.HOURS);
}
/**
* 将 key,value 存放到redis数据库中,设置过期时间单位是天
* @param key
* @param value
*/
public void setByDays(String key, Object value, long timeout) {
redisTemplate.opsForValue().set(key, JsonUtil.convertObj2String(value), timeout, TimeUnit.DAYS);
}
/**
* 删除 key 对应的 value
* @param key
*/
public void delete(String key) {
redisTemplate.delete(key);
}
/**
* 获取与 key 对应的对象
* @param key
* @param clazz 目标对象类型
* @param <T>
* @return
*/
public <T> T get(String key, Class<T> clazz) {
String s = get(key);
if (s == null) {
return null;
}
return JsonUtil.convertString2Obj(s, clazz);
}
/**
* 获取与 key 对应的对象
* @param key
* @param clazz 目标对象类型
* @param <T>
* @return
* @throws IOException
* @throws JsonMappingException
* @throws JsonParseException
*/
public <T> List<T> getList(String key, Class<T> clazz) throws JsonParseException, JsonMappingException, IOException {
String s = get(key);
if (s == null) {
return null;
}
return JsonUtil.toList(s,clazz);
}
/**
* 获取 key 对应的字符串
* @param key
* @return
*/
public String get(String key) {
return redisTemplate.opsForValue().get(key);
}
/**
* 查询 key 对应的过期时间
* @param key
* @return
*/
public String getExpire(String key){
Long timeout = redisTemplate.getExpire(key,TimeUnit.MILLISECONDS);
System.out.println(timeout);
if (timeout < 0)
return "Has expired!";
Long milliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli() + timeout;
return DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.ofInstant(
Instant.ofEpochMilli(milliSecond),ZoneId.systemDefault()));
}
/**
* 判断 key 是否在 redis 数据库中
* @param key
* @return
*/
public boolean exists(final String key) {
return redisTemplate.hasKey(key);
}
这种方式可以实时的改变redis的失效时间。
6:配置redis失效的监听config RedisListenerConfig.java
@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// container.addMessageListener(new RedisExpiredListener(), new PatternTopic("__keyevent@0__:expired"));
return container;
}
}
我没有配置__keyevent@0__:expired",对某个db进行监听,RedisMessageListenerContainer有个默认的配置是对所有的db进行监听。
7:redis的监听类 RedisKeyExpirationListener.java
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* 针对redis数据失效事件,进行数据处理
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
//message.toString()可以获取失效的key
String expiredKey = message.toString();
logger.debug("失效的redis是:"+expiredKey);
String part = "NoDataCache0";
//如果是NoDataCache0:开头的key,进行处理
if(expiredKey.startsWith(part)){
//自己的业务逻辑
}
}
1)注意:失效的redis只能得到key,是得不到value的,所以业务逻辑如果用到value里的值需要把值写到key中。