【前言】
目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。说起分布式的概念,首当其冲就是CAP理论,即满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)。但是CAP理论告诉我们,任何系统只能满足其中两个,”所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。
在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。有的时候,我们需要保证一个方法在同一时间内只能被同一个线程执行。在单机环境中,java提供了很多并发的API来可以处理类似此问题,但是这些API在分布式场景中就无能为力了。也就是说单纯的Api并不能提供分布式锁的能力。所以针对分布式锁的实现目前有多种方案。
谈到分布式锁其实质就是一直在分布式环境下确保数据一致性的思想或者解决方案,任何手段或者措施只要能在多节点环境下保证数据的一致性我们就可以称其为分布式锁。保证各节点对资源的访问有序性。这里重点在“锁”,即当某个节点获取到锁时,才有对此资源的访问权限,与此同时,其他节点是没有的,同一时刻只有获取到锁的节点才有操作权。
【三种方式】
实现分布式锁的方式有很多种,最常用的为以下三种:
1:mysql实现
2:redis实现
3:zookeeper实现
其中各有有缺,今天主要分享redis的实现方式。
【业务场景】
场景一:分布式项目多节点部署要同步mysql的数据到ES中(每个节点都有定时同步任务)
场景二:MQ消息消费,多节点会造成重复消费
如果不设置锁处理将会出现各个节点都会去重复同步,这无疑是一种资源的浪费,同时也不利于数据的安全性和一致性。
【实现】
1、redis配置 略
2、锁
/**
* @Author: LvFang
* @Date: Created in 2018/9/17.
* @Description:
*/
@Service
public class RedisService {
private Logger logger = LoggerFactory.getLogger(getClass());
/**
* 存储到redis中的锁标志
*/
private static final String LOCKED = "LOCKED";
/**
* 默认请求锁的超时时间(ms 毫秒)
*/
private static final long TIME_OUT = 200;
/**
* 默认锁的有效时间(s)
*/
public static final int EXPIRE = 60*5;
/**
* 锁flag
*/
private volatile boolean isLocked = false;
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 默认锁时间60s
*
* @param key
* @return
*/
public boolean lock(String key) {
return lock(key, EXPIRE, TIME_OUT);
}
/**
* @param key
* @param expireTime 锁时间,单位秒
* @return
*/
public boolean lock(String key, int expireTime) {
return lock(key, expireTime, TIME_OUT);
}
/**
* @param key 锁定key
* @param expireTime 锁过期时间 (秒)
* @param timeOut 请求锁超时时间 (毫秒)
* @return
*/
public boolean lock(String key, int expireTime, long timeOut) {
// 系统当前时间,纳秒
long nowTime = System.nanoTime();
logger.info("key = {}, lock start time = {}.", key, nowTime / 1000000);
// 请求锁超时时间,纳秒
long timeout = timeOut * 1000000;
final Random random = new Random();
// 不断循环向Master节点请求锁,当请求时间(System.nanoTime() - nano)超过设定的超时时间则放弃请求锁
// 这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间
// 如果一个master节点不可用了,应该尽快尝试下一个master节点
synchronized (this) {
while ((System.nanoTime() - nowTime) < timeout) {
// 将锁作为key存储到redis缓存中,存储成功则获得锁
if (redisTemplate.opsForValue().setIfAbsent(key, LOCKED)) {
isLocked = true;
// 设置锁的有效期,也是锁的自动释放时间,也是一个客户端在其他客户端能抢占锁之前可以执行任务的时间
// 可以防止因异常情况无法释放锁而造成死锁情况的发生
redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);
// 上锁成功结束请求
break;
}
// 获取锁失败时,应该在随机延时后进行重试,避免不同客户端同时重试导致谁都无法拿到锁的情况出现
// 睡眠10毫秒后继续请求锁
try {
Thread.sleep(10, random.nextInt(50000));
} catch (InterruptedException e) {
logger.error("获取分布式锁休眠被中断:", e);
}
}
}
logger.info("key = {}, lock end time = {} ,spend time = {}ms.", key, nowTime / 1000000, (System.nanoTime() - nowTime) / 1000000);
return isLocked;
}
public boolean isLock(String key) {
//redisTemplate.getConnectionFactory().getConnection().time();
return redisTemplate.hasKey(key);
}
public void unlock(String key) {
// 释放锁
// 不管请求锁是否成功,只要已经上锁,客户端都会进行释放锁的操作
if (isLock(key)) {
long startTime = System.currentTimeMillis();
logger.info("unlock key = {} start.", key, startTime);
redisTemplate.delete(key);
long endTime = System.currentTimeMillis();
logger.info("unlock key = {} , end, spend time = {}ms.", key, (endTime - startTime));
}
}
}
3、定时任务所处理
/**
* @Author: LvFang
* @Date: Created in 2018/8/21.
* @Description:实时导入数据至ES服务(同步给总分片)
*/
@Service
public class RealTimeToEsService {
Logger logger = LoggerFactory.getLogger(RealTimeToEsService.class);
@Autowired
private NlpInfoService nlpInfoService;
@Autowired
private JestService jestService;
@Autowired
private RedisService redisService;
public final static long SECOND = 1 * 1000;
public final static String BE_ADDES_JOB_NODE_LOCK = "BE_ADDES_JOB_NODE_LOCK";//数据同步ES锁
/**
* 定时5分钟进行一次数据同步
* @throws Exception
*/
@Scheduled(initialDelay = SECOND * 30,fixedDelay = SECOND * 60 * 5)
public void fixedDelayNlpInfoToES() throws Exception {
try {
if(redisService.lock(BE_ADDES_JOB_NODE_LOCK)){
//进行mysql-ES的数据同步
}else{
logger.info("未获取到执行锁,不做同步更新job!");
}
} catch (Exception e) {
logger.info(e.getMessage(), e);
} finally {
// 一定要释放锁
redisService.unlock(BE_ADDES_JOB_NODE_LOCK);
}
}
4、模拟测试
本地环境无法模拟多个节点,所以我们开启两个不同的项目来模拟两个节点,只需要保证他们的锁名称一样即可
我们可以看到两个不同的节点(项目)在获取同一把锁时只会有一个成功,其他均会失败。到此一个简单的redis实现分布式锁的demo就完成了。
【参阅】
https://blog.csdn.net/baidu_29609961/article/details/81128943