一、准备
//记录 成功记录
CREATE TABLE `product_record` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`product_id` varchar(20) NOT NULL COMMENT '商品id',
`user_id` varchar(20) NOT NULL COMMENT '买主id',
`num` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 COMMENT='购买记录表';
//库存表
CREATE TABLE `product_storage` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`product_id` varchar(20) NOT NULL COMMENT '商品ID',
`amount` bigint(20) NOT NULL COMMENT '库存数',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=latin1;
并发测试工具 JMeter
二、单机并发控制
@Transactional
public synchronized String buySth(String userId, String productId) {
int num = random.nextInt(10) + 1;
// 查库存
ProductStorage storage = storageMapper.getByProductId(productId);
Long oldAmount = storage.getAmount();
if (num > oldAmount) {
log.info(userId + " request " + num + " fail");
return userId + " request " + num + " fail";
}
storage.setAmount(oldAmount - num);
log.info(userId + " request " + num + " " + productId + "当前 " + oldAmount);
// 减库存
int res = storageMapper.updateByPrimaryKeySelective(storage);
if (res > 0) {
ProductRecord productRecord = new ProductRecord();
productRecord.setProductId(productId);
productRecord.setUserId(userId);
productRecord.setNum(num);
recordMapper.insert(productRecord);
return userId + "request " + num + " success" + " 当前 " + oldAmount;
} else {
return userId + "request " + num + " fail";
}
}
tips
:sychronized
加在静态方法上,表示sychronized(类.class){} 。加在非静态方法上等同于对整个方法体使用sychroniced(this){}。
为什么会出现锁不住的情况呢,还是有多扣的情况。这是因为我们使用了@Transactional
注解加入了事务控制。Spring的事务控制是通过AOP实现的,在业务开始前后会添加/提交事务。流程大概如下:
- 开启事务(aop)
- 加锁(进入synchronized方法)
- 释放锁(退出synchronized方法)
- 提交事务(aop)
所以T2进入的时候,T1的事务还没有提交,就可能导致读到的还是T2秒杀前的库存。试着注释掉@Transactional
注释再试一下。
可以看到不会有重复扣减的情况。
针对 Spring事务提交与锁释放不同步的问题,可以使用Spring手动事务控制方式解决:
@Autowired
private PlatformTransactionManager platformTransactionManager;
@Autowired
private TransactionDefinition transactionDefinition;
引入事务控制管理类。
// @Transactional
public synchronized String buySth(String userId, String productId) {
int num = random.nextInt(10) + 1;
// 查库存
ProductStorage storage = storageMapper.getByProductId(productId);
Long oldAmount = storage.getAmount();
if (num > oldAmount) {
log.info(userId + " request " + num + " fail");
return userId + " request " + num + " fail";
}
//获取事务状态对象,开启事务
TransactionStatus transaction = dataSourceTransactionManager.getTransaction(transactionDefinition);
try {
storage.setAmount(oldAmount - num);
log.info(userId + " request " + num + " " + productId + " 当前 " + oldAmount);
// 减库存
int res = storageMapper.updateByPrimaryKeySelective(storage);
if (res > 0) {
ProductRecord productRecord = new ProductRecord();
productRecord.setProductId(productId);
productRecord.setUserId(userId);
productRecord.setNum(num);
recordMapper.insert(productRecord);
//事务提交
dataSourceTransactionManager.commit(transaction);
return userId + "request " + num + " success" + " 当前 " + oldAmount;
} else {
//事务回滚
dataSourceTransactionManager.rollback(transaction);
return userId + "request " + num + " fail";
}
} catch (Exception e) {
dataSourceTransactionManager.rollback(transaction);
} finally {
return userId + "request " + num + " fail";
}
}
这样也是可以实现单机 并发控制的。
上面的例子说明synchronize
关键字是可以解决并发问题的。但是这种方式会有很大的缺陷:
-
synchronize
不够灵活,一次性锁住了一整个方法,如果我们都是有多个商品的话,会被一次性全部锁住,效率太低。 -
synchronize
只可以锁住当前的项目实例,如果项目发展到集群部署,就失去作用了。
为了解决上面的问题,就引入了分布式锁的概念。
二、使用Redis 分布式锁模拟秒杀
@Transactional
public String buySth(String userId, String productId) {
int num = random.nextInt(10) + 1;
//获取锁
Boolean lock = redis.opsForValue().setIfAbsent("skill::" + productId, userId, 3, TimeUnit.SECONDS);
if (lock) {
try {
// 查库存
ProductStorage storage = storageMapper.getByProductId(productId);
Long oldAmount = storage.getAmount();
if (num > oldAmount) {
return userId + " request " + num + " fail";
}
storage.setAmount(oldAmount - num);
log.info(userId + " request " + num + " " + productId + " 当前 " + oldAmount + " success");
// 减库存
int res = storageMapper.updateByPrimaryKeySelective(storage);
if (res > 0) {
ProductRecord productRecord = new ProductRecord();
productRecord.setProductId(productId);
productRecord.setUserId(userId);
productRecord.setNum(num);
recordMapper.insert(productRecord);
return userId + "request " + num + " success" + " 当前 " + oldAmount;
} else {
return userId + "request " + num + " fail";
}
} finally {
String currUser = redis.opsForValue().get("skill::" + productId);
if (userId.equals(currUser)) {
redis.delete("skill" + productId);
}
}
} else {
// log.info(userId + "request lock fail");
return userId + "request " + num + " fail";
}
}
一个简单的 nginx 配置请求分发