队列泄洪原理
- 排队有时候比并发更高效,例如 Redis 单线程模型就是个正例;innodb 的 mutex key 就是个反例,所有的线程对一行数据更新,同时竞争一把行锁,没竞争到的线程多进入阻塞队列,完了等锁释放,再又一起竞争行锁就;单线程模型避免了线程切换的开销,反正遇到锁都会等待,还要有切换线程的消耗,不如就等待,把切换线程的消耗省掉;
- 依靠排队去限制并发的流量;
- 依靠排队和下游拥塞窗口的拥塞程度,调整队列,释放流量大小;就是说,在队列的消费端,不一定是一个一个的取,可以一次取多个,交给下游的多线程去处理;取的大小就是拥塞窗口,比如下游可以解决 6 个 TPS 的问题,那么拥塞窗口就是6,一次取 6 个请求;
- 支付宝银行网关队列,就是使用了队列泄洪的原理做的;支付宝将用户的支付请求,存进自己的队列中,然后依据下游银行网关承诺可以承受的 TPS 的流量取泄洪;
队列泄洪实现
- 在 Controller 中定义一个线程池,固定大小,工作线程的数量为 20;
- 下单操作,即 RocketMQ 的事务型消息的发送操作,丢到线程池中完成,意味着,同一时刻,只能有 20 个用户的请求能下去做下单的操作;
package com.lixinlei.miaosha.controller;
import com.alibaba.druid.util.StringUtils;
import com.lixinlei.miaosha.error.BusinessException;
import com.lixinlei.miaosha.error.EmBusinessError;
import com.lixinlei.miaosha.mq.MqProducer;
import com.lixinlei.miaosha.response.CommonReturnType;
import com.lixinlei.miaosha.service.ItemService;
import com.lixinlei.miaosha.service.OrderService;
import com.lixinlei.miaosha.service.PromoService;
import com.lixinlei.miaosha.service.model.OrderModel;
import com.lixinlei.miaosha.service.model.UserModel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.*;
@Controller("order")
@RequestMapping("/order")
@CrossOrigin(allowCredentials = "true", allowedHeaders = "*")
public class OrderController extends BaseController {
private ExecutorService executorService;
@PostConstruct
public void init() {
executorService = Executors.newFixedThreadPool(20);
}
@RequestMapping(value = "/createorder", method = {RequestMethod.POST}, consumes = {CONTENT_TYPE_FORMED})
@ResponseBody
public CommonReturnType createOrder(@RequestParam(name = "itemId") Integer itemId,
@RequestParam(name = "amount") Integer amount,
@RequestParam(name = "promoId", required = false) Integer promoId,
@RequestParam(name = "promoToken", required = false) String promoToken)
throws BusinessException {
// 校验用户是否登录
String token = httpServletRequest.getParameterMap().get("token")[0];
if (StringUtils.isEmpty(token)) {
throw new BusinessException(EmBusinessError.USER_NOT_LOGIN, "用户还未登录,不能下单");
}
UserModel userModel = (UserModel) redisTemplate.opsForValue().get(token);
if (userModel == null) {
throw new BusinessException(EmBusinessError.USER_NOT_LOGIN, "用户还未登录,不能下单");
}
// 校验秒杀令牌是否正确
if (promoId != null) {
String inRedisPromoToken = (String)redisTemplate.opsForValue()
.get("promo_token_" + promoId + "_userid_" + userModel.getId() + "_itemid_" + itemId);
if (inRedisPromoToken == null) {
throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR, "秒杀令牌校验失败");
}
if (!StringUtils.equals(promoToken, inRedisPromoToken)) {
throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR, "秒杀令牌校验失败");
}
}
// 拥塞窗口为 20 的等待队列,用来队列化泄洪,在一个 Tomcat 上,同一时间只能有 20 个请求能下来做下单,其他请求都要排队
Future<Object> future = executorService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
// 在 RocketMQ 的事务型消息中完成下单操作
String stockLogId = itemService.initStockLog(itemId, amount);
if (!mqProducer.transactionAsyncReduceStock(userModel.getId(), promoId, itemId, amount, stockLogId)) {
throw new BusinessException(EmBusinessError.UNKNOWN_ERROR, "下单失败");
}
return null;
}
});
try {
// 倒不是要什么返回值,就是主线程等提交到线程池中的任务执行完,好给前端响应;
future.get();
} catch (InterruptedException e) {
throw new BusinessException(EmBusinessError.UNKNOWN_ERROR);
} catch (ExecutionException e) {
throw new BusinessException(EmBusinessError.UNKNOWN_ERROR);
}
return CommonReturnType.create(null);
}
}