在了解了java JDK锁的相关知识后准备了解一些关于分布式锁的知识。关于jdk里面的锁实现其实纠其根本大多数是通过竞争state来进行执行权限的,其实分布式锁类似,只是分布式锁将竞争的资源放在了一个多个应用(多个jvm)能够共同访问的位置上。比如redis实现分布式锁的框架 redission,又比如zookeeper大家无外乎都是讲一个字符串,亦或者一个节点放到一个多个服务都可以访问的位置(redis-server)或者zookeeper的server。
本文先从zookeeper里面是如何实现分布式锁的开始。在开始聊这个话题之前我们需要先想清楚一个问题这些能够被大家共享的server是不是在保存这些代表着锁含义的字符串或者节点的时候要保障线程安全,答案是肯定的,我们知道redis是没问题,但是zookeper是如何保障这个问题的呢。
接下来我们先将这个问题摆平,也就是先将zookeeper,server内部的一些线程安全机制搞清楚,这是实现分布式锁的关键,不然并发如果不能控制那么就会造成非常多的隐患。我们知道zookeeper是通过datatree来存数据节点的。虽然不是zookeeper源码专场但是我们为了表示对源码的尊重还是要下载一下的看下底层的一个实现机制
下载好源码我们先从zookeeper的bin目录里面找到zkServer.sh这是zookeeper启动的脚本程序,通过阅读这段脚本我们看到他的入口
if [ "x$JMXDISABLE" = "x" ] || [ "$JMXDISABLE" = 'false' ]
then
echo "ZooKeeper JMX enabled by default" >&2
if [ "x$JMXPORT" = "x" ]
then
# for some reason these two options are necessary on jdk6 on Ubuntu
# accord to the docs they are not necessary, but otw jconsole cannot
# do a local attach
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
从源码中找到这个类QuorumPeerMain,从源码中找到这个类为了避免篇幅过长这里不贴代码了
程序入口为main方法--》main.initializeAndRun(args)--》 runFromConfig(config);下面的代码是runFromConfig的代码片段找到我们想要找的内容
quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier() != null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
我们看到非常多代码,在这部分代码的第二行可以看到ZKDatabase被创建,打开ZKDatabase类可以看到里面有一个成员属性DataTree,这个DataTree就是我们要找的zookeeper中存储数据的对象。代码截取一部分如下
/**
* This map provides a fast lookup to the datanodes. The tree is the
* source of truth and is where all the locking occurs
*/
存储node的集合
private final NodeHashMap nodes;
数据监听器集合
private IWatchManager dataWatches;
节点监听器集合
private IWatchManager childWatches;
/** cached total size of paths and data for all DataNodes */
当前集合中拥有的node计数器,利用的是线程安全的Long类型原子计数器
private final AtomicLong nodeDataSize = new AtomicLong(0);
接下来带着我们的问题去看一下create方法
/**
* Add a new node to the DataTree.
* @param path
* Path for the new node.
* @param data
* Data to store in the node.
* @param acl
* Node acls
* @param ephemeralOwner
* the session id that owns this node. -1 indicates this is not
* an ephemeral node.
* @param zxid
* Transaction ID
* @param time
* @param outputStat
* A Stat object to store Stat output results into.
* @throws NodeExistsException
* @throws NoNodeException
*/
public void createNode(final String path, byte[] data, List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat) throws KeeperException.NoNodeException, KeeperException.NodeExistsException {
int lastSlash = path.lastIndexOf('/');
String parentName = path.substring(0, lastSlash);
String childName = path.substring(lastSlash + 1);
StatPersisted stat = createStat(zxid, time, ephemeralOwner);
DataNode parent = nodes.get(parentName);
if (parent == null) {
throw new KeeperException.NoNodeException();
}
synchronized (parent) {
// Add the ACL to ACL cache first, to avoid the ACL not being
// created race condition during fuzzy snapshot sync.
//
// This is the simplest fix, which may add ACL reference count
// again if it's already counted in in the ACL map of fuzzy
// snapshot, which might also happen for deleteNode txn, but
// at least it won't cause the ACL not exist issue.
//
// Later we can audit and delete all non-referenced ACLs from
// ACL map when loading the snapshot/txns from disk, like what
// we did for the global sessions.
Long longval = aclCache.convertAcls(acl);
Set<String> children = parent.getChildren();
if (children.contains(childName)) {
throw new KeeperException.NodeExistsException();
}
nodes.preChange(parentName, parent);
if (parentCVersion == -1) {
parentCVersion = parent.stat.getCversion();
parentCVersion++;
}
// There is possibility that we'll replay txns for a node which
// was created and then deleted in the fuzzy range, and it's not
// exist in the snapshot, so replay the creation might revert the
// cversion and pzxid, need to check and only update when it's
// larger.
if (parentCVersion > parent.stat.getCversion()) {
parent.stat.setCversion(parentCVersion);
parent.stat.setPzxid(zxid);
}
DataNode child = new DataNode(data, longval, stat);
parent.addChild(childName);
nodes.postChange(parentName, parent);
nodeDataSize.addAndGet(getNodeSize(path, child.data));
nodes.put(path, child);
EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
if (ephemeralType == EphemeralType.CONTAINER) {
containers.add(path);
} else if (ephemeralType == EphemeralType.TTL) {
ttls.add(path);
} else if (ephemeralOwner != 0) {
HashSet<String> list = ephemerals.get(ephemeralOwner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(ephemeralOwner, list);
}
synchronized (list) {
list.add(path);
}
}
if (outputStat != null) {
child.copyStat(outputStat);
}
}
// now check if its one of the zookeeper node child
if (parentName.startsWith(quotaZookeeper)) {
// now check if its the limit node
if (Quotas.limitNode.equals(childName)) {
// this is the limit node
// get the parent and add it to the trie
pTrie.addPath(parentName.substring(quotaZookeeper.length()));
}
if (Quotas.statNode.equals(childName)) {
updateQuotaForPath(parentName.substring(quotaZookeeper.length()));
}
}
// also check to update the quotas for this node
String lastPrefix = getMaxPrefixWithQuota(path);
long bytes = data == null ? 0 : data.length;
if (lastPrefix != null) {
// ok we have some match and need to update
updateCountBytes(lastPrefix, bytes, 1);
}
updateWriteStat(path, bytes);
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged);
}
这段代码通过传入的path截取后获得对应path的父节点,从成员变量nodes中取出对应的path结点对象,通过synchronized 关键字进行并发控制,这点其实也采用了分而治之的思想,将锁的力度尽可能的缩小,提升并发性能。因为synchronized 在JVM 中的实现是通过锁定对象的head完成的临界控制。好,我们想要的答案截止到目前就出来了,也就是说在zookeper的server端帮我们进行了并发的控制,确保了分布式锁在加锁时候的线程安全性
另外这里mark一下在这段代码的最后几行其实是在触发监听事件。这里运用了设计模式中的观察者模式,能够将create事情传播出去,调用相应的注册在服务端的监听器,将通知压入待发送队列。当然这里面涉及到与client的交互,那属于socket层面的话题了,不做展开。其他的操作与监听完全类似。
好下面开始分析curator这个zookeeper客户端对于分布式锁的源码部分
先从使用入手,感受一下这货带来的简单api(如丝般顺滑)
public class Recipes_Lock {
static String lock_path = "/curator_recipes_lock_path";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("domain1.book.zookeeper:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
client.start();
final InterProcessMutex lock = new InterProcessMutex(client,lock_path);
final CountDownLatch down = new CountDownLatch(1);
for(int i = 0; i < 30; i++){
new Thread(new Runnable() {
public void run() {
try {
down.await();
lock.acquire();
} catch ( Exception e ) {}
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
String orderNo = sdf.format(new Date());
System.out.println("生成的订单号是 : "+orderNo);
try {
lock.release();
} catch ( Exception e ) {}
}
}).start();
}
down.countDown();
}
}
熟悉的acquire() 熟悉的lock(),离不开李大爷的恩泽。
先点开使用到的类InterProcessMutex我将核心的属性列到下面进行标记
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
{
可以理解为锁对象的引用
private final LockInternals internals;
锁定的路径
private final String basePath;
线程与锁定数据的并发集合,用于检查当前jvm中当前线程是否已经在zookeeper中加锁,如果有了则进行计数器自增,避免给server造成过大压力。
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
threadData中要存的value对象的类型。主要用于保存当前线程引用,与重入次数
private static class LockData
{
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1);
private LockData(Thread owningThread, String lockPath)
{
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
}
下面看下acquire方法
/**
* Acquire the mutex - blocking until it's available. Note: the same thread
* can call acquire re-entrantly. Each call to acquire must be balanced by a call
* to {@link #release()}
*
* @throws Exception ZK errors, connection interruptions
*/
@Override
public void acquire() throws Exception
{
if ( !internalLock(-1, null) )
{
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
取到当前线程
Thread currentThread = Thread.currentThread();
通过并发集合进行线程安全的获取当前线程有无对应的数据
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
如果有则证明当前jvm中这个线程重入了进行重入自增
lockData.lockCount.incrementAndGet();
return true;
}
当前没有缓存,那么就进行尝试加锁
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
如果能够返回锁定路径则说明加锁成功那么就创建一个计数器对象,交给并发集合进行保存。
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
我们着重再看上面代码中的 internals.attemptLock
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try
{ 如果有要写入的数据
if ( localLockNodeBytes != null )
{
通过api写入基于会话的递增的节点,并且将数据写入进去
ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes);
}
else
{
没有数据写入的情况则建立结点就好了。
ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// gets thrown by StandardLockInternalsDriver when it can't find the lock node
// this can happen when the session expires, etc. So, if the retry allows, just try it all again
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
if ( hasTheLock )
{
return ourPath;
}
return null;
}
上面代码是进行尝试加锁,但是是否能够加锁成功还要靠internalLockLoop方法
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);
if ( stat != null )
{
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
}
// else it may have been deleted (i.e. lock released). Try to acquire again
}
}
}
catch ( Exception e )
{
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
这段代码的实现逻辑是先将要加锁的路径将孩子结点取出来,然后将上一步取得的node的path进行比较没如果自己的序号比其他同级孩子结点小那么则加锁成功,否则注册监听事件到比当前path小的那个结点上面去。
综上所述,进行一个简单的总结后面再慢慢完善,zookeeper的server通过保存一个datatree将数据节点进行保存,然后当客户端进行节点创建时候为了达到并发与线程安全的折中,使用了一个ConcurrentHashMap进行并发控制,并且为了减小锁的粒度,是将客户端需要创建的子节点的父亲节点进行加锁,这样就降低了并发阻塞,不使用全局阻塞(一开始没想明白后来看了源码才懂)。这样就解决了服务端线程安全的问题,防止出现锁覆盖等问题。
客户端在进行加锁时候先是创建一个加锁的对象,锁对象里面有一个当前锁加锁成功的ConcurrentHashMap集合,这么做的目的一方面是能够准确的进行重入计数,不降数据存储进去zookeeper更好的是将压力丢给client而不是去轮训zookeeper-server,这点在redission中使用了分桶进行加锁字符串,缓存的思路是一致的。比较好的降低了轮训给服务端带来的额外开支。
另外客户端在实现加锁时候如果本地缓存中没有线程对象则开始调用接口向远程server写入基于session的节点,之所以支持会话是在当前jvm崩溃后能够释放掉锁。写入成功后则返回path,拿到path后,进行当前path下,兄弟节点的查询,因为server的自增性,所以如果当前节点的序号是最小的就认为当前线程拿到了锁,如果不是最小的,则注册一个监听事件给比他大的最近的那个node(删除事件)。程序进入等待。
以上就是对zookeeper-servery,与使用curator进行分布式加锁的一些源码的阅读心得与体会。