在Java中,关于锁我想大家都很熟悉。在并发编程中,我们通过锁,来避免由于竞争而造成的数据不一致问题。通常我们以进程锁synchronized 、Lock来实现它,对于分布式程序,就不能用进程锁了,这时候常用的是分布式锁。
什么是分布式锁
分布式锁,是一种思想,它的实现方式有很多。比如,我们将沙滩当做分布式锁的组件,那么它看起来应该是这样的:
加锁
在沙滩上踩一脚,留下自己的脚印,就对应了加锁操作。其他进程或者线程,看到沙滩上已经有脚印,证明锁已被别人持有,则等待。
解锁
把脚印从沙滩上抹去,就是解锁的过程。
锁超时
为了避免死锁,我们可以设置一阵风,在单位时间后刮起,将脚印自动抹去。
分布式锁的实现有很多,比如基于数据库、memcached、Redis、系统文件、zookeeper等。它们的核心的理念跟上面的过程大致相同。基于数据库可以用乐观锁和悲观锁处理分页式锁,乐观锁使用对比记录version号来实现,悲观锁使用类似“select * where * for update”行锁实现。
本文讨论的是基于redis实现分页式锁的问题,别的方面不做详说,有相关需求可以参考和查阅别的资料。
Redis分布式锁原理
加锁
加锁实际上就是在redis中,给Key键设置一个值,为避免死锁,并给定一个过期时间。
SET lock_key random_value NX PX 5000
值得注意的是:
random_value 是客户端生成的唯一的字符串。
NX 代表只在键不存在时,才对键进行设置操作。
PX 5000 设置键的过期时间为5000毫秒。
这样,如果上面的命令执行成功,则证明客户端获取到了锁。
解锁
解锁的过程就是将Key键删除。但也不能乱删,不能说客户端1的请求将客户端2的锁给删除掉。这时候random_value的作用就体现出来。
为了保证解锁操作的原子性,我们用LUA脚本完成这一操作。先判断当前锁的字符串是否与传入的值相等,是的话就删除Key,解锁成功。
if redis.call('get',KEYS[1]) == ARGV[1] then
return redis.call('del',KEYS[1])
else
return 0
end
jedis实现(单节点)
/**
* 获取分布式锁:一分命令,保证事务的一致性。
* @param lockKey
* @param requestId
* @param expireTime
* @return
*/
public static boolean getDistributeLock(String lockKey, String requestId, long expireTime) {
Jedis jedis = null;
try {
jedis = getResource();
String result = jedis.set(lockKey,requestId,"NX","PX",expireTime);
if ("OK".equals(result)) {
return true;
}
} catch (Exception e) {
logger.error("getDistributeLock {}", lockKey, e);
} finally {
returnResource(jedis);
}
return false;
}
/**
* 释放分布式锁:使用lua脚本,一个命令实现对带有标志的锁的释放
* @param lockKey
* @param requestId
* @return
*/
public static boolean releaseDistributeLock(String lockKey, String requestId) {
Jedis jedis = null;
try {
jedis = getResource();
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
Long RELEASE_SUCCESS = 1L;
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
} catch (Exception e) {
logger.error("releaseDistributeLock {}", lockKey, e);
} finally {
returnResource(jedis);
}
return false;
}
注意:这里的requestId,类似客户端口请求id,每次请求都是不同的可以使用uuid,测试和使用可以参考后面的”测试和说明“部分。
缺点:在集群包括主从、哨兵模式、集群模式不可用;锁不具有可重入性。
redisson实现(通用)
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
Redisson底层采用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本。它里面也实现了分布式锁,而且包含多种类型的锁:可重入锁,公平锁等。
具体实现如下:
JedisUtil提供
//从配置类中获取redisson对象
private static Redisson redisson = JedisConfig.getRedisson();
//加锁 Redisson:适用单机、主从、哨兵和集群
//同步方法,等待锁返回执行 所以涉及锁使用的,可以放在线程池中进行
public static boolean acquire(String lockName){
//声明key对象
String key = lockName;
//获取锁对象
RLock mylock = redisson.getLock(key);
//加锁,并且设置锁过期时间,防止死锁的产生
mylock.lock(2, TimeUnit.MINUTES); // 分钟
//加锁成功
return true;
}
//锁的释放 Redisson:适用单机、主从、哨兵和集群
//同步方法,等待锁返回执行 所以涉及锁使用的,可以放在线程池中进行
public static void release(String lockName){
//必须是和加锁时的同一个key
String key = lockName;
//获取所对象
RLock mylock = redisson.getLock(key);
//释放锁(解锁)
mylock.unlock();
}
JedisConfig提供
private static Config config = new Config();
//声明redisso对象
private static Redisson redisson = null;
static{
//可以用"redis://"来启用SSL连接
if (IS_CLUSTER.equals(CLUSTER_USED)) {//集群
log.info("Redisson redis lock init cluster config:"+server1+";"+server2+";"+server3+";"+server4+";"+server5+";"+server6);
config.useClusterServers().addNodeAddress(
"redis://".concat(server1),"redis://".concat(server2), "redis://".concat(server3),
"redis://".concat(server4),"redis://".concat(server5), "redis://".concat(server6)).setScanInterval(5000);
} else {//单机
log.info("Redisson redis lock init single node config:"+server1+";"+server2+";"+server3+";"+server4+";"+server5+";"+server6);
config.useSingleServer().setAddress("redis://".concat(poolHost).concat(":").concat(poolPort));
}
//得到redisson对象
redisson = (Redisson) Redisson.create(config);
}
/**
* Redisson redis分布式锁处理对象
* @return
*/
public static Redisson getRedisson() {
return redisson;
}
测试和说明
测试和使用,可以参考下面的junit测试用例。
@Slf4j
public class JedisUtilTest extends SpringTxTestCase {
private static Logger logger = LoggerFactory.getLogger(JedisUtils.class);
/**
* 单机版本:加解锁功能
*/
@Test
public void testSingleRedisLockAndUnlock(){
JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE);
{
for (int i = 0; i < 5; i++) {
boolean result = JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE);
System.out.println(Thread.currentThread().getName()+":lock result:"+result);
JedisUtils.releaseDistributeLock("lockKey","requestId");
boolean result1 = JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE);
System.out.println(Thread.currentThread().getName()+":unlock result1:"+result1);
}
}
}
/**
* 单机版本:锁测试
*/
@Test
public void testSingleRedisLock(){
{
final CyclicBarrier cbRef = new CyclicBarrier(10);
final ReentrantLock reentrantLock=new ReentrantLock();
for(int i=0;i<10;i++){
Thread t= new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "准备");
cbRef.await();//10个线程等待在这里 才开始执行下面的
//reentrantLock.lock();
//tryGetDistributedLock("hello","hello",10000);
boolean result = JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE);
System.out.println(Thread.currentThread().getName()+"===lock result:"+result);
JedisUtils.releaseDistributeLock("lockKey",UUID.randomUUID().toString());
boolean result1 = JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE);
System.out.println(Thread.currentThread().getName()+"===lock result1:"+result);
} catch (Exception e) {
e.printStackTrace();
}finally {
//reentrantLock.unlock();
}
}
});
t.start();
}
//这一段可以不要
try {
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + "起跑");
System.out.println( cbRef.getParties()+"--" +cbRef.getNumberWaiting());
} catch (Exception e){
e.printStackTrace();
}
}
}
/**
* 单机版本redis:测试分布式锁的使用方法
*/
@Test
public void testUseOfSingleRedisLock() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(10);
String data2Deal = "data to deal";
final CyclicBarrier cbRef = new CyclicBarrier(10);
for(int i=0;i<10;i++){
Thread t= new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "准备");
try {
cbRef.await();//10个线程等待在这里 才开始执行下面的+
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
final ReentrantLock reentrantLock=new ReentrantLock();
reentrantLock.lock();
try {
String data2Deal = "data to deal:" + Thread.currentThread().getName();
useOfSingleRedisLock(data2Deal);
} catch (Exception e){
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
countDownLatch.countDown();
}
});
t.start();
}
countDownLatch.await();
System.out.println("所有线程都执行完了……");
}
/**
* 分布式锁的使用方法:单机redis cluster包括(集群和哨兵)不适用。
* @param data2Deal
*/
public void useOfSingleRedisLock(String data2Deal){
String requestId = UUID.randomUUID().toString();
if(JedisPoolUtils.getDistributeLock("lock_key", requestId, 1000*60*5)){
try {
methonNeedDisLock(data2Deal);
} catch (Exception e) {
logger.error("分布式锁业务处理失败!",e);
e.printStackTrace();
} finally {
JedisPoolUtils.releaseDistributeLock("lock_key",requestId);
}
} else {
try {
Thread.sleep(1000);
useOfSingleRedisLock(data2Deal);
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
}
}
/*
* 需要分布式锁的业务代码
*/
public void methonNeedDisLock(String data2Deal){
System.out.println("分布式锁业务处理方法:"+data2Deal);
}
/**
* 测试分布式锁(Redisson)的使用方法:redis单机和哨兵、集群都适用
* 测试说明:开启1000个线程,对count进行累加
*/
int count = 0;
@Test
public void testRedisLock() throws InterruptedException {
int clientcount =1000;
final CountDownLatch countDownLatch = new CountDownLatch(clientcount);
ExecutorService executorService = Executors.newFixedThreadPool(clientcount);
long start = System.currentTimeMillis();
for (int i = 0;i<clientcount;i++){
executorService.execute(new Runnable() {
@Override
public void run() {
//通过Snowflake算法获取唯一的ID字符串
String id = UUID.randomUUID().toString();
System.out.println("request id:"+id);
try {
JedisPoolUtils.getDistributeLock("lock_key", id, 1000*60*5);
count++;
}finally {
JedisPoolUtils.releaseDistributeLock("lock_key",id);
}
countDownLatch.countDown();
}
});
}
//控制程序结束
countDownLatch.await();
long end = System.currentTimeMillis();
System.out.println("执行线程数:{"+clientcount+"},总耗时:{"+(end-start)+"},count数为:{"+count+"}");
//logger.info("执行线程数:{},总耗时:{},count数为:{}",clientcount,end-start,count);
}
@Test
public void testRedissonLock() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(2);
System.out.println("测试程序:开始运行……");
Thread t1 = new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
String key = "test123";
System.out.println(Thread.currentThread().getName()+": t1 start");
//加锁
boolean result = JedisUtils.acquire(key);
System.out.println(Thread.currentThread().getName()+": t1 get lock time :"+new Date().toLocaleString());
System.out.println(Thread.currentThread().getName()+": t1 get lock:"+result);
Thread.sleep(50000);
System.out.println(Thread.currentThread().getName()+":t1 sleep 50000.");
System.out.println(Thread.currentThread().getName()+":t1 修改了数据库");
//释放锁
JedisUtils.release(key);
System.out.println(Thread.currentThread().getName()+" t1 lock release time:"+new Date().toLocaleString());
countDownLatch.countDown();
}
});
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
String key = "test123";
System.out.println(Thread.currentThread().getName()+": t2 start");
//加锁
boolean result = JedisUtils.acquire(key);
System.out.println(Thread.currentThread().getName()+": t2 get lock time :"+new Date().toLocaleString());
System.out.println(Thread.currentThread().getName()+": t2 get lock:"+result);
System.out.println(Thread.currentThread().getName()+":t2 修改了数据库");
//释放锁
JedisUtils.release(key);
System.out.println(Thread.currentThread().getName()+" t2 lock release time:"+new Date().toLocaleString());
countDownLatch.countDown();
}
});
t2.start();
countDownLatch.await();
System.out.println("测试程序:完成运行。");
}
@Test
public void testRedissonLockSyn() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(2);
System.out.println("测试程序:开始运行……");
Thread t1 = new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
String key = "test123";
System.out.println(Thread.currentThread().getName()+": t1 start");
System.out.println(Thread.currentThread().getName()+": t1 sleep 50000.");
Thread.sleep(50000);
System.out.println(Thread.currentThread().getName()+": t1 ready for lock.");
//加锁
boolean result = JedisUtils.acquire(key);
System.out.println(Thread.currentThread().getName()+": t1 get lock time :"+new Date().toLocaleString());
System.out.println(Thread.currentThread().getName()+": t1 get lock:"+result);
System.out.println(Thread.currentThread().getName()+":t1 修改了数据库");
//释放锁
JedisUtils.release(key);
System.out.println(Thread.currentThread().getName()+" t1 lock release time:"+new Date().toLocaleString());
countDownLatch.countDown();
}
});
t1.start();
Thread t2 = new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
String key = "test123";
System.out.println(Thread.currentThread().getName()+": t2 start");
boolean result = JedisUtils.acquire(key);
System.out.println(Thread.currentThread().getName()+": t2 get lock time :"+new Date().toLocaleString());
System.out.println(Thread.currentThread().getName()+": t2 get lock:"+result);
System.out.println(Thread.currentThread().getName()+": t2 sleep 500000.");
Thread.sleep(500000);
//释放锁
JedisUtils.release(key);
System.out.println(Thread.currentThread().getName()+" t2 lock release time:"+new Date().toLocaleString());
countDownLatch.countDown();
}
});
t2.start();
countDownLatch.await();
System.out.println("测试程序:完成运行。");
}
@Test
public void testUseOfRedissonLock() throws InterruptedException {
int clientcount = 1000;
final CountDownLatch countDownLatch = new CountDownLatch(clientcount);
ExecutorService executorService = Executors.newFixedThreadPool(5);
long start = System.currentTimeMillis();
for (int i = 0;i<clientcount;i++){
executorService.execute(new Runnable() {
@Override
public void run() {
JedisUtils.acquire("lockName");
count++;
JedisUtils.release("lockName");
countDownLatch.countDown();
}
});
}
//控制程序结束
countDownLatch.await();
long end = System.currentTimeMillis();
System.out.println("执行线程任务总数:{"+clientcount+"},总耗时:{"+(end-start)+"},count数为:{"+count+"}");
}
}
参考
https://blog.csdn.net/u014353343/article/details/88921212