线程池的使用

回顾线程的基本知识

我们最常见的创建线程的几种方法:一是继承Thread类,二是实现Runnable的接口,三是实现Callable接口。单个线程的创建和销毁会消耗机器资源,如果线程数量多的话,频繁的创建和销毁会大大的降低程序运行的效率,耗费大量的内存,因为正常来说线程执行完毕后死亡,会被当作垃圾回收。怎么能让线程复用、统一管理命名线程、提高资源的使用率?线程池就能完美的解决这个问题。

为什么使用线程池

线程池是一种多线程处理形式,处理过程中将任务添加到队列,线程池在工程启动时并不会立即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务。执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。
简而言之,线程池就是一组线程的集合。

线程池实现原理

SpringBoot配置线程池

首先在启动类中配置如下Bean

   @Bean("defaultThreadPool")
   public ThreadPoolExecutor defaultThreadPool() {
        log.info("start asyncServiceExecutor");

        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HiveMetric-pool-%d").build();

        //  核心线程数
        int corePoolSize = 10;

        //  配置最大线程数
        int maximumPoolSize = 20;

        //  当线程数大于内核数时,这是多余的空闲线程将在终止之前等待新任务的最长时间
        long keepAliveTime = 2L;

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
                TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), threadFactory, new ThreadPoolExecutor.AbortPolicy());
        log.info("线程池,{}", threadPoolExecutor);

        return threadPoolExecutor;
    }

1)首先创建一个ThreadFactoryBuilder给线程统一命名,方便管理及检索日志;
2)通过ThreadPoolExecutor来创建一个线程池,该类来自于JUC包下,下面介绍一下重要的几个参数;
corePoolSize:核心线程数量
maximumPoolSize:线程池中的线程总数=核心线程数+非核心线程数
keepAliveTime:当线程数大于内核数时,多余的空闲线程将在终止之前等待新任务的最长时间。
TimeUnit:时间单位
BlockingQueue<Runnable>:在执行任务之前用于保留任务的队列。
ThreadFactory:线程工厂
RejectedExecutionHandler:因为达到了线程界限和队列容量而在执行被阻止时使用的处理器。线程池中的线程已经用完了,无法继续为新任务服务,同时,等待队列也已经排满了,再也塞不下新任务了。这时候我们就需要拒绝策略机制合理的处理这个问题。JDK 内置的拒绝策略如下:

  • AbortPolicy : 直接抛出异常,阻止系统正常运行。
  • CallerRunsPolicy : 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的
    任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
  • DiscardOldestPolicy : 丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再
    次提交当前任务。
  • DiscardPolicy : 该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢
    失,这是最好的一种方案。
    以上内置拒绝策略均实现了 RejectedExecutionHandler 接口,若以上策略仍无法满足实际需要,完全可以自己扩展RejectedExecutionHandler 接口。
    3)在service层通过如下方法注入
    @Autowired
    private ThreadPoolExecutor defaultThreadPool;

ThreadPoolExecutor继承自AbstractExecutorService,下面看一下submit()。

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

最后通过Future<V>get()的方法取出线程的执行结果。

        for (HostEntity hostEntity : hiveServer2Info) {
            String hostName = hostEntity.getHostName();
            Future<JSONObject> submit = defaultThreadPool.submit(() -> stringConvertJsonObject(hostName, getJmxFileContent(hostName, metricFileLocation)));
            map.put(hostName, submit);
        }

        for (Map.Entry<String, Future<JSONObject>> next : map.entrySet()) {
            String hostName = next.getKey();
            try {
                result.put(hostName, next.getValue().get());
            } catch (Exception e) {
                log.error("线程池获取返回结果异常!主机名:{}", hostName, e);
            }
        }

之前从多台服务器解析文件由串行改为线程池并行的解决方法,初步优化之后线上接口的响应时间由10s+降为1s。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容