MySql如何实现分布式锁

本篇我们使用mysql实现一个分布式锁。
环境:mysql8,navicat,maven,springboot2.3.11,mybatis-plus

分布式锁的功能

1,分布式锁使用者位于不同的机器中,锁获取成功之后,才可以对共享资源进行操作

2,锁具有重入的功能:即一个使用者可以多次获取某个锁

3,获取锁有超时的功能:即在指定的时间内去尝试获取锁,超过了超时时间,如果还未获取成功,则返回获取失败

4,能够自动容错,比如:A机器获取锁lock1之后,在释放锁lock1之前,A机器挂了,导致锁lock1未释放,结果会lock1一直被A机器占有着,遇到这种情况时,分布式锁要能够自动解决,可以这么做:持有锁的时候可以加个持有超时时间,超过了这个时间还未释放的,其他机器将有机会获取锁

预备技能:乐观锁

通常我们修改表中一条数据过程如下:

t1:select获取记录R1
t2:对R1进行编辑
t3:update R1

我们来看一下上面的过程存在的问题:

如果A、B两个线程同时执行到t1,他们俩看到的R1的数据一样,然后都对R1进行编辑,然后去执行t3,最终2个线程都会更新成功,后面一个线程会把前面一个线程update的结果给覆盖掉,这就是并发修改数据存在的问题。

我们可以在表中新增一个版本号,每次更新数据时候将版本号作为条件,并且每次更新时候版本号+1,过程优化一下,如下:

t1:打开事务start transaction
t2:select获取记录R1,声明变量v=R1.version
t3:对R1进行编辑
t4:执行更新操作
    update R1 set version = version + 1 where user_id=#user_id# and version = #v#;
t5:t4中的update会返回影响的行数,我们将其记录在count中,然后根据count来判断提交还是回滚
    if(count==1){
        //提交事务
        commit;
    }else{
        //回滚事务
        rollback;
    }

上面重点在于步骤t4,当多个线程同时执行到t1,他们看到的R1是一样的,但是当他们执行到t4的时候,数据库会对update的这行记录加锁,确保并发情况下排队执行,所以只有第一个的update会返回1,其他的update结果会返回0,然后后面会判断count是否为1,进而对事务进行提交或者回滚。可以通过count的值知道修改数据是否成功了。

上面这种方式就乐观锁。我们可以通过乐观锁的方式确保数据并发修改过程中的正确性。

使用mysql实现分布式锁

我们创建一个分布式锁表,如下

DROP TABLE IF EXISTS t_lock;
create table t_lock(
  lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT '锁唯一标志',
  request_id varchar(64) NOT NULL DEFAULT '' COMMENT '用来标识请求对象的',
  lock_count INT NOT NULL DEFAULT 0 COMMENT '当前上锁次数',
  timeout BIGINT NOT NULL DEFAULT 0 COMMENT '锁超时时间',
  version INT NOT NULL DEFAULT 0 COMMENT '版本号,每次更新+1'
)COMMENT '锁信息表';

java代码如下
mapper接口

package com.shiguiwu.springmybatis.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.shiguiwu.springmybatis.lock.model.LockModel;
import org.springframework.stereotype.Repository;

/**
 * @description: 锁mapper
 * @author: stone
 * @date: Created by 2021/5/30 11:12
 * @version: 1.0.0
 * @pakeage: com.shiguiwu.springmybatis.mapper
 */
@Repository
public interface LockMapper extends BaseMapper<LockModel> {

}

锁对象model

package com.shiguiwu.springmybatis.lock.model;

import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.annotation.Version;
import lombok.Data;

/**
 * @description: 锁模型
 * @author: stone
 * @date: Created by 2021/9/10 11:13
 * @version: 1.0.0
 * @pakeage: com.shiguiwu.springmybatis.lock.model
 */
@Data
@TableName("t_lock")
public class LockModel {

    /**
     * 锁的唯一值
     */
    @TableId
    private String lockKey;

    /**
     * 请求id,同一个线程里请求id一样
     */
    private String requestId;

    //锁次数
    private Integer lockCount;

    //锁超时
    private Long timeout;

    //乐观锁版本
    @Version
    private Integer version;
}

锁接口

package com.shiguiwu.springmybatis.lock;

/**
 * @description: 锁接口
 * @author: stone
 * @date: Created by 2021/9/10 11:40
 * @version: 1.0.0
 * @pakeage: com.shiguiwu.springmybatis.lock
 */
public interface ILock<T> {

    /**
     * 获取分布式锁,支持重入
     * @param lockKey 锁可以
     * @param lockTimeout  持有锁的有效时间,防止死锁
     * @param getTimeout 获取锁超时时间,
     * @return 是否锁成功
     */
    public boolean lock(String lockKey, long lockTimeout, int getTimeout) throws Exception;

    /**
     * 解锁
     * @param lockKey 锁key
     *
     */
    public void unlock(String lockKey);

    /**
     * 重置锁对象
     * @param t 锁对象
     * @return 返回锁记录
     */
    public int restLock(T t);

}

锁的实现代码如下

package com.shiguiwu.springmybatis.lock;

import cn.hutool.core.util.StrUtil;
import com.shiguiwu.springmybatis.lock.model.LockModel;
import com.shiguiwu.springmybatis.mapper.LockMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @description: mysql实现分布式锁
 * @author: stone
 * @date: Created by 2021/9/10 11:09
 * @version: 1.0.0
 * @pakeage: com.shiguiwu.springmybatis.lock
 */
@Component
@Slf4j
public class MysqlLock implements ILock<LockModel>{

    static ThreadLocal<String> requestIds = new ThreadLocal<>();



    @Autowired
    private LockMapper lockMapper;


    public String getRequestId() {
        String requestId = requestIds.get();
        if (StrUtil.isBlank(requestId)) {
            requestId = UUID.randomUUID().toString();
            requestIds.set(requestId);
        }
        log.info("获取到的requestId===> {}", requestId);
        return requestId;

    }

    /**
     *  获取锁
     * @param lockKey 锁可以
     * @param lockTimeout  持有锁的有效时间,防止死锁
     * @param getTimeout 获取锁超时时间,
     * @return
     */
    @Override
    public boolean lock(String lockKey, long lockTimeout, int getTimeout) throws Exception {
        log.info(" lock start =======================> {}",lockKey);
        //从local中获取 请求id
        String requestId = this.getRequestId();

        //获取锁的结果
        boolean lockResult = false;

        //开始时间
        long startTime = System.currentTimeMillis();

        while (true) {
            LockModel lockModel = lockMapper.selectById(lockKey);
            if (Objects.nonNull(lockModel)) {

                //获取锁对象的请求id
                String reqId = lockModel.getRequestId();
                //如果是空,表示改锁未被占有
                if (StrUtil.isBlank(reqId)) {
                    //马上占有它
                    //设置请求id
                    lockModel.setRequestId(requestId);
                    //设置锁次数
                    lockModel.setLockCount(1);
                    //设置超时时间,防止死锁
                    lockModel.setTimeout(System.currentTimeMillis() + lockTimeout);
                    if (lockMapper.updateById(lockModel) == 1) {
                        lockResult = true;
                        break;
                    }

                }
                //如果request_id和表中request_id一样表示锁被当前线程持有者,此时需要加重入锁
                else if (requestId.equals(reqId)) {
                    //可重入锁
                    lockModel.setTimeout(System.currentTimeMillis() + lockTimeout);
                    //设置获取初次
                    lockModel.setLockCount(lockModel.getLockCount() + 1);
                    if (lockMapper.updateById(lockModel) == 1) {
                        lockResult = true;
                        break;
                    }
                }

                //不为空,也不相等,说明是其他线程占有
                else {
                    //锁不是自己的,并且已经超时了,则重置锁,继续重试
                    if (lockModel.getTimeout() < System.currentTimeMillis()) {
                        //未超时,继续重试
                        this.restLock(lockModel);
                    }
                    //如果未超时,休眠100毫秒,继续重试
                    else {
                        if (startTime + getTimeout > System.currentTimeMillis()) {
                            TimeUnit.MILLISECONDS.sleep(100);
                        }
                        else {
                            //防止长时间阻塞
                            break;

                        }
                    }
                }
            }
            //如果是空,就插入一个锁,重新尝试获取锁
            else {
                lockModel = new LockModel();
                //设置锁key
                lockModel.setLockKey(lockKey);
                lockMapper.insert(lockModel);
            }
        }
        log.info(" lock end =======================> {}",lockKey);
        return lockResult;
    }

    /**
     * 释放锁
     * @param lockKey 锁key
     */
    @Override
    public void unlock(String lockKey) {
        LockModel lockModel = lockMapper.selectById(lockKey);
        //获取当前线程的请求id
        String reqId = this.getRequestId();

        //获取锁次数
        int count = 0;
        //当前线程requestId和库中request_id一致 && lock_count>0,表示可以释放锁
        if (Objects.nonNull(lockModel)
                && reqId.equals(lockModel.getRequestId())
                && (count = lockModel.getLockCount()) > 0) {
            if (count == 1) {
                //重置锁
                this.restLock(lockModel);
            }
            //重入锁的问题,锁的次数减一
            else {
                lockModel.setLockCount(lockModel.getLockCount() - 1);
                //更新次数
                lockMapper.updateById(lockModel);
            }
        }
    }

    /**
     * 重置锁
     * @param lockModel 锁对象
     * @return 更新条数
     */
    @Override
    public int restLock(LockModel lockModel) {
        lockModel.setLockCount(0);
        lockModel.setRequestId("");
        lockModel.setTimeout(0L);
        return lockMapper.updateById(lockModel);
    }

}

上面代码中实现了文章开头列的分布式锁的所有功能,大家可以认真研究下获取锁的方法:lock,释放锁的方法:unlock。

测试用例

package com.shiguiwu.springmybatis;

import com.shiguiwu.springmybatis.lock.ILock;
import com.shiguiwu.springmybatis.lock.model.LockModel;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * @description: 锁测试
 * @author: stone
 * @date: Created by 2021/9/10 15:32
 * @version: 1.0.0
 * @pakeage: com.shiguiwu.springmybatis
 */
@SpringBootTest
@Slf4j
public class LockApplicationTests {

    @Autowired
    private ILock<LockModel> mysqlLock;


    ////测试重复获取和重复释放
    @Test
    public void testRepeat() throws Exception {
        for (int i = 0; i < 10; i++) {
            mysqlLock.lock("key1", 10000L, 1000);
        }

        for (int i = 0; i < 10; i++) {
            mysqlLock.unlock("key1");
        }
    }

    //    //获取之后不释放,超时之后被thread1获取
    @Test
    public void testTimeout() throws Exception {
        String lockKey = "key2";
        mysqlLock.lock(lockKey, 5000L, 1000);
        Thread thread1 = new Thread(() -> {
            try {
                mysqlLock.lock(lockKey, 5000L, 7000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                mysqlLock.unlock(lockKey);

            }
        }, "thread1");

        thread1.start();
        thread1.join();

    }

}

test1方法测试了重入锁的效果。
test2测试了主线程获取锁之后一直未释放,持有锁超时之后被thread1获取到了

留给大家一个问题

上面分布式锁还需要考虑一个问题:比如A机会获取了key1的锁,并设置持有锁的超时时间为10秒,但是获取锁之后,执行了一段业务操作,业务操作耗时超过10秒了,此时机器B去获取锁时可以获取成功的,此时会导致A、B两个机器都获取锁成功了,都在执行业务操作,这种情况应该怎么处理?大家可以思考一下然后留言,我们一起讨论一下。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,544评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,430评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,764评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,193评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,216评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,182评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,063评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,917评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,329评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,543评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,722评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,425评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,019评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,671评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,825评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,729评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,614评论 2 353

推荐阅读更多精彩内容