数据类型
主业务数据(Master Data)
- 比如 ItemModel,记录了商品的主数据;
- 比如 item_stock 记录了商品库存的主数据;
操作型数据(Log Data)
- 比如库存扣减这样的操作发生了,需要把操作本身的过程记录下来,用于支持这种记录的数据,就是操作型数据;
- 记录下操作型数据是为了追踪,比如库存流水的操作状态,可以根据这个状态做回滚,或者查询正在处理中的状态,使得很多异步的动作,比如下单之前,先落一条待扣减的库存操作的流水,成功之后,再把流水的状态设置为成功,或者下单失败时候,将对应的库存操作的流水设置为失败,可以根据这个操作流水的状态反查,得到异步动作进行的当前状态;
库存操作流水模型构建
创建库存流水表 stock_log
- 顺便生成 mybatis-generator 的一套;
CREATE TABLE `stock_log` (
`stock_log_id` varchar(64) COLLATE utf8_unicode_ci NOT NULL,
`item_id` int NOT NULL DEFAULT '0',
`amount` int NOT NULL DEFAULT '0',
`status` int NOT NULL COMMENT '1 表示初始状态, 2 表示下单减库存成功, 3 表示下单回滚',
PRIMARY KEY (`stock_log_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
下单前生成库存流水
- 用户的下单请求到来时,先生成一个库存流水记录,库存流水的 status 设置成 1;
- 下单的时候,把库存流水号带上;
@RequestMapping(value = "/createorder", method = {RequestMethod.POST}, consumes = {CONTENT_TYPE_FORMED})
@ResponseBody
public CommonReturnType createOrder(@RequestParam(name = "itemId") Integer itemId,
@RequestParam(name = "amount") Integer amount,
@RequestParam(name = "promoId", required = false) Integer promoId) throws BusinessException {
String token = httpServletRequest.getParameterMap().get("token")[0];
if (StringUtils.isEmpty(token)) {
throw new BusinessException(EmBusinessError.USER_NOT_LOGIN, "用户还未登录,不能下单");
}
UserModel userModel = (UserModel) redisTemplate.opsForValue().get(token);
if (userModel == null) {
throw new BusinessException(EmBusinessError.USER_NOT_LOGIN, "用户还未登录,不能下单");
}
String stockLogId = itemService.initStockLog(itemId, amount);
if(!mqProducer.transactionAsyncReduceStock(userModel.getId(), promoId, itemId, amount, stockLogId)) {
throw new BusinessException(EmBusinessError.UNKNOWN_ERROR, "下单失败");
}
return CommonReturnType.create(null);
}
下单成功后,修改库存流水的 status 为 2;
- 整个下单逻辑要经历的步骤有:1. 查询验证,2. 减库存,3. 下单,4. 增销量,5. 设置库存流水状态;
// 5. 设置库存流水状态为成功
StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
if (stockLogDO == null) {
throw new BusinessException(EmBusinessError.UNKNOWN_ERROR);
}
stockLogDO.setStatus(2);
stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO);
本地事务(下单)失败,要设置库存流水的 status 为 3;
- 设置完了让 Broker 中的 Prepared 状态的消息回滚;
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object args) {
// 真正要执行的操作:创建订单
Integer userId = (Integer)((Map) args).get("userId");
Integer itemId = (Integer)((Map) args).get("itemId");
Integer promoId = (Integer)((Map) args).get("promoId");
Integer amount = (Integer)((Map) args).get("amount");
String stockLogId = (String)((Map) args).get("stockLogId");
try {
OrderModel orderModel = orderService.createOrder(userId, itemId, promoId, amount, stockLogId);
} catch (BusinessException e) {
e.printStackTrace();
// 设置对应的 stockLog 为回滚状态
StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
stockLogDO.setStatus(3);
stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
Broker 长时间没收到 Commit 提交要走反查逻辑
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 根据是否扣减库存成功,来判断要返回 COMMIT, ROLLBACK, 还是继续 UNKNOW
String jsonString = new String(msg.getBody());
Map<String, Object> map = JSON.parseObject(jsonString, Map.class);
String stockLogId = (String) map.get("stockLogId");
StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
if (stockLogDO == null) {
return LocalTransactionState.UNKNOW;
}
if (stockLogDO.getStatus().intValue() == 2) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (stockLogDO.getStatus().intValue() == 1) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}