最近在做一个项目,需求很简单,就是要为平台网站web端,h5端,app端提供轮播图,公告,新闻等cms内容的系统。
这个系统功能很简单,基本就是文章的相关增删改查操作。
功能很单一,但实际需要考虑的比较多。以为这些CMS内容都是在前端展示给用户看,一般需要响应很快,如果每个用户访问你都去数据库里面取,那么数据库的压力会非常大。为了缓解数据库压力,需要用到缓存。说道缓存,市面上有很多有名的缓存,比如Redis,Memcache等常见缓存了。但是由于项目的限制,所以我决搭建一个本地缓存。这里不会多讲本地缓存相关东西,后面会专门写篇文章来介绍。
本地缓存效率高,存取速度快。但是有一点,我们的项目是分布式环境下,我们知道在分布式环境下,本地缓存是没有任何意义的。比如我们将cms系统部署在两台机器上A,B,把文章缓存在本地中,也就是A,B各自拥有一份,那么如果我们对文章有增删改查的动作,A,B怎么知道,然后去及时更新自己的本地缓存呢?
这就是我们需要解决的问题:在分布式环境下如何更新所有服务器上的本地缓存。
接下来我们用zookeeper方案来解决:
1.首先需要先引出zookeeper客户端
package com.wang.demo.zookeeper;
import java.util.Set;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
/**
*
* @author wxe
* @since 0.0.1
*/
public class ZkClientUtil extends ZkClient {
public ZkClientUtil(String zkServers, int connectionTimeout) {
super(zkServers, connectionTimeout);
}
/**
* 引出获取监听器的方法
*/
public Set<IZkDataListener> getDataListener(String path){
return super.getDataListener(path);
}
}
2.zookeeper客户端的一些简单操作,比如连接,创建结点等。
package com.wang.demo.zookeeper;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ZookeeperClient {
private volatile ZkClientUtil zkClient = null;
private String servers;//服务器
/**
* zkClient默认获取连接超时限制
*/
private static final int DEFAULT_CONNECTION_TIMEOUT = 20 * 1000;
protected void init() {
try {
zkClient = new ZkClientUtil(servers, DEFAULT_CONNECTION_TIMEOUT);
log.info("Create zookeeper's connection successfully!");
} catch (ZkTimeoutException e) {
log.error("Connect zookeeper error", e);
}
}
public ZkClientUtil getZkClient() {
if(zkClient == null){
init();
}
return zkClient;
}
/**
* 创建永久性结点
*
* @param path
*/
public void createPersistent(String path) {
try {
zkClient.createPersistent(path, true);
} catch (ZkNodeExistsException e) {
log.error("ZkNode exists", e);
}
}
/**
* 创建临时结点
* @param path
*/
public void createEphemeral(String path) {
try {
zkClient.createEphemeral(path);
} catch (ZkNodeExistsException e) {
}
}
/**
* 结点中写入数据
* @param keyPath
* @param value
* @return
* @throws Exception
*/
public boolean writeData(String keyPath, Object value) throws Exception {
boolean flag = false;
try {
if (!zkClient.exists(keyPath)) {
zkClient.createPersistent(keyPath, true);
}
zkClient.writeData(keyPath, value);
flag = true;
} catch (Exception e) {
throw e;
}
return flag;
}
public String getServers() {
return servers;
}
public void setServers(String servers) {
this.servers = servers;
}
}
现在我们需要监听这些结点下的内容,然后来通知本地缓存做一些操作,比如更新缓存。
如何去监听呢?
3.监听数据
package com.wang.demo.listener;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.springframework.stereotype.Service;
import com.alibaba.dubbo.common.json.JSONObject;
/**
* zookeeper提供了三种监听器:分别为子节点监听器IZKChildListener,节点数据变化监听器(不监听结点)IZKDataListener,服务状态监听器IZKStateListener
* @author wxe
* @since 0.0.1
*/
@Slf4j
@Service("zkDataListenerImpl")
public class ZkDataListenerImpl implements IZkDataListener {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
log.info(">>>> handleDataChange ---- path : {}, data : {}", dataPath, data);
String type = (String)data;
if (type.equals("edit")) {
//做修改操作
log.info("我这里只是修改了数据哦。哈哈....");
}
if (type.equals("add")) {
log.info("这里新添加了一条数据哦,哈哈哈.....");
}
if (type.equals("delete")) {
log.info("这里是删除了一条数据哦。哈哈哈....");
}
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
// TODO Auto-generated method stub
}
}
那么我们需要专门开启一个线程来监听整个结点:
package com.wang.config;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.wang.demo.listener.ZkDataListenerImpl;
import com.wang.demo.zookeeper.ZookeeperClient;
/**
* 订阅结点数据变化
* @author wxe
* @since 0.0.1
*/
@Slf4j
@Component
public class PubSubTask extends Thread {
@Autowired
private ZookeeperClient zookeeperClient;
@Resource(name = "zkDataListenerImpl")
private ZkDataListenerImpl zkDataListenerImpl;
@Override
public void run() {
try {
Thread.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("PubSubTask start...");
zookeeperClient.getZkClient().subscribeDataChanges(Constants.LOG_LISTENER_ZK_PATH, zkDataListenerImpl);
}
}
4.配置
<bean id="zookeeperClient" class="com.wang.demo.zookeeper.ZookeeperClient" init-method="init">
<property name="servers" value="123.59.72.126:2181,123.59.72.184:2181,123.59.72.135:2181" />
</bean>
<bean class="com.wang.config.PubSubTask" init-method="start"/>
5.结果演示
package com.wang.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import com.wang.demo.zookeeper.ZookeeperClient;
@Controller
@RequestMapping("/main")
@Slf4j
public class MainController {
@Autowired
private ZookeeperClient zookeeperClient;
@RequestMapping(method = RequestMethod.GET,value="/addLog")
@ResponseBody
public void addLog(){
log.info("开始向zookeeper写入数据了!");
if (!zookeeperClient.getZkClient().exists(Constants.LOG_LISTENER_ZK_PATH)) {
zookeeperClient.createPersistent(Constants.LOG_LISTENER_ZK_PATH);
}
try {
String type = "add";
zookeeperClient.writeData(Constants.LOG_LISTENER_ZK_PATH, type);
} catch (Exception ex) {
ex.printStackTrace();
}
log.info("add log!");
}
}
限制性连接zookeeper:
17:28:50.298 [localhost-startStop-1] INFO c.w.demo.zookeeper.ZookeeperClient - Create zookeeper's connection successfully!
再订阅:
17:29:39.608 [Thread-5] DEBUG org.I0Itec.zkclient.ZkClient - Subscribed data changes for /com/demo/updateLog
开始修改结点数据:
17:30:35.661 [http-nio-8888-exec-3] INFO com.wang.config.MainController - 开始向zookeeper写入数据了!
监听到结点数据变更,开始做处理了。这个地方,其实就是我们实现自己的逻辑,比如项目里需要根据结点数据的变化来更新本地缓存:
17:32:09.071 [ZkClient-EventThread-20-123.59.72.126:2181,123.59.72.184:2181,123.59.72.135:2181] INFO c.w.demo.listener.ZkDataListenerImpl - 这里新添加了一条数据哦,哈哈哈.....
总结
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终,将简单易用的接口和性能高效、功能稳定的系统提供给用户。
zookeeper提供了两种机制:
1)文件系统:临时结点,持久化结点,临时有序结点,持久化有序结点。这些结点可以存储数据。
2)通知机制:客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、被删除、子目录节点增加删除)时,zookeeper会通知客户端。
我们正是利用了zookeeper的通知机制,才能实现分布式系统下本地缓存的更新。
回顾一下如何实现的:
- 首先,我们需要连接上zookeeper,实现zookeeper基本基本操作(连接,创建结点,写入结点数据)。
- 其次,我们需要专门开启线程监听需要的结点。
- 最后,我们需要对监听后的结点数据变更通知后,实现我们想要的逻辑,比如本服务器监听监听,一旦结点数据有变更,本服务就会收到通知,本服务就可以对本地缓存做出相应的操作。