线程池设计

使用线程池的场合

  • 单个任务处理时间短
  • 将需处理的任务数量大

使用Java线程池好处

1、使用new Thread()创建线程的弊端:

  • 每次通过new Thread()创建对象性能不佳。
  • 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
  • 缺乏更多功能,如定时执行、定期执行、线程中断。
    2、使用Java线程池的好处:
  • 重用存在的线程,减少对象创建、消亡的开销,提升性能。
  • 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
  • 提供定时执行、定期执行、单线程、并发数控制等功能。

一般一个简单线程池至少包含下列组成部分。

  • 线程池管理器(CustomThreadPool):用于创建并管理线程池
  • 工作线程(CustomWorker): 线程池中线程
  • 任务接口(基类)(CustomTask):每个任务必须实现的接口(或继承的基类),以供工作线程调度任务的执行。
  • 任务队列:用于存放没有处理的任务。提供一种缓冲机制。
    详细代码

线程池管理器至少有下列功能:创建线程池,销毁线程池,添加新任务
线程池的代码如下:

package com.tao.threadpool;

import lombok.extern.slf4j.Slf4j;

import java.util.Optional;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;

/**
 * @author: Penger
 * @time: 2019/7/23
 * @description: <p>
 * 自定义线程池简单实现:
 * 1. 线程池大小
 * 2. 创建、销毁线程池
 * 3. 工作线程
 * 4. 任务队列
 * </p>
 **/
@Slf4j
class CustomThreadPool {
    /**
     * 状态,是否在运行
     */
    private volatile Boolean running = true;
    /**
     * 任务缓存队列
     */
    private final BlockingQueue<CustomTask> taskQueue;
    /**
     * worker的执行需要thread
     */
    private Vector<CustomWorker> workers = new Vector<>();
    /**
     * 最大线程数
     */
    private int maxPoolSize;
    /**
     * 当前线程数
     */
    private int workSize;

    CustomThreadPool(int maxPoolSize, BlockingQueue<CustomTask> taskQueue) {
        this.maxPoolSize = maxPoolSize;
        this.workSize = 0;
        this.taskQueue = taskQueue;
    }

    void execute(CustomTask task) {
        Optional.ofNullable(task).orElseThrow(NullPointerException::new);
        if (workSize < maxPoolSize) {
            addThread(task);
        } else if (!taskQueue.offer(task)) {
            rejectTask(task);
        }
    }

    /**
     * 添加线程执行start
     *
     * @param task task
     */
    private void addThread(CustomTask task) {
        //当前线程数+1
        workSize++;
        //创建新的Worker
        CustomWorker worker = new CustomWorker("Thread-" + workSize, task, taskQueue, running);
        workers.add(worker);
        //执行thread
        worker.start();
    }

    /**
     * 停止所有线程
     */
    void shutdown() {
        running = false;
        if (!workers.isEmpty()) {
            for (CustomWorker worker : workers) {
//                worker.setRunning(false);
                worker.interruptSelf();
            }
        }
        Thread.currentThread().interrupt();
    }

    private void rejectTask(CustomTask task) {
        log.info("blocking queue has no space, reject task{}", task.getTaskId());
    }
}

工作线程可以继承thread,能够获取到任务队列,实现Thread的run方法,在run里面取任务执行,同时提供工作线程的销毁方法

 package com.tao.threadpool;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BlockingQueue;

/**
 * @author: Penger
 * @time: 2019/7/23
 * @description: <p>
 * 自定义工作线程,优先执行直接赋予的任务,执行完毕之后再从队列获取任务
 * </p>
 **/
@EqualsAndHashCode(callSuper = true)
@Slf4j
@Data
public class CustomWorker extends Thread {
    private CustomTask task;
    private final BlockingQueue<CustomTask> taskQueue;
    private Boolean running;

    CustomWorker(String name, CustomTask task, BlockingQueue<CustomTask> taskQueue, Boolean running) {
        super(name);
        this.task = task;
        this.taskQueue = taskQueue;
        this.running = running;
    }

    @Override
    public void run() {
        while (running) {
            if (task != null) {
                task.taskRun();
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            try {
                task = taskQueue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("queue has [{}] elements", taskQueue.size());
        }
    }

    void interruptSelf() {
        this.interrupt();
    }
}

任务接口(基类),定义线程池具体执行何种任务,通过taskRun()开始任务执行

package com.tao.threadpool;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

/**
 * @author: Penger
 * @time: 2019/7/23
 * @description: <p>
 * 自定义任务
 * </p>
 **/
@Slf4j
@Data
class CustomTask {
    private int taskId;

    CustomTask(int taskId) {
        this.taskId = taskId;
    }

    void taskRun() {
        log.info("custom task-{} running...", taskId);
    }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 前言 ES的操作中充斥着不同的操作任务类型,如Index、Search、刷新合并、recover等等。各种类型任务...
    beipiao阅读 7,737评论 0 0
  • 第一部分 来看一下线程池的框架图,如下: 1、Executor任务提交接口与Executors工具类 Execut...
    压抑的内心阅读 9,710评论 1 24
  • 前段时间遇到这样一个问题,有人问微信朋友圈的上传图片的功能怎么做才能让用户的等待时间较短,比如说一下上传9张图片,...
    加油码农阅读 4,919评论 0 2
  • 进程和线程 进程 所有运行中的任务通常对应一个进程,当一个程序进入内存运行时,即变成一个进程.进程是处于运行过程中...
    胜浩_ae28阅读 10,531评论 0 23
  • 作者:明至 我依然是爱你的, 无时无刻,不在想念。 可是,还要与往事告别。 人生的路,还有很长, 我不能沉溺于悲伤...
    作家明至阅读 1,794评论 0 3