多线程-线程池

by shihang.mai

1. 线程池的种类

实际上只有两种,一种是ThreadPoolExecutor,另一种是ForkJoinPool

Executors.newCachedThreadPool()

new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());

核心线程 = 0
线程最大数 = Integer.MAX_VALUE
过期时间 = 60s
阻塞队列 = SynchronousQueue
线程工厂 = Executors.defaultThreadFactory()
拒绝策略 = AbortPolicy

Executors.newFixedThreadPool(n)

new ThreadPoolExecutor(n, n,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

核心线程 = n
线程最大数 = n
过期时间 = 0s
阻塞队列 = LinkedBlockingQueue
线程工厂 = Executors.defaultThreadFactory()
拒绝策略 = AbortPolicy

Executors.newScheduledThreadPool(n)

new ThreadPoolExecutor(n, Integer.MAX_VALUE,0L, NANOSECONDS,new DelayedWorkQueue());

核心线程 = n
线程最大数 = Integer.MAX_VALUE
过期时间 = 0s
阻塞队列 = DelayedWorkQueue
线程工厂 = Executors.defaultThreadFactory()
拒绝策略 = AbortPolicy

Executors.newWorkStealingPool()

new ForkJoinPool
  (Runtime.getRuntime().availableProcessors(),
   ForkJoinPool.defaultForkJoinWorkerThreadFactory,
   null, true)

线程数为CPU核心数, ForkJoinPool

Executors.newSingleThreadExecutor()

new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())

核心线程 = 1
线程最大数 = 1
过期时间 = 0s
阻塞队列 = LinkedBlockingQueue
线程工厂 = Executors.defaultThreadFactory()
拒绝策略 = AbortPolicy

Executors.newSingleThreadScheduledExecutor()

new ThreadPoolExecutor(1, Integer.MAX_VALUE,0L, NANOSECONDS,new DelayedWorkQueue());

核心线程 = 1
线程最大数 = Integer.MAX_VALUE
过期时间 = 0s
阻塞队列 = DelayedWorkQueue
线程工厂 = Executors.defaultThreadFactory()
拒绝策略 = AbortPolicy

线程池种类

2. ThreadPoolExecutor

创建线程池:

ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4,
                60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(4),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy());
属性 含义
int corePoolSize 核心线程
int maximumPoolSize 最大线程
long keepAliveTime 当线程超过时间长期不干活,归还操作系统。(有参数设定核心线程是否参与)
TimeUnit unit 时间单位
BlockingQueue<Runnable> workQueue 任务队列
ThreadFactory threadFactory 生产线程的工厂
RejectedExecutionHandler handler 拒绝策略

线程池与任务关系:

ThreadPoolExecutor任务与线程关系
  1. 首先线程数为空,当来2个任务时,那么开启2个线程去处理
  2. 再来4个任务,直接进入到阻塞队列中
  3. 当再来2个任务时,再开2个线程去处理这两个新的任务
  4. 当再来任务时,执行拒绝策略。拒绝策略可以自定义,Jdk默认提供4种
    • Abort.扔掉,抛异常
    • Discard.扔掉,不抛异常
    • DiscardOld.扔掉排队时间最长的
    • CallerRuns.调用者处理任务

3. ForkJoinPool

原理:

采用的是work stealing算法

work stealing
  1. ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
  2. 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行
  3. 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
  4. 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
  5. 在既没有自己的任务,也没有可以窃取的任务时,进入休眠

用途:

  • 分解汇总的任务
  • 用很少的线程可以执行很多的任务
  • CPU密集型
ExecutorService service = Executors.newWorkStealingPool();
//没返回值的任务继承RecursiveAction
class AddTask extends RecursiveAction
//有返回值任务继承
class AddTaskRet extends RecursiveTask    
new ForkJoinPool().execute(Task)  
ForkJionPool

4. 队列的种类

队列可分为阻塞和非阻塞队列,也可分为有界、无界队列、同步移交。有界是指,队列放入的元素个数有限。无界是指,队列放入的元素没个数限制,只限制于物理设备

按有界无界分

无界队列:ConcurrentLinkedQueue、PriorityBlockingQueue、DelayQueue、LinkedTransferQueue

有界队列:ArrayBlockingQueue、LinkedBlockingQueue(默认Integer.MAX_VALUE近似无限,但它构造时可传入值变为有界)

按阻塞非阻塞分

阻塞队列:LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue、LinkedTransferQueue

非阻塞队列:ConcurrentLinkedQueue

4.1 ConcurrentLinkedQueue

底层是单向链表,每个节点是一个Node。

PS:当调试时,必须关闭idea的配置,因为idea默认开启了toString预览特性,在debug模式下,ConcurrentLinkedQueue的对象也会被调用toString方法的,在队列的toString方法中会获取队列的迭代器,而创建迭代器时会调用first方法,first方法里就会cas修改head属性

Enable 'toString()' object view
Enable alternative view for Collections classes

下面观察元素入队过程


ConcurrentLinkedQueue入队.png

观察上面的加入元素的快照图,tail并不是都指向最后一个节点,是先经历tail.next赋值为加入元素,下一次tail节点才指向尾节点
源码中的offer()其实只是做了2步

  1. 定位尾节点
  2. 使用CAS将入队节点设置成尾节点的next节点,如不成功则重试

下面观察出队流程


ConcurrentLinkedQueue出队.png

入队/出队head、tail不是一致指向头和尾节点的目的:减少cas的次数,提高入队出队的效率

4.2 LinkedBlockingQueue

底层是单向链表,每个节点是一个Node。并且使用了两个ReentrantLock,分别代表用于入队和出队的锁,并且用了两个Condition,用于挂起和唤醒线程

下面观察入队操作


LinkedBlockingQueue入队.png

在元素入队的时候,需要先获取put lock(Reentranlock)

  • 队列已满,利用Condition阻塞等待
  • 队列未满,创建一个node节点放入队列中,如果放完以后队列还有剩余空间,继续唤醒下一个添加线程进行添加。如果放之前队列中没元素,放完以后要唤醒消费线程进行消费

下面观察出队操作


LinkedBlockingQueue出队.png

在元素出队时,需要先获取take lock(Reentranlock)

  • 队列为空,阻塞等待。
  • 队列不为空,从队首获取并移除一个元素,如果消费后还有元素在队列中,继续唤醒下一个消费线程进行元素移除。如果放之前队列是满元素的情况,移除完后要唤醒生产线程进行添加元素

4.3 ArrayBlockingQueue

底层是一个环形数组,使用单个ReentrantLock,两个Conditon队列,用在出队、入队操作上

利用常用的数组实现队列,那么我们维护一个尾指针,即可对入队达到O(1),但是对于出队操作,都均要移动元素,达到了O(n),我们可以用环形逻辑解决,出入队都是O(1)

ArrayBlockingQueue.png

我们观察一下入队操作


ArrayBlockingQueue入队.png

在元素入队时,需要获取lock(Reentranlock)

  • 将加入的元素放到putIndex位置
  • putIndex+1,当到了环形的尽头时,重新置为0
  • 唤醒在Condition等待获取元素的线程

我们观察一下出队操作


ArrayBlockingQueue出队.png

在元素出队时,需要获取lock(Reentranlock)

  • 当队列中没元素,直接返回null,不操作takeIndex了
  • 通过takeIndex获取到该位置的元素,并把该位置置为null
  • takeIndex+1,到达列表长度时设置为0
  • 唤醒在Condition等待元素放入队列的线程

4.4 PriorityBlockingQueue

底层是用数组+二叉堆维护元素的优先级,并且使用了一个ReentrantLock,只有一个Conditon,因为它无界的,可以无限向里面加入元素,但是获取时,当没元素就会被阻塞,所以只有一个Condition

  1. 初始化时,默认队列容量11,比较器为null,即使用元素的compareTo,即队列的元素必须实现Comparable接口
  2. 当元素入队时,需要先使用ReentranLock上锁
  • 如果当前元素个数>=队列容量,则扩容
  • 建堆排序元素
  • 元素数量+1
  • 唤醒Conditon中的等待的线程
  1. 当触发扩容时
  • 当capacity<64时,扩容至2capacity+2
  • 当capacity>64时,扩容至capacity(1+50%)
  • 最大扩容至Integer.MAX_VALUE - 8
  • 扩容后,将原本的元素复制到新数组中
    当需要扩容时,让一个线程去做扩容操作,其他线程自旋等待
  1. 建堆排序,最终结果会把最高或者最低优先级放到根顶位置
/**
* k为当前元素数量
* x为加入的元素
* array是数组
*/
private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }

假如初始容量为2,按顺序加入B、C、D、A,会经历如下


PriorityBlockingQueue建堆过程.png
  1. 对于出队操作,操作前使用ReentranLock上锁,也会涉及重建堆过程
private E dequeue() {
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            E result = (E) array[0];
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }

/**
* k=0
* x为数组中的尾元素
* array是数组
* n=size-1
*/
private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child];
                int right = child + 1;
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }

续上面的图,会经历如下


PriorityBlockingQueue重建堆过程.png

4.5 DelayQueue

底层用PriorityBlockingQueue,放入此队列的元素必须实现Delayed,而接口Delayed extends Comparable接口,即任务具备过期时间+排序能力。一样使用ReentrantLock和Condition

  1. 对于入队操作,操作前使用ReentranLock上锁,向PriorityBlockingQueue加入元素,并建堆
  2. 对于出队操作,操作前使用ReentranLock上锁,在PriorityBlockingQueue获取根元素即可,但是要判断失效时间是否为0
  • 当=0,取出元素
  • 当!=0, 这里有一个类型为Thread leader标志位,看是否有线程在取值,如果为null,证明没线程取值,直接等待delay时间后获取即可;如!=null,直接阻塞当前线程

4.6 LinkedTransferQueue

底层是单链表+松弛阈值

  • 与SynchronousQueue相比,LinkedTransferQueue多了一个可以存储的队列
  • 与LinkedBlockingQueue相比,LinkedTransferQueue多了直接传递元素,少了用锁来同步
LinkedTransferQueue入队出队.png
  1. 所有的入队出队方法都调用xfer(E e, boolean haveData, int how, long nanos)
  • e:如果是入队操作,那么就是实际入队的值,如果是出队操作,为null
  • haveData:如果是入队操作,true,如果是出队操作,false
  • how:执行类型,有立即返回的NOW,有异步的ASYNC,有阻塞的SYNC, 有带超时的 TIMED
  • nanos:只有在执行类型TIMED才有作用
  1. 节点分为数据节点和请求节点
  2. 当线程A、B、C都调用offer,它们分别携带值1、2、3,它们通过CAS形成链表1->2->3,并且线程都阻塞
  3. 当线程D调用take,那么就会从第一个节点开始匹配,匹配到值1,获取1并设置原节点为null,并唤醒A线程,A线程唤醒后,将null替换为this
  4. 当线程E调用take,那么还是从第一个节点开始匹配,因为第一个节点已经匹配过,找第二个节点,获取2并设置原节点为null,并唤醒B线程,第一个节点的next指向自身变为垃圾,等到GC回收。如此往复,但是不是每一个匹配过的节点都会将next指向自身变为垃圾的,具体要看松弛阈值
  5. 对于transfer操作,将指定元素e传递给消费者线程,如果有消费者线程正在阻塞等待,则调用transfer方法的线程会直接将元素传递给它;如果没有消费者线程等待获取元素,则调用transfer方法的线程会将元素插入到队尾,然后阻塞等待,直到出现一个消费者线程获取元素.
  6. 当生产者线程调用tryTransfer方法时,如果没有消费者等待接收元素,则会立即返回false。该方法和transfer方法的区别就是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法必须等到消费者消费后才返回。
  7. 对于take操作,会从队首取出一个元素,如果队列为空,则线程会阻塞

为了节省 CAS 操作的开销,LinkedTransferQueue使用了松弛(slack)操作:
在结点被匹配(被删除)之后,不会立即更新队列的head、tail,而是当 head、tail结点与最近一个未匹配的结点之间的距离超过“松弛阀值”后才会更新(默认为 2)。这个“松弛阀值”一般为1到3,如果太大会增加沿链表查找未匹配结点的时间,太小会增加 CAS 的开销

4.7 LinkedTransferQueue和SynchronousQueue区别

SynchronousQueue:线程A使用put将数据添加到队列,如果没有其他线程使用take去获取数据,那么线程A阻塞,直到数据被其他线程获取,同理 如果线程B从队列中获取数据为空,被阻塞,等待线程添加数据。即握手传递数据

LinkedTransferQueue:LinkedTransferQueue使用put,tryTransfer和transfer可添加多条数据, LinkedTransferQueue具有SynchronousQueue的功能,而且LinkedTransferQueue比SynchronousQueue灵活,可选择put和tryTransfer进行非阻塞操作,也可以用transfer进行阻塞操作。

  • put就是用ASYNC方式执行,不阻塞,一直自旋.
  • transfer用SYNC方式执行,会阻塞,直到有消费线程后唤醒.
  • tryTransfer用NOW方式执行,直接检测是否有消费线程,有直接递交数据,没直接返回
public void put(E e) {
   xfer(e, true, ASYNC, 0);
}
public void transfer(E e) throws InterruptedException {
   if (xfer(e, true, SYNC, 0) != null) {
       Thread.interrupted(); // failure possible only due to interrupt
       throw new InterruptedException();
   }
}
public boolean tryTransfer(E e) {
   return xfer(e, true, NOW, 0) == null;
}

5. 合理的线程数

  1. N * U * (1+ W/C)
  2. 压测

N-CPU核数 U-CPU使用率,取值范围0-1 W/C-等待时间与计算时间比值

参考

https://blog.csdn.net/qq_38293564/article/details/80798310

https://benjaminwhx.com

https://www.cnblogs.com/yuexiaoyun/p/12203101.html

https://www.cnblogs.com/myseries/p/10944211.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,492评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,048评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,927评论 0 358
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,293评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,309评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,024评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,638评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,546评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,073评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,188评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,321评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,998评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,678评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,186评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,303评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,663评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,330评论 2 358

推荐阅读更多精彩内容