zookeeper分布式缓存

使用Zookeeper实现分布式缓存

问题背景

在项目开发过程中,需要存储对象型缓存,然而目前的分布式缓存方案redis之类的,缓存服务器存储的都是序列化过后的数据,在使用时需要重新反序列化,在高并发的情况下,反序列化也是一笔不小的开销。

我们需要寻找出一种使用时不需要反序列化的缓存方案,减少反序列化带来的性能消耗。

实现思路

如果不反序列化的话,对象只能存在本地,但是存在本地,各服务器的缓存就不能保持一致性。而我们知道,zookeeper是保证数据一致性的分布式解决方案,那么我们是否可以利用zookeeper来实现分布式缓存。

我们实现分布式缓存有以下目标:

1. 提升数据读取的性能,减轻数据库压力

2. 保证服务器各节点的数据一致性

Zookeeper有以下五个特性:

1. 构造高可用集群

zookeeper的选举模式保证了集群的相对稳定性,从而使得集群是高可用的。

2. 集群全局配置文件管理

即统一资源配置,在一个偌大的集群环境中,假设你需要对该集群的配置文件作修改,假设集群很庞大,手动去修改是一件不太现实的事,不但费时费力,还极有可能造成差错,zookeeper可以自动帮我们完成配置文件的分发,既高效又准确。

3. 发布与订阅

支持服务发布与状态监听。

4. 分布式锁

在集群环境下,同样会存在对资源的竞争,zookeeper提供了分布式锁实现了同步

5. 保证数据强一致性

在集群环境下,对集群中某个节点的数据的改变,会被zookeeper同步到其他机器上

根据以上思路,我们可以有以下的缓存方案。

1. 缓存序列化后保存到zk服务器。

2. Zk的缓存节点在收到缓存变化的时候,将变化后的值同步到各个客户端

3. 客户端收到zk的同步信息后,将数据反序列化存储到本地。

4. 客户端读取缓存时,直接读取本地反序列化后的副本。

根据以上方案,我们的序列化和反序列过程只有在数据发生变更的时候才会执行,大大提高了我们程序的执行效率。

未解决问题

1. 在读多写少的场景中,该方案的性能优秀,但是在写场景特别多的情况下,序列化性能的问题还是没有解决。

2. 缓存的有效性问题没有得到解决。

附源码

zkClient

package com.gree.cdsp.framework.zk;

import lombok.extern.slf4j.Slf4j;

import org.I0Itec.zkclient.ZkClient;

import org.I0Itec.zkclient.exception.ZkNodeExistsException;

import org.I0Itec.zkclient.exception.ZkTimeoutException;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Slf4j

@Component

public class ZookeeperClient {

    private volatile ZkClient zkClient = null;

    @Value("${zk.server}")

    private String servers;

    /**

    * zkClient默认获取连接超时限制

    */

    private static final int DEFAULT_CONNECTION_TIMEOUT = 20 * 1000;

    @PostConstruct

    protected void init() {

        try {

            zkClient = new ZkClientUtil(servers, DEFAULT_CONNECTION_TIMEOUT);

            log.info("Create zookeeper's connection successfully!");

        } catch (ZkTimeoutException e) {

            log.error("Connect zookeeper error", e);

        }

    }

    public ZkClient getZkClient() {

        if (zkClient == null) {

            init();

        }

        return zkClient;

    }

    /**

    * 创建永久性结点

    *

    * @param path

    */

    public void createPersistent(String path) {

        try {

            zkClient.createPersistent(path, true);

        } catch (ZkNodeExistsException e) {

            log.error("ZkNode exists", e);

        }

    }

    /**

    * 创建临时结点

    *

    * @param path

    */

    public void createEphemeral(String path) {

        try {

            zkClient.createEphemeral(path);

        } catch (ZkNodeExistsException e) {

        }

    }

    /**

    * 结点中写入数据

    *

    * @param keyPath

    * @param value

    * @return

    * @throws Exception

    */

    public boolean writeData(String keyPath, Object value) throws Exception {

        boolean flag = false;

        try {

            if (!zkClient.exists(keyPath)) {

                zkClient.createPersistent(keyPath, true);

            }

            zkClient.writeData(keyPath, value);

            flag = true;

        } catch (Exception e) {

            throw e;

        }

        return flag;

    }

}

缓存key变更监听

package com.gree.cdsp.framework.cache.zk;

import com.gree.cdsp.framework.zk.ZookeeperClient;

import org.I0Itec.zkclient.IZkChildListener;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import java.util.List;

@Component

public class ZkCacheKeyListner implements IZkChildListener {

    private ZkCacheDataListner zkCacheDataListner;

    private ZookeeperClient zookeeperClient;

    @Autowired

    public void setZkCacheDataListner(ZkCacheDataListner zkCacheDataListner) {

        this.zkCacheDataListner = zkCacheDataListner;

}

    @Autowired

    public void setZookeeperClient(ZookeeperClient zookeeperClient) {

        this.zookeeperClient = zookeeperClient;

}

    @Override

    public void handleChildChange(String path, List<String> list) {

        for (String str : list) {

            zookeeperClient.getZkClient().subscribeDataChanges(path+ "/" + str, zkCacheDataListner);

}

}

}


缓存值变更监听

package com.gree.cdsp.framework.cache.zk;

import lombok.extern.slf4j.Slf4j;

import org.I0Itec.zkclient.IZkDataListener;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import java.util.List;

import java.util.Map;

import java.util.Set;

@Slf4j

@Component

public class ZkCacheDataListner implements IZkDataListener {

    private ZkCache zkCache;

    @Autowired

    public void setZkCache(ZkCache zkCache) {

        this.zkCache = zkCache;

}

    @Override

    public void handleDataChange(String dataPath, Object data) {

        String[] pathArray = dataPath.split("/");

        String cacheType = pathArray[pathArray.length - 2];

        switch (cacheType) {

            case ZkCacheConstants.ZK_CACHE_TYPE_SINGLE:

                zkCache.setSingleCache(pathArray[pathArray.length - 1], data);

                break;

            case ZkCacheConstants.ZK_CACHE_TYPE_MEMBERS:

                zkCache.setMemberCache(pathArray[pathArray.length - 1], (Set) data);

                break;

            case ZkCacheConstants.ZK_CACHE_TYPE_LIST:

                zkCache.setListCache(pathArray[pathArray.length - 1], (List) data);

                break;

            case ZkCacheConstants.ZK_CACHE_TYPE_MAP:

                zkCache.setMapCache(pathArray[pathArray.length - 1], (Map) data);

                break;

            default:

                break;

}

}

    @Override

    public void handleDataDeleted(String dataPath) throws Exception {

        String key = dataPath.substring(dataPath.lastIndexOf('/') + 1);

        zkCache.del(key);

}

}


缓存操作类

package com.gree.cdsp.framework.cache.zk;

import com.gree.cdsp.framework.zk.ZookeeperClient;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Component;

import java.util.*;

@Slf4j

@Component

public class ZkCache {

    private ZookeeperClient zkClientUtil;

    private String cacheNode;

    private Map<String, Object> singleCache = new HashMap<>();

    private Map<String, Set<Object>> memberCache = new HashMap<>();

    private Map<String, Map<Object, Object>> mapCache = new HashMap<>();

    private Map<String, List<String>> listCache = new HashMap<>();

    @Autowired

    public void setZkClientUtil(ZookeeperClient zkClientUtil) {

        this.zkClientUtil = zkClientUtil;

}

    @Value("${zk.cache.node}")

    public void setCacheNode(String cacheNode) {

        this.cacheNode = cacheNode;

}

    public void del(String key) {

        singleCache.remove(key);

}

    public <T> Tget(String key) {

        return (T) singleCache.get(key);

}

    //region protected

    <T> void setSingleCache(String key, T value) {

        singleCache.put(key, value);

}

    void setMemberCache(String key, Set value) {

        memberCache.put(key, value);

}

    void setListCache(String key, List value) {

        listCache.put(key, value);

}

    void setMapCache(String key, Map value) {

        this.mapCache.put(key, value);

}

    //endregion

//region public

//region set method

    public <T> void set(String key, T value)  {

        try {

            zkClientUtil.writeData(cacheNode + "/" + ZkCacheConstants.ZK_CACHE_TYPE_SINGLE + "/" + key, value);

        } catch (Exception e) {

            log.error("<<<<设置zookeeper缓存出错", e);

}

}

    /**

* 高并发下可能不安全,慎用

*

    * @param key

    * @param value

    */

    public void mset(String key, Object value) {

        String path = cacheNode + "/" + ZkCacheConstants.ZK_CACHE_TYPE_MEMBERS + "/" + key;

        Set<Object> set;

        if (zkClientUtil.getZkClient().exists(path)) {

            set = zkClientUtil.getZkClient().readData(path);

        } else {

            set = new HashSet<>();

}

        set.add(value);

        try {

            zkClientUtil.writeData(path, set);

        } catch (Exception e) {

            log.error("<<<<设置zookeeper缓存出错", e);

}

}

    public void msetAll(String key, Set value) {

        String path = cacheNode + "/" + ZkCacheConstants.ZK_CACHE_TYPE_MEMBERS + "/" + key;

        try {

            zkClientUtil.writeData(path, value);

        } catch (Exception e) {

            log.error("<<<<设置zookeeper缓存出错", e);

}

}

    /**

* 高并发下可能不安全,慎用

*

    * @param key

    * @param value

    */

    public void lset(String key, Object value) {

        String path = cacheNode + "/" + ZkCacheConstants.ZK_CACHE_TYPE_LIST + "/" + key;

        List<Object> list;

        if (zkClientUtil.getZkClient().exists(path)) {

            list = zkClientUtil.getZkClient().readData(path);

        } else {

            list = new LinkedList<>();

}

        list.add(value);

        try {

            zkClientUtil.writeData(path, list);

        } catch (Exception e) {

            log.error("<<<<设置zookeeper缓存出错", e);

}

}

    public <T> void lsetAll(String key, List<T> value) {

        String path = cacheNode + "/" + ZkCacheConstants.ZK_CACHE_TYPE_LIST + "/" + key;

        try {

            zkClientUtil.writeData(path, value);

        } catch (Exception e) {

            log.error("<<<<设置zookeeper缓存出错", e);

}

}

    /**

* 高并发下可能不安全,慎用

*

    * @param key

    * @param field

    * @param value

    * @param

    */

    public <T> void hsetAll(String key, String field, T value) {

        String path = cacheNode + "/" + ZkCacheConstants.ZK_CACHE_TYPE_MAP + "/" + key;

        Map<String, Object> map;

        if (zkClientUtil.getZkClient().exists(path)) {

            map = zkClientUtil.getZkClient().readData(path);

        } else {

            map = new HashMap<>();

}

        map.put(field, value);

        try {

            zkClientUtil.writeData(path, map);

        } catch (Exception e) {

            log.error("<<<<设置zookeeper缓存出错", e);

}

}

    public void hsetAll(String key, Map value) {

        String path = cacheNode + "/" + ZkCacheConstants.ZK_CACHE_TYPE_MAP + "/" + key;

        try {

            zkClientUtil.writeData(path, value);

        } catch (Exception e) {

            log.error("<<<<设置zookeeper缓存出错", e);

}

}

    //endregion

//region get method

    public <T> Set<T> members(String key) {

        return (Set<T>) memberCache.get(key);

}

    public <T> List<T> lget(String key) {

        return (List<T>) listCache.get(key);

}

    public <T> Thget(String key, String field) {

        if (mapCache.containsKey(key)) {

            Map<Object, Object> map = mapCache.get(key);

            return (T) map.get(field);

}

        return null;

}

    public Map hgetAll(String key) {

        if (mapCache.containsKey(key)) {

            return mapCache.get(key);

}

        return null;

}

    //endregion

//endregion

}

缓存初始化类

package com.gree.cdsp.framework.cache.zk;

import com.gree.cdsp.framework.zk.ZookeeperClient;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

import java.util.List;

@Component

public class CacheInit {

    private ZookeeperClient zookeeperClient;

    private ZkCacheKeyListner zkCacheKeyListner;

    private String cacheNode;

    private ZkCacheDataListner zkCacheDataListner;

    @Autowired

    public void setZookeeperClient(ZookeeperClient zookeeperClient) {

        this.zookeeperClient = zookeeperClient;

}

    @Autowired

    public void setZkCacheDataListner(ZkCacheDataListner zkCacheDataListner) {

        this.zkCacheDataListner = zkCacheDataListner;

}

    @Value("${zk.cache.node}")

    public void setCacheNode(String cacheNode) {

        this.cacheNode = cacheNode;

}

    @Autowired

    public void setZkCacheKeyListner(ZkCacheKeyListner zkCacheKeyListner) {

        this.zkCacheKeyListner = zkCacheKeyListner;

}

    @PostConstruct

    public void init() {

        List<String> children = zookeeperClient.getZkClient().getChildren(cacheNode);

        initNode(children, ZkCacheConstants.ZK_CACHE_TYPE_SINGLE);

        initNode(children, ZkCacheConstants.ZK_CACHE_TYPE_MEMBERS);

        initNode(children, ZkCacheConstants.ZK_CACHE_TYPE_MAP);

        initNode(children, ZkCacheConstants.ZK_CACHE_TYPE_LIST);

}

    private void initNode(List<String> children, String node) {

        String path = cacheNode + "/" + node;

        if (!children.contains(node)) {

            zookeeperClient.createPersistent(path);

        } else {

            List<String> keys = zookeeperClient.getZkClient().getChildren(path);

            for (String key : keys) {

                zookeeperClient.getZkClient().subscribeDataChanges(path + "/" + key, zkCacheDataListner);

}

}

        zookeeperClient.getZkClient().subscribeChildChanges(path, zkCacheKeyListner);

}

}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容