Redis Cluster模式集群监听

Redis实现事件监听

redis.conf配置文件中修改配置

notify-keyspace-events "Ex"

配置详解

字符 发送通知
K 键空间通知,所有通知以 keyspace@ 为前缀,针对Key
E 键事件通知,所有通知以 keyevent@ 为前缀,针对event
g DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知
字符串命令的通知 l 列表命令的通知 s 集合命令的通知 h 哈希命令的通知 z 有序集合命令的通知 x 过期事件:每当有过期键被删除时发送 e 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送 A 参数 glshzxe 的别名,相当于是All

单机版Redis监听

代码示例

配置类

@Configuration
public class RedisConfiguration {

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer() {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        return redisMessageListenerContainer;
    }

    @Bean
    public KeyExpiredListener keyExpiredListener() {
        return new KeyExpiredListener(this.redisMessageListenerContainer());
    }

}

监听类

//继承MessageListener本地跑的通,在服务器上跑不通,原因不明
public class KeyExpiredListener extends KeyExpirationEventMessageListener {
    @Autowired
    protected RedisTemplate redisTemplate;
    @Value("${spring.redis.appStatusTTL}")
    private Long appStatusTTL;

    private static final Logger LOGGER = LoggerFactory.getLogger(KeyExpiredListener.class);

    public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
        //过期的key
        String key = new String(message.getBody(), StandardCharsets.UTF_8);
        LOGGER.info("redis key 过期:pattern={},channel={},key={}", new String(pattern), channel, key);
        //业务处理
    }
}

redis-cli客户端示例

$ redis-cli config set notify-keyspace-events KEA
$ redis-cli --csv psubscribe '__key*__:*'
Reading messages... (press Ctrl-C to quit)
"psubscribe","__key*__:*",1

Redis模糊查询key

  • :通配任意多个字符

?:通配单个字符

[]:通配括号内的某一个字符

主从复制版集群Redis监听

网上目前未发现能够直接监听cluster集群的方法,通用方法为为每一个节点配置单独的监听器;监听全部主节点即可

代码实现

配置类

@Configuration
public class RedisConfiguration {
    @Autowired
    private RedisConnectionFactory redisConnectionFactory;
    @Autowired
    private RedisListenerErrorHandle redisListenerErrorHandle;

    @Value("${spring.redis.host1}")
    private String host1;

    @Value("${spring.redis.host2}")
    private String host2;

    @Value("${spring.redis.host3}")
    private String host3;

    @Value("${spring.redis.port1}")
    private int port1;

    @Value("${spring.redis.port2}")
    private int port2;

    @Value("${spring.redis.port3}")
    private int port3;

    @Value("${spring.redis.password}")
    private String password;

    @Bean
    JedisPoolConfig jedisPoolConfig(){
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxIdle(100);
        jedisPoolConfig.setMaxWaitMillis(1000);
        return jedisPoolConfig;
    }

    // redis-cluster不支持key过期监听,建立多个连接,对每个redis节点进行监听
    @Bean
    RedisMessageListenerContainer redisMessageListenerContainer1() {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
        jedisConnectionFactory.setHostName(host1);
        jedisConnectionFactory.setPort(port1);
        jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
        jedisConnectionFactory.setPassword(password);
        jedisConnectionFactory.afterPropertiesSet();
        container.setConnectionFactory(jedisConnectionFactory);

        container.addMessageListener(new KeyExpiredListener(container),new PatternTopic("__keyevent@0__:expired"));
        container.setErrorHandler(redisListenerErrorHandle);
        return container;
    }

    @Bean
    RedisMessageListenerContainer redisMessageListenerContainer2() {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
        jedisConnectionFactory.setHostName(host2);
        jedisConnectionFactory.setPort(port2);
        jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
        jedisConnectionFactory.setPassword(password);
        jedisConnectionFactory.afterPropertiesSet();
        container.setConnectionFactory(jedisConnectionFactory);

        container.addMessageListener(new KeyExpiredListener(container),new PatternTopic("__keyevent@0__:expired"));
        container.setErrorHandler(redisListenerErrorHandle);

        return container;
    }

    @Bean
    RedisMessageListenerContainer redisMessageListenerContainer3() {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
        jedisConnectionFactory.setHostName(host3);
        jedisConnectionFactory.setPort(port3);
        jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
        jedisConnectionFactory.setPassword(password);
        jedisConnectionFactory.afterPropertiesSet();
        container.setConnectionFactory(jedisConnectionFactory);

        container.addMessageListener(new KeyExpiredListener(container),new PatternTopic("__keyevent@0__:expired"));
        container.setErrorHandler(redisListenerErrorHandle);

        return container;
    }

    @Bean
    KeyExpiredListener redisKeyExpirationListener1() {
        return new KeyExpiredListener(redisMessageListenerContainer1());
    }

    @Bean
    KeyExpiredListener redisKeyExpirationListener2() {
        return new KeyExpiredListener(redisMessageListenerContainer2());
    }

    @Bean
    KeyExpiredListener redisKeyExpirationListener3() {
        return new KeyExpiredListener(redisMessageListenerContainer3());
    }
}

监听器

//此代码运行时报错redisTemplate为null,但是代码完全正常执行业务。未找到原因
public class KeyExpiredListener extends KeyExpirationEventMessageListener {
    @Autowired
    protected RedisTemplate redisTemplate;

    private static final Logger logger = LoggerFactory.getLogger(KeyExpiredListener.class);

    public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
        //过期的key
        String key = new String(message.getBody(), StandardCharsets.UTF_8);
        logger.warn("redis key 过期:pattern={},channel={},key={}", new String(pattern), channel, key);
       //TODO
    }
}

错误处理类

@Component
public class RedisListenerErrorHandle implements ErrorHandler {
    private static final Logger logger = LoggerFactory.getLogger(RedisListenerErrorHandle.class);

    @Override
    public void handleError(Throwable throwable) {
        logger.error("正常监听");
    }
}

自定义一个ErrorHandle并注入到RedisMessageListenerContainer中,redia源码中默认没有此处理类,不注入会导致每次监听时打印error级别日志:Execution of message listener failed, and no ErrorHandler has been set.

源码解析

RedisMessageListenerContainer类中有这样一段代码

    /**
     * Invoke the registered ErrorHandler, if any. Log at error level otherwise.
     *
     * @param ex the uncaught error that arose during message processing.
     * @see #setErrorHandler
     */
    protected void invokeErrorHandler(Throwable ex) {
        if (this.errorHandler != null) {
            this.errorHandler.handleError(ex);
        } else if (logger.isWarnEnabled()) {
            logger.warn("Execution of message listener failed, and no ErrorHandler has been set.", ex);
        }
    }
    /**
     * Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default
     * there will be <b>no</b> ErrorHandler so that error-level logging is the only result.
     */
        //自己定义的RedisMessageListenerContainer需要手动设置一个ErrorHandle
    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

Cluster集群监听

RedisAutoConfiguration类

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnection;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import redis.clients.jedis.Jedis;
/**
 * @author wangrh
 * @title:
 * @description: 描述
 * @date: 2020/12/30
 */
@Configuration
@ConditionalOnClass({ JedisConnection.class, RedisOperations.class, Jedis.class, MessageListener.class })
@AutoConfigureAfter({ JacksonAutoConfiguration.class,RedisAutoConfiguration.class })
public class RedisAutoConfiguration {
    @Configuration
    @ConditionalOnExpression("'${spring.redis.host:}'.isEmpty()")
    public static class RedisStandAloneAutoConfiguration {
        @Bean
        public RedisMessageListenerContainer customizeRedisListenerContainer(
                RedisConnectionFactory redisConnectionFactory, @Qualifier("keyExpiredEventMessageListener") MessageListener messageListener) {
            RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
            redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
            redisMessageListenerContainer.addMessageListener(messageListener,new PatternTopic("__keyevent@0__:expired"));
            return redisMessageListenerContainer;
        }
    }

    @Configuration
    @ConditionalOnExpression("'${spring.redis.host:}'.isEmpty()")
    public static class RedisClusterAutoConfiguration {
        @Bean
        public RedisMessageListenerFactory redisMessageListenerFactory(BeanFactory beanFactory,
                                                                       RedisConnectionFactory redisConnectionFactory) {
            RedisMessageListenerFactory beans = new RedisMessageListenerFactory();
            beans.setBeanFactory(beanFactory);
            beans.setRedisConnectionFactory(redisConnectionFactory);
            return beans;
        }
    }
}

RedisMessageListenerFactory类

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import redis.clients.jedis.JedisShardInfo;
/**
 * @author wangrh
 * @title:
 * @description: 描述
 * @date: 2020/12/30
 */
@Slf4j
public class RedisMessageListenerFactory implements BeanFactoryAware, ApplicationListener<ContextRefreshedEvent> {
    @Value("${spring.redis.password}")
    private String password;

    private DefaultListableBeanFactory beanFactory;

    private RedisConnectionFactory redisConnectionFactory;

    @Qualifier("keyExpiredEventMessageListener")
    @Autowired
    private MessageListener messageListener;

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = (DefaultListableBeanFactory) beanFactory;
    }

    public void setRedisConnectionFactory(RedisConnectionFactory redisConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        RedisClusterConnection redisClusterConnection = redisConnectionFactory.getClusterConnection();
        if (redisClusterConnection != null) {
            Iterable<RedisClusterNode> nodes = redisClusterConnection.clusterGetNodes();
            for (RedisClusterNode node : nodes) {
                if (node.isMaster()) {
                    log.info("获取到redis的master节点为[{}]",node.toString());
                    String containerBeanName = "messageContainer" + node.hashCode();
                    if (beanFactory.containsBean(containerBeanName)) {
                        return;
                    }
                    JedisShardInfo jedisShardInfo = new JedisShardInfo(node.getHost(), node.getPort());
                    jedisShardInfo.setPassword(password);
                    JedisConnectionFactory factory = new JedisConnectionFactory(jedisShardInfo);
                    BeanDefinitionBuilder containerBeanDefinitionBuilder = BeanDefinitionBuilder
                            .genericBeanDefinition(RedisMessageListenerContainer.class);
                    containerBeanDefinitionBuilder.addPropertyValue("connectionFactory", factory);
                    containerBeanDefinitionBuilder.setScope(BeanDefinition.SCOPE_SINGLETON);
                    containerBeanDefinitionBuilder.setLazyInit(false);
                    beanFactory.registerBeanDefinition(containerBeanName,
                            containerBeanDefinitionBuilder.getRawBeanDefinition());

                    RedisMessageListenerContainer container = beanFactory.getBean(containerBeanName,
                            RedisMessageListenerContainer.class);
                    String listenerBeanName = "messageListener" + node.hashCode();
                    if (beanFactory.containsBean(listenerBeanName)) {
                        return;
                    }
                    container.addMessageListener(messageListener, new PatternTopic("__keyevent@0__:expired"));
                    container.start();
                }
            }
        }
    }
}

KeyExpiredEventMessageListener类

import com.tvpartner.wechat.users.vo.AppStatusEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
 * @author wangrh
 * @title:
 * @description: redis集群监听业务类
 * @date: 2020/12/30
 */
@Slf4j
@Component
public class KeyExpiredEventMessageListener implements MessageListener  {
    @Autowired
    protected StringRedisTemplate stringRedisTemplate;
    @Value("${spring.redis.appStatusTTL}")
    private Long appStatusTTL;
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expired = message.toString();
        System.out.println("======接收监听===="+expired);
        String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
        //过期的key
        String key = new String(message.getBody(), StandardCharsets.UTF_8);
        String[] split = key.split(":");
        String isp = split[0];
        String iptvUserId = split[1];
        if ("CM".equals(isp) || "CTC".equals(isp) || "CU".equals(isp)) {
            if(log.isInfoEnabled()) {
                log.info("redis key 过期:pattern={},channel={},key={}", new String(pattern), channel, key);
            }
            stringRedisTemplate.opsForValue().set(key, AppStatusEnum.OFFLINE.code.toString(), appStatusTTL, TimeUnit.SECONDS);
            String key2 = isp + ":" + iptvUserId + "*";
            String key3 = "appstatus:" + isp + ":" + iptvUserId;
            //获取key3状态,如果不存在或3(有在线用户状态)则查询本次操作后是否还有在线用户并修改状态,如果处于2(无在线用户状态)则不做操作
            Object appstatus2 = stringRedisTemplate.opsForValue().get(key3);
            if (null == appstatus2 || String.valueOf(AppStatusEnum.IPTV_ONLINE.code).equals(appstatus2.toString())) {
                Set keys = stringRedisTemplate.keys(key2);
                //遍历用户在线状态,如果有用户在线则状态不变,如果所有用户均不在线则修改状态为2
                if (null != keys && keys.size() > 0) {
                    boolean flag = false;
                    for (Object k : keys) {
                        if (String.valueOf(AppStatusEnum.ONLINE.code).equals(stringRedisTemplate.opsForValue().get(k))) {
                            flag = true;
                        }
                    }
                    if (flag) {
                        stringRedisTemplate.opsForValue().set(key3, String.valueOf(AppStatusEnum.IPTV_ONLINE.code));
                    } else {
                        stringRedisTemplate.opsForValue().set(key3, String.valueOf(AppStatusEnum.IPTV_OFFLINE.code));
                    }
                }
            }
        }
    }
}

监听集群会出现类似广播的效果,导致消息被重复消费

getset命令加锁

一、redis的getset命令介绍:

1、getset命令自动将key对应到value并且返回原来key对应的value。如果key存在但是对应的value不是字符串,返回错误。

2、getset命令返回之前的旧值,如果之前Key不存在将返回null。

二、我们采用的是RedisTemplate操作redis,以下是封装的redisUtil中的getset方法内容:

public  <T> T getAndSet(final String key, T value) {
        T oldValue = null;
        try {
            ValueOperations<String, Object> operations = redisTemplate.opsForValue();
            oldValue =(T) operations.getAndSet(key, value);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return oldValue;
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353

推荐阅读更多精彩内容