Java线程池源码分析

1.1 为什么要线程池

我们在执行大规模任务时,如安卓中的多图下载,网络请求,都少不了使用线程。而线程作为进程下面的计算基本单位,它必然会有一些进程的特点,如需要系统分配内存资源,所以有下面的说法:

each thread requires an allocation of stack memory whose default size ranges from 64 KB to 1 MB, depending on the OS.

可以看出,在一个任务对应一个线程的模式下,这样频繁创建线程可能面对的第一个问题是资源瓶颈,线程大量增加,出现OutOfMemoryError()的异常风险会增高。

为了避免频繁创建线程,我们可以想到一个解决方案: 一个线程对应多个任务,把线程缓存起来。

另外,线程的创建是有自己生命周期,创建好了不会马上运行,这对于一些网络请求要求速率快无疑有很大的影响。

根据上面的几个特点,我们可以根据CPU核数、JVM启动参数、Thread构造函数中请求栈道大小等因素,限制线程的数量,从而提高系统的吞吐率。

2.1 自己写一个线程池

在Java JDK中,提供了Excutors工具类定制了一些ThreadPoolExecutor,通过工厂方法可以得到它们。具体可以去看源码,这里我主要是通过手写线程池主要的对象来分析线程池管理任务的过程。

线程池工作流程.png
需求一 :设计可执行任务的接口

根据面向接口编程的原则,定义一个IExecutor 接口,它有一个方法execute(Runnable r)专门用来执行任务:

package executor;
/**
 * Created by qiangzeng on 17/3/19.
 */
public interface IExecutor {
    void execute(Runnable runnable);
}

该任务可以在当前线程中同步执行。如下面代码:

 Executor mExecutorSync = new Executor() {
           public void execute(Runnable command) {
               command.run();
           }
       };
       mExecutorSync.execute(new Runnable() {
           public void run() {
               System.out.println("我在主线程,同步执行");
           }
 });

也可以扔到新开的线程中异步执行。如下面代码:

 Executor mExecutorAsync = new Executor() {
            public void execute(Runnable command) {
                new Thread(command).start();
            }
        };
        mExecutorAsync.execute(new Runnable() {
            public void run() {
                System.out.println("我在新开的子线程异步执行");
            }
        });

至此,我们基本实现了需求一:管理者Executor执行一个任务Runnbale。在这里我们仍然是执行一个任务需创建一个线程,这还不够. 我们需要一个线程能调度很多任务。

需求二:创建线程

对于需求二,让有限的线程逐个去执行队列中任务,完成这个工作首先我们需要有个线程工厂类,创建线程:
为了统一构造线程池中的线程,代码如下。

/**
 * Created by qiangzeng on 17/3/23.
 */
public abstract class MyThreadFactory {

    public abstract Thread newThread(Runnable runnable);
}

我们可以定义一个默认线程工厂实现类:

  public static MyThreadFactory defaultThreadFactory = new MyThreadFactory() {
        @Override
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setDaemon(false);
            thread.setPriority(Thread.NORM_PRIORITY);
            return thread;
        }
    };

如上,我们给线程池的线程统一设置为非后台线程,优先级为一般。需要它时可以直接使用。

线程需要一个代理类,这样我们可以给线程优雅地做运行、中断操作,我们把它叫做Worker类:


    /**
     * 任务持有者,对任务进行执行,(拒绝,中止.请参考java源码)
     */
    private class Worker implements Runnable {
        private Runnable firstTask;
        private Thread thread;

        public Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            this.thread = threadFactory.newThread(this);//利用线程工厂创建线程
        }

        public void run() {
            runWork(this);
        }
    }

Work类,做些什么事呢? 它持有一个线程和默认的任务。如果线程池的数目还没用达到corePoolSize(线程限制数)我们就创建新Work,并添加到线程调度池中:

    /**
     * 线程池集合,用来保存线程工作者.
     */
    private HashSet<Worker> workers = new HashSet<Worker>();

至于*runWork(this) *方法我们稍后会分析,到这里,我们的线程基本工作准备好了。下面我们需要分析下任务是怎么被线程调度。

需求三:利用线程池调度任务

首先我们需要一些原材料(属性)我们创建一个IExecutorImpl类,它是IExecutor的实现。

 /**
     * 线程池的大小
     */
    private int corePoolSize;
    /**
     * 保存任务的队列
     */
    private BlockingDeque<Runnable> workQueue;

    /**
     * 用来创建线程的工程类
     */
    private MyThreadFactory threadFactory;

上面代码声明了基本属性:

  • 1.线程池的线程数
  • 2.创建线程的工厂类
  • 3.阻塞任务队列

因为这些属性是线程池必备的。我们在构造函数中初始化这些属性

   public IExecutorImpl(int corePoolSize, BlockingDeque<Runnable> workQueue, MyThreadFactory threadFactory) {
        this.corePoolSize = corePoolSize;
        this.workQueue = workQueue;
        this.threadFactory = threadFactory;
    }

从属性可以看出,我们要提交到线程池的任务可能会因为等待执行而放在阻塞任务队列中。至此我们可以开始正式写线程调度逻辑了
1.当用户提交任务时,会执行execute().
2.如果线程池中的线程工作者小于线程限制大小则继续新增线程,并马上运行提交的任务.
3.否则,把任务放入阻塞队列等待执行.

 /**
     *  覆写IExecutor中的execute() 
     * @param command
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        if (getWorkCount() < corePoolSize)//
            addWork(command);
        else {
            workQueue.offer(command);//否则,把任务放入阻塞队列等待执行
        }
    }

上面addWork()做的工作是:向线程池中添加线程,代码如下:


    public void addWork(Runnable firstTask) {
        if (getWorkCount() < corePoolSize) {
            Worker worker = null;
            try {
                final ReentrantLock mainLock = this.mainLock;
                worker = new Worker(firstTask);
                final Thread t = worker.thread;
                if (t != null) {
                    mainLock.lock();
                    workers.add(worker);
                    t.start();
                }
            } finally {
                mainLock.unlock();
            }
        }
    }

上面代码逻辑:
1.判断有没有超过线程池最大数量
2.在临界区中新建一个线程工作者
3.把线程工作者Work添加到线程池works里面,并启动线程,然后执行最新提交的任务。

这样,如果线程池达到了最大限制数,那么我们不再创建新线程(除非某个线程停止了)。来了新的任务,我们就放入任务队列。

那么我们思考一下,一个线程如何执行多个任务?

很简单,我们看看之前的runWork(this)方法:

    public void runWork(Worker worker) {
        Runnable task = worker.firstTask;
        worker.firstTask = null;
            while (task != null || (task = getTask()) != null) {
                task.run();
                task = null;
            }

work.firstTask是创建新线程是添加的默认任务。 而这里主要逻辑是在一个while循环中不断的getTask()得到阻塞队列中的任务,执行run(),然后销毁为null.
我们看看getTask():

    public Runnable getTask() {
        Runnable runnable = null;
        try {
            runnable = workQueue.take();
            return runnable;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            return runnable;
        }
    }

很简单,就是从workQueue中take拿一个任务出来。因为是阻塞队列。如果队列为空则线程等待。知道被添加新的任务才会被唤醒。如此反复的调度任务。

3.1实例

为了更方便的使用线程池,我们会默认配置一些参数,所以有一个线程池工厂类MyExecutors用来创建线程池:

public class MyExecutors {
    public static IExecutor newFixedThreadPool(int nThreads) {
            return new IExecutorImpl(nThreads, new  LinkedBlockingDeque<Runnable>(), MyExecutors.defaultThreadFactory);
    }

我们可以正式使用这个线程池了:

功能:

一个线程, 模拟调度多个任务。

/**
 * Created by qiangzeng on 17/3/23.
 */
public class App {
    public static void main(String[] args) {
        IExecutor iExecutor = MyExecutors.newFixedThreadPool(1);//默认只有一个线程, 模拟调度多个任务
        iExecutor.execute(new Runnable() {
            public void run() {
                System.out.println("task1");
                try {
                    System.out.println("等线程1执行firstTask,5秒...后");
                    System.out.println("线程1从任务队列里面依次取出任务,有序执行...");

                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        for ( int i = 0; i < 10; i++) {
            final int count = i;
            iExecutor.execute(new Runnable() {
                public void run() {
                    System.out.println("task"+ (count +2));
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
}
}

输出结果:

task1
等线程1执行firstTask,5秒...后
线程1从任务队列里面依次取出任务,有序执行...
task2
task3
task4
task5
task6
task7
task8
task9
task10
task11

总结

上面是本人对JAVA线程池简要分析,主要是用作对源码的知识笔记。线程池中的任务调度,除了从队列中拿任务执行,还有一个非常重要的是如何拒绝任务。因为我们这里只是通过限制线程数来防止资源消耗过多。但是任务没有任何限制,而且任务什么时候可以中断取消执行,则要在Work类中去处理了,源码是最好的教材,有空继续阅读。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容