Executor框架

  • Executor接口
  • ExecutorService规定了Executor的生命周期 (待写)
  • ThreadPoolExecutor是ExecutorService的实现类
image.png
  • 线程池不建议使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

  • Executors 返回的线程池对象的弊端如下:
    1)FixedThreadPool 和 SingleThreadPool:
    允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
    2)CachedThreadPool 和 ScheduledThreadPool:
    允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

  • 对于延迟任务、周期任务使用ScheduledThreadPoolExecutor来代替Timer。

    • Timer使用绝对时间而不是相对时间,在执行定时任务时只会创建一个线程。如果某个任务执行时间过长,会破坏其它TimerTask的定时精确性。
    • Timer中的任何一个TimerTask抛出一个未检查异常,就会取消整个Timer,尚未被调度的TimerTask将不会被执行。
    • ScheduledThreadPoolExecutor配合DelayQueue来代替Timer。

ThreadPoolExecutor构造函数

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

参数解释

  • corePoolSize:线程池基本大小,即没有任务执行时的大小,但是在创建ThreadPoolExecutor初期,线程并不会立即启动,而是等到有任务提交时才会启动,除非调用prestartAllCoreThreads

  • maximumPoolSize:线程池最大大小。表示可同时活动的线程数量的上限。

  • keepAliveTime:线程存活时间,如果某个线程的空闲时间超出了存活时间,那么被标记为可回收,但是,只有线程池的当前大小超过了基本大小这个线程才会被终止。

  • unit:时间单位,和keepAliveTime组成具体时间,比如keepAliveTime为10,unit= TimeUnit.SECONDS,就表示存活时间为10秒。

  • workQueue:工作队列,用于存放提交的任务,用于被线程执行。

    • BlockingQueue的其中4个实现类:SynchronousQueue(同步队列),ArrayBlockingQueue(数组阻塞队列FIFO),LinkedBlockingQueue(链表阻塞队列FIFO),PriorityBlockingQueue(优先级阻塞队列)。后2个可以是有界也可以是无界,第二个是有界队列,第一个是好比容量只有一的阻塞队列。
    • SynchronousQueue不是一个真正队列,而是一种在线程之间进行移交的机制,要将一个元素放入SynchronousQueue中(put方法),必须有另一个线程正在等待接受这个元素(take方法),如果没有线程在等待,那么就会阻塞。在线程池中如果当前线程池中的线程数量没有达到最大值,就创建新的线程,否则就根据饱和策略(handler)来执行。该队列适合消费者多的情况,针对线程池就是适合无界的线程池(maximumPoolSize为int最大值)
  • threadFactory:构造新线程的工厂,具体看:处理非正常的线程终止中使用ThreadFactory来定制Thread。

  • handler:饱和策略,当有界工作队列被填满后,并且没有可用的线程(线程池也是有界的),饱和策略发挥作用。如果某个任务被提交到一个已被关闭的Executor时(调用了shutdown方法),也会用到饱和策略。

    • 四种策略。中止(Abort):该策略抛出未检查异常RejectedExecutionException,调用者可以捕获这个异常,然后根据需求处理;抛弃(Discard):抛弃任务;抛弃最旧的(Discard-Oldest):抛弃下一个将被执行的任务,然后重提提交新的任务,如果工作队列是优先队列(优先级)那么将抛弃优先级最高的任务;调用者运行(Caller-Runs):将任务转交给由调用execute的线程执行该任务,即主线程。
  • 注:如果线程数量等于线程池基本大小值corePoolSize,那么只有当工作队列被填满后,ThreadPoolExecutor才会创建新线程来执行队列中的任务

    • corePoolSize为0、maximumPoolSize为3,并且工作队列为有界队列,并且大小为5,那么只有每当5个任务都填满队列后,才会创建一个线程执行,最多只有3个线程同时作业。Executors.newCachedThreadPool()返回的ThreadPoolExecutor的corePoolSize为0,maximumPoolSize为MAX_VALUE,workQueue为SynchronousQueue。
  • image.png

    下实现一种平缓的性能下降。

案例

  • 1.创建固定大小线程池,有界队列,调用者运行饱和策略
  • image.png
    1. 没有定义饱和策略,但是通过使用Semaphore(信号量)来控制任务的提交速率,信号量大小设置为线程池大小加上可排队列任务的数量,从而控制正在执行的和等待执行的任务数量。
package net.jcip.examples;
import java.util.concurrent.*;
import net.jcip.annotations.*;

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
        }
    }
}

扩展ThreadPoolExecutor

  • ThreadPoolExecutor提供了beforeExecute、afterExecute和terminated,这些方法可以用于扩展ThreadPoolExecutor的行为。

    • 其中beforeExecute、afterExecute在每个任务线程中执行如下,runWorker会在在任务线程的run方法中执行。
public runWorker(Worker w){
......
 try {
                   beforeExecute(wt, task);
                   Throwable thrown = null;
                   try {
                       task.run();
                   } catch (RuntimeException x) {
                       thrown = x; throw x;
                   } catch (Error x) {
                       thrown = x; throw x;
                   } catch (Throwable x) {
                       thrown = x; throw new Error(x);
                   } finally {
                       afterExecute(task, thrown);
                   }
               } finally {
                   task = null;
                   w.completedTasks++;
                   w.unlock();
               }
....
}
  • afterExecute在beforeExecute抛出RuntimeException,或者任务完成后带有一个Error都不会被调用。
  • terminated在线程池关闭时调用,可以用来释放Executor分配的资源,或者执行发送通知,记录日志或手机finalize统计信息等操作。

案例

  • 给线程池添加统计信息
  • 测量任务的运行时间,记录已处理任务数和总的处理时间,并通过terminated来输入包含平均任务时间的日志消息。
public class MyAppThread extends Thread {
    public static final String DEFAULT_NAME = "MyAppThread";
    private static volatile boolean debugLifecycle = false;
    private static final AtomicInteger created = new AtomicInteger();
    private static final AtomicInteger alive = new AtomicInteger();
    private static final Logger log = Logger.getAnonymousLogger();

    public MyAppThread(Runnable r) {
        this(r, DEFAULT_NAME);
    }

    public MyAppThread(Runnable runnable, String name) {
        super(runnable, name + "-" + created.incrementAndGet());
        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            public void uncaughtException(Thread t,
                                          Throwable e) {
                log.log(Level.SEVERE,
                        "UNCAUGHT in thread " + t.getName(), e);
            }
        });
    }

    public void run() {
        // Copy debug flag to ensure consistent value throughout.
        boolean debug = debugLifecycle;
        if (debug) log.log(Level.FINE, "Created " + getName());
        try {
            alive.incrementAndGet();
            super.run();
        } finally {
            alive.decrementAndGet();
            if (debug) log.log(Level.FINE, "Exiting " + getName());
        }
    }

    public static int getThreadsCreated() {
        return created.get();
    }

    public static int getThreadsAlive() {
        return alive.get();
    }

    public static boolean getDebug() {
        return debugLifecycle;
    }

    public static void setDebug(boolean b) {
        debugLifecycle = b;
    }
}

递归算法的并行化

  • 如果在循环体中包含一些密集型计算,或者需要执行可能阻塞的I/O操作,那么只要每次迭代是独立的,都可以对其进行并行化。(计算密集型和I/O操作对Cpu利用率不同,所以线程池大小也不一样)
  • 下面给出来了2个串行转并行的操作。
public abstract class TransformingSequential {

  //1.将串行转并行
    void processSequentially(List<Element> elements) {
        for (Element e : elements)
            process(e);
    }
    void processInParallel(Executor exec, List<Element> elements) {
        for (final Element e : elements)
            exec.execute(new Runnable() {
                public void run() {
                    process(e);
                }
            });
    }

    public abstract void process(Element e);

 //2.将串行递归转化为并行递归:树的深度优先遍历
    public <T> void sequentialRecursive(List<Node<T>> nodes,
                                        Collection<T> results) {
        for (Node<T> n : nodes) {
            results.add(n.compute());
            sequentialRecursive(n.getChildren(), results);
        }
    }

    public <T> void parallelRecursive(final Executor exec,
                                      List<Node<T>> nodes,
                                      final Collection<T> results) {
        for (final Node<T> n : nodes) {
            exec.execute(new Runnable() {
                public void run() {
                    results.add(n.compute());
                }
            });
            parallelRecursive(exec, n.getChildren(), results);
        }
    }

  //等待通过并行方式的计算结果
    public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
            throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
        parallelRecursive(exec, nodes, resultQueue);
        exec.shutdown();
        exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        return resultQueue;
    }

    interface Element {
    }

    interface Node <T> {
        T compute();

        List<Node<T>> getChildren();
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352

推荐阅读更多精彩内容