分布式锁的实现原理
#上面的命令的意思是设置lock_mjw的值为1 过期时间10秒钟,set时如果lock_mjw已经有值,
#设置失败,并返回0,如果没有值,设置成功,并返回1.
#EX设置过期时间 NX:设置值时判断key是否已经有值,如果有值不做修改并返回0,如果没有值设置成功,返回1.
#为什么加过期时间,因为如果不加过期时间,某个业务系统获取锁成功后,还未释放锁的情况下
#挂掉了,就会导致lock_mjw的值永远存在,从而导致其他业务系统和线程永远无法获取锁成功。
#加了过期时间及时业务系统在未释放锁的情况下挂掉了,过一段时间lock_mjw会被清理掉,其他线程或应用也可以获取到该所。
192.168.29.134:7001> set lock_mjw 1 EX 10 NX
上面的分布式锁还不够完美,会存在什么问题呢?请看下图:
上图的问题是,如果锁在还没有执行完业务代码时就已经失效;并发访问的线程2就会在线程1还没执行
完业务代码就已经获取锁成功,会导致锁起不了锁的作用,为了解决以上问题,需要对lock_mjw时间进行续约,直接上代码:
@Controller
@Api(tags = "RedisTestController", description = "redis测试")
@RequestMapping("/redis")
public class RedisTestController {
@Autowired
private RedisLockUtil redisLockUtil;
@RequestMapping(value = "/send5",method = RequestMethod.POST)
@ApiOperation("发送消息到消息队列")
@ResponseBody
@ApiImplicitParam(name = "key", value = "键", defaultValue = "mju",paramType = "query")
public CommonResult<String> send5(@RequestParam String key) throws InterruptedException {
try{
redisLockUtil.lock(key);
System.out.println("成功获取锁..正在执行业务代码....");
Thread.sleep(5000L);
}finally {
System.out.println("业务代码执行完毕,释放锁....");
redisLockUtil.unlock(key);
}
return CommonResult.success("马军伟","返回正确");
}
}
package com.luban.mall.search.controller;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@Component
public class RedisLockUtil {
private static final String PREX_LOCK="lock_";
private static AtomicBoolean isStart = new AtomicBoolean(false);
private byte[] lock = new byte[1];
@Autowired
private JedisPool jedisPool;
@Autowired
private JedisCluster cluster;
/**
* 加锁
* @param key
*/
@SneakyThrows
public void lock(String key){
//循环尝试加锁直到加锁成功
while(true){
//尝试加锁
if(tryLock(key)){
break;
}else{
Thread.sleep(100l);
}
}
}
/**
* 尝试加锁
* @param key
* @return
*/
private boolean tryLock(String key) {
//设置lock_key的值到redis中,失效时间20秒,设置时,如果lock_key不存在,会设置成功返回:ok,
//如果已存在值,则设置失败,返回null
String ok = cluster.set(PREX_LOCK + key, "1", "NX", "EX", 5);
if (ok == null) {
return false;
} else {
//设置值成功,相当于获取锁成功,获取锁成功后需要定时对锁的过期时间进行续约,防止锁过期
//需要开一个守护线程(主线程完成后,守护线程也会跟着结束)来做这个事儿
renewalTime(key);
return true;
}
}
private void renewalTime(String key) {
//把锁记录到redis的set集合lock_中
cluster.sadd(PREX_LOCK,PREX_LOCK+key);
if(isStart.get()){
return;
}
synchronized (lock){
if(isStart.get()){
return;
}
isStart.set(true);
Thread t = new Thread(){
@SneakyThrows
@Override
public void run() {
while (true){
//从redis的set集合lock_中获取生成的锁
Set<String> locks = cluster.smembers(PREX_LOCK);
for(String redlock: locks){
Long ttl = cluster.ttl(redlock);
if(ttl==null || ttl<0){
cluster.srem(PREX_LOCK,PREX_LOCK+key);
}else if(ttl<5){
//续约
cluster.expire(redlock, ttl.intValue()+1);
System.out.println("续约:"+redlock+":"+ttl.intValue()+1);
}
}
Thread.sleep(1000l);
}
}
};
t.setDaemon(true);
t.start();
}
}
/**
* 释放锁
* @param key
*/
public void unlock(String key) {
//删除锁
cluster.del(PREX_LOCK+key);
//从锁集合中移除
cluster.srem(PREX_LOCK,PREX_LOCK+key);
}
}
package com.luban.mall.search.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.HashSet;
import java.util.Set;
@Configuration
@PropertySource(value={"classpath:jedis.properties"})
public class RedisConfiguration {
@Value("${redis.node.maxTotal}")
private Integer maxTotal;
@Value("${redis.node.host}")
private String host;
@Value("${redis.node.port}")
private Integer port;
@Value("${redis.node.password}")
private String passwd;
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory){
RedisTemplate<String,Object> template = new RedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
/*
* 序列化后会产生java类型说明,如果不需要用“Jackson2JsonRedisSerializer”
* 和“ObjectMapper ”配合效果更好
*/
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
@Bean //这个注解注入工厂的名称是方法名
public JedisPool jedisPool(){
JedisPoolConfig jedisPoolConfig = jedisPoolConfig();
return new JedisPool(jedisPoolConfig, host, port, 2000, passwd, 0, (String)null);
}
public JedisPoolConfig jedisPoolConfig(){ //这个是修改redis性能的时候需要的对象
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(maxTotal);
return jedisPoolConfig;
}
@Bean
public JedisCluster jedisCluster(){
Set<HostAndPort> set = new HashSet<HostAndPort>();
set.add(new HostAndPort("192.168.29.134",7001));
set.add(new HostAndPort("192.168.29.134",7002));
set.add(new HostAndPort("192.168.29.134",7003));
set.add(new HostAndPort("192.168.29.134",7004));
set.add(new HostAndPort("192.168.29.134",7005));
set.add(new HostAndPort("192.168.29.134",7006));
JedisCluster cluster = new JedisCluster(set,5000,5000,20,"123456",jedisPoolConfig());
return cluster;
}
}
#jedis.properties
#资源池中最大连接数
redis.node.maxTotal=10
redis.node.host=192.168.29.134
redis.node.port=6379
redis.node.password=123456
#资源池中允许的最大空闲连接数
redis.node.maxIdle=10
#资源池确保的最少空闲连接数
redis.node.minIdle=0
#当资源池用尽后,调用者是否要等待。只有当值为true时,下面的maxWaitMillis才会生效。
redis.node.blockWhenExhausted=true
#当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)
redis.node.maxWaitMillis=5000
#向资源池借用连接时是否做连接有效性检测(ping)。检测到的无效连接将会被移除。
#业务量很大时候建议设置为false,减少一次ping的开销。
redis.node.testOnBorrow=false
#向资源池归还连接时是否做连接有效性检测(ping)。检测到无效连接将会被移除。
#业务量很大时候建议设置为false,减少一次ping的开销。
redis.node.testOnReturn=false
#是否开启JMX监控
redis.node.jmxEnabled=false
建议开启,请注意应用本身也需要开启。
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.1.10.RELEASE</version>
<scope>compile</scope>
</dependency>
<!--redis客户端-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
单机情况下redis节点挂掉的情况下会存在锁失效的问题,
为解决这个问题:可以在不同节点上加多个锁,其中有一个存在锁定状态就认为是锁定状态,
这就会解决单机版redis锁失效问题。
利用redisson框架实现高可用分布式锁
@Configuration
@PropertySource(value={"classpath:jedis.properties"})
public class RedisConfiguration {
@Bean
public Redisson redisson(){
Config config= new Config();
config.useClusterServers()
.addNodeAddress("redis://192.168.29.134:7001")
.addNodeAddress("redis://192.168.29.134:7002")
.addNodeAddress("redis://192.168.29.134:7003")
.addNodeAddress("redis://192.168.29.134:7004")
.addNodeAddress("redis://192.168.29.134:7005")
.addNodeAddress("redis://192.168.29.134:7006")
.setPassword("123456");
Redisson redisson = (Redisson) Redisson.create(config);
return redisson;
}
}
package com.luban.mall.search.controller;
import org.redisson.Redisson;
import org.redisson.RedissonRedLock;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RedisLockUtil2 {
@Autowired
private Redisson redisson;
public void lock(String key){
RLock lock1 = redisson.getLock(key+0);
RLock lock2 = redisson.getLock(key+1);
RLock lock3 = redisson.getLock(key+2);
RedissonRedLock lock = new RedissonRedLock(lock1,lock2,lock3);
lock.lock();
}
public void unlock(String key){
RLock lock1 = redisson.getLock(key+0);
RLock lock2 = redisson.getLock(key+1);
RLock lock3 = redisson.getLock(key+2);
RedissonRedLock lock = new RedissonRedLock(lock1,lock2,lock3);
lock.unlock();
}
}
@RequestMapping(value = "/send6",method = RequestMethod.POST)
@ApiOperation("发送消息到消息队列")
@ResponseBody
@ApiImplicitParams({
@ApiImplicitParam(name = "key", value = "键", defaultValue = "mju",paramType = "query"),
@ApiImplicitParam(name = "flag", value = "键", defaultValue = "1",paramType = "query")
})
public CommonResult<String> send6(@RequestParam String key,@RequestParam String flag) throws InterruptedException {
try{
redisLockUtil2.lock(key);
System.out.println("成功获取锁..正在执行业务代码....key:"+key+"----flag:"+flag);
Thread.sleep(50000L);
}finally {
System.out.println("业务代码执行完毕,释放锁....");
redisLockUtil2.unlock(key);
}
return CommonResult.success("马军伟","返回正确");
}
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.10.7</version>
</dependency>