线程池这个大家必会的把,就算咱是做 android 的,有现成的异步组件但是也保不齐什么时候得自己写个异步处理,再者面试时你要是不会线程池,面试官会把你虐出翔来
java 的4种自带线程池
线程池这玩必须复杂,Java 必须提供标准 API ,要不指不定能扭曲成啥样了,这不就有了Executors 这个类,通过 Executors 提供4种标准线程池:
- FixedThreadPool - 定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
- SingleThreadExecutor - 单线程线程池,只用一个工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
- CachedThreadPool - 可缓存线程池,线程数量没有限制,如果线程池数量超过任务,可灵活回收空闲线程,若无可回收,则新建线程
- ScheduledThreadPool - 定时线程池,支持定时及周期性任务执行
1. FixedThreadPool
它是一种线程数量固定的线程池,它只有核心线程(不会被被回收,除非线程关闭),并且这些核心线程没有超时机制,任务队列也是没有大小限制(默认是 128),采用的是 LinkedBlockingQueue 阻塞队列
val fixedThreadPool = Executors.newFixedThreadPool(3)
fixedThreadPool.execute(object : Runnable {
override fun run() {
Thread.sleep(2000)
Log.d("AA", "fixedThreadPool...run")
}
})
2. SingleThreadExecutor
单线程线程池,关键得看怎么设计其中的阻塞队列,在 android 中的应用范围日挺广,android 这种移动客户端是没有什么并发要求的,到时这种按顺序执行的阻塞任务到时挺多,比如大多数 GUI 操作都是单线程的,数据库,日志,文件操作,应用批量安装,应用批量删除等不适合并发但可能 IO 阻塞性u以及影响 UI 线程响应的操作
val fixedThreadPool = Executors.newSingleThreadExecutor()
fixedThreadPool.execute(object : Runnable {
override fun run() {
Thread.sleep(2000)
Log.d("AA", "fixedThreadPool...run")
}
})
3. CachedThreadPool
它是一种线程数量不定的线程池,它只有非核心线程,并且其最大线程数为Integer.MAX_VALUE(因为 Integer.MAX_VALUE 的值很大,所以默认为线程数是为任意大),线程中的空闲线程都有超时机制,时长是60秒,超过60秒闲置线程就会被回收,采用的是 SynchronousQueue 这个阻塞队列
val fixedThreadPool = Executors.newCachedThreadPool()
fixedThreadPool.execute(object : Runnable {
override fun run() {
Thread.sleep(2000)
Log.d("AA", "fixedThreadPool...run")
}
})
4. ScheduledThreadPool
它的核心线程数量是固定的而非核心线程数是没有限制的,并且当非核心线程限制时会被立即回收,这类线程池主要用于执行定时任务和具有固定周期的重复任务。
val fixedThreadPool = Executors.newScheduledThreadPool(3)
fixedThreadPool.schedule(object : Runnable {
override fun run() {
Thread.sleep(2000)
Log.d("AA", "fixedThreadPool...run")
}
},3,TimeUnit.SECONDS)
// 2000s 后执行 runnable
scheduledThreadPool.schedule(runnable,2000, TimeUnit.MILLISECONDS);
// 延迟 10ms 后,每隔 1000ms 执行一次 runnable
scheduledThreadPool.scheduleAtFixedRate(runnable,10,1000,TimeUnit.MILLISECONDS);
线程池状态
- RUNNING - 线程池初始化后就是 RUNNING 状态,此时线程池中的任务为0
- SHUTDOWN - 调了 shutdown 结束线程池,线程池就是 SHUTDOWN 状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕
- STOP - 调了 shutdownNow 结束线程池,线程池就是 STOP 状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务
- TIDYING - 所有的任务已终止,ctl 记录的”任务数量”为 0,线程池会变为 TIDYING 状态,接着会执行 terminated 函数
- TERMINATED - 看上面自己理解下
关闭线程池
就是这2个 shutdown、shutdownNow API
- shutdown - 只标记状态 SHUTDOWN,正在执行的任务会继续执行下去,没有被执行的则中断 回。
- shutdownNow - 将线程池的状态设置为 STOP,正在执行的任务则被停止,没被执行任务的则返,注意这样线程池此时若是 sheep 的话会抛异常
线程池配置大小
一般需要根据任务的类型来配置线程池大小:
ThreadPoolExecutor
Android 中的线程池的概念来源于Java 中的 Executor,Executor 是一个接口,真正的线程池的实现为ThreadPoolExecutor,ThreadPoolExecutor 提供了一系列参数来配置线程池,通过不同的参数可以创建不同的线程池
实现关系如下:ThreadPoolExecute -> AbstractExecutorService -> ExecutorService -> Executor
通过 ThreadPoolExecutor 我们可以自己实现自己定制的线程池,这只是参数配置的不同,不过我是没碰到需要作到这一步的,想必也是后台开发才需要的吧
线程池的构造方法:
public ThreadPoolExecutor (int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable workQueue>,
ThreadFactory threadFactory )
- corePoolSize - 线程池核心线程数,等于 CPU 核心数 + 1,CPU 核心数可通过这个 API 来获取Runtime.getRuntime().availableProcessors(),如果将 ThreadPoolExecutor 的 allowCoreThreadTimeOut 属性设置为 true,那么核心线程就会存在超时策略,这个时间间隔有 keepAliveTime 所决定,当等待时间超过 keepAliveTime 所指定的时长后,核心线程就会被停止
- maximumPoolSize - 线程池所能容纳的最大线程数,当活动线程数达到这个数值后,后续的新任务将会被阻塞
- keepAliveTime - 非核心线程的超时时长,超过这个时长,非核心线程就会被回收,核心线程无超时机制,非核心线程在闲置时的超时时间为 1 秒
- TimeUnit - 超时时长单位,TimeUnit 下:MILLISECONDS | SECONDS |MINUTES
- BlockingQueue<Runnable workQueue> - 线程池中的任务队列,通过线程池的 execute 方法体检的 Runnable 对象会存储在这个参数中
- ThreadFactory - 线程工厂,为线程池提供创建新线程的功能,ThreadFactory 是一个接口,它只有一个方法:Thread newThread(Runnable r)
-
RejectedExecutionHandler - 饱和策略,这一块我目前也不是很清楚,先把资料当在这里
- AbortPolicy - 默认饱和策略,直接抛出异常
- CallerRunsPolicy - 只用调用者所在线程来运行任务
- DiscardPolicy - 不处理,丢弃掉,也不会日志什么的
- DiscardOldestPolicy - 丢弃队列里最近的一个任务,并执行当前任务。
- 自定义Police - 需要实现RejectedExecutionHandler
BlockingQueue
在线程池中 BlockingQueue 阻塞队列这个点相当重要了,甚至可以拿出来单独使用,队列概念也不难理解,利用数据结构存储数据,调节任务生产者和任务消费者之间的执行
队列大家也可以为栈,有2种处理任务的顺序:
- FIFO 先进先出 - 先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性
- LIFO 后进先出 - 后插入队列的元素最先出队列,这种队列优先处理最近发生的事件
阻塞队列应对的是个经典的生产者消费者场景,充当生产者和消费者之间的二道贩子(SynchronousQueue 这种队列除外),生产者和消费者不直接联系,生产者把任务交给阻塞队列,阻塞队列再去转手给下家消费者。一般阻塞队列都有大小的,当队列满了时,生产者再往队列里塞任务,那么就会阻塞生产者;当队列为空时,消费者没有任务就阻塞,核心线程是阻塞,非核心线程会回收,具体根据阻塞队列来
简单来看下队列的 API
-
添加任务:
- offer(anObject) - 添加任务,添加成功返回 true,若是队列满了该方法不阻塞当前执行方法的线程
- offer(E o, long timeout, TimeUnit unit) - 指定时间内还不能往队列中添加任务,返回 false
- put(anObject) - 队列满了该方法会阻塞当前执行方法的线程,直到队列有空间为止
-
获取任务:
- poll(time) - 获取首位的对象,若不能立即取出,等待指定规时间,若还是不行则返回 null
- poll(long timeout, TimeUnit unit) -
- take() - 获取首位的对象,弱此时队列为空,则会进入阻塞,直到由新的任务添加进来
- drainTo() - 一次性取出所有可用的数据,可以效率,因为不需要多次分批加锁或释放锁
目前有 7 种 BlockingQueue :
- ArrayBlockingQueue - 数组结构组成的有界阻塞队列,特点是插入和取出数据采用一把锁,而没有使用分离锁设计,插入或删除元素时不会产生或销毁任何额外的对象实例,在创建可以设置公平锁还是非公平锁,默认是非公平锁
- LinkedBlockingQueue - 链表结构组成的有界阻塞队列,默认队列无限大小 Integer.MAX_VALUE,需要注意若是插入的速度大于获取的速度,内存有可能消耗殆尽的可能,插入和获取采用2把锁,分离锁的设计可以大大加快并发性能
- LinkedTransferQueue - 链表结构组成的无界阻塞队列
- LinkedBlockingDeque - 链表结构组成的双向阻塞队列
- DelayQueue - 使用优先级队列实现的无界阻塞队列,没有大小限制,元素只有在指定的延迟时间后才能够从队列中获取元素,插入数据的操作不会阻塞,而获取数据的操作会被阻塞,不过 DelayQueue 的使用场景较少,但都相当巧妙,常见的例子比如使用一个 DelayQueue 来管理一个超时未响应的连接队列
- PriorityBlockingQueue - 支持优先级排序的无界阻塞队列,不阻塞插入,但会阻塞取出操作,使用的时候要特别注意,采用的是公平锁
- SynchronousQueue - 不存储元素的阻塞队列,有公平模式非公平模式 2 种模式:
ArrayBlockingQueue 和 LinkedBlockingQueue 是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以
小小的试试队列
我们 new 3个生产者,每个生产者添加 10 个任务,new 一个消费者,消费者线程 3 秒拿不到任务就结速线程,显示完事了
class Product(var name: String, var blockingQueue: BlockingDeque<String>) : Runnable {
var tag: Boolean = true
/**
* 每隔 100 毫米添加一个任务进来
*/
override fun run() {
while (tag) {
for (index in 1..10) {
blockingQueue.offer("$name 添加的第$index 个任务")
Thread.sleep(100)
}
tag = false
}
}
}
class Consumer(var name: String, var blockingQueue: BlockingDeque<String>) : Runnable {
var tag: Boolean = true
/**
* 3 秒接受不到任务就结速线程
*/
override fun run() {
while (tag) {
val work = blockingQueue.poll(3000, TimeUnit.SECONDS)
Log.d("AA", "$name 处理任务 - $work")
Thread.sleep(300)
if (work == null) {
tag = false
}
}
}
}
btn_left.setOnClickListener {
val linkedBlockingDeque = LinkedBlockingDeque<String>()
val productAA = Product("Product_AA", linkedBlockingDeque)
val productBB = Product("Product_BB", linkedBlockingDeque)
val productCC = Product("Product_CC", linkedBlockingDeque)
val consumerXX = Consumer("Consumer_XX", linkedBlockingDeque)
val threadPool = Executors.newFixedThreadPool(4)
threadPool.execute( productAA )
threadPool.execute( productBB )
threadPool.execute( productCC )
threadPool.execute( consumerXX )
}