分布式锁的三种实现的对比

锁是开发过程中十分常见的工具,在处理高并发请求的时候和订单数据的时候往往需要锁来帮助我们保证数据的安全。

场景1.前端点击太快,导致后端重复调用接口。两次调用一个接口,这样就会产生同一个请求执行了两次,而从用户的角度出发,他是因为太卡而点了两次,他的目标是执行一次请求。

场景2.对于高并发场景,我们往往需要引入分布式缓存,来加快整个系统的响应速度。但是缓存是有失效机制的,如果某一时刻缓存失效,而此时有大量的请求过来,那么所有的请求会瞬间直接打到DB上,那么这么大的并发量,DB可能是扛不住的。那么这里需要引入一个保护机制。当发生“缓存击穿”的时候加锁,从而保护DB不被拖垮。

看完了上面的场景,其实分布式锁的场景一直在我们身边。说分布式锁之前,应该先说一下java提供的锁,比较能单机解决的并发问题,没必要引入分布式的解决方案。

java提供了两种内置的锁的实现,一种是由JVM实现的synchronized和JDK提供的Lock,当你的应用是单机或者说单进程应用时,可以使用synchronized或Lock来实现锁。

但是,当你的应用涉及到多机、多进程共同完成时,例如现在的互联网架构,一般都是分布式的RPC框架来支撑,那么这样你的Server有多个,由于负载均衡的路由规则随机,相同的请求可能会打到不同的Server上进行处理,那么这时候就需要一个全局锁来实现多个线程(不同的进程)之间的同步。

实现全局的锁需要依赖一个第三方系统,此系统需要满足高可用、一致性比较强同时能应付高并发的请求。

常见的处理办法有三种:数据库、缓存、分布式协调系统。数据库和缓存是比较常用的,但是分布式协调系统是不常用的。

数据库实现分布式锁####

利用DB来实现分布式锁,有两种方案。两种方案各有好坏,但是总体效果都不是很好。但是实现还是比较简单的。

  1. 利用主键唯一规则:
    我们知道数据库是有唯一主键规则的,主键不能重复,对于重复的主键会抛出主键冲突异常。
    了解JDK reentrantlock的人都知道,reentrantlock是利用了OS的CAS特性实现的锁。主要是维护一个全局的状态,每次竞争锁都会CAS修改锁的状态,修改成功之后就占用了锁,失败的加入到同步队列中,等待唤醒。
    其实这和分布式锁实现方案基本是一致的,首先我们利用主键唯一规则,在争抢锁的时候向DB中写一条记录,这条记录主要包含锁的id、当前占用锁的线程名、重入的次数和创建时间等,如果插入成功表示当前线程获取到了锁,如果插入失败那么证明锁被其他人占用,等待一会儿继续争抢,直到争抢到或者超时为止。

这里我主要写了一个简单的实现:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 利用mysql实现可重入分布式锁
 */
public class MysqlprimaryLock {
    private static Connection connection;
    static {
        try {
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        String url = "jdbc:mysql://10.0.0.212:3308/dbwww_lock?user=lock_admin&password=lock123";
        try {
            connection = DriverManager.getConnection(url);
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    /**
     * 加锁
     * @param lockID
     */
    public void lock(String lockID) {
        acquire(lockID);
    }

    /**
     * 获取锁
     * @param lockID
     * @return
     */
    public boolean acquire(String lockID) {
        String sql = "insert into test_lock('id','count','thName','addtime') VALUES (?,?,?,?)";
        while (true) {
            try {
                PreparedStatement statement = connection.prepareStatement(sql);
                statement.setString(1, lockID);
                statement.setInt(2, 1);
                statement.setLong(1, System.currentTimeMillis());
                boolean ifsucess = statement.execute();//如果成功,那么就是获取到了锁
                if (ifsucess)
                    return true;
            } catch (SQLException e) {
                e.printStackTrace();
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            continue;
        }
    }

    /**
     * 超时获取锁
     * @param lockID
     * @param timeOuts
     * @return
     * @throws InterruptedException
     */
    public boolean acquire(String lockID, long timeOuts) throws InterruptedException {

        String sql = "insert into test_lock('id','count','thName','addtime') VALUES (?,?,?,?)";
        long futureTime = System.currentTimeMillis() + timeOuts;
        long ranmain = timeOuts;
        long timerange = 500;
        while (true) {
            CountDownLatch latch = new CountDownLatch(1);
            try {
                PreparedStatement statement = connection.prepareStatement(sql);
                statement.setString(1, lockID);
                statement.setInt(2, 1);
                statement.setLong(1, System.currentTimeMillis());
                boolean ifsucess = statement.execute();//如果成功,那么就是获取到了锁
                if (ifsucess)
                    return true;
            } catch (SQLException e) {
                e.printStackTrace();
            }
            latch.await(timerange, TimeUnit.MILLISECONDS);
            ranmain = futureTime - System.currentTimeMillis();
            if (ranmain <= 0)
                break;
            if (ranmain < timerange) {
                timerange = ranmain;
            }
            continue;
        }
        return false;

    }

    /**
     * 释放锁
     * @param lockID
     * @return
     * @throws SQLException
     */
    public boolean unlock(String lockID) throws SQLException {
        String sql = "DELETE  from test_lock where id = ?";
        PreparedStatement statement = connection.prepareStatement(sql);
        statement.setString(1, lockID);
        boolean ifsucess = statement.execute();
        if (ifsucess)
            return true;
        return false;

    }

}

这里是利用主键冲突规则,加入了id','count','thName','addtime',count主要是为了重入计数,thName为了判断占用锁的线程,addtime是记录占用时间。上面代码没有实现重入的逻辑。

重入主要实现思路是,在每次获取锁之前去取当前锁的信息,如果锁的线程是当前线程,那么更新锁的count+1,并且执行锁之后的逻辑。如果不是当前锁,那么进行重试。释放的时候也要进行count-1,最后减到0时,删除锁标识释放锁。

优点:实现简单

缺点:没有超时保护机制,mysql存在单点,并发量大的时候请求量太大、没有线程唤醒机制,用异常去控制逻辑多少优点恶心。

对于超时保护:如果可能,可以采用定时任务去扫描超过一定阈值的锁,并删除。但是也会存在,锁住的任务执行时间很长,删除锁会导致并发问题。所以需要对超时时间有一个很好的预估。

对于单点问题:有条件可以搞一个主从,但是为了一个锁来搞一个主从是不是优点浪费?同时主从切换的时候系统不可用,这也是一个问题。

并发量大的时候请求量太大:因为这种实现方式是没有锁的唤醒机制的,不像reentrantlock在同步队列中的节点,可以通过唤醒来避免多次的循环请求。但是分布式环境数据库这种锁的实现是不能做到唤醒的。所以只能将获取锁的时间间隔调高,避免死循环给系统和DB带来的巨大压力。这样也牺牲了系统的吞吐量,因为总会有一定的间隔锁是空闲的。

用异常去控制逻辑多少优点恶心:就不说了,每次失败都抛异常.....

  1. 利用Mysql行锁的特性:

Mysql是有表锁、页锁和行锁的机制的,可以利用这个机制来实现锁。这里尽量使用行锁,它的吞吐量是最高的。

    /**
     * 超时获取锁
     * @param lockID
     * @param timeOuts
     * @return
     * @throws InterruptedException
     */
    public boolean acquireByUpdate(String lockID, long timeOuts) throws InterruptedException, SQLException {

        String sql = "SELECT id from test_lock where id = ? for UPDATE ";
        long futureTime = System.currentTimeMillis() + timeOuts;
        long ranmain = timeOuts;
        long timerange = 500;
        connection.setAutoCommit(false);
        while (true) {
            CountDownLatch latch = new CountDownLatch(1);
            try {
                PreparedStatement statement = connection.prepareStatement(sql);
                statement.setString(1, lockID);
                statement.setInt(2, 1);
                statement.setLong(1, System.currentTimeMillis());
                boolean ifsucess = statement.execute();//如果成功,那么就是获取到了锁
                if (ifsucess)
                    return true;
            } catch (SQLException e) {
                e.printStackTrace();
            }
            latch.await(timerange, TimeUnit.MILLISECONDS);
            ranmain = futureTime - System.currentTimeMillis();
            if (ranmain <= 0)
                break;
            if (ranmain < timerange) {
                timerange = ranmain;
            }
            continue;
        }
        return false;

    }


    /**
     * 释放锁
     * @param lockID
     * @return
     * @throws SQLException
     */
    public void unlockforUpdtate(String lockID) throws SQLException {
        connection.commit();

    }

利用for update加显式的行锁,这样就能利用这个行级的排他锁来实现分布式锁了,同时unlock的时候只要释放commit这个事务,就能达到释放锁的目的。

优点:实现简单

缺点:连接池爆满和事务超时的问题单点的问题,单点问题,行锁升级为表锁的问题,并发量大的时候请求量太大、没有线程唤醒机制。

连接池爆满和事务超时的问题单点的问题:利用事务进行加锁的时候,query需要占用数据库连接,在行锁的时候连接不释放,这就会导致连接池爆满。同时由于事务是有超时时间的,过了超时时间自动回滚,会导致锁的释放,这个超时时间要把控好。

对于单点问题:同上。

并发量大的时候请求量太大:同上。

行锁升级为表锁的问题:Mysql行锁默认需要走索引,如果不走索引会导致锁表,如果可以,在sql中可以强制指定索引。

缓存分布式锁####

缓存实现分布式锁还是比较常见的,因为缓存比较轻量,并且缓存的响应快、吞吐高。最重要的是还有自动失效的机制来保证锁一定能释放。

缓存的分布式锁主要通过Redis实现,当然其他的缓存也是可以的。关于缓存有两种实现吧:

  1. 基于SetNX实现:
    setNX是Redis提供的一个原子操作,如果指定key存在,那么setNX失败,如果不存在会进行Set操作并返回成功。我们可以利用这个来实现一个分布式的锁,主要思路就是,set成功表示获取锁,set失败表示获取失败,失败后需要重试。

具体看下代码:

import redis.clients.jedis.Jedis;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Redis分布式锁
 */
public class RedisLockTest {

    private Jedis jedisCli = new Jedis("localhost",6381);

    private int expireTime = 1;

    /**
     * 获取锁
     * @param lockID
     * @return
     */
    public boolean lock(String lockID){
        while(true){
            long returnFlag = jedisCli.setnx(lockID,"1");
            if (returnFlag == 1){
                System.out.println(Thread.currentThread().getName() + " get lock....");
                return true;
            }
            System.out.println(Thread.currentThread().getName() + " is trying lock....");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
                return false;
            }
        }
    }

    /**
     * 超时获取锁
     * @param lockID
     * @param timeOuts
     * @return
     */
    public boolean lock(String lockID,long timeOuts){
        long current = System.currentTimeMillis();
        long future = current + timeOuts;
        long timeStep = 500;
        CountDownLatch latch = new CountDownLatch(1);
        while(future > current){
            long returnFlag = jedisCli.setnx(lockID,"1");
            if (returnFlag == 1){
                System.out.println(Thread.currentThread().getName() + " get lock....");
                jedisCli.expire(lockID,expireTime);
                return true;
            }
            System.out.println(Thread.currentThread().getName() + " is trying lock....");
            try {
                latch.await(timeStep, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            current = current + timeStep;
        }
        return false;
    }

    public void unlock(String lockId){
        long flag = jedisCli.del(lockId);
        if (flag>0){
            System.out.println(Thread.currentThread().getName() + " release lock....");
        }else {
            System.out.println(Thread.currentThread().getName() + " release lock fail....");
        }
    }

    /**
     * 线程工厂,命名线程
     */
    public static class MyThreadFactory implements ThreadFactory{
        public static AtomicInteger count = new AtomicInteger();
        @Override
        public Thread newThread(Runnable r) {
            count.getAndIncrement();
            Thread thread = new Thread(r);
            thread.setName("Thread-lock-test "+count);
            return thread;
        }
    }

    public static void main(String args[]){
        final String lockID = "test1";
        Runnable task = () ->{
            RedisLockTest testCli = new RedisLockTest();
            testCli.lock(lockID);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            testCli.unlock(lockID);
        };

        MyThreadFactory factory = new MyThreadFactory();
        ExecutorService services = Executors.newFixedThreadPool(10);
        for (int i = 0;i<3;i++)
            services.execute(factory.newThread(task));
    }

}

看看结果:

pool-1-thread-3 is trying lock....
pool-1-thread-2 get lock....
pool-1-thread-1 is trying lock....
pool-1-thread-3 is trying lock....
pool-1-thread-2 release lock....
pool-1-thread-1 get lock....
pool-1-thread-3 is trying lock....
pool-1-thread-1 release lock....
pool-1-thread-3 get lock....
pool-1-thread-3 release lock....

可以看到,几个线程很好的进行了同步。

这种方式也是有优点和缺点:

优点:实现简单,吞吐量十分客观,对于高并发情况应付自如,自带超时保护,对于网络抖动的情况也可以利用超时删除策略保证不会阻塞所有流程。

缺点:单点问题、没有线程唤醒机制、网络抖动可能会引起锁删除失败。

对单点问题:因为redis一般都是单实例使用,那么对于单点问题,可以做一个主从。当然主从切换的时候也是不可用的,因为主从同步是异步的,可能会并发问题。如果对于主从还是不能保证可靠性的话,可以上Redis集群,对于Redis集群,因为使用了类一致性Hash算法,虽然不能避免节点下线的并发问题(当前的任务没有执行完,其他任务就开始执行),但是能保证Redis是可用的。可用性的问题是出了问题之后的备选方案,如果我们系统天天都出问题还玩毛啊,对于突发情况牺牲一两个请求还是没问题的。

对于线程唤醒机制:分布式锁大多都是这样轮训获取锁的,所以控制住你的重试频率,也不会导致负载特别高的。可能就是吞吐量低点而已。

对于锁删除失败:分布式锁基本都有这个问题,可以对key设置失效时间。这个超时时间需要把控好,过大那么系统吞吐量低,很容易导致超时。如果过小那么会有并发问题,部分耗时时间比较长的任务就要遭殃了。

基于Zookeeper的分布式锁####

Zookeeper是一个分布式一致性协调框架,主要可以实现选主、配置管理和分布式锁等常用功能,因为Zookeeper的写入都是顺序的,在一个节点创建之后,其他请求再次创建便会失败,同时可以对这个节点进行Watch,如果节点删除会通知其他节点抢占锁。

Zookeeper实现分布式锁虽然是比较重量级的,但实现的锁功能十分健全,由于Zookeeper本身需要维护自己的一致性,所以性能上较Redis还是有一定差距的。

Zookeeper实现分布式锁有几种形式,后面会单独的总结一下。

对比:####

Mysql实现比较简单,不需要引入第三个应用,但实现多少有些重,性能不是很好。
Redis的话实现比较简单,同时性能很好,引入集群可以提高可用性。同时定期失效的机制可以解决因网络抖动锁删除失败的问题,所以我比较倾向Redis实现。
Zookeeper实现是有些重的,同时我们还需要维护Zookeeper集群,实现起来还是比较复杂的,实现不好的话还会引起“羊群效应”。如果不是原有系统就依赖Zookeeper,同时压力不大的情况下。一般不使用Zookeeper实现分布式锁。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,580评论 18 139
  • 北京的34度,我从银行回来,明明是周末,却忍不住发了一条,貌似上班的说说:“实习的小白鼠,总被虐,各种虐。...
    微小44烨墨阅读 314评论 0 0
  • 走进孤独且不为人知的内心 一旁鲜花漫溢 一旁布满荆棘 当努力使自己宽松平静之时忽而闻见花香 · 觥筹交错的俗世 被...
    蓁鹿阅读 221评论 0 1
  • 值此新春佳节即将来临之际:河北震大物流有限公司董事长兼总经理王震携全体员工祝大家:鸡年新春快乐!万事如意! 感谢一...
    震大之声阅读 428评论 0 0
  • 版权归作者所有,任何形式转载请联系作者。 作者:半亩方塘(来自豆瓣) 来源:https://www.douban....
    laineyxin阅读 107评论 0 0