API说明
InterProcessMutex有两个构造方法
public InterProcessMutex(CuratorFramework client, String path) {
this(client, path, new StandardLockInternalsDriver());
}
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
this(client, path, LOCK_NAME, 1, driver);
}
参数说明
参数 | 说明 |
---|---|
client | curator中zk客户端对象 |
path | 抢锁路径,同一个锁path需一致 |
driver | 可自定义lock驱动实现分布式锁 |
主要方法
//获取锁,若失败则阻塞等待直到成功,支持重入
public void acquire() throws Exception
//超时获取锁,超时失败
public boolean acquire(long time, TimeUnit unit) throws Exception
//释放锁
public void release() throws Exception
注意:调用acquire()方法后需相应调用release()来释放锁
使用简介
下面的例子模拟了100个线程同时抢锁,抢锁成功的线程睡眠1秒钟后释放锁,通知其他等待的线程重新抢锁,比较简单,不多说
public class InterprocessLock {
static CountDownLatch countDownLatch = new CountDownLatch(10);
public static void main(String[] args) {
CuratorFramework zkClient = getZkClient();
String lockPath = "/lock";
InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath);
//模拟100个线程抢锁
for (int i = 0; i < 100; i++) {
new Thread(new TestThread(i, lock)).start();
}
}
static class TestThread implements Runnable {
private Integer threadFlag;
private InterProcessMutex lock;
public TestThread(Integer threadFlag, InterProcessMutex lock) {
this.threadFlag = threadFlag;
this.lock = lock;
}
@Override
public void run() {
try {
lock.acquire();
System.out.println("第"+threadFlag+"线程获取到了锁");
//等到1秒后释放锁
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private static CuratorFramework getZkClient() {
String zkServerAddress = "127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString(zkServerAddress)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
zkClient.start();
return zkClient;
}
}
源码分析
从获取锁acquire()方法入手
public void acquire() throws Exception {
if ( !internalLock(-1, null) ) {
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
看到调用了internalLock方法,进到internalLock方法中
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
//先判断当前线程是否持有了锁,如果是,则加锁次数count+1,返回成功
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
//调用LockInternals的attemptLock()方法进行加锁
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
//加锁成功,则将当前线程对应加锁数据加到map中
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
进到LockInternals的attemptLock()中,看下代码
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
//开始时间,后面用做超时判断
final long startMillis = System.currentTimeMillis();
//超时时间,转换为毫秒
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
//节点数据
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
//重试次数
int retryCount = 0;
//lockPath
String ourPath = null;
//是否持有锁
boolean hasTheLock = false;
//是否处理完成
boolean isDone = false;
//循环处理
while ( !isDone )
{
isDone = true;
try
{
//在path下创建一个EPHEMERAL_SEQUENTIAL(临时顺序型)类型节点
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//抢锁并判断是否拥有锁
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// 重试范围内时进行重试
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
if ( hasTheLock )
{
return ourPath;
}
return null;
}
创建临时有序节点createsTheLock方法如下,比较简单
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
判断是否拥有锁的方法internalLockLoop才是核心,下面注意了
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
//自旋
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
//获取path下对应临时有序节点,并按节点编号从小到大排序
List<String> children = getSortedChildren();
//获取当前线程创建的临时节点名称
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
//判断当前节点编号是否<maxLease,若是,则抢到了锁,maxLease这里为1,所以只有index为0时才抢到锁,标识只有1个线程能抢到锁
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
//前一个节点编号较小的节点的路径
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
//如果没抢到锁,监听前一个节点事件
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
判断是否超时
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
//超时 直接退出,并标记 删除节点doDelete标记=true
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
//调用Object.wait(),等待线程被notify唤醒
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
//如果标记了删除,删除节点数据
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
可以看到逻辑比较清晰,N个线程同时在path下创建临时顺序节点,编号最小的获取锁,没抢到锁的会调用wait()方法等待被唤醒
那么是在哪里调用了notify()方法来唤醒其他节点的呢?
答案是在监听器wacher里,该监听器会在前一个(节点编号较小)的节点被删除后触发
先分析下释放锁的方法release
看下源码
public void release() throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData == null )
{
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
//如果锁被当前线程获取了超过1次,将count-1,直接返回
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 )
{
return;
}
if ( newLockCount < 0 )
{
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try
{
//释放锁
internals.releaseLock(lockData.lockPath);
}
finally
{
threadData.remove(currentThread);
}
}
最终调用releaseLock方法中的deleteOurPath中
void releaseLock(String lockPath) throws Exception
{
revocable.set(null);
deleteOurPath(lockPath);
}
private void deleteOurPath(String ourPath) throws Exception
{
try
{
//直接调用client删除节点
client.delete().guaranteed().forPath(ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// ignore - already deleted (possibly expired session, etc.)
}
}
节点被删除后,会触发抢锁过程中的wather监听器,看下监听器中内容
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
notifyFromWatcher();
}
};
private synchronized void notifyFromWatcher() {
notifyAll();
}
可以看到节点path被删除后,会通知后面一个节点进行notify操作,notify操作后,重新进入while自旋中,重新判断是否抢到了锁
最后看下getTheLock
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases)
throws Exception {
// 之前创建的临时顺序节点在排序后的子节点列表中的索引
int ourIndex =
children.indexOf(sequenceNodeName);
// 校验之前创建的临时顺序节点是否有效
validateOurIndex(sequenceNodeName,
ourIndex);
// 锁公平性的核心逻辑
// 由 InterProcessMutex 的构造函数可知, maxLeases 为 1,即只有 ourIndex 为 0 时,线程才能持有锁,或者说该线程创建的临时顺序节点激活了锁
// Zookeeper 的临时顺序节点特性能保证跨多个 JVM 的线程并发创建节点时的顺序性,越早创建临时顺序节点成功的线程会更早地激活锁或获得锁
boolean getsTheLock = ourIndex <
maxLeases;
// 如果已经获得了锁,则无需监听任何节点,否则需要监听上一顺序节点(ourIndex - 1)
// 因 为 锁 是 公 平 的 , 因 此 无 需 监 听 除 了(ourIndex - 1)以外的所有节点,这是为了减少羊群效应, 非常巧妙的设计!!
String pathToWatch = getsTheLock ? null :
children.get(ourIndex - maxLeases);
// 返回获取锁的结果,交由上层继续处理(添加监听等操作)
return new PredicateResults(pathToWatch,
getsTheLock);
}
static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException {
if (ourIndex < 0) {
// 容错处理,可跳过
// 由于会话过期或连接丢失等原因,该线程创建的临时顺序节点被 Zookeeper 服务端删除,往外抛出 NoNodeException
// 如果在重试策略允许范围内,则进行重新尝试获取锁,这会重新重新生成临时顺序节点
// 佩服 Curator 的作者将边界条件考虑得 如此周到!
throw new KeeperException.NoNodeException("Sequential path not found:" + sequenceNodeName);
}
}