聊聊jetcache的BroadcastManager

本文主要研究一下jetcache的BroadcastManager

BroadcastManager

jetcache-core/src/main/java/com/alicp/jetcache/support/BroadcastManager.java

public abstract class BroadcastManager implements AutoCloseable {
    private static Logger logger = LoggerFactory.getLogger(BroadcastManager.class);

    private final String sourceId = UUID.randomUUID().toString();
    private final CacheManager cacheManager;

    public BroadcastManager(CacheManager cacheManager) {
        this.cacheManager = cacheManager;
    }

    protected void checkConfig(ExternalCacheConfig config) {
        if (config.getBroadcastChannel() == null) {
            throw new CacheConfigException("BroadcastChannel not set");
        }
        if (config.getValueEncoder() == null) {
            throw new CacheConfigException("no value encoder");
        }
        if (config.getValueDecoder() == null) {
            throw new CacheConfigException("no value decoder");
        }
    }

    public abstract CacheResult publish(CacheMessage cacheMessage);

    public abstract void startSubscribe();

    @Override
    public void close() throws Exception {
    }

    //......
}    

BroadcastManager是个抽象类,实现了AutoCloseable接口,其close方法默认为空实现,它定义了publish及startSubscribe两个抽象方法;它主要有MockRemoteCacheBuilder的匿名实现、RedisBroadcastManager、LettuceBroadcastManager、SpringDataBroadcastManager、RedissonBroadcastManager这几个实现

MockRemoteCacheBuilder

jetcache-core/src/main/java/com/alicp/jetcache/external/MockRemoteCacheBuilder.java

    public BroadcastManager createBroadcastManager(CacheManager cacheManager) {
        return new BroadcastManager(cacheManager) {
            @Override
            public CacheResult publish(CacheMessage cacheMessage) {
                lastPublishMessage = cacheMessage;
                return CacheResult.SUCCESS_WITHOUT_MSG;
            }

            @Override
            public void startSubscribe() {
                subscribeStart = true;
            }
        };
    }

MockRemoteCacheBuilder的createBroadcastManager创建了一个匿名的BroadcastManager,其publish返回CacheResult.SUCCESS_WITHOUT_MSG,其startSubscribe设置subscribeStart为true

RedisBroadcastManager

jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisBroadcastManager.java

public class RedisBroadcastManager extends BroadcastManager {

    private static final Logger logger = LoggerFactory.getLogger(RedisBroadcastManager.class);

    private final byte[] channel;
    private final String channelStr;
    private final RedisCacheConfig<Object, Object> config;

    private volatile CacheMessagePubSub cacheMessagePubSub;
    private volatile boolean closed;
    private volatile boolean subscribe;
    private boolean subscribeThreadStart;

    public RedisBroadcastManager(CacheManager cacheManager, RedisCacheConfig<Object, Object> config) {
        super(cacheManager);
        this.channelStr = config.getBroadcastChannel();
        this.channel = channelStr.getBytes(StandardCharsets.UTF_8);
        this.config = config;

        checkConfig(config);
        if (config.getJedis() == null && config.getJedisPool() == null) {
            throw new CacheConfigException("no jedis");
        }
        if (config.getJedis() != null && config.getJedisPool() != null) {
            throw new CacheConfigException("'jedis' and 'jedisPool' can't set simultaneously");
        }
    }

    @Override
    public synchronized void startSubscribe() {
        if (subscribeThreadStart) {
            throw new IllegalStateException("subscribe thread is started");
        }
        this.cacheMessagePubSub = new CacheMessagePubSub();
        Thread subThread;
        subThread = new Thread(this::runSubThread, "Sub_" + channelStr);
        subThread.setDaemon(true);
        subThread.start();
        this.subscribeThreadStart = true;
    }

    @Override
    public CacheResult publish(CacheMessage message) {
        Object jedisObj = null;
        try {
            jedisObj = writeCommands();
            byte[] value = config.getValueEncoder().apply(message);
            if (jedisObj instanceof Jedis) {
                ((Jedis) jedisObj).publish(channel, value);
            } else {
                ((UnifiedJedis) jedisObj).publish(channel, value);
            }
            return CacheResult.SUCCESS_WITHOUT_MSG;
        } catch (Exception ex) {
            SquashedLogger.getLogger(logger).error("jetcache publish error", ex);
            return new CacheResult(ex);
        } finally {
            RedisCache.closeJedis(jedisObj);
        }
    }


    @Override
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        if (subscribe) {
            try {
                this.cacheMessagePubSub.unsubscribe(channel);
            } catch (Exception e) {
                logger.warn("unsubscribe {} fail", channelStr, e);
            }
        }
    }

    //......
}    

RedisBroadcastManager继承了BroadcastManager,其startSubscribe创建Thread并执行runSubThread方法;其publish方法主要是将value发布到指定的channel;其close方法对于subscribe的true则执行cacheMessagePubSub.unsubscribe(channel)

runSubThread

    private void runSubThread() {
        while (!closed) {
            runSubThread0();
        }
    }

    private void runSubThread0() {
        Object jedisObj = null;
        try {
            jedisObj = writeCommands();
            if (jedisObj instanceof Jedis) {
                subscribe = true;
                ((Jedis) jedisObj).subscribe(cacheMessagePubSub, channel);
            } else if (jedisObj instanceof UnifiedJedis) {
                subscribe = true;
                ((UnifiedJedis) jedisObj).subscribe(cacheMessagePubSub, channel);
            }
        } catch (Throwable e) {
            SquashedLogger.getLogger(logger).error("run jedis subscribe thread error: {}", e);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                // ignore
            }
        } finally {
            subscribe = false;
            RedisCache.closeJedis(jedisObj);
        }
    }

runSubThread通过while循环执行runSubThread0,它主要是执行subscribe(cacheMessagePubSub, channel)方法

LettuceBroadcastManager

jetcache-support/jetcache-redis-lettuce/src/main/java/com/alicp/jetcache/redis/lettuce/LettuceBroadcastManager.java

public class LettuceBroadcastManager extends BroadcastManager {
    private static final Logger logger = LoggerFactory.getLogger(LettuceBroadcastManager.class);

    private final RedisLettuceCacheConfig<Object, Object> config;
    private final byte[] channel;

    private volatile boolean subscribeThreadStart;
    private volatile RedisPubSubAdapter<byte[], byte[]> pubSubAdapter;

    private final LettuceConnectionManager lettuceConnectionManager;
    private final BaseRedisAsyncCommands<byte[], byte[]> stringAsyncCommands;


    public LettuceBroadcastManager(CacheManager cacheManager, RedisLettuceCacheConfig<Object, Object> config) {
        super(cacheManager);
        checkConfig(config);
        if (config.getPubSubConnection() == null) {
            throw new CacheConfigException("PubSubConnection not set");
        }

        this.config = config;
        this.channel = config.getBroadcastChannel().getBytes(StandardCharsets.UTF_8);
        this.lettuceConnectionManager = config.getConnectionManager();
        this.lettuceConnectionManager.init(config.getRedisClient(), config.getConnection());
        this.stringAsyncCommands = (BaseRedisAsyncCommands<byte[], byte[]>) lettuceConnectionManager
                .asyncCommands(config.getRedisClient());
    }

    @Override
    public CacheResult publish(CacheMessage cacheMessage) {
        try {
            byte[] value = config.getValueEncoder().apply(cacheMessage);
            RedisFuture<Long> future = stringAsyncCommands.publish(channel, value);
            return new CacheResult(future.handle((rt, ex) -> {
                if (ex != null) {
                    JetCacheExecutor.defaultExecutor().execute(() ->
                            SquashedLogger.getLogger(logger).error("jetcache publish error", ex));
                    return new ResultData(ex);
                } else {
                    return new ResultData(CacheResultCode.SUCCESS, null, null);
                }
            }));
        } catch (Exception ex) {
            SquashedLogger.getLogger(logger).error("jetcache publish error", ex);
            return new CacheResult(ex);
        }
    }

    @Override
    public synchronized void startSubscribe() {
        if (subscribeThreadStart) {
            throw new IllegalStateException("startSubscribe has invoked");
        }
        this.pubSubAdapter = new RedisPubSubAdapter<byte[], byte[]>() {
            @Override
            public void message(byte[] channel, byte[] message) {
                processNotification(message, config.getValueDecoder());
            }
        };
        config.getPubSubConnection().addListener(this.pubSubAdapter);
        RedisPubSubAsyncCommands<byte[], byte[]> asyncCommands = config.getPubSubConnection().async();
        asyncCommands.subscribe(channel);
        this.subscribeThreadStart = true;
    }

    @Override
    public void close() {
        config.getPubSubConnection().removeListener(this.pubSubAdapter);
        config.getPubSubConnection().close();
    }
}

LettuceBroadcastManager继承了BroadcastManager,其publish方法执行stringAsyncCommands.publish(channel, value);其startSubscribe方法执行config.getPubSubConnection().addListener(this.pubSubAdapter)及asyncCommands.subscribe(channel);其close方法主要是执行config.getPubSubConnection().removeListener(this.pubSubAdapter)及config.getPubSubConnection().close()

SpringDataBroadcastManager

jetcache-support/jetcache-redis-springdata/src/main/java/com/alicp/jetcache/redis/springdata/SpringDataBroadcastManager.java

public class SpringDataBroadcastManager extends BroadcastManager {

    private static final Logger logger = LoggerFactory.getLogger(SpringDataBroadcastManager.class);

    private final RedisSpringDataCacheConfig config;
    private final MessageListener listener = this::onMessage;
    private final byte[] channel;
    private volatile RedisMessageListenerContainer listenerContainer;

    public SpringDataBroadcastManager(CacheManager cacheManager, RedisSpringDataCacheConfig config) {
        super(cacheManager);
        this.config = config;
        checkConfig(config);
        if (config.getConnectionFactory() == null) {
            throw new CacheConfigException("connectionFactory is required");
        }
        this.channel = config.getBroadcastChannel().getBytes(StandardCharsets.UTF_8);
    }

    @Override
    public CacheResult publish(CacheMessage cacheMessage) {
        RedisConnection con = null;
        try {
            con = config.getConnectionFactory().getConnection();
            byte[] body = (byte[]) config.getValueEncoder().apply(cacheMessage);
            con.publish(channel, body);
            return CacheResult.SUCCESS_WITHOUT_MSG;
        } catch (Exception ex) {
            SquashedLogger.getLogger(logger).error("jetcache publish error", ex);
            return new CacheResult(ex);
        } finally {
            if (con != null) {
                try {
                    con.close();
                } catch (Exception e) {
                    SquashedLogger.getLogger(logger).error("RedisConnection close fail", e);
                }
            }
        }
    }

    @Override
    public synchronized void startSubscribe() {
        if (this.listenerContainer != null) {
            throw new IllegalStateException("subscribe thread is started");
        }
        Topic topic = new ChannelTopic(config.getBroadcastChannel());
        if (config.getListenerContainer() == null) {
            RedisMessageListenerContainer c = new RedisMessageListenerContainer();
            c.setConnectionFactory(config.getConnectionFactory());
            c.afterPropertiesSet();
            c.start();
            this.listenerContainer = c;
            logger.info("create RedisMessageListenerContainer instance");
        } else {
            this.listenerContainer = config.getListenerContainer();
        }
        this.listenerContainer.addMessageListener(listener, topic);
        logger.info("subscribe jetcache invalidate notification. channel={}", config.getBroadcastChannel());
    }

    private void onMessage(Message message, byte[] pattern) {
        processNotification(message.getBody(), config.getValueDecoder());
    }

    @Override
    public synchronized void close() throws Exception {
        if (this.listenerContainer != null) {
            this.listenerContainer.removeMessageListener(listener);
            if (this.config.getListenerContainer() == null) {
                this.listenerContainer.destroy();
            }
        }
        this.listenerContainer = null;
    }
}

SpringDataBroadcastManager继承了BroadcastManager,其publish方法执行con.publish(channel, body);其startSubscribe执行listenerContainer.addMessageListener(listener, topic);其close方法对于listenerContainer不为null的执行listenerContainer.removeMessageListener(listener),对于config.getListenerContainer()为null的执行listenerContainer.destroy()

RedissonBroadcastManager

jetcache-support/jetcache-redisson/src/main/java/com/alicp/jetcache/redisson/RedissonBroadcastManager.java

public class RedissonBroadcastManager extends BroadcastManager {
    private static final Logger logger = LoggerFactory.getLogger(RedissonBroadcastManager.class);
    private final RedissonCacheConfig<?, ?> config;
    private final String channel;
    private final RedissonClient client;
    private volatile int subscribeId;

    public RedissonBroadcastManager(final CacheManager cacheManager, final RedissonCacheConfig<?, ?> config) {
        super(cacheManager);
        checkConfig(config);
        this.config = config;
        this.channel = config.getBroadcastChannel();
        this.client = config.getRedissonClient();
    }

    @Override
    public synchronized void startSubscribe() {
        if (this.subscribeId == 0 && Objects.nonNull(this.channel) && !this.channel.isEmpty()) {
            this.subscribeId = this.client.getTopic(this.channel)
                    .addListener(byte[].class, (channel, msg) -> processNotification(msg, this.config.getValueDecoder()));
        }
    }


    @Override
    public synchronized void close() {
        final int id;
        if ((id = this.subscribeId) > 0 && Objects.nonNull(this.channel)) {
            this.subscribeId = 0;
            try {
                this.client.getTopic(this.channel).removeListener(id);
            } catch (Throwable e) {
                logger.warn("unsubscribe {} fail", this.channel, e);
            }
        }
    }

    @Override
    public CacheResult publish(final CacheMessage cacheMessage) {
        try {
            if (Objects.nonNull(this.channel) && Objects.nonNull(cacheMessage)) {
                final byte[] msg = this.config.getValueEncoder().apply(cacheMessage);
                this.client.getTopic(this.channel).publish(msg);
                return CacheResult.SUCCESS_WITHOUT_MSG;
            }
            return CacheResult.FAIL_WITHOUT_MSG;
        } catch (Throwable e) {
            SquashedLogger.getLogger(logger).error("jetcache publish error", e);
            return new CacheResult(e);
        }
    }
}

RedissonBroadcastManager继承了BroadcastManager,其startSubscribe方法执行client.getTopic(this.channel)addListener;其publish方法执行client.getTopic(this.channel).publish(msg);其close执行client.getTopic(this.channel).removeListener(id)

小结

BroadcastManager是个抽象类,实现了AutoCloseable接口,其close方法默认为空实现,它定义了publish及startSubscribe两个抽象方法;它主要有MockRemoteCacheBuilder的匿名实现、RedisBroadcastManager、LettuceBroadcastManager、SpringDataBroadcastManager、RedissonBroadcastManager这几个实现

这几个实现看起来jedis的实现稍微费劲一点,其他的基本都是通过listener机制来执行processNotification

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,948评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,371评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,490评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,521评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,627评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,842评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,997评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,741评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,203评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,534评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,673评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,339评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,955评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,770评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,000评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,394评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,562评论 2 349

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,633评论 18 139
  • 序 本文主要研究一下claudb的NotificationManager NotificationManager ...
    go4it阅读 173评论 0 0
  • Redis 概念: redis是一款高性能的NOSQL系列的非关系型数据库什么是NOSQL NoSQL(NoSQL...
    强某某阅读 448评论 0 1
  • 内存模型以及分区 JVM分为虚拟机栈、堆、方法区、本地方法区堆,用来存放实例化对象、非static成员变量,属于线...
    北京黄小胖阅读 1,220评论 0 0
  • 精心整理的 Python 相关的基础知识,用于面试,或者平时复习,都是很好的!废话不多说,直接开搞由于文章过长,萝...
    萝卜大杂烩阅读 277评论 0 0