轻松构建微服务之分布式锁

前言

在多线程情况下访问资源,我们需要加锁来保证业务的正常进行,JDK中提供了很多并发控制相关的工具包,来保证多线程下可以高效工作,同样在分布式环境下,有些互斥操作我们可以借助分布式锁来实现两个操作不能同时运行,必须等到另外一个任务结束了把锁释放了才能获取锁然后执行,因为跨JVM我们需要一个第三方系统来协助实现分布式锁,一般我们可以用
数据库,redis,zookeeper,etcd等来实现.

要实现一把分布式锁,我们需要先分析下这把锁有哪些特性

  • 1.在分布式集群中,也就是不同的JVM中,相互有冲突的方法,可以是不同JVM相同实例内的同一个方法,也可以是不同方法,也就是不同业务间的隔离和同一个业务操作不能并行运行,而分布式锁需要保证这两个方法在同一时间只能有一个运行.

  • 2.这把锁最好是可重入的,因为不可重入的锁很容易出现死锁

  • 3.获取锁和释放锁的性能要很高

  • 4.支持获取锁的时候可以阻塞等待,以及等待时间

  • 5.获取锁后支持设置一个期限,超过这个期限可以自动释放,防止程序没有自己释放的情况

  • 6.这是一把轻量锁,对业务侵入小

  • 7.易用

数据库实现分布式锁

由于数据库的锁无能是在性能高可用上都不及其他方式,这里我们简单介绍下可能的方案

  • 1.获取锁的时候,往数据库里插入一条记录,可以根据方法名作唯一键约束,其他线程获取锁的时候无法插入所以会等待,释放锁的时候删除,这种方式不支持可重入
  • 2.根据数据库的排他锁 for update实现,当commit的时候释放,这种方式如果锁不释放就会一直占有一个connection,而且加锁导致性能低
  • 3.将每一个锁作为表里的一条记录,这个记录加一个状态,每次获取锁的时候都update status = 1 where status = -1,这种类似CAS的方式可以解决排他锁性能低.但是mysql是一个单点,而且和业务系统关联,因为两个业务方可能属于不同系统不同数据库,如果做到不和业务关联还需要增加一次RPC请求,将锁业务抽为一个单独系统,不够轻量

redis的分布式锁

SET resource_name my_random_value NX PX 30000
  • SET NX 只会在key不存在的时候给key赋值,当多个进程同时争抢锁资源的时候,会下发多个SET NX只会有一个返回成功,并且SET NX对外是一个原子操作
  • PX 设置过期时间,代表这个key的存活时间,也就是获取到的锁只会占有这么长,超过这个时间将会自动释放
  • my_random_value 一般是全局唯一值,这个随机数一般可以用时间戳加随机数,这种方式在多机器实例上可能不唯一,如果需要保证绝对唯一可以采用UUID,但是性能会有影响,这个值的用途会在锁释放的时候用到

我们可以看看下面获取分布式锁的使用场景,假设我们释放锁,直接del这个key

if (!redisComponent.acquireLock(lockKey) {
    LOGGER.warn(">>分布式并发锁获取失败");
    return ;
}

try {
      // do  business  ...
} catch (BusinessException e) {
      // exception handler  ...
} finally {
  redisComponent.releaseLock(lockKey);
}

  • 1.进程A获取到锁,超时时间为1分钟
  • 2.1分钟时间到,进程A还没有处理完,锁自动释放了
  • 3.进程B获取到锁,开始进行业务处理
  • 4.进程A处理结束,释放锁,这个时候将进程B获取到的锁释放了
  • 5.进程C获取到锁,开始业务处理,进程B还没有处理结束,结果B和C开始并行处理,发生并发

为了解决以上问题,我们可以在释放锁的时候,判断下锁是否存在,这样进程A在释放锁的时候就不会将进程B加的锁释放了,
或者通过以下方式,将过期时间做为value存储在对应的key中,释放锁的时候,判断当前时间是否小于过期时间,只有小于当前时间才处理,我们也可以在进行del操作的时候判断下对应的value是否相等,这个时候就需要在del操作的时候传人
my_random_value

下面我们看下redis实现分布式锁java代码实现,我们采用在del的时候判断下当前时间是否小于过期时间

 public boolean acquireLock(String lockKey, long expired) {

        ShardedJedis jedis = null;

        try {

            jedis = pool.getResource();
            String value = String.valueOf(System.currentTimeMillis() + expired + 1);
            int tryTimes = 0;

            while (tryTimes++ < 3) {

                /*
                 *  1. 尝试锁
                 *  setnx : set if not exist
                 */
                if (jedis.setnx(lockKey, value).equals(1L)) {
                    return true;
                }

                /*
                 * 2. 已经被别的线程锁住,判断是否失效
                 */
                String oldValue = jedis.get(lockKey);
                if (StringUtils.isBlank(oldValue)) {
                    /*
                     * 2.1 value存的是超时时间,如果为空有2种情况
                     *      1. 异常数据,没有value 或者 value为空字符
                     *      2. 锁恰好被别的线程释放了
                     * 此时需要尝试重新尝试,为了避免出现情况1时导致死循环,只重试3次
                     */
                    continue;
                }

                Long oldValueL = Long.valueOf(oldValue);
                if (oldValueL < System.currentTimeMillis()) {
                    /*
                     * 已超时,重新尝试锁
                     *
                     * Redis:getSet 操作步骤:
                     *      1.获取 Key 对应的 Value 作为返回值,不存在时返回null
                     *      2.设置 Key 对应的 Value 为传入的值
                     * 这里如果返回的 getValue != oldValue 表示已经被其它线程重新修改了
                     */
                    String getValue = jedis.getSet(lockKey, value);
                    return oldValue.equals(getValue);
                } else {
                    // 未超时,则直接返回失败
                    return false;
                }
            }

            return false;

        } catch (Throwable e) {
            logger.error("acquireLock error", e);
            return false;

        } finally {
            returnResource(jedis);
        }
    }


    /**
     * 释放锁
     *
     * @param lockKey
     *            key
     */
    public void releaseLock(String lockKey) {
        ShardedJedis jedis = null;
        try {
            jedis = pool.getResource();
            long current = System.currentTimeMillis();
            // 避免删除非自己获取到的锁
            String value = jedis.get(lockKey);
            if (StringUtils.isNotBlank(value) && current < Long.valueOf(value)) {
                jedis.del(lockKey);
            }
        } catch (Throwable e) {
            logger.error("releaseLock error", e);
        } finally {
            returnResource(jedis);
        }
    }

这种方式没有用到刚刚说的my_random_value,我们看下如果我们按以下代码获取锁会有什么问题

if (!redisComponent.acquireLock(lockKey) {
    LOGGER.warn(">>分布式并发锁获取失败");
    return ;
}

try {
boolean locked = redisComponent.acquireLock(lockKey);
if(locked)
      // do  business  ...
} catch (BusinessException e) {
      // exception handler  ...
} finally {
  redisComponent.releaseLock(lockKey);
}

同样这种方式当进程A没有获取到锁,之后进程B获取到锁,进程A会释放进程B的锁,这个时候我们可以借助my_random_value来实现

    /**
     * 释放锁
     *
     * @param lockKey ,value
     */
    public void releaseLock(String lockKey, long oldvalue) {
        ShardedJedis jedis = null;
        try {
            jedis = pool.getResource();
            String value = jedis.get(lockKey);
            if (StringUtils.isNotBlank(value) && oldvalue == Long.valueOf(value)) {
                jedis.del(lockKey);
            }
        } catch (Throwable e) {
            logger.error("releaseLock error", e);
        } finally {
            returnResource(jedis);
        }
    }

这种方式需要保存之前获取锁时候的value值,并在释放锁的带上value值,不过这种实现方式,value的值为过期时间也不唯一

由于我们用了redis得超时机制来释放锁,那么当进程在锁租约到期后还没有执行结束,那么其他进程获取到锁后则会产生并发写的情况,这种如果业务上需要精确控制,只能用乐观锁来控制了,每次写入数据都带一个锁的版本,如果下次获取锁的时候版本加1,这样上面那种情况,锁到期释放了新的进程获取到锁后会使用新的版本号,之前的进程锁已经释放了如果继续使用该锁则会发现版本已经不对了

zookeeper实现分布式锁

可以借助zookeeper的顺序节点,在一个父节点下,所有需要争抢锁的资源都去这个目录下创建一个顺序节点,然后判断这个临时顺序节点是否是兄弟节点中顺序最小的,如果是最小的则获取到锁,如果不是则监听这个顺序最小的节点的删除事件,然后在继续根据这个流程获取最小节点

 public void lock() {
        try {

            // 创建临时子节点
            String myNode = zk.create(root + "/" + lockName , data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);

            System.out.println(j.join(Thread.currentThread().getName() + myNode, "created"));

            // 取出所有子节点
            List<String> subNodes = zk.getChildren(root, false);
            TreeSet<String> sortedNodes = new TreeSet<>();
            for(String node :subNodes) {
                sortedNodes.add(root +"/" +node);
            }

            String smallNode = sortedNodes.first();
            String preNode = sortedNodes.lower(myNode);

            if (myNode.equals( smallNode)) {
                // 如果是最小的节点,则表示取得锁
                System.out.println(j.join(Thread.currentThread().getName(), myNode, "get lock"));
                this.nodeId.set(myNode);
                return;
            }

            CountDownLatch latch = new CountDownLatch(1);
            Stat stat = zk.exists(preNode, new LockWatcher(latch));// 同时注册监听。
            // 判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
            if (stat != null) {
                System.out.println(j.join(Thread.currentThread().getName(), myNode,
                        " waiting for " + root + "/" + preNode + " released lock"));

                latch.await();// 等待,这里应该一直等待其他线程释放锁
                nodeId.set(myNode);
                latch = null;
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

    }

    public void unlock() {
        try {
            System.out.println(j.join(Thread.currentThread().getName(), nodeId.get(), "unlock "));
            if (null != nodeId) {
                zk.delete(nodeId.get(), -1);
            }
            nodeId.remove();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

当然如果我们开发环境使用的是etcs也可以用etcd来实现分布式锁,原理和zookeeper类似

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容

  • 什么是锁?在单进程的系统中,当存在多个线程可以同时改变某个变量(可变共享变量)时,就需要对变量或代码块做同步,使其...
    康康不遛猫阅读 1,032评论 0 5
  • 最近碰到几个业务场景,会遇到并发的问题。在单实例情况下,我们会通过java.util.concurrent包...
    菜鸟小玄阅读 2,253评论 0 5
  • 最近看了极客时间左耳听风的专栏,对于分布式系统的设计有了更深的认识,准备结合陈皓的总结加上自己看过的资料对于分布式...
    仰泳的双鱼阅读 3,677评论 0 23
  • ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是...
    Java架构007阅读 2,307评论 0 4
  • 最近发生的一件事让我倍受触动,使我第一次联想到自己老去的那天。 现在的我们年轻,所以意气风发,潇洒自在,但我们生而...
    檬乐阅读 726评论 5 12