在传统单机项目中为了解决并发访问的问题,我们通常都会使用锁来实现。一般使用synchronized关键字来锁住临界区,当然也可以使用Lock锁。但是现在很多项目都使用分布式集群方式部署,synchronized关键字无法实现跨机器之间的锁,Lock也是如此,为了解决这种问题,需要设计一种新的分布式锁。
分布式锁除了具有分布式情况下控制并发的问题,同时还需要考虑以下问题:
1.较高的性能,如果实现分布式锁的方式太差,会影响并发访问。
2.可重入性,同一个线程可多次获取锁。
3.锁失效机制,如果一个线程在获取锁后挂掉了,需要释放锁,否则其他线程将一直获取不到锁。
4.如果用户在失效时间内没有完成任务,锁如何续期,该续期多长时间。
基于以上问题,共设计了三种分布式锁
①基于数据库实现分布式锁
基于数据库实现分布式锁主要是利用数据库唯一索引的排斥性和天然支持原子操作有关,可以设计如下的表结构:
//基于数据库创建分布式锁
DROP TABLE IF EXISTS `method_lock`;
CREATE TABLE `method_lock` (
`id` bigint(0) NOT NULL AUTO_INCREMENT COMMENT '主键',
`method_name` varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '临界区方法',
`machine_id` varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '机器id',
`thread_id` varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '线程id',
`invalid_time` datetime(0) NOT NULL COMMENT '失效时间',
`desc` varchar(255) CHARACTER SET utf8mb4 NULL DEFAULT NULL COMMENT '描述',
`update_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `idx_method_name`(`method_name`) USING BTREE COMMENT '方法名主键'
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = '分布式锁表' ROW_FORMAT = Dynamic;
加锁:insert into method_lock(method_name, machine_id, thread_id, invalid_time)values("addUser", "001", "thread-01", "2022-01-01 01:01:01")
释放锁:delete from method_lock where method_name = "addUser" and machine_id = "001" and thread_id = "thread-01"
说明:客户端1加锁成功后,客户端再执行加锁语句就会因为method_name创建了唯一索引而失败,这样就达到不同机器并发访问的问题。
以上是使用mysql自身的锁来完成,mysql的锁是一种排他锁,其实也可以利用乐观锁来完成。即通过version来完成锁控制。
但是还需要考虑上面分布式锁的四个问题?
1.由于数据库的查询性能问题,需要使用集群部署或者主从复制
2.为了完成可重入性,我们在表结构中添加了machine_id和thread_id来实现。如果客户端1再次执行加锁过程,需要先根据机器和线程信息查询,如果查询到则直接返回。如果没有则说明没有加锁成功,则尝试加锁或者失败。为了查询是否已经加锁,需要使用select * from table for update
语句,此语句可以避免脏读。
3.为了客户端1线程宕掉后,不影响其他客户端操作,需要存储线程的失效时间,并利用定时任务清理失效锁
4.如果线程在规定时间内没有完成任务,需要利用定时任务在查询失效锁的同时查询线程状态,如果线程存活就对锁进行续期。note:这种方式需要考虑的问题很多,同时性能也不太理想,所以使用这种方式的很少。
②基于redis实现分布式锁
基于redis实现分布式锁主要是利用redis的性能和某些特性。
加锁:利用redis的SETNX命令和EXPIRE命令完成setnx key val
是如果key不存在则val设置成功,返回1;否则val设置失败,返回0。以下语句可以完成。
key是加锁的方法,value是线程的唯一标识 set nx key value
存在的问题:
①假设线程获取了锁之后,在执行任务的过程中挂掉,但是线程还没有执行del命令释放锁,那么竞争该锁的线程都会执行不了,产生死锁的情况。
②释放锁的时候为了保证只释放自己的锁,需要先判断加锁的value,如果不等于自己才能删除。但是这两部操作并不是原子性的,所以释放锁需要使用lua脚本。
解决方案:如下代码所示:
//加锁过程 public Boolean getLock(String key,String value,Long timeout){ boolean result = this.redisTemplate.opsForValue().setIfAbsent(key,value, Duration.ofSeconds(timeout)); return result; } //释放锁过程 public Long releaseLock(String key,String value){ String script= "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; RedisScript<Long> redisScript = new DefaultRedisScript<>(script,Long.class); Long result= (Long)this.redisTemplate.execute(redisScript, Collections.singletonList(key),value); return result; } //调用过程 //添加自旋逻辑 boolean result = false; do{ result = redisService.getLock(key, value, timeout); }while(!result) try{ ...... //业务代码 }catch(Exception e){ }finally{ //最后释放锁的逻辑 redisService.releaseLock(key, value); }
存在的问题:这个方案虽然解决了过期和释放锁的问题,但是没有实现可重入性,当已经加锁的线程再次加锁时就会失败。
解决方案:需要在加锁失败后判断加锁线程是否是自己,如果是自己则+1返回成功,否则返回失败。这样加锁和释放锁都需要执行lua脚本完成多步操作。
local key = KEYS[1]; -- 第1个参数,锁的key local value = ARGV[1]; -- 第2个参数,锁的值,可以是线程的唯一标识 local timeout = ARGV[2]; -- 第3个参数,锁的自动释放时间 -- 判断锁是否已存在 if (redis.call('exists', key) == 0) then -- 不存在, 则获取锁 redis.call('hset', key, value, '1'); redis.call('expire', key, timeout); return 1; end ; -- 锁已存在,判断拥有者是否是自己 if (redis.call('hexists', key, value) == 1) then -- 如果是自己,则重入次数+1 redis.call('hincrby', key, value, '1'); redis.call('expire', key, timeout); return 1; end ; return 0; -- 锁已经被获取,并且不是自己,返回获取锁失败
local key = KEYS[1]; -- 第1个参数,锁的key local value = ARGV[1]; -- 第2个参数,锁的值,可以是线程的唯一标识 -- 判断锁是否已存在 if (redis.call('exists', key) == 0) then return 0; end ; -- 锁已存在,判断拥有者是否是自己 if (redis.call('hexists', key, value) == 0) then return 0; end ; local count = redis.call('hincrby', key, value, '-1'); -- 当可重入次数为0时,需要删除key if (tonumber(count) == 0) then redis.call('del', key); return 1; end ; return 1;
然而上面的代码还是有问题,就是续期的问题。因为如果线程A在获取到锁之后,执行业务时间较长,锁过期了。那么其他线程就可以获得锁了,所以还需要写一个定时任务定时查询锁是否快要过期,如果快过期了就续期一段时间。
Redisson实现
针对上面这些问题,redisson提供了实现,它将加锁和解锁的lua脚本,以及续期功能功能(看门狗)都统一到了一个简单地逻辑中@Bean public RedissonClient getRedisson() { Config config = new Config(); // 修改看门狗默认的锁过期时间 config.setLockWatchdogTimeout(100); config.useSingleServer().setAddress("redis://localhost:6379").setPassword("123456"); return Redisson.create(config); }
RLock lock = redissonClient.getLock(key); //key和上面的redis键意思相同 try { lock.lock(); ..... //业务代码 } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); }
上面代码就完成了分布式锁的功能,而且加锁和解锁都是通过lua脚本来完成的。简单方便,lock.lock()默认加锁时间为30s, 后台默认会启动一个WatchDog的看门狗线程,该线程每10s去扫描一下该锁是否被释放,如果没有释放那么就延长至30s,这个机制就是看门狗机制。如果客户端请求没有获取到锁,那么它将while循环获取继续尝试加锁。网上关于redisson的描述图:
//该方式下会使用默认的加锁时间,即30s。但是如果自定义程序中自定义加锁时间,则看门狗线程不会进行锁续期。 void lock(); void lock(long leaseTime, TimeUnit unit); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
单机模式下,redisson还有一个问题:就是线程A如果刚获取到redis的锁,此时redis宕机了,重新启动后锁过期了,然后线程B这时过来是可以加锁的,然后A线程也认为自己还在加锁,这种问题怎么解决?
redis宕机的时候,别那么快去重启redis,可以在你的方法里面设置事务的超时时间,等到事务超时再去重启redis应用。
在redis集群环境下,redission实际还存在一些问题,就是线程A如果刚获取到master节点的锁,还没有同步到slave节点,此时master节点给挂了,然后线程B这时过来是可以加锁的,但是实际上master已经被A加锁过了,这就是集群环境下可能出现的问题?
Redlock 是 Redis 的作者 antirez 给出的集群模式的 Redis 分布式锁,它基于 N 个完全独立的 Redis 节点。
算法描述:
①客户端尝试获取 N 个节点的锁,(每个节点获取锁的方式和前面说的缓存锁一样),N 个节点以相同的 key 和 value 获取锁。而且客户端需要设置接口访问超时,接口超时时间需要远远小于锁超时时间,这样可以在有 redis 节点宕机后,访问该节点时能尽快超时,而减小锁的正常使用。
②客户端需要获得了超过 半数 节点的锁,而且获取锁的时间小于锁的超时时间,客户端才获得了分布式锁。
③客户端获取的锁的时间为设置的锁超时时间减去获取锁花费时间。
④如果客户端获取锁失败了,会依次请求所有节点删除所有的锁。
note:使用 Redlock 算法,可以保证在挂掉最多 不超过半数节点的时候,分布式锁服务仍然能工作,这相比之前的单机模式大大提高了可用性。有关redisson分布式锁问题的问题可以查看基于Redission实现分布式锁
③基于zookeeper实现分布式锁
zookeeper是分布式系统协调服务,可以解决分布式系统下数据同步问题。我们可以在上面创建树形结构的节点,相当于一个分布式文件系统+watch服务。而且同一目录下只能有唯一的文件名。
zookeeper的节点分为两类:
1)持久节点 :节点一旦创建,除非被主动删除,否则一直存在;
2)临时节点 :一旦创建该节点的客户端会话失效,则所有该客户端创建的临时节点都会被删除。
原理: 就是利用zookeeper 的临时有序节点和 watcher 机制来实现。客户端创建zookeeper的临时有序节点,如果该节点序号最小则返回成功,如果不是则监听前一个节点的,一旦客户端会话结束,临时节点就会释放,后一个节点就会监听到该事件并重新获取锁。
面对上面提到的死锁,可重入,续期以及非阻塞问题,zookeeper该如何解决?
死锁:客户端会在ZK中创建一个临时节点,一旦客户端获取到锁之后突然挂掉(Session连接断开),那么这个临时节点就会自动删除掉。其他客户端就可以再次获得锁。
不可重入:客户端在创建节点的时候,可以把当前客户端的主机信息和线程信息直接写入到节点中,下次想要获取锁的时候和当前最小的节点中的数据比对一下就可以了。如果相同,那么自己直接获取到锁,如果不一样就再创建一个临时的顺序节点,参与排队。
续期问题:zookeeper的临时节点不存在过期时间,所以只有在客户端主动删除节点或者断开连接的情况下才会失效,所以不存在续期问题。
非阻塞锁:客户端可以通过在节点上添加watch监听,一旦节点有变化,zookeeper会通知客户端。不需要空转。
springboot代码如下:
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </dependency>
ZKLockService zkLockService = new ZKLockService(zooKeeper); try { zkLockService.tryLock(name); //xxxxxxx // 业务逻辑 } catch (Exception e) { e.printStackTrace(); } finally { zkLockService.unLock(); }
public class ZKLockService implements AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback, Watcher { private ZooKeeper zooKeeper; private String nodeName; private CountDownLatch countDownLatch = new CountDownLatch(1); private final static String PARENT_NODE = "/zkLock"; public ZKLockService(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // 创建节点之后需要阻塞等待,因为zk获取子节点的方式是通过回调的方式。 public void tryLock(String value) throws InterruptedException { zooKeeper.create(PARENT_NODE + "/lock", value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, "orderLock"); countDownLatch.await(); } public void unLock() throws InterruptedException, KeeperException { zooKeeper.delete(nodeName, -1); } //创建子节点的回调函数,获取子节点信息 @Override public void processResult(int i, String s, Object o, String s1) { this.nodeName = s1; zooKeeper.getChildren(PARENT_NODE, false, this, o); } //获取子节点信息的回调函数,判断是否加锁成功;如果加锁失败,则监听前一个节点 @Override public void processResult(int i, String s, Object o, List<String> list, Stat stat) { if (list == null || list.isEmpty()) { System.out.println("children is null......"); return; } Collections.sort(list); String minPath = list.get(0); String tempNodeName = this.nodeName.substring(8); if (minPath.equals(tempNodeName)) { System.out.println("加锁成功"); countDownLatch.countDown(); } else { System.out.println("加锁失败"); int index = list.indexOf(tempNodeName); // 只监听前一个结点,当前一个节点挂掉,只需要通过后一个节点即可。能避免羊群效应 zooKeeper.exists(PARENT_NODE + "/" + list.get(index - 1), this, this, o); } } //监听前一个节点的方法实现,当前一个节点被删除后,重新获取子节点信息比较判断 @Override public void process(WatchedEvent watchedEvent) { switch (watchedEvent.getType()) { case NodeDeleted: zooKeeper.getChildren(PARENT_NODE, false, this, "orderLock"); break; default: break; } } @Override public void processResult(int i, String s, Object o, Stat stat) { } }
note:上段代码create和getChildren方法使用的都是zookeeper的异步方法,如果使用同步方法,可以这样写:
public class ZKLockServiceNew implements AsyncCallback.StatCallback, Watcher { private ZooKeeper zooKeeper; private String nodeName; private final static String PARENT_NODE = "/zkLock"; private CountDownLatch countDownLatch = new CountDownLatch(1); public ZKLockServiceNew(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } public void tryLock(String value) throws InterruptedException, KeeperException { String path = zooKeeper.create(PARENT_NODE + "/lock", value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); nodeName = path; boolean flag = false; do { flag = verifyMinNode(path); } while (!flag); } public void unLock() throws InterruptedException, KeeperException { zooKeeper.delete(nodeName, -1); } @Override public void process(WatchedEvent watchedEvent) { switch (watchedEvent.getType()) { case NodeDeleted: countDownLatch.countDown(); break; default: break; } } @Override public void processResult(int i, String s, Object o, Stat stat) { } private boolean verifyMinNode(String path) throws KeeperException, InterruptedException { List<String> children = zooKeeper.getChildren(PARENT_NODE, false); if (children == null || children.isEmpty()) { System.out.println("children is null......"); return false; } Collections.sort(children); String minPath = children.get(0); String tempNodeName = this.nodeName.substring(8); if (minPath.equals(tempNodeName)) { System.out.println("加锁成功"); return true; } else { System.out.println("加锁失败"); int index = children.indexOf(tempNodeName); zooKeeper.exists(PARENT_NODE + "/" + children.get(index - 1), this, this, "AAA"); countDownLatch.await(); } return false; } }
Curator是已经封装好的zookeeper类,通过它可以更方便的完成分布式锁的功能,以下是示例代码:
public class CuratorTest { public static void main(String[] args) throws Exception { CuratorFramework zkClient = getClient(); InterProcessMutex zkMutex = new InterProcessMutex(zkClient, "/test/mutex"); ExecutorService executor = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { final int temp = i; executor.submit(() -> { System.out.println("线程" + temp + "启动"); try { zkMutex.acquire(); //阻塞等待,也可超时等待 System.out.println("线程" + temp + "获取到锁"); Thread.sleep(3000); zkMutex.release(); System.out.println("线程" + temp + "释放锁"); } catch (Exception e) { throw new RuntimeException(e); } }); } TimeUnit.SECONDS.sleep(60); System.out.println("end----"); } public static CuratorFramework getClient() { ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework zkClient = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy); zkClient.start(); return zkClient; } }
参照:
实现分布式锁的三种方式
实现分布式锁的方式
分布式锁
elk
elk基础
分布式基础
分布式基础理论
Redis可重入锁的实现设计
Zookeeper介绍
zookeeper介绍
zookeeper分布式锁