springBoot结合线程池解决多线程问题实录

背景:

线上有一个接口,3台机器总共QPS在3000左右,单机QPS在1000左右,接口响应时间2ms。为了保证接口的任何改动在上线之前能够在大流量下能够没有问题。提出想法,搭建一套流量回放环境,上线之前把代码先部署到流量回放环境。然后将线上的流量导入到流量回放环境,用真实的业务请求来做模拟测试,这个过程我们称作是流量回放。

​ 为了保证流量回放的时候,流量导入过程,不能影响正常的线上接口请求,也就是响应时间不能有变化。首先就要考虑启动一个线程来异步处理这个事情。好,按照这个想法,写了第一版本代码。(以下代码是有问题的,只怪我too young too sample)

第一版本代码写起来:

@RequestMapping(value = "/flags", method = RequestMethod.POST)
@ResponseBody
public ServerResponse getFlagsPost(@RequestBody NewServerParam param) {
    //如果流量回放开关打开,就创建线程,将请求发送到流量回放环境
    if(Constant.TRAFFIC_REPLAY_FLAG){
        Runnable r = ()->{
            HttpClientUtil.post(trafficReplayUrl+"sample/v2/flags", JSON.toJSONString(param),10000,"application/json");
        };
        Thread thread = new Thread(r);
        thread.start();
    }
    long a = System.currentTimeMillis();
    ServerResponse response = new ServerResponse();
    response.setFlags(serverService.getFlags(param, param.getExp_key()));
    long b = System.currentTimeMillis();
    LOGGER.info(logService.parseToJSon(param, response, (b - a), LOGGER, Level.INFO.getName()));
    return response;
}

线下测试没有问题,一上线,大流量上来,服务器瞬间报错上万条,马上回滚。

报错提示:

​ java.lang.OutOfMemoryError: Unable to create new native thread

这个错误的意思是:程序创建的线程数量已达到上限值

剖析错误

​ JVM向操作系统申请创建新的 native thread(原生线程)时, 就有可能会碰到 java.lang.OutOfMemoryError: Unable to create new native thread 错误. 如果底层操作系统创建新的 native thread 失败, JVM就会抛出相应的OutOfMemoryError. 原生线程的数量受到具体环境的限制, 通过一些测试用例可以找出这些限制, 请参考下文的示例. 但总体来说, 导致 java.lang.OutOfMemoryError: Unable to create new native thread 错误的场景大多经历以下这些阶段:

  1. java程序向jvm申请创建一个线程

  2. jvm本地代码(native code)代理该请求,尝试创建一个操作系统级别的native Thread(原生线程)

  3. 操作系统尝试创建一个新的native Thread,需要同时分配一些内存给该线程

  4. 如果操作系统的虚拟内存已经耗尽,或者受到32位进程的地址空间限制(约2-4GB),OS就会拒绝本地内存分配

  5. JVM抛出 java.lang.OutOfMemoryError: Unable to create new native thread 错误。

    参考:https://blog.csdn.net/renfufei/article/details/78088553

改进方案-springBoot整合线程池优化

​ 错误很明显,就是创建线程数量过多,超过OS所能允许的最大空间。那这个问题,完全就可以用线程池去解决,用线程池维护一定数量的线程,防止无限制的创建线程,带来的内存开销过大。

代码改进:

​ 1. 创建线程池配置类

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;


/**
 * @description: 线程池配置类
 **/
@Configuration  //表示这个类是配置类
@EnableAsync    //表示这个类是线程池配置类
public class ExecutorConfig {

    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
    @Bean
    public Executor asyncServiceExecutor(){
        logger.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        int corePoolSize = Runtime.getRuntime().availableProcessors();
        //配置核心线程数
        executor.setCorePoolSize(corePoolSize*2);
        //配置最大线程数
        executor.setMaxPoolSize(corePoolSize*2);
        //配置队列大小
        executor.setQueueCapacity(99999);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-service-");
        // rejection-policy:当pool已经达到max size的时候,并且队列已经满了,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        //DiscardPolicy: 直接丢弃
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        //执行初始化
        executor.initialize();
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }

}

​ 2.创建线程信息打印的类,这样在执行线程池执行excute方法的时候,会把当前的任务的情况打印出来

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @description: 获取线程池的监控信息
 **/
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);

    private void showThreadPoolInfo(String prefix){
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
        if(null == threadPoolExecutor){
            return;
        }
        logger.info("{},{},taskCount[{}],completedTaskCount[{}],activeCount[{}],queuesize[{}]",
                this.getThreadNamePrefix(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size()
                );
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }


}
  1. 创建任务接口
```java
import com.alibaba.fastjson.JSONObject;

/**
 * 异步任务接口
 */
public interface AsyncService {

    void trfficRepalyForFlagV2(String param);

}
```
  1. 创建任务实现类

    ``

    import com.alibaba.fastjson.JSONObject;
    import com.lianjia.platform.sampling.util.HttpClientUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Service;
    
    
    /**
     * @description: 异步任务实现类
     **/
    @Service
    public class AsyncServiceImpl implements AsyncService {
        private final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);
    
        @Value("${URL.trafficReplayUrl}")
        private String trafficReplayUrl; //获取yml文件中配置的流量回放环境的URL
    
        @Override
        @Async("asyncServiceExecutor")//这里要使用定义的线程池配置的Bean的方法名
        public void trfficRepalyForFlagV2(String param) {
            HttpClientUtil.post(trafficReplayUrl+"sample/v2/flags", param,10000,"application/json");
        }
    
    }
    
  2. 使用任务

```java
@RequestMapping(value = "/flags", method = RequestMethod.POST)
@ResponseBody
public ServerResponse getFlagsPost(@RequestBody NewServerParam param) {
    //如果流量回放开关打开,就创建线程,将请求发送到流量回放环境
    if(Constant.TRAFFIC_REPLAY_FLAG){
        asyncService.trfficRepalyForFlagV2(JSON.toJSONString(param));
    }
    long a = System.currentTimeMillis();
    ServerResponse response = new ServerResponse();
    response.setFlags(serverService.getFlags(param, param.getExp_key()));
    long b = System.currentTimeMillis();
    LOGGER.info(logService.parseToJSon(param, response, (b - a), LOGGER, Level.INFO.getName()));
    return response;
}
```

压测结果:

​ 压测数据:并发数50,压测时间10min。并发数=QPS(1000)*响应时间(0.02s)。

​ 之前因为上线之前没有做压测,导致了上线之后,大流量下报错。吃一堑,长一智。这次改完之后做了压测。压测之后,第一,打开流量开关之后,不报错了;第二,主线程平均响应耗时和不开启异步任务时候的平均响应耗时基本一致。证明方案是可以的。

插曲:拒绝策略使用不当导致主线程平均响应时间非常大。第一次在写线程池配置类的时候,使用了executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());的拒绝策略。队列的大小设置了3000,在压测的时候发现并发数50(一般计算是并发数=QPS * 响应时间)左右,这个和线上单台机器的QPS基本接近,虽然不报错了主线程的接口耗时远远超出了不打开开关的接口耗时。通过打印信息来看,是因为提交的任务量非常大,队列中的任务已经把队列填满了,这个时候,从线程池原理来看,要去创建线程数达到maxpoolsize,我们这里设置的maxPoolsize和corePoolsize大小是一样的。意味着就不会再去创建线程了,只能走拒绝策略。这里的拒绝策略CallerRunsPolicy的含义是如果异步线程执行不了,就由调用线程处理,实际上就是主线程来处理,这样就会导致主线程的部分流量回放任务成了同步的了。这当然会增大主线程的接口响应时间了。因为我们只需要有部分线上流量了其实就可以了,因此,我把拒绝策略改为了直接丢弃。这样处理不了的线程不进入队列,也不由主线程执行,保证主线程的响应时间不受影响。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,458评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,030评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,879评论 0 358
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,278评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,296评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,019评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,633评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,541评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,068评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,181评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,318评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,991评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,670评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,183评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,302评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,655评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,327评论 2 358