一、高并发处理思想
建议先看看高并发的处理思想,脑子里先有个全局概念。
高并发处理的5个思想
二、令牌桶算法
以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。令牌桶允许请求某种程度的突发传输。这是令牌桶区别于漏桶的地方。
三、代码示例
pom.xml依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.1-jre</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
1、请求速率限制注解类
package com.hello;
import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;
@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RequestRateLimitAnnotation {
/**
* 固定令牌个数
* @return
*/
double limitNum();
/**
* 获取令牌超时时间
* @return
*/
long timeout();
/**
* 单位-默认毫秒
* @return
*/
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
/**
* 无法获取令牌时的错误信息
* @return
*/
String errMsg() default "请求太频繁!";
}
2、aop拦截器
package com.hello;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Aspect
@Component
public class RequestRateLimitAspect {
/**
* 使用url做为key,存放令牌桶 防止每次重新创建令牌桶
*/
private Map<String, RateLimiter> limitMap = Maps.newConcurrentMap();
// 拦截带RequestRateLimitAnnotation注解的接口
@Pointcut("@annotation(com.hello.RequestRateLimitAnnotation)")
public void execMethod() {}
@Around("execMethod()")
public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable {
//获取请求uri
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
String reqUrl=request.getRequestURI();
// 获取令牌
RequestRateLimitAnnotation rateLimiter = this.getRequestRateLimiter(joinPoint);
if (Objects.nonNull(rateLimiter)) {
RateLimiter limiter = getRateLimiter(reqUrl, rateLimiter);
// 请求获取令牌,参数为等待超时时间
boolean acquire = limiter.tryAcquire(rateLimiter.timeout(), rateLimiter.timeUnit());
if (!acquire) {
return new Response(200, reqUrl.concat(rateLimiter.errMsg())).toString();
}
}
//获得令牌,继续执行
return joinPoint.proceed();
}
/**
* 获取RateLimiter
* @return
*/
private RateLimiter getRateLimiter(String reqUrl, RequestRateLimitAnnotation rateLimiter) {
RateLimiter limiter = limitMap.get(reqUrl);
if (Objects.isNull(limiter)) {
synchronized (this) {
limiter = limitMap.get(reqUrl);
if (Objects.isNull(limiter)) {
// 创建一个限流器,参数代表每秒生成的令牌数
limiter = RateLimiter.create(rateLimiter.limitNum());
limitMap.put(reqUrl, limiter);
log.info("RequestRateLimitAspect请求{},创建令牌桶,容量{} 成功!!!", reqUrl, rateLimiter.limitNum());
}
}
}
return limiter;
}
/**
* 获取注解对象
* @param joinPoint 对象
* @return ten LogAnnotation
*/
private RequestRateLimitAnnotation getRequestRateLimiter(final JoinPoint joinPoint) {
Method[] methods = joinPoint.getTarget().getClass().getDeclaredMethods();
String name = joinPoint.getSignature().getName();
if (!StringUtils.isEmpty(name)) {
for (Method method : methods) {
RequestRateLimitAnnotation annotation = method.getAnnotation(RequestRateLimitAnnotation.class);
if (!Objects.isNull(annotation) && name.equals(method.getName())) {
return annotation;
}
}
}
return null;
}
}
3、控制器类
package com.hello;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/test")
public class TestController {
@RequestRateLimitAnnotation(limitNum = 2, timeout = 10)
@GetMapping("/rate_limit")
public String testRateLimit() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
/**
* 测试代码
*/
return "success";
}
}
4、测试类
package com.hello;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class AccessClient {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
/**
* get请求
* @param realUrl
* @return
*/
public static String sendGet(URL realUrl) {
String result = "";
BufferedReader in = null;
try {
// 打开和URL之间的连接
URLConnection connection = realUrl.openConnection();
// 设置通用的请求属性
connection.setRequestProperty("accept", "*/*");
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("user-agent",
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
// 建立实际的连接
connection.connect();
// 定义 BufferedReader输入流来读取URL的响应
in = new BufferedReader(new InputStreamReader(
connection.getInputStream()));
String line;
while ((line = in.readLine()) != null) {
result += line;
}
} catch (Exception e) {
System.out.println("发送GET请求出现异常!" + e);
e.printStackTrace();
}
// 使用finally块来关闭输入流
finally {
try {
if (in != null) {
in.close();
}
} catch (Exception e2) {
e2.printStackTrace();
}
}
return result;
}
public void access() throws Exception{
final URL url = new URL("http://localhost:8080/test/rate_limit");
for(int i=0;i<4;i++) {
fixedThreadPool.submit(new Runnable() {
public void run() {
System.out.println(sendGet(url));
}
});
}
fixedThreadPool.shutdown();
fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
public static void main(String[] args) throws Exception{
AccessClient accessClient = new AccessClient();
accessClient.access();
}
}
测试结果:
Response(code=200, msg=/test/rate_limit请求太频繁!)
success
success
success
结果分析:
部分请求由于获取的令牌可以成功执行,其余请求没有拿到令牌,我们可以根据实际业务来做区分处理。还有一点要注意,我们通过RateLimiter.create(2.0)配置的是每一秒2枚令牌,但是限流的时候发出的是3枚,改用其他值验证,也是实际的比配置的大1。
四、RateLimiter源码解析
待补充。
五、qps多大合适?
一直再说高并发,多少QPS才算高并发?
Web开发中,什么级别才算是高并发
总结我比较关心的几点:
1、如果某个系统的日pv在千万级别以上,他就可能是一个高并发的系统。
2、PV和QPS
比如微博每天1亿多pv的系统一般也就1500QPS,5000QPS峰值。
比如有人说:
2C4G机器单机一般1000QPS。
8C8G机器单机可承受7000QPS。
3、具体多少QPS跟业务强相关,只读接口读缓存,将压力给到缓存单机3000+没问题,写请求1000+也正常,也复杂些可能也就几百+QPS。
所以QPS和业务场景和设计相关性很大,比如可以通过浏览器本地缓存,用缓存做热点数据查询,写事务MQ异步处理等方式提升QPS。