使用zookeeper在分布式环境下更新本地缓存

最近在做一个项目,需求很简单,就是要为平台网站web端,h5端,app端提供轮播图,公告,新闻等cms内容的系统。
这个系统功能很简单,基本就是文章的相关增删改查操作。
功能很单一,但实际需要考虑的比较多。以为这些CMS内容都是在前端展示给用户看,一般需要响应很快,如果每个用户访问你都去数据库里面取,那么数据库的压力会非常大。为了缓解数据库压力,需要用到缓存。说道缓存,市面上有很多有名的缓存,比如Redis,Memcache等常见缓存了。但是由于项目的限制,所以我决搭建一个本地缓存。这里不会多讲本地缓存相关东西,后面会专门写篇文章来介绍。
本地缓存效率高,存取速度快。但是有一点,我们的项目是分布式环境下,我们知道在分布式环境下,本地缓存是没有任何意义的。比如我们将cms系统部署在两台机器上A,B,把文章缓存在本地中,也就是A,B各自拥有一份,那么如果我们对文章有增删改查的动作,A,B怎么知道,然后去及时更新自己的本地缓存呢?
这就是我们需要解决的问题:在分布式环境下如何更新所有服务器上的本地缓存。
接下来我们用zookeeper方案来解决:

1.首先需要先引出zookeeper客户端

package com.wang.demo.zookeeper;

import java.util.Set;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
/**
 * 
 * @author wxe
 * @since 0.0.1
 */
public class ZkClientUtil extends ZkClient {

    public ZkClientUtil(String zkServers, int connectionTimeout) {
        super(zkServers, connectionTimeout);
    }
    /**
     * 引出获取监听器的方法
     */
    public Set<IZkDataListener>  getDataListener(String path){
        return super.getDataListener(path);
    }

}

2.zookeeper客户端的一些简单操作,比如连接,创建结点等。

package com.wang.demo.zookeeper;

import lombok.extern.slf4j.Slf4j;

import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ZookeeperClient {
    
    private volatile ZkClientUtil zkClient = null;

    private String servers;//服务器
    
    
    /**
     * zkClient默认获取连接超时限制
     */
    private static final int DEFAULT_CONNECTION_TIMEOUT = 20 * 1000;
    
    protected void init() {
        try {
            zkClient = new ZkClientUtil(servers, DEFAULT_CONNECTION_TIMEOUT);
            log.info("Create zookeeper's connection successfully!");
        } catch (ZkTimeoutException e) {
            log.error("Connect zookeeper error", e);
        }
    }
    
    public ZkClientUtil getZkClient() {
        if(zkClient == null){
            init();
        }
        return zkClient;
    }
    
    /**
     * 创建永久性结点
     *
     * @param path
     */
    public void createPersistent(String path) {
        try {
            zkClient.createPersistent(path, true);
        } catch (ZkNodeExistsException e) {
            log.error("ZkNode exists", e);
        }
    }
    /**
     * 创建临时结点
     * @param path
     */
    public void createEphemeral(String path) {
        try {
            zkClient.createEphemeral(path);
        } catch (ZkNodeExistsException e) {
        }
    }
    /**
     * 结点中写入数据
     * @param keyPath
     * @param value
     * @return
     * @throws Exception
     */
    public boolean writeData(String keyPath, Object value) throws Exception {
        boolean flag = false;
        try {
            if (!zkClient.exists(keyPath)) {
                zkClient.createPersistent(keyPath, true);
            }
            zkClient.writeData(keyPath, value);
            flag = true;
        } catch (Exception e) {
            throw e;
        }
        return flag;
    }
    
    public String getServers() {
        return servers;
    }

    public void setServers(String servers) {
        this.servers = servers;
    }
    
}

现在我们需要监听这些结点下的内容,然后来通知本地缓存做一些操作,比如更新缓存。
如何去监听呢?

3.监听数据

package com.wang.demo.listener;

import lombok.extern.slf4j.Slf4j;

import org.I0Itec.zkclient.IZkDataListener;
import org.springframework.stereotype.Service;

import com.alibaba.dubbo.common.json.JSONObject;
/**
 * zookeeper提供了三种监听器:分别为子节点监听器IZKChildListener,节点数据变化监听器(不监听结点)IZKDataListener,服务状态监听器IZKStateListener
 * @author wxe
 * @since 0.0.1
 */
@Slf4j
@Service("zkDataListenerImpl")
public class ZkDataListenerImpl implements IZkDataListener {

    @Override
    public void handleDataChange(String dataPath, Object data) throws Exception {
        log.info(">>>> handleDataChange ---- path : {}, data : {}", dataPath, data);
        String type = (String)data;
        if (type.equals("edit")) {
            //做修改操作
            log.info("我这里只是修改了数据哦。哈哈....");
        }
        if (type.equals("add")) {
            log.info("这里新添加了一条数据哦,哈哈哈.....");
        }
        if (type.equals("delete")) {
            log.info("这里是删除了一条数据哦。哈哈哈....");
        }
    }

    @Override
    public void handleDataDeleted(String dataPath) throws Exception {
        // TODO Auto-generated method stub
        
    }
}

那么我们需要专门开启一个线程来监听整个结点:

package com.wang.config;

import javax.annotation.Resource;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.wang.demo.listener.ZkDataListenerImpl;
import com.wang.demo.zookeeper.ZookeeperClient;

/**
 * 订阅结点数据变化
 * @author wxe
 * @since 0.0.1
 */
@Slf4j
@Component
public class PubSubTask extends Thread {
    
    @Autowired
    private ZookeeperClient zookeeperClient;
    
    @Resource(name = "zkDataListenerImpl")
    private ZkDataListenerImpl zkDataListenerImpl;
    
    @Override
    public void run() {
        try {
            Thread.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("PubSubTask start...");
        zookeeperClient.getZkClient().subscribeDataChanges(Constants.LOG_LISTENER_ZK_PATH, zkDataListenerImpl);
    }

}

4.配置

    <bean id="zookeeperClient" class="com.wang.demo.zookeeper.ZookeeperClient" init-method="init">
        <property name="servers" value="123.59.72.126:2181,123.59.72.184:2181,123.59.72.135:2181" />
    </bean>
    <bean class="com.wang.config.PubSubTask" init-method="start"/>

5.结果演示

package com.wang.config;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

import com.wang.demo.zookeeper.ZookeeperClient;

@Controller
@RequestMapping("/main")
@Slf4j
public class MainController {
    
    @Autowired
    private ZookeeperClient zookeeperClient;
    
    @RequestMapping(method = RequestMethod.GET,value="/addLog")
    @ResponseBody
    public void addLog(){
        log.info("开始向zookeeper写入数据了!");
        if (!zookeeperClient.getZkClient().exists(Constants.LOG_LISTENER_ZK_PATH)) {
            zookeeperClient.createPersistent(Constants.LOG_LISTENER_ZK_PATH);
        } 
        
        try {
            String type = "add";
            zookeeperClient.writeData(Constants.LOG_LISTENER_ZK_PATH, type);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        log.info("add log!");
    }
    

}

限制性连接zookeeper:

17:28:50.298 [localhost-startStop-1] INFO  c.w.demo.zookeeper.ZookeeperClient - Create zookeeper's connection successfully!

再订阅:

17:29:39.608 [Thread-5] DEBUG org.I0Itec.zkclient.ZkClient - Subscribed data changes for /com/demo/updateLog

开始修改结点数据:

17:30:35.661 [http-nio-8888-exec-3] INFO  com.wang.config.MainController - 开始向zookeeper写入数据了!

监听到结点数据变更,开始做处理了。这个地方,其实就是我们实现自己的逻辑,比如项目里需要根据结点数据的变化来更新本地缓存:

17:32:09.071 [ZkClient-EventThread-20-123.59.72.126:2181,123.59.72.184:2181,123.59.72.135:2181] INFO  c.w.demo.listener.ZkDataListenerImpl - 这里新添加了一条数据哦,哈哈哈.....

总结

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终,将简单易用的接口和性能高效、功能稳定的系统提供给用户。
zookeeper提供了两种机制:
1)文件系统:临时结点,持久化结点,临时有序结点,持久化有序结点。这些结点可以存储数据。
2)通知机制:客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、被删除、子目录节点增加删除)时,zookeeper会通知客户端。
我们正是利用了zookeeper的通知机制,才能实现分布式系统下本地缓存的更新。
回顾一下如何实现的:

  • 首先,我们需要连接上zookeeper,实现zookeeper基本基本操作(连接,创建结点,写入结点数据)。
  • 其次,我们需要专门开启线程监听需要的结点。
  • 最后,我们需要对监听后的结点数据变更通知后,实现我们想要的逻辑,比如本服务器监听监听,一旦结点数据有变更,本服务就会收到通知,本服务就可以对本地缓存做出相应的操作。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容