redis连接池代码略过
实现类并没有使用lua,而是使用了JDK自带的System.currentTimeMillis(),因此要求分布式各台服务器直接时间同步,否则会出现超时时间错误地相互覆盖问题
redis实现类
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
/**
* redis分布式锁
*/
public class RedisLock {
private static Logger logger = Logger.getLogger(RedisLock.class);
/**
* 锁等待时间,防止线程饥饿
*/
private int TIMEOUT_MSECS = 10 * 1000;
/**
* 锁超时时间,防止线程在入锁以后,无限的执行等待
*/
private int EXPIRE_MSECS = 60 * 1000;
private static int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;
/**
* Lock key path.
*/
private String lockKey;
/**
* Java本地与redis对应的锁
*/
private volatile boolean locked = false;
private static Jedis jedis;
/**
* 构造器
* 默认锁等待时间10*1000ms,锁超时时间60*1000ms
* @param jedis jedis连接池
* @param lockKey 锁的key名称
*/
public RedisLock(Jedis jedis, String lockKey) {
this.jedis = jedis;
this.lockKey = lockKey + "_lock";
}
/**
* 构造器
* 默认锁超时时间60*1000ms
* @param jedis jedis连接池
* @param lockKey 锁的key名称
* @param timeoutMsecs 锁超时时间
*/
public RedisLock(Jedis jedis, String lockKey, int timeoutMsecs) {
this(jedis, lockKey);
this.TIMEOUT_MSECS = timeoutMsecs;
}
/**
* 默认构造器
* @param jedis
* @param lockKey
* @param timeoutMsecs
* @param expireMsecs
*/
public RedisLock(Jedis jedis, String lockKey, int timeoutMsecs, int expireMsecs) {
this(jedis, lockKey, timeoutMsecs);
this.EXPIRE_MSECS = expireMsecs;
}
/**
* @return lock key
*/
public String getLockKey() {
return lockKey;
}
/**
* 根据key获取值
*
* @param key
* @return
*/
private String get(final String key) {
Object obj = null;
try {
obj = jedis.get(key);
} catch (Throwable e) {
}
return obj != null ? obj.toString() : null;
}
/**
* setNX
* @param key
* @param value
* @return
*/
private boolean setNX(final String key, final String value) {
Object obj = null;
try {
obj = jedis.setnx(key, value);
} catch (Throwable e) {
logger.error(String.format("setNX redis error, key : %s", key), e);
}
return obj != null ? (Boolean) obj : false;
}
/**
* getSet
* @param key
* @param value
* @return
*/
private String getSet(final String key, final String value) {
Object obj = null;
try {
obj = jedis.getSet(key, value);
} catch (Throwable e) {
logger.error(String.format("getSet redis error, key : %s", key), e);
}
return obj != null ? (String) obj : null;
}
/**
* 获得 lock.
* 实现思路: 主要是使用了redis 的setnx命令,缓存了锁.
* reids缓存的key是锁的key,所有的共享, value是锁的到期时间(注意:这里把过期时间放在value了,没有时间上设置其超时时间)
* 执行过程:
* 1.通过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回,成功获得锁
* 2.锁已经存在则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值
*
* @return true if lock is acquired, false acquire timeouted
* @throws InterruptedException in case of thread interruption
*/
public synchronized boolean lock() throws InterruptedException {
int timeout = TIMEOUT_MSECS;
while (timeout >= 0) {
long expires = System.currentTimeMillis() + EXPIRE_MSECS + 1;
String expiresStr = String.valueOf(expires); //锁到期时间
if (this.setNX(lockKey, expiresStr)) {
// lock acquired
locked = true;
return true;
}
String currentValueStr = this.get(lockKey); //redis里的时间
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
//判断是否为空,不为空的情况下,如果被其他线程设置了值,则第二个条件判断是过不去的
// lock is expired
String oldValueStr = this.getSet(lockKey, expiresStr);
//获取上一个锁到期时间,并设置现在的锁到期时间,
//只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的
if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
//防止误删(覆盖,因为key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,但是因为什么相差了很少的时间,所以可以接受
//[分布式的情况下]:如果这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁
// lock acquired
locked = true;
return true;
}
}
timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;
/*
延迟100 毫秒, 这里使用随机时间可能会好一点,可以防止饥饿进程的出现,即,当同时到达多个进程,
只会有一个进程获得锁,其他的都用同样的频率进行尝试,后面有来了一些进行,也以同样的频率申请锁,这将可能导致前面来的锁得不到满足.
使用随机的等待时间可以一定程度上保证公平性
*/
Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);
}
return false;
}
/**
* Acqurired lock release.
*/
public synchronized void unlock() {
if (locked) {
jedis.del(lockKey);
locked = false;
}
}
}
测试类
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
/**
* 测试类
*/
public class main {
public static void main(String[] args) {
final RedisLock redisLock = new RedisLock(RedisUtil.getJedis(), "test");
final CountDownLatch down = new CountDownLatch(1);
for (int i = 0; i < 30; i++) {
new Thread(new Runnable() {
public void run() {
try {
down.await();
redisLock.lock();
} catch (Exception e) {
}
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
String orederNo = sdf.format(new Date());
System.out.println("生成的订单号为:" + orederNo);
try {
redisLock.unlock();
} catch (Exception e) {
}
}
}).start();
}
down.countDown();
}
}
测试结果
[QC] INFO [main] RedisUtil.getJedis(77) | init Redis success
生成的订单号为:15:51:44|643
生成的订单号为:15:51:44|643
生成的订单号为:15:51:44|643
生成的订单号为:15:51:44|643
生成的订单号为:15:51:44|643
生成的订单号为:15:51:44|643
生成的订单号为:15:51:44|648
生成的订单号为:15:51:44|655
生成的订单号为:15:51:44|657
生成的订单号为:15:51:44|659
生成的订单号为:15:51:44|661
生成的订单号为:15:51:44|664
生成的订单号为:15:51:44|665
生成的订单号为:15:51:44|669
生成的订单号为:15:51:44|676
生成的订单号为:15:51:44|678
生成的订单号为:15:51:44|680
生成的订单号为:15:51:44|686
生成的订单号为:15:51:44|688
生成的订单号为:15:51:44|690
生成的订单号为:15:51:44|693
生成的订单号为:15:51:44|695
生成的订单号为:15:51:44|697
生成的订单号为:15:51:44|699
生成的订单号为:15:51:44|702
生成的订单号为:15:51:44|713
生成的订单号为:15:51:44|714
生成的订单号为:15:51:44|723
生成的订单号为:15:51:44|725
生成的订单号为:15:51:44|729
Process finished with exit code 0
对比之下,ZooKeeper的测试类
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
/**
* curator分布式锁实现
*/
public class Recipes_lock {
static String lock_path = "/curator_recipes_lock_path";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
client.start();
final InterProcessMutex lock = new InterProcessMutex(client, lock_path);
final CountDownLatch down = new CountDownLatch(1);
for (int i = 0; i < 30; i++) {
new Thread(new Runnable() {
public void run() {
try {
down.await();
lock.acquire();
} catch (Exception e) {
}
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
String orederNo = sdf.format(new Date());
System.out.println("生成的订单号为:" + orederNo);
try {
lock.release();
} catch (Exception e) {
}
}
}).start();
}
down.countDown();
}
}
ZooKeeper的测试结果
生成的订单号为:15:32:33|728
生成的订单号为:15:32:33|781
生成的订单号为:15:32:33|849
生成的订单号为:15:32:33|879
生成的订单号为:15:32:33|897
生成的订单号为:15:32:33|972
生成的订单号为:15:32:34|013
生成的订单号为:15:32:34|057
生成的订单号为:15:32:34|130
生成的订单号为:15:32:34|197
生成的订单号为:15:32:34|232
生成的订单号为:15:32:34|282
生成的订单号为:15:32:34|367
生成的订单号为:15:32:34|404
生成的订单号为:15:32:34|425
生成的订单号为:15:32:34|461
生成的订单号为:15:32:34|482
生成的订单号为:15:32:34|515
生成的订单号为:15:32:34|550
生成的订单号为:15:32:34|573
生成的订单号为:15:32:34|600
生成的订单号为:15:32:34|624
生成的订单号为:15:32:34|647
生成的订单号为:15:32:34|670
生成的订单号为:15:32:34|715
生成的订单号为:15:32:34|752
生成的订单号为:15:32:34|774
生成的订单号为:15:32:34|793
生成的订单号为:15:32:34|811
生成的订单号为:15:32:34|836
对比两个结果来看
redis
生成的订单号为:15:51:44|643
...
生成的订单号为:15:51:44|729
ZooKeeper
生成的订单号为:15:32:33|728
...
生成的订单号为:15:32:34|836