Redisson的Github地址:https://github.com/redisson/redisson/wiki/Table-of-Content
1、添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.4.1</version>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
</dependency>
2、新建配置类
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
/**
* @author ZhaoBW
* @version 1.0
* @date 2021/1/25 14:24
*/
@Configuration
public class MyRedissonConfig {
@Bean(destroyMethod="shutdown")
RedissonClient redisson() throws IOException {
//1、创建配置
Config config = new Config();
config.useSingleServer()
.setAddress("192.168.43.129:6379");
return Redisson.create(config);
}
}
3、分布式锁
3.1、可重入锁
基于Redis的Redisson分布式可重入锁RLock对象实现了java.util.concurrent.locks.Lock接口。
@RequestMapping("/redisson")
public String testRedisson(){
//获取分布式锁,只要锁的名字一样,就是同一把锁
RLock lock = redissonClient.getLock("lock");
//加锁(阻塞等待),默认过期时间是30秒
lock.lock();
try{
//如果业务执行过长,Redisson会自动给锁续期
Thread.sleep(1000);
System.out.println("加锁成功,执行业务逻辑");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//解锁,如果业务执行完成,就不会继续续期,即使没有手动释放锁,在30秒过后,也会释放锁
lock.unlock();
}
return "Hello Redisson!";
}
大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
在RedissonLock类的renewExpiration()方法中,会启动一个定时任务每隔30/3=10秒给锁续期。如果业务执行期间,应用挂了,那么不会自动续期,到过期时间之后,锁会自动释放。
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
另外Redisson还提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
如果指定了锁的超时时间,底层直接调用lua脚本,进行占锁。如果超过leaseTime,业务逻辑还没有执行完成,则直接释放锁,所以在指定leaseTime时,要让leaseTime大于业务执行时间。RedissonLock类的tryLockInnerAsync()方法
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
3.2、读写锁
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。在读写锁中,读读共享、读写互斥、写写互斥。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
读写锁测试类,当访问write接口时,read接口会被阻塞住。
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RLock;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
/**
* @author ZhaoBW
* @version 1.0
* @date 2021/1/23 20:12
*/
@RestController
public class TestController {
@Autowired
RedissonClient redissonClient;
@Autowired
StringRedisTemplate redisTemplate;
@RequestMapping("/write")
public String write(){
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("wr-lock");
RLock writeLock = readWriteLock.writeLock();
String s = UUID.randomUUID().toString();
writeLock.lock();
try {
redisTemplate.opsForValue().set("wr-lock-key", s);
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
writeLock.unlock();
}
return s;
}
@RequestMapping("/read")
public String read(){
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("wr-lock");
RLock readLock = readWriteLock.readLock();
String s = "";
readLock.lock();
try {
s = redisTemplate.opsForValue().get("wr-lock-key");
} finally {
readLock.unlock();
}
return s;
}
}
3.3、信号量(Semaphore)
Redisson的分布式信号量与的用法与java.util.concurrent.Semaphore相似
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();
现在redis中保存semaphore的值为3
然后在TestController中添加测试方法:
@RequestMapping("/releaseSemaphore")
public String releaseSemaphore(){
RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
semaphore.release();
return "release success";
}
@RequestMapping("/acquireSemaphore")
public String acquireSemaphore() throws InterruptedException {
RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
semaphore.acquire();
return "acquire success";
}
当访问acquireSemaphore接口时,redis中的semaphore会减1;访问releaseSemaphore接口时,redis中的semaphore会加1。当redis中的semaphore为0时,继续访问acquireSemaphore接口,会被阻塞,直到访问releaseSemaphore接口,使得semaphore>0,acquireSemaphore才会继续执行。
3.4、闭锁(CountDownLatch)
CountDownLatch作用:某一线程,等待其他线程执行完毕之后,自己再继续执行。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
在TestController中添加测试方法,访问close接口时,调用await()方法进入阻塞状态,直到有三次访问release接口时,close接口才会返回。
@RequestMapping("/close")
public String close() throws InterruptedException {
RCountDownLatch close = redissonClient.getCountDownLatch("close");
close.trySetCount(3);
close.await();
return "close";
}
@RequestMapping("/release")
public String release(){
RCountDownLatch close = redissonClient.getCountDownLatch("close");
close.countDown();
return "release";
}