Java并发-30.Executor框架

  • HotSpot VM线程模型中Java线程被一对一映射为本地操作系统线程。
  • 应用程序通过Executor框架控制上层调度,下层调度由操作系统内核控制,不受应用程序影响

1. Executor框架结构

  • 任务,包括执行任务需要实现的接口:
    • Runnable接口和Callable接口的实现类,用于被ThreadPoolExecutor或ScheduledThreadExecutor执行
  • 任务的执行,包括任务执行机制的核心接口Executor,和继承自Executor的ExecutorService接口,有两个实现类:
    • ThreadPoolExecutor:用来执行被提交的任务
    • ScheduledThreadExecutor:在给定延时后执行任务,或定时执行任务
  • 异步运算结果,包括接口Future和实现Future接口的FutureTask类

2. Executor成员

2.1 ThreadPoolExecutor

工厂类Executors来创建,有三种:

  • FixeThreadPool:可重用固定线程数的线程池
    • 用于需要限制当前线程数量的应用场合
    • 它的corePoolSize和maximunPoolSize设置为创建时的参数。
    • 线程池中线程数大于corePoolSize,多余的空闲线程立即终止
    • 使用无界队列作为工作队列
    • 源码:
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • SingleThreadExecutor:单个worker线程的Executor
    • 用于保证顺序执行各个任务,在任意时间点也不会有多个活动线程
    • corePoolSize= maximunPoolSize=1
    • 其余和ThreadPoolExecutor一样
    • 源码
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
  • CachedThreadPool
    • 根据需要创建新线程,用于执行很多的短期异步小任务
    • 大小无界的线程池,corePoolSize = 0, maximunPoolSize=Integer.MAX_VALUE
    • 空闲线程等待60秒
    • SynchronousQueue作为工作队列
    • 源码:
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

2.2 ScheduledThreadPoolExecutor

通常用工厂类Executors类创建,有两种:

  • ScheduledThreadPoolExecutor:包含固定个线程的ScheduledThreadPoolExecutor。
    • 使用DelayQueue作为任务队列,放入其中的是ScheduledFutureTask,主要包含3个成员变量:

      • long time,标识任务将要被执行的具体时间,DelayQueue中封装了一个PriorityQueue,根据它排序。
      • long sequenceNumber,标识这个任务被添加到ScheduledThreadPoolExecutor中的序号,先按time排序后按照它排序
      • long period,表示任务执行的时间间隔
    • 执行周期任务后,增加额外处理

    • 构造器源码:

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
  • SingleThreadScheduledExecutor只包含一个线程的ScheduledThreadPoolExecutor。
    • 构造器源码:
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }

2.3 Future接口

  • Future接口和实现它的FutureTask类用来表示异步运算的结果。
  • 可以看到submit()方法会返回一个FutureTask对象
  • 三种状态
    • 未启动
    • 已启动
      • 正常结束
      • 取消而结束(FutureTask.cancel())
      • 异常而结束
  • 使用
    • 交给Executor执行
    • 通过ExecutorService.submit()方法返回FutureTask,执行get()方法或则cancel()方法。
  • 实现
    • 基于AbstractQueuedSynchronized(AQS)实现:AQS是一个同步框架,提供通用机制来原子性的管理同步状态,阻塞和唤醒线程,以及维护阻塞线程的队列(基于它实现的同步器包括ReentrantLock,Semaphore,ReentrantReadWriteLock,CountDownLatch和FutureTask)
      • 基于AQS的同步器包含两种操作:
        • 至少一个acquire操作,阻塞调用线程,直到AQS状态允许这个线程继续执行
        • 至少一个release操作,改变AQS状态,改变后可允许一个或多个阻塞线程被解除阻塞,Future的release操作包括run()和cancel()

AQS作为“模板方法模式”的基础类提供给FutureTask的内部子类Sync,这个内部子类只需要实现AQS的tryAcquireShared(int)方法检查同步状态,实现了tryReleaseShared(int)方法更新同步状态,他们控制FutureTask的获取和释放操作。

Sync是FutureTask的内部私有类,继承与AQS,FutureTask的所有公共方法都委托给了内部私有的Sync:

  • FutureTask.get()方法调用AQS.acquireSharedInturruptibli(int arg)方法:
    1. AQS.acquireSharedInterruptibly(int arg)方法,返回子类Sync中实现的tryAcquireShared()方法来判断acquire方法能否成功,成功条件为state执行完成状态RUN或者已取消状态CANCELLED,且runner不为null
    2. 成功则get()方法立即返回,失败则线程到线程等待队列中去等待其他线程执行release操作
    3. 其他线程执行release操作(FutureTask.run()或者FutureTask.cancel())唤醒后,当前线程再次执行tryAcquireShared()方法返回正值1,当前线程离开线程等待队列并唤醒它的后继线程
    4. 返回计算的结果或抛出异常
  • FutureTask.run()
    1. 执行构造函数中指定的任务(Callable.call())
    2. 原子方式更新同步状态(AQS.compareAndSetState(int except, int update), 设置state为RAN),如果原子操作成功,设置代表计算结果的变量result值为Callable.call()的返回值,然后调用AQS.releaseShared(int arg)
    3. AQS.releaseShared(int arg)首先返回子类Sync中实现的tryReleaseShared(arg)来执行release操作(设置运行任务的线程runner为null,然后返回true);AQS.releaseShared(int arg),然后唤醒线程等待队列中的第一个线程
    4. 调用FutureTask.done()

2.4 Runnable接口和Callable接口

  • 两个接口的实现类都可以被ScheduledThreadPoolExecutor或者ThreadPoolExecutor执行。
  • Runnable不会返回结果,Callable会返回结果
  • Executors提供了API把Runnable包装成Callable
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354

推荐阅读更多精彩内容

  • 如果你在亚马逊搜索java相关的书,本书排名是非常靠前的,豆瓣的评分也很高。刚好我最近忙找工作,也需要复习并发相关...
    whiledoing阅读 4,509评论 1 6
  • 第1章并发编程的挑战 第2章Java并发机制的底层实现原理 1. volatile——下文讨论 2. synchr...
    小张同学_loveZY阅读 1,360评论 0 1
  • 概述 Java的线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元与执行机制分离开来。工作单元包括Run...
    康俊1024阅读 283评论 0 0
  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,820评论 1 19
  • 那是一个阴雨朦胧的冬日的下午,在医院奋战一夜后,我决定好好慰劳一下自己。下班后,先去菜场购置食材:老母鸡,葱,姜,...
    幽游呦佑阅读 172评论 0 0