当今微服务下的分布式系统,传统的线程锁仅能保证单服务器内的线程安全,已无法满足需求。因此需要引入一种新的安全机制,以保证各服务器上运行的服务实例之间的协调运作。而这个分布式协调技术的核心就是分布式锁。
目前分布式锁实现方式有多种,常见的有Redis,ZooKeeper以及基于数据库实现等等
下面主要介绍两种ZooKeeper分布式锁的实现思路:
实现一:创建节点抢占式
思路:由于Zookeeper节点不可重复,那么同一时间只可能有一台服务实例成功创建节点(拿到锁),其他实例则等待该服务器执行完代码删除节点(释放锁),继续抢占。。。
步骤
- 创建节点
- 创建失败则阻塞当前线程,并注册监听器
- 触发删除回调事件后,释放线程,继续尝试创建节点,直到创建成功
- 删除节点
下面展示代码实现:
lock接口:
public interface ZkLock {
void lock();
boolean tryLock();
void unlock();
}
实现类:
public class BuiltLock implements ZkLock{
private static final Logger logger = LoggerFactory.getLogger(BuiltLock.class);
private static final String CONNECTSTRING = "192.168.208.129:2181";
private static final String LOCK_PATH = "/Lock";
private ZkClient zkClient = new ZkClient(CONNECTSTRING);
private CountDownLatch countDownLatch;
@Override
public void lock() {
if (!tryLock()) {
// 节点存在,阻塞等待获取锁
waitForLock();
// 阻塞解除,继续尝试获取锁
lock();
}
}
private void waitForLock() {
// 检查节点是否存在,不存在直接返回
if (!zkClient.exists(LOCK_PATH)) {
return;
}
// 给节点加监听
zkClient.subscribeDataChanges(LOCK_PATH, listener);
// 阻塞,直到监听器触发删除事件回调释放
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 删除回调,锁释放,解除监听
zkClient.unsubscribeDataChanges(LOCK_PATH, listener);
}
// 节点监听器
private IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
logger.info(Thread.currentThread().getName() + ": 删除回调事件!");
if (countDownLatch != null) {
// 节点被删除,解除阻塞
countDownLatch.countDown();
}
}
};
@Override
public boolean tryLock() {
try {
// 创建节点,成功返回true,获取锁成功
zkClient.createPersistent(LOCK_PATH);
return true;
} catch (ZkNodeExistsException e) {
// 节点已存在,加锁失败
return false;
}
}
@Override
public void unlock() {
// 删除节点,释放锁
zkClient.delete(LOCK_PATH);
}
}
编写测试类测试:
public class TicketSeller {
private static final Logger logger = LoggerFactory.getLogger(TicketSeller.class);
private void sell() {
logger.info(Thread.currentThread().getName() + ":开始售票");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info(Thread.currentThread().getName() + ":售票完成");
}
public void sellSync() throws Exception {
ZkLock lock = new BuiltLock();
lock.lock();
sell();
lock.unlock();
}
public static void main(String[] args) {
TicketSeller seller = new TicketSeller();
Runnable runnable = () -> {
try {
seller.sellSync();
} catch (Exception e) {
e.printStackTrace();
}
};
for (int i = 0; i < 10; i++) {
new Thread(runnable).start();
}
}
}
控制台输出:
INFO [Thread-0] - Thread-0:开始售票
INFO [Thread-0] - Thread-0:售票完成
INFO [ZkClient-EventThread-30-192.168.208.129:2181] - ZkClient-EventThread-30-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-31-192.168.208.129:2181] - ZkClient-EventThread-31-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-27-192.168.208.129:2181] - ZkClient-EventThread-27-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-25-192.168.208.129:2181] - ZkClient-EventThread-25-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-23-192.168.208.129:2181] - ZkClient-EventThread-23-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-24-192.168.208.129:2181] - ZkClient-EventThread-24-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-28-192.168.208.129:2181] - ZkClient-EventThread-28-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-29-192.168.208.129:2181] - ZkClient-EventThread-29-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-32-192.168.208.129:2181] - ZkClient-EventThread-32-192.168.208.129:2181: 删除回调事件!
INFO [Thread-3] - Thread-3:开始售票
INFO [Thread-3] - Thread-3:售票完成
INFO [ZkClient-EventThread-29-192.168.208.129:2181] - ZkClient-EventThread-29-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-32-192.168.208.129:2181] - ZkClient-EventThread-32-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-31-192.168.208.129:2181] - ZkClient-EventThread-31-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-25-192.168.208.129:2181] - ZkClient-EventThread-25-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-24-192.168.208.129:2181] - ZkClient-EventThread-24-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-23-192.168.208.129:2181] - ZkClient-EventThread-23-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-27-192.168.208.129:2181] - ZkClient-EventThread-27-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-28-192.168.208.129:2181] - ZkClient-EventThread-28-192.168.208.129:2181: 删除回调事件!
INFO [Thread-9] - Thread-9:开始售票
INFO [Thread-9] - Thread-9:售票完成
INFO [ZkClient-EventThread-25-192.168.208.129:2181] - ZkClient-EventThread-25-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-32-192.168.208.129:2181] - ZkClient-EventThread-32-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-27-192.168.208.129:2181] - ZkClient-EventThread-27-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-23-192.168.208.129:2181] - ZkClient-EventThread-23-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-31-192.168.208.129:2181] - ZkClient-EventThread-31-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-28-192.168.208.129:2181] - ZkClient-EventThread-28-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-24-192.168.208.129:2181] - ZkClient-EventThread-24-192.168.208.129:2181: 删除回调事件!
INFO [Thread-8] - Thread-8:开始售票
INFO [Thread-8] - Thread-8:售票完成
INFO [ZkClient-EventThread-25-192.168.208.129:2181] - ZkClient-EventThread-25-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-31-192.168.208.129:2181] - ZkClient-EventThread-31-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-24-192.168.208.129:2181] - ZkClient-EventThread-24-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-23-192.168.208.129:2181] - ZkClient-EventThread-23-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-27-192.168.208.129:2181] - ZkClient-EventThread-27-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-28-192.168.208.129:2181] - ZkClient-EventThread-28-192.168.208.129:2181: 删除回调事件!
INFO [Thread-7] - Thread-7:开始售票
INFO [Thread-7] - Thread-7:售票完成
INFO [ZkClient-EventThread-31-192.168.208.129:2181] - ZkClient-EventThread-31-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-24-192.168.208.129:2181] - ZkClient-EventThread-24-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-25-192.168.208.129:2181] - ZkClient-EventThread-25-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-27-192.168.208.129:2181] - ZkClient-EventThread-27-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-28-192.168.208.129:2181] - ZkClient-EventThread-28-192.168.208.129:2181: 删除回调事件!
INFO [Thread-6] - Thread-6:开始售票
INFO [Thread-6] - Thread-6:售票完成
INFO [ZkClient-EventThread-25-192.168.208.129:2181] - ZkClient-EventThread-25-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-28-192.168.208.129:2181] - ZkClient-EventThread-28-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-27-192.168.208.129:2181] - ZkClient-EventThread-27-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-24-192.168.208.129:2181] - ZkClient-EventThread-24-192.168.208.129:2181: 删除回调事件!
INFO [Thread-4] - Thread-4:开始售票
INFO [Thread-4] - Thread-4:售票完成
INFO [ZkClient-EventThread-24-192.168.208.129:2181] - ZkClient-EventThread-24-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-25-192.168.208.129:2181] - ZkClient-EventThread-25-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-27-192.168.208.129:2181] - ZkClient-EventThread-27-192.168.208.129:2181: 删除回调事件!
INFO [Thread-5] - Thread-5:开始售票
INFO [Thread-5] - Thread-5:售票完成
INFO [ZkClient-EventThread-25-192.168.208.129:2181] - ZkClient-EventThread-25-192.168.208.129:2181: 删除回调事件!
INFO [ZkClient-EventThread-27-192.168.208.129:2181] - ZkClient-EventThread-27-192.168.208.129:2181: 删除回调事件!
INFO [Thread-1] - Thread-1:开始售票
INFO [Thread-1] - Thread-1:售票完成
INFO [ZkClient-EventThread-27-192.168.208.129:2181] - ZkClient-EventThread-27-192.168.208.129:2181: 删除回调事件!
INFO [Thread-2] - Thread-2:开始售票
INFO [Thread-2] - Thread-2:售票完成
思考:此实现存在至少两个问题
- 死锁问题:出现异常情况下,没来得及删除节点,锁将一直被占用无法释放
- 惊群效应:可以看到输出日志里面,有很大比例都是删除回调事件。原因是所有服务实例同时监听此节点,每次的释放锁删除节点都会触发回调事件,而所有等待锁的服务实例都会接收回调,并执行抢占代码,那么将造成通信,服务器资源不必要的浪费
为了解决以上两个问题,我们采用第二种实现方式:
2、实现二:创建子节点,比较时序
思路:用临时有序节点代替永久节点,大家竞争时同时创建有序节点,根据创建顺序决定先后顺序。服务器仅需要关注自己的前序子节点,大大减少了回调事件的触发频次。同时由于使用的临时节点,与服务器断开连接即删除,就避免了死锁问题
步骤
- 创建子节点(临时&有序)
- 获取所有子节点,排序,判断自己创建的子节点是否是第一位
- 如果不是第一位,为前位子节点添加监听器,待其触发删除事件,再次尝试获取锁
lock接口:同实现一
实现类:
public class HoraeLock implements ZkLock{
private static final Logger logger = LoggerFactory.getLogger(HoraeLock.class);
private static final String CONNECTSTRING = "192.168.208.129:2181";
private static final String ROOT_PATH = "/Locks";
private static final String NODE_PREFIX = "Lock_";
private ZkClient zkClient = new ZkClient(CONNECTSTRING);
private CountDownLatch countDownLatch;
private String beforePath;
private String currentPath;
// 初始化根节点
public HoraeLock() {
if (!zkClient.exists(ROOT_PATH)) {
zkClient.createPersistent(ROOT_PATH);
}
}
@Override
public void lock() {
if (!tryLock()) {
// 节点存在,阻塞等待获取锁
waitForLock();
// 阻塞解除,继续尝试获取锁
lock();
}
}
private void waitForLock() {
// 先检查前序节点是否存在,不存在直接返回
if (!zkClient.exists(beforePath)) {
return;
}
// 给节点加监听
zkClient.subscribeDataChanges(beforePath, listener);
// 阻塞,直到监听器触发删除事件回调释放
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 删除回调,锁释放,解除监听
zkClient.unsubscribeDataChanges(beforePath, listener);
}
// 节点监听器
private IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
logger.info(Thread.currentThread().getName() + ": 删除回调事件!");
if (countDownLatch != null) {
// 节点被删除,解除阻塞
countDownLatch.countDown();
}
}
};
@Override
public boolean tryLock() {
// 如果currentPath为空,则创建子节点
if (currentPath == null || currentPath.length() <= 0) {
currentPath = zkClient.createEphemeralSequential(ROOT_PATH + "/" + NODE_PREFIX, "");
logger.info(Thread.currentThread().getName() + "currentPath:" + currentPath);
}
// 获取所有子节点
List<String> list = zkClient.getChildren(ROOT_PATH);
Collections.sort(list);
// 排序后与比较,如果第一位是自己,则证明拿到锁
if (currentPath.equals(ROOT_PATH + "/" + list.get(0))) {
return true;
} else {
// 拿到前序节点
String substring = currentPath.substring(7);
int i = Collections.binarySearch(list, substring);
beforePath = ROOT_PATH + "/" + list.get(i - 1);
return false;
}
}
@Override
public void unlock() {
zkClient.close();
}
}
测试类:直接替换锁实现类即可
public class TicketSeller {
private static final Logger logger = LoggerFactory.getLogger(TicketSeller.class);
private void sell() {
logger.info(Thread.currentThread().getName() + ":开始售票");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info(Thread.currentThread().getName() + ":售票完成");
}
public void sellSync() throws Exception {
ZkLock lock = new HoraeLock();
lock.lock();
sell();
lock.unlock();
}
public static void main(String[] args) {
TicketSeller seller = new TicketSeller();
Runnable runnable = () -> {
try {
seller.sellSync();
} catch (Exception e) {
e.printStackTrace();
}
};
for (int i = 0; i < 10; i++) {
new Thread(runnable).start();
}
}
}
测试结果:
INFO [Thread-1] - Thread-1:开始售票
INFO [Thread-1] - Thread-1:售票完成
INFO [ZkClient-EventThread-32-192.168.208.129:2181] - Terminate ZkClient event thread.
INFO [Thread-1] - Session: 0x10000f7f72f00ca closed
INFO [Thread-1-EventThread] - EventThread shut down for session: 0x10000f7f72f00ca
INFO [ZkClient-EventThread-30-192.168.208.129:2181] - ZkClient-EventThread-30-192.168.208.129:2181: 删除回调事件!
INFO [Thread-0] - Thread-0:开始售票
INFO [Thread-0] - Thread-0:售票完成
INFO [ZkClient-EventThread-30-192.168.208.129:2181] - Terminate ZkClient event thread.
INFO [Thread-0] - Session: 0x10000f7f72f00c5 closed
INFO [Thread-0-EventThread] - EventThread shut down for session: 0x10000f7f72f00c5
INFO [ZkClient-EventThread-27-192.168.208.129:2181] - ZkClient-EventThread-27-192.168.208.129:2181: 删除回调事件!
INFO [Thread-9] - Thread-9:开始售票
INFO [Thread-9] - Thread-9:售票完成
INFO [ZkClient-EventThread-27-192.168.208.129:2181] - Terminate ZkClient event thread.
INFO [Thread-9] - Session: 0x10000f7f72f00c8 closed
INFO [Thread-9-EventThread] - EventThread shut down for session: 0x10000f7f72f00c8
INFO [ZkClient-EventThread-24-192.168.208.129:2181] - ZkClient-EventThread-24-192.168.208.129:2181: 删除回调事件!
INFO [Thread-4] - Thread-4:开始售票
INFO [Thread-4] - Thread-4:售票完成
INFO [ZkClient-EventThread-24-192.168.208.129:2181] - Terminate ZkClient event thread.
INFO [Thread-4] - Session: 0x10000f7f72f00c7 closed
INFO [Thread-4-EventThread] - EventThread shut down for session: 0x10000f7f72f00c7
INFO [ZkClient-EventThread-25-192.168.208.129:2181] - ZkClient-EventThread-25-192.168.208.129:2181: 删除回调事件!
INFO [Thread-2] - Thread-2:开始售票
INFO [Thread-2] - Thread-2:售票完成
INFO [ZkClient-EventThread-25-192.168.208.129:2181] - Terminate ZkClient event thread.
INFO [Thread-2] - Session: 0x10000f7f72f00cc closed
INFO [Thread-2-EventThread] - EventThread shut down for session: 0x10000f7f72f00cc
INFO [ZkClient-EventThread-29-192.168.208.129:2181] - ZkClient-EventThread-29-192.168.208.129:2181: 删除回调事件!
INFO [Thread-8] - Thread-8:开始售票
INFO [Thread-8] - Thread-8:售票完成
INFO [ZkClient-EventThread-29-192.168.208.129:2181] - Terminate ZkClient event thread.
INFO [Thread-8] - Session: 0x10000f7f72f00c6 closed
INFO [Thread-8-EventThread] - EventThread shut down for session: 0x10000f7f72f00c6
INFO [ZkClient-EventThread-23-192.168.208.129:2181] - ZkClient-EventThread-23-192.168.208.129:2181: 删除回调事件!
INFO [Thread-6] - Thread-6:开始售票
INFO [Thread-6] - Thread-6:售票完成
INFO [ZkClient-EventThread-23-192.168.208.129:2181] - Terminate ZkClient event thread.
INFO [Thread-6] - Session: 0x10000f7f72f00cd closed
INFO [Thread-6-EventThread] - EventThread shut down for session: 0x10000f7f72f00cd
INFO [ZkClient-EventThread-28-192.168.208.129:2181] - ZkClient-EventThread-28-192.168.208.129:2181: 删除回调事件!
INFO [Thread-3] - Thread-3:开始售票
INFO [Thread-3] - Thread-3:售票完成
INFO [ZkClient-EventThread-28-192.168.208.129:2181] - Terminate ZkClient event thread.
INFO [Thread-3] - Session: 0x10000f7f72f00c4 closed
INFO [Thread-3-EventThread] - EventThread shut down for session: 0x10000f7f72f00c4
INFO [ZkClient-EventThread-31-192.168.208.129:2181] - ZkClient-EventThread-31-192.168.208.129:2181: 删除回调事件!
INFO [Thread-7] - Thread-7:开始售票
INFO [Thread-7] - Thread-7:售票完成
INFO [ZkClient-EventThread-31-192.168.208.129:2181] - Terminate ZkClient event thread.
INFO [Thread-7] - Session: 0x10000f7f72f00cb closed
INFO [Thread-7-EventThread] - EventThread shut down for session: 0x10000f7f72f00cb
INFO [ZkClient-EventThread-26-192.168.208.129:2181] - ZkClient-EventThread-26-192.168.208.129:2181: 删除回调事件!
INFO [Thread-5] - Thread-5:开始售票
INFO [Thread-5] - Thread-5:售票完成
INFO [ZkClient-EventThread-26-192.168.208.129:2181] - Terminate ZkClient event thread.
INFO [Thread-5] - Session: 0x10000f7f72f00c9 closed
INFO [Thread-5-EventThread] - EventThread shut down for session: 0x10000f7f72f00c9
观察测试结果可以看出减少了大量的回调事件触发,同时因为采用的临时节点,断开连接将自动删除,也解决了死锁问题。
因此建议采用第二种方式实现Zookeeper分布式锁