使用Zookeeper实现分布式缓存
问题背景
在项目开发过程中,需要存储对象型缓存,然而目前的分布式缓存方案redis之类的,缓存服务器存储的都是序列化过后的数据,在使用时需要重新反序列化,在高并发的情况下,反序列化也是一笔不小的开销。
我们需要寻找出一种使用时不需要反序列化的缓存方案,减少反序列化带来的性能消耗。
实现思路
如果不反序列化的话,对象只能存在本地,但是存在本地,各服务器的缓存就不能保持一致性。而我们知道,zookeeper是保证数据一致性的分布式解决方案,那么我们是否可以利用zookeeper来实现分布式缓存。
我们实现分布式缓存有以下目标:
1. 提升数据读取的性能,减轻数据库压力
2. 保证服务器各节点的数据一致性
Zookeeper有以下五个特性:
1. 构造高可用集群
zookeeper的选举模式保证了集群的相对稳定性,从而使得集群是高可用的。
2. 集群全局配置文件管理
即统一资源配置,在一个偌大的集群环境中,假设你需要对该集群的配置文件作修改,假设集群很庞大,手动去修改是一件不太现实的事,不但费时费力,还极有可能造成差错,zookeeper可以自动帮我们完成配置文件的分发,既高效又准确。
3. 发布与订阅
支持服务发布与状态监听。
4. 分布式锁
在集群环境下,同样会存在对资源的竞争,zookeeper提供了分布式锁实现了同步
5. 保证数据强一致性
在集群环境下,对集群中某个节点的数据的改变,会被zookeeper同步到其他机器上
根据以上思路,我们可以有以下的缓存方案。
1. 缓存序列化后保存到zk服务器。
2. Zk的缓存节点在收到缓存变化的时候,将变化后的值同步到各个客户端
3. 客户端收到zk的同步信息后,将数据反序列化存储到本地。
4. 客户端读取缓存时,直接读取本地反序列化后的副本。
根据以上方案,我们的序列化和反序列过程只有在数据发生变更的时候才会执行,大大提高了我们程序的执行效率。
未解决问题
1. 在读多写少的场景中,该方案的性能优秀,但是在写场景特别多的情况下,序列化性能的问题还是没有解决。
2. 缓存的有效性问题没有得到解决。
附源码
zkClient
package com.gree.cdsp.framework.zk;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class ZookeeperClient {
private volatile ZkClient zkClient = null;
@Value("${zk.server}")
private String servers;
/**
* zkClient默认获取连接超时限制
*/
private static final int DEFAULT_CONNECTION_TIMEOUT = 20 * 1000;
@PostConstruct
protected void init() {
try {
zkClient = new ZkClientUtil(servers, DEFAULT_CONNECTION_TIMEOUT);
log.info("Create zookeeper's connection successfully!");
} catch (ZkTimeoutException e) {
log.error("Connect zookeeper error", e);
}
}
public ZkClient getZkClient() {
if (zkClient == null) {
init();
}
return zkClient;
}
/**
* 创建永久性结点
*
* @param path
*/
public void createPersistent(String path) {
try {
zkClient.createPersistent(path, true);
} catch (ZkNodeExistsException e) {
log.error("ZkNode exists", e);
}
}
/**
* 创建临时结点
*
* @param path
*/
public void createEphemeral(String path) {
try {
zkClient.createEphemeral(path);
} catch (ZkNodeExistsException e) {
}
}
/**
* 结点中写入数据
*
* @param keyPath
* @param value
* @return
* @throws Exception
*/
public boolean writeData(String keyPath, Object value) throws Exception {
boolean flag = false;
try {
if (!zkClient.exists(keyPath)) {
zkClient.createPersistent(keyPath, true);
}
zkClient.writeData(keyPath, value);
flag = true;
} catch (Exception e) {
throw e;
}
return flag;
}
}
缓存key变更监听
package com.gree.cdsp.framework.cache.zk;
import com.gree.cdsp.framework.zk.ZookeeperClient;
import org.I0Itec.zkclient.IZkChildListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class ZkCacheKeyListner implements IZkChildListener {
private ZkCacheDataListner zkCacheDataListner;
private ZookeeperClient zookeeperClient;
@Autowired
public void setZkCacheDataListner(ZkCacheDataListner zkCacheDataListner) {
this.zkCacheDataListner = zkCacheDataListner;
}
@Autowired
public void setZookeeperClient(ZookeeperClient zookeeperClient) {
this.zookeeperClient = zookeeperClient;
}
@Override
public void handleChildChange(String path, List<String> list) {
for (String str : list) {
zookeeperClient.getZkClient().subscribeDataChanges(path+ "/" + str, zkCacheDataListner);
}
}
}
缓存值变更监听
package com.gree.cdsp.framework.cache.zk;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Slf4j
@Component
public class ZkCacheDataListner implements IZkDataListener {
private ZkCache zkCache;
@Autowired
public void setZkCache(ZkCache zkCache) {
this.zkCache = zkCache;
}
@Override
public void handleDataChange(String dataPath, Object data) {
String[] pathArray = dataPath.split("/");
String cacheType = pathArray[pathArray.length - 2];
switch (cacheType) {
case ZkCacheConstants.ZK_CACHE_TYPE_SINGLE:
zkCache.setSingleCache(pathArray[pathArray.length - 1], data);
break;
case ZkCacheConstants.ZK_CACHE_TYPE_MEMBERS:
zkCache.setMemberCache(pathArray[pathArray.length - 1], (Set) data);
break;
case ZkCacheConstants.ZK_CACHE_TYPE_LIST:
zkCache.setListCache(pathArray[pathArray.length - 1], (List) data);
break;
case ZkCacheConstants.ZK_CACHE_TYPE_MAP:
zkCache.setMapCache(pathArray[pathArray.length - 1], (Map) data);
break;
default:
break;
}
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
String key = dataPath.substring(dataPath.lastIndexOf('/') + 1);
zkCache.del(key);
}
}
缓存操作类
package com.gree.cdsp.framework.cache.zk;
import com.gree.cdsp.framework.zk.ZookeeperClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.*;
@Slf4j
@Component
public class ZkCache {
private ZookeeperClient zkClientUtil;
private String cacheNode;
private Map<String, Object> singleCache = new HashMap<>();
private Map<String, Set<Object>> memberCache = new HashMap<>();
private Map<String, Map<Object, Object>> mapCache = new HashMap<>();
private Map<String, List<String>> listCache = new HashMap<>();
@Autowired
public void setZkClientUtil(ZookeeperClient zkClientUtil) {
this.zkClientUtil = zkClientUtil;
}
@Value("${zk.cache.node}")
public void setCacheNode(String cacheNode) {
this.cacheNode = cacheNode;
}
public void del(String key) {
singleCache.remove(key);
}
public <T> Tget(String key) {
return (T) singleCache.get(key);
}
//region protected
<T> void setSingleCache(String key, T value) {
singleCache.put(key, value);
}
void setMemberCache(String key, Set value) {
memberCache.put(key, value);
}
void setListCache(String key, List value) {
listCache.put(key, value);
}
void setMapCache(String key, Map value) {
this.mapCache.put(key, value);
}
//endregion
//region public
//region set method
public <T> void set(String key, T value) {
try {
zkClientUtil.writeData(cacheNode + "/" + ZkCacheConstants.ZK_CACHE_TYPE_SINGLE + "/" + key, value);
} catch (Exception e) {
log.error("<<<<设置zookeeper缓存出错", e);
}
}
/**
* 高并发下可能不安全,慎用
*
* @param key
* @param value
*/
public void mset(String key, Object value) {
String path = cacheNode + "/" + ZkCacheConstants.ZK_CACHE_TYPE_MEMBERS + "/" + key;
Set<Object> set;
if (zkClientUtil.getZkClient().exists(path)) {
set = zkClientUtil.getZkClient().readData(path);
} else {
set = new HashSet<>();
}
set.add(value);
try {
zkClientUtil.writeData(path, set);
} catch (Exception e) {
log.error("<<<<设置zookeeper缓存出错", e);
}
}
public void msetAll(String key, Set value) {
String path = cacheNode + "/" + ZkCacheConstants.ZK_CACHE_TYPE_MEMBERS + "/" + key;
try {
zkClientUtil.writeData(path, value);
} catch (Exception e) {
log.error("<<<<设置zookeeper缓存出错", e);
}
}
/**
* 高并发下可能不安全,慎用
*
* @param key
* @param value
*/
public void lset(String key, Object value) {
String path = cacheNode + "/" + ZkCacheConstants.ZK_CACHE_TYPE_LIST + "/" + key;
List<Object> list;
if (zkClientUtil.getZkClient().exists(path)) {
list = zkClientUtil.getZkClient().readData(path);
} else {
list = new LinkedList<>();
}
list.add(value);
try {
zkClientUtil.writeData(path, list);
} catch (Exception e) {
log.error("<<<<设置zookeeper缓存出错", e);
}
}
public <T> void lsetAll(String key, List<T> value) {
String path = cacheNode + "/" + ZkCacheConstants.ZK_CACHE_TYPE_LIST + "/" + key;
try {
zkClientUtil.writeData(path, value);
} catch (Exception e) {
log.error("<<<<设置zookeeper缓存出错", e);
}
}
/**
* 高并发下可能不安全,慎用
*
* @param key
* @param field
* @param value
* @param
*/
public <T> void hsetAll(String key, String field, T value) {
String path = cacheNode + "/" + ZkCacheConstants.ZK_CACHE_TYPE_MAP + "/" + key;
Map<String, Object> map;
if (zkClientUtil.getZkClient().exists(path)) {
map = zkClientUtil.getZkClient().readData(path);
} else {
map = new HashMap<>();
}
map.put(field, value);
try {
zkClientUtil.writeData(path, map);
} catch (Exception e) {
log.error("<<<<设置zookeeper缓存出错", e);
}
}
public void hsetAll(String key, Map value) {
String path = cacheNode + "/" + ZkCacheConstants.ZK_CACHE_TYPE_MAP + "/" + key;
try {
zkClientUtil.writeData(path, value);
} catch (Exception e) {
log.error("<<<<设置zookeeper缓存出错", e);
}
}
//endregion
//region get method
public <T> Set<T> members(String key) {
return (Set<T>) memberCache.get(key);
}
public <T> List<T> lget(String key) {
return (List<T>) listCache.get(key);
}
public <T> Thget(String key, String field) {
if (mapCache.containsKey(key)) {
Map<Object, Object> map = mapCache.get(key);
return (T) map.get(field);
}
return null;
}
public Map hgetAll(String key) {
if (mapCache.containsKey(key)) {
return mapCache.get(key);
}
return null;
}
//endregion
//endregion
}
缓存初始化类
package com.gree.cdsp.framework.cache.zk;
import com.gree.cdsp.framework.zk.ZookeeperClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
@Component
public class CacheInit {
private ZookeeperClient zookeeperClient;
private ZkCacheKeyListner zkCacheKeyListner;
private String cacheNode;
private ZkCacheDataListner zkCacheDataListner;
@Autowired
public void setZookeeperClient(ZookeeperClient zookeeperClient) {
this.zookeeperClient = zookeeperClient;
}
@Autowired
public void setZkCacheDataListner(ZkCacheDataListner zkCacheDataListner) {
this.zkCacheDataListner = zkCacheDataListner;
}
@Value("${zk.cache.node}")
public void setCacheNode(String cacheNode) {
this.cacheNode = cacheNode;
}
@Autowired
public void setZkCacheKeyListner(ZkCacheKeyListner zkCacheKeyListner) {
this.zkCacheKeyListner = zkCacheKeyListner;
}
@PostConstruct
public void init() {
List<String> children = zookeeperClient.getZkClient().getChildren(cacheNode);
initNode(children, ZkCacheConstants.ZK_CACHE_TYPE_SINGLE);
initNode(children, ZkCacheConstants.ZK_CACHE_TYPE_MEMBERS);
initNode(children, ZkCacheConstants.ZK_CACHE_TYPE_MAP);
initNode(children, ZkCacheConstants.ZK_CACHE_TYPE_LIST);
}
private void initNode(List<String> children, String node) {
String path = cacheNode + "/" + node;
if (!children.contains(node)) {
zookeeperClient.createPersistent(path);
} else {
List<String> keys = zookeeperClient.getZkClient().getChildren(path);
for (String key : keys) {
zookeeperClient.getZkClient().subscribeDataChanges(path + "/" + key, zkCacheDataListner);
}
}
zookeeperClient.getZkClient().subscribeChildChanges(path, zkCacheKeyListner);
}
}