序
本文主要研究一下jetcache的BroadcastManager
BroadcastManager
jetcache-core/src/main/java/com/alicp/jetcache/support/BroadcastManager.java
public abstract class BroadcastManager implements AutoCloseable {
private static Logger logger = LoggerFactory.getLogger(BroadcastManager.class);
private final String sourceId = UUID.randomUUID().toString();
private final CacheManager cacheManager;
public BroadcastManager(CacheManager cacheManager) {
this.cacheManager = cacheManager;
}
protected void checkConfig(ExternalCacheConfig config) {
if (config.getBroadcastChannel() == null) {
throw new CacheConfigException("BroadcastChannel not set");
}
if (config.getValueEncoder() == null) {
throw new CacheConfigException("no value encoder");
}
if (config.getValueDecoder() == null) {
throw new CacheConfigException("no value decoder");
}
}
public abstract CacheResult publish(CacheMessage cacheMessage);
public abstract void startSubscribe();
@Override
public void close() throws Exception {
}
//......
}
BroadcastManager是个抽象类,实现了AutoCloseable接口,其close方法默认为空实现,它定义了publish及startSubscribe两个抽象方法;它主要有MockRemoteCacheBuilder的匿名实现、RedisBroadcastManager、LettuceBroadcastManager、SpringDataBroadcastManager、RedissonBroadcastManager这几个实现
MockRemoteCacheBuilder
jetcache-core/src/main/java/com/alicp/jetcache/external/MockRemoteCacheBuilder.java
public BroadcastManager createBroadcastManager(CacheManager cacheManager) {
return new BroadcastManager(cacheManager) {
@Override
public CacheResult publish(CacheMessage cacheMessage) {
lastPublishMessage = cacheMessage;
return CacheResult.SUCCESS_WITHOUT_MSG;
}
@Override
public void startSubscribe() {
subscribeStart = true;
}
};
}
MockRemoteCacheBuilder的createBroadcastManager创建了一个匿名的BroadcastManager,其publish返回CacheResult.SUCCESS_WITHOUT_MSG,其startSubscribe设置subscribeStart为true
RedisBroadcastManager
jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisBroadcastManager.java
public class RedisBroadcastManager extends BroadcastManager {
private static final Logger logger = LoggerFactory.getLogger(RedisBroadcastManager.class);
private final byte[] channel;
private final String channelStr;
private final RedisCacheConfig<Object, Object> config;
private volatile CacheMessagePubSub cacheMessagePubSub;
private volatile boolean closed;
private volatile boolean subscribe;
private boolean subscribeThreadStart;
public RedisBroadcastManager(CacheManager cacheManager, RedisCacheConfig<Object, Object> config) {
super(cacheManager);
this.channelStr = config.getBroadcastChannel();
this.channel = channelStr.getBytes(StandardCharsets.UTF_8);
this.config = config;
checkConfig(config);
if (config.getJedis() == null && config.getJedisPool() == null) {
throw new CacheConfigException("no jedis");
}
if (config.getJedis() != null && config.getJedisPool() != null) {
throw new CacheConfigException("'jedis' and 'jedisPool' can't set simultaneously");
}
}
@Override
public synchronized void startSubscribe() {
if (subscribeThreadStart) {
throw new IllegalStateException("subscribe thread is started");
}
this.cacheMessagePubSub = new CacheMessagePubSub();
Thread subThread;
subThread = new Thread(this::runSubThread, "Sub_" + channelStr);
subThread.setDaemon(true);
subThread.start();
this.subscribeThreadStart = true;
}
@Override
public CacheResult publish(CacheMessage message) {
Object jedisObj = null;
try {
jedisObj = writeCommands();
byte[] value = config.getValueEncoder().apply(message);
if (jedisObj instanceof Jedis) {
((Jedis) jedisObj).publish(channel, value);
} else {
((UnifiedJedis) jedisObj).publish(channel, value);
}
return CacheResult.SUCCESS_WITHOUT_MSG;
} catch (Exception ex) {
SquashedLogger.getLogger(logger).error("jetcache publish error", ex);
return new CacheResult(ex);
} finally {
RedisCache.closeJedis(jedisObj);
}
}
@Override
public synchronized void close() {
if (this.closed) {
return;
}
this.closed = true;
if (subscribe) {
try {
this.cacheMessagePubSub.unsubscribe(channel);
} catch (Exception e) {
logger.warn("unsubscribe {} fail", channelStr, e);
}
}
}
//......
}
RedisBroadcastManager继承了BroadcastManager,其startSubscribe创建Thread并执行runSubThread方法;其publish方法主要是将value发布到指定的channel;其close方法对于subscribe的true则执行cacheMessagePubSub.unsubscribe(channel)
runSubThread
private void runSubThread() {
while (!closed) {
runSubThread0();
}
}
private void runSubThread0() {
Object jedisObj = null;
try {
jedisObj = writeCommands();
if (jedisObj instanceof Jedis) {
subscribe = true;
((Jedis) jedisObj).subscribe(cacheMessagePubSub, channel);
} else if (jedisObj instanceof UnifiedJedis) {
subscribe = true;
((UnifiedJedis) jedisObj).subscribe(cacheMessagePubSub, channel);
}
} catch (Throwable e) {
SquashedLogger.getLogger(logger).error("run jedis subscribe thread error: {}", e);
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
// ignore
}
} finally {
subscribe = false;
RedisCache.closeJedis(jedisObj);
}
}
runSubThread通过while循环执行runSubThread0,它主要是执行subscribe(cacheMessagePubSub, channel)方法
LettuceBroadcastManager
jetcache-support/jetcache-redis-lettuce/src/main/java/com/alicp/jetcache/redis/lettuce/LettuceBroadcastManager.java
public class LettuceBroadcastManager extends BroadcastManager {
private static final Logger logger = LoggerFactory.getLogger(LettuceBroadcastManager.class);
private final RedisLettuceCacheConfig<Object, Object> config;
private final byte[] channel;
private volatile boolean subscribeThreadStart;
private volatile RedisPubSubAdapter<byte[], byte[]> pubSubAdapter;
private final LettuceConnectionManager lettuceConnectionManager;
private final BaseRedisAsyncCommands<byte[], byte[]> stringAsyncCommands;
public LettuceBroadcastManager(CacheManager cacheManager, RedisLettuceCacheConfig<Object, Object> config) {
super(cacheManager);
checkConfig(config);
if (config.getPubSubConnection() == null) {
throw new CacheConfigException("PubSubConnection not set");
}
this.config = config;
this.channel = config.getBroadcastChannel().getBytes(StandardCharsets.UTF_8);
this.lettuceConnectionManager = config.getConnectionManager();
this.lettuceConnectionManager.init(config.getRedisClient(), config.getConnection());
this.stringAsyncCommands = (BaseRedisAsyncCommands<byte[], byte[]>) lettuceConnectionManager
.asyncCommands(config.getRedisClient());
}
@Override
public CacheResult publish(CacheMessage cacheMessage) {
try {
byte[] value = config.getValueEncoder().apply(cacheMessage);
RedisFuture<Long> future = stringAsyncCommands.publish(channel, value);
return new CacheResult(future.handle((rt, ex) -> {
if (ex != null) {
JetCacheExecutor.defaultExecutor().execute(() ->
SquashedLogger.getLogger(logger).error("jetcache publish error", ex));
return new ResultData(ex);
} else {
return new ResultData(CacheResultCode.SUCCESS, null, null);
}
}));
} catch (Exception ex) {
SquashedLogger.getLogger(logger).error("jetcache publish error", ex);
return new CacheResult(ex);
}
}
@Override
public synchronized void startSubscribe() {
if (subscribeThreadStart) {
throw new IllegalStateException("startSubscribe has invoked");
}
this.pubSubAdapter = new RedisPubSubAdapter<byte[], byte[]>() {
@Override
public void message(byte[] channel, byte[] message) {
processNotification(message, config.getValueDecoder());
}
};
config.getPubSubConnection().addListener(this.pubSubAdapter);
RedisPubSubAsyncCommands<byte[], byte[]> asyncCommands = config.getPubSubConnection().async();
asyncCommands.subscribe(channel);
this.subscribeThreadStart = true;
}
@Override
public void close() {
config.getPubSubConnection().removeListener(this.pubSubAdapter);
config.getPubSubConnection().close();
}
}
LettuceBroadcastManager继承了BroadcastManager,其publish方法执行stringAsyncCommands.publish(channel, value);其startSubscribe方法执行config.getPubSubConnection().addListener(this.pubSubAdapter)及asyncCommands.subscribe(channel);其close方法主要是执行config.getPubSubConnection().removeListener(this.pubSubAdapter)及config.getPubSubConnection().close()
SpringDataBroadcastManager
jetcache-support/jetcache-redis-springdata/src/main/java/com/alicp/jetcache/redis/springdata/SpringDataBroadcastManager.java
public class SpringDataBroadcastManager extends BroadcastManager {
private static final Logger logger = LoggerFactory.getLogger(SpringDataBroadcastManager.class);
private final RedisSpringDataCacheConfig config;
private final MessageListener listener = this::onMessage;
private final byte[] channel;
private volatile RedisMessageListenerContainer listenerContainer;
public SpringDataBroadcastManager(CacheManager cacheManager, RedisSpringDataCacheConfig config) {
super(cacheManager);
this.config = config;
checkConfig(config);
if (config.getConnectionFactory() == null) {
throw new CacheConfigException("connectionFactory is required");
}
this.channel = config.getBroadcastChannel().getBytes(StandardCharsets.UTF_8);
}
@Override
public CacheResult publish(CacheMessage cacheMessage) {
RedisConnection con = null;
try {
con = config.getConnectionFactory().getConnection();
byte[] body = (byte[]) config.getValueEncoder().apply(cacheMessage);
con.publish(channel, body);
return CacheResult.SUCCESS_WITHOUT_MSG;
} catch (Exception ex) {
SquashedLogger.getLogger(logger).error("jetcache publish error", ex);
return new CacheResult(ex);
} finally {
if (con != null) {
try {
con.close();
} catch (Exception e) {
SquashedLogger.getLogger(logger).error("RedisConnection close fail", e);
}
}
}
}
@Override
public synchronized void startSubscribe() {
if (this.listenerContainer != null) {
throw new IllegalStateException("subscribe thread is started");
}
Topic topic = new ChannelTopic(config.getBroadcastChannel());
if (config.getListenerContainer() == null) {
RedisMessageListenerContainer c = new RedisMessageListenerContainer();
c.setConnectionFactory(config.getConnectionFactory());
c.afterPropertiesSet();
c.start();
this.listenerContainer = c;
logger.info("create RedisMessageListenerContainer instance");
} else {
this.listenerContainer = config.getListenerContainer();
}
this.listenerContainer.addMessageListener(listener, topic);
logger.info("subscribe jetcache invalidate notification. channel={}", config.getBroadcastChannel());
}
private void onMessage(Message message, byte[] pattern) {
processNotification(message.getBody(), config.getValueDecoder());
}
@Override
public synchronized void close() throws Exception {
if (this.listenerContainer != null) {
this.listenerContainer.removeMessageListener(listener);
if (this.config.getListenerContainer() == null) {
this.listenerContainer.destroy();
}
}
this.listenerContainer = null;
}
}
SpringDataBroadcastManager继承了BroadcastManager,其publish方法执行con.publish(channel, body);其startSubscribe执行listenerContainer.addMessageListener(listener, topic);其close方法对于listenerContainer不为null的执行listenerContainer.removeMessageListener(listener),对于config.getListenerContainer()为null的执行listenerContainer.destroy()
RedissonBroadcastManager
jetcache-support/jetcache-redisson/src/main/java/com/alicp/jetcache/redisson/RedissonBroadcastManager.java
public class RedissonBroadcastManager extends BroadcastManager {
private static final Logger logger = LoggerFactory.getLogger(RedissonBroadcastManager.class);
private final RedissonCacheConfig<?, ?> config;
private final String channel;
private final RedissonClient client;
private volatile int subscribeId;
public RedissonBroadcastManager(final CacheManager cacheManager, final RedissonCacheConfig<?, ?> config) {
super(cacheManager);
checkConfig(config);
this.config = config;
this.channel = config.getBroadcastChannel();
this.client = config.getRedissonClient();
}
@Override
public synchronized void startSubscribe() {
if (this.subscribeId == 0 && Objects.nonNull(this.channel) && !this.channel.isEmpty()) {
this.subscribeId = this.client.getTopic(this.channel)
.addListener(byte[].class, (channel, msg) -> processNotification(msg, this.config.getValueDecoder()));
}
}
@Override
public synchronized void close() {
final int id;
if ((id = this.subscribeId) > 0 && Objects.nonNull(this.channel)) {
this.subscribeId = 0;
try {
this.client.getTopic(this.channel).removeListener(id);
} catch (Throwable e) {
logger.warn("unsubscribe {} fail", this.channel, e);
}
}
}
@Override
public CacheResult publish(final CacheMessage cacheMessage) {
try {
if (Objects.nonNull(this.channel) && Objects.nonNull(cacheMessage)) {
final byte[] msg = this.config.getValueEncoder().apply(cacheMessage);
this.client.getTopic(this.channel).publish(msg);
return CacheResult.SUCCESS_WITHOUT_MSG;
}
return CacheResult.FAIL_WITHOUT_MSG;
} catch (Throwable e) {
SquashedLogger.getLogger(logger).error("jetcache publish error", e);
return new CacheResult(e);
}
}
}
RedissonBroadcastManager继承了BroadcastManager,其startSubscribe方法执行client.getTopic(this.channel)addListener;其publish方法执行client.getTopic(this.channel).publish(msg);其close执行client.getTopic(this.channel).removeListener(id)
小结
BroadcastManager是个抽象类,实现了AutoCloseable接口,其close方法默认为空实现,它定义了publish及startSubscribe两个抽象方法;它主要有MockRemoteCacheBuilder的匿名实现、RedisBroadcastManager、LettuceBroadcastManager、SpringDataBroadcastManager、RedissonBroadcastManager这几个实现
这几个实现看起来jedis的实现稍微费劲一点,其他的基本都是通过listener机制来执行processNotification