问题描述
在redis集成到spring后, 使用了redis的订阅与发布的功能,在每次发布的时候, 会出现疯狂日志报错。 由于该问题比较隐蔽,在这边记录并提供解决办法。
集成说明
- spring 版本: 4.1.4
- redis客户端: jedis 2.7.2
- 连接池druid
使用application.xml配置redis连接池等的信息。
对jedis的简单封装
在项目中, 我对jedis客户端做了简单的封装, 封装代码如下:
@Repository("redisClientTemplate")
public class RedisClientTemplate {
@Autowired
private ShardedJedisPool shardedJedisPool;
@Autowired
private JedisPool jedisPool;
/**
*
* @param ec
* @return
* @throws IllegalArgumentException when ec is null
* @throws RuntimeException
*/
public <T>T execute(Executor<T> ec, Class<? extends JedisCommands> jedisClass){
Assert.notNull(ec, "the redis executor is null");
JedisCommands jedisCommands = null;
try {
if (jedisClass == Jedis.class) {
jedisCommands = jedisPool.getResource();
} else {
jedisCommands = shardedJedisPool.getResource();
}
return ec.execute(jedisCommands);
}catch (Exception e){
throw new RuntimeException(e);
}finally {
if (jedisCommands!=null && jedisCommands instanceof Closeable){
try {
((Closeable)jedisCommands).close();
} catch (IOException e) {
// log
}
}
}
}
}
/**
* 具体的执行逻辑
*/
public abstract class Executor<T> {
T execute(JedisCommands jedisCommands){
if (jedisCommands instanceof Jedis){
return doExecute((Jedis)jedisCommands);
}else if (jedisCommands instanceof ShardedJedis){
return doExecute((ShardedJedis)jedisCommands);
}else {
return doExecute((ShardedJedis)jedisCommands);
}
}
protected T doExecute(ShardedJedis jedisCommands) {
return null;
}
protected T doExecute(Jedis jedisCommands) {
return null;
}
}
在这里说明一下, 由于jedis存在jedis和shardedJedis, 2种api不一样, 并且应用场景也不一样, 比如对于key的模糊搜索ShardedJedis此操作。
使用example:
redisClientTemplate.execute(ExecutorUtils.addSet(key, value), ShardedJedis.class);
在ExecutorUtils
public static Executor<Boolean> addSet(final String key, final String... value){
if (key == null || key.equals("")) {
return null; // todo illegalArgument
}
return new Executor<Boolean>() {
@Override
public Boolean doExecute(ShardedJedis shardedJedis) {
Long i = shardedJedis.sadd(key, value); // todo test
return true;
}
};
}
jedis的api 使用起来很简单, 在项目中, 使用了redis的订阅发布功能, 如何使用呢, 请看下面代码
@Component
@Slf4j
public class SubscriberDemo extends JedisPubSub {
Gson gson = new Gson();
@Autowired
RedisClientTemplate redisClientTemplate;
@Override
public void onMessage(String channel, String message) {
doSomethingWithDatabase();
}
@PostConstruct
public void init (){
new Thread(new Runnable() {
@Override
public void run() {
redisClientTemplate.execute(new Executor<Object>() {
@Override
protected Object doExecute(Jedis jedisCommands) {
jedisCommands.subscribe(SubscriberDemo.this,"channel");
return null;
}
}, Jedis.class);
}
},"threadName").start();
}
@PreDestroy
public void destroy(){
this.unsubscribe("channel");
}
}
问题详情
在使用jedis时,我并没有加入destroy方法, 导致每次生产发布后, 在日志中都会出现大量druid获取的连接已关闭的错误, 然而我发现服务器上的业务时正常运行的。
是什么原因导致这个问题的产生呢?
经过一定的校验我发现, 在发布生产时,数据源会被关闭, 然而redis客户端还在订阅服务端的chennel,此时服务器又启动,在这个类初始化之前, 继而调用init函数,此时又会有一个redis客户端订阅该channe
简单画一个图解:
解决办法
解决办法正如我上面写的, 写一个destry方法, 在该对象销毁之前, 取消订阅channel即可。 这样始终保持每次发布订阅一个channel, 结束销毁一个channel。
其他注意点:
还会发现, 我们这边在订阅的时候写了个线程,原因是因为redis订阅是同步的, 其实我们通过jedis源码很容易看到, 在内部是通过一个do while循环来监听消息。所以我们将该订阅交给一个线程执行, 使得应用能够正常被启动。
如何提高消息的处理能力呢?
很简单,我们可以在onMessage中加入线程池来处理消息。
spring-data-redis
spring-data-redis为我们在jedis做了更好的封装, 使得我们对redis客户端能够很简单的应用, 下面简单介绍以下spring-data-redis的发布与订阅功能。
其中我们只需要进行相关的bean的配置, 无需关心channel的订阅
@Configuration
public class RedisSubListenerConfig {
final ExecutorService executorService = Executors.newFixedThreadPool(20);
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter service) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(dataWarning, new PatternTopic(TOPIC));
return container;
}
@Bean
MessageListenerAdapter dataAdapter(Service service) {
MessageListenerAdapter adapter = new MessageListenerAdapter(service, "doSomething");
adapter.setSerializer(new FastJsonRedisSerializer<>(Message.class));
return adapter;
}
}