Kotlin 多线程
- 线程可能处于以下几种状态:
新建(New):线程被创建但还未启动时的状态。
可运行(Runnable):在Java中,可运行状态和运行状态是并列的,在Kotlin中,它们都认为是线程处于可被线程调度器调度的状态。
等待(Waiting):运行中的线程执行了Object.wait()、Thread.join()或LockSupport.park()等操作后,放弃CPU,进入等待状态。
计时等待(Timed Waiting):进入这个状态的线程会在一段时间之后自动唤醒,不同于等待状态,它是有时间限制的。
阻塞(Blocked):线程在等待获取锁时,如果无法获取,则进入阻塞状态。
结束(Terminated):线程执行完毕或因异常退出run()方法后,进入结束状态。
- 这些状态可以通过java.lang.Thread.State枚举类来表示 :
enum class ThreadForState {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}
class threadForManager{
init {
val thread = Thread({ println("Thread is running") })
println(thread.state) // 输出 NEW
thread.start()
println(thread.state) // 输出 RUNNABLE (或其他状态,取决于操作系统的线程调度)
}
}
- 继承Thread,重写run方法
java 中Thread本质上是实现了runnable接口的一个实例。当调用 start 方法后并不是立即执行多线程的代码,而是使该线程变为可运行状态,什么时候运行该线程代码是由操作系统决定的:
- 定义Thread子类 ThreadForHong ,并重写run方法(run的方法体代表了线程要完成的任务,因此被称为执行体)
- 创建 ThreadForHong 的实例,即创建线程实例对象
- 调用线程对象的 start 方法启动线程
class ThreadForHong:Thread() {
}
class ClientTreadForManager {
init {
val threadHong01 = ThreadForHong().let {
it.start()
}
}
}
- 实现 runnable 接口,并运行该接口的 run 方法
- 1.自定义并实现 runnable 接口
- 2.创建子线程实例
- 3.调用子线程的 start 方法启动线程
class RunnableForHong:Runnable {
/*
什么是runnable (看这里)
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}
* */
override fun run() {
}
}
class ThreadOfHong(runnable: Runnable):Thread(){
}
class ClientForThreadRunnable {
init {
// 这一套说白了,就是把子线程放在异步队列中执行,
this.javaRunnableOnThread()
//我们自己实践一下runnable达到同样的效果
this.kontlinRunnableOnThread()
}
fun javaRunnableOnThread(){
/*
* Specified by:
run in interface Runnable
See Also:
start(), stop(), Thread(ThreadGroup, Runnable, String)
* */
// Java 的方式
// RunnableForHong 还必须去重写run方法(这就恶心了)
val runnableHong = RunnableForHong()
val what = ThreadOfHong(runnableHong)
what.start()
}
fun kontlinRunnableOnThread(){
val what = Runnable {
// 线程执行体..
}
ThreadOfHong(what).start()
// 你还可以
ThreadOfHong(Runnable {
// 线程执行体..
}).start()
// 还可以这样
ThreadOfHong({
// 线程执行体..
}).start()
}
}
- 实现Callable接口,重写call方法
Callable 接口 实际是属于Executor框架中的功能类,Callable 与 Runnable 功能类似,但是比Runnable具有更强大的功能:
- 可以在任务接受后提供一个返回值,runnable没有提供这个功能
- 可以在call方法中抛出异常
- 可以拿到一个Future对象,该对象表示异步计算的结果,它提供了检查并计算是否完成的方法。
- 由于线程属于异步计算模型,因此无法从别的线程中得到函数的返回值,
- 在这种情况下就可以使用Future来监视目标线程调用call方法的情况
- 但调用Future的get方法获取结果时,当前线程就会被阻塞,直到call返回结果。
class CallableForHong {
fun execute(){
val what = Callable {
// 线程执行体..
}
val whatIn = Executors.newSingleThreadExecutor().let {
val service = it
it.submit(what).let {
try {
it.get()
}
catch (ex:Exception) {
// 注意!!!
ex.printStackTrace()
}
finally {
// 关闭线程池
service.shutdown()
}
}
}
}
}
- 理解中断
当线程当run方法执行完毕,或者在方法中没有捕获的异常时,线程将终止。
- interrupt 方法可以用来请求中断线程,当一个线程调用 interrupt 方法时,线程的中断标识位将被置位为true,线程会不时的检测这个中断标识位,以判断线程是否应该被中断。要想知道线程是否被置位,可以调用
Thread.currentThread().isInterrupted
,还可以调用Thread.interrupted()
对中断标识位进行复位。但是如果一个线程被阻塞,就无法检测中断状态。
代码
*
fun whileHong(){
while (!Thread.currentThread().isInterrupted) {
...
}
}
- 如果一个线程处于阻塞状态,那么线程在检查中断标识位时若发现中断标识位为true,则会在阻塞方法调用处抛出 InterruptedException 异常,并且在抛出异常前将线程的中断标识位复位,即重新设置位=为false。需要注意的是:被中断的线程不一定会终止,中断线程是为了引起线程的注意,被中断的线程可以决定如何去响应中断。如果是比较重要的线程,则不理会中断,而大部分情况是线程会将中断作为一个终止请求。
- 另外:不要在底层代码里捕获 InterruptedException ,异常后不做处理:
代码
*
try {
it.get()
}
catch (ex:Exception) {
...
}
finally {
// 关闭线程池
service.shutdown()
}
- 安全终止线程
interrupt 控制
// RunnableForHong 继承自 Runnable,超时则中断
class MoonRunnable:Runnable {
private var count = 0
override fun run() {
while (!Thread.currentThread().isInterrupted){
count++
println(count)
}
println("stop")
}
}
// 使用
class StopThreadForManager {
val moonrannable = MoonRunnable()
val threadHong = ThreadOfHong(runnable = moonrannable, threadName = "RunnableForHong").let {
it.start()
TimeUnit.MILLISECONDS.sleep(10) // 10ms
it.interrupt()
}
}
Boolean 控制
class MoonRunnable02:Runnable {
private var count = 0
private @Volatile var control = true
override fun run() {
while (control){
count++
println(count)
}
println("stop")
}
fun cancal(){
control = false
}
}
// 使用
val moonrannable02 = MoonRunnable02()
val what02 = ThreadOfHong(runnable = moonrannable02,threadName = "RunnableForHong02").let {
it.start()
TimeUnit.MILLISECONDS.sleep(10) // 10ms
moonrannable02.cancal()
}
- 线程同步
synchronized 足以 (拓展其他形式的线程同步,也只是为了.. 理解)
synchronized(this){
it.name = "我改变了线程的名字"
}
ReentrantLock & Lock(condition):它表示能够支持一个线程对资源的重复加锁 代码如下:
class SourceRunnable:Runnable {
private @Volatile var content:String = ""
// 这里相当于线程池,里面可以容纳N多线程
private val control = ReentrantLock()
val lock:Lock = ReentrantLock()
val condition = lock.newCondition()
var account = 0
var amount = 0
override fun run() {
control.lock()
this.content = "content"
control.unlock()
lock.lock()
try {
while (account < amount) {
//阻塞当前线程并放弃锁
condition.await()
}
account = amount
condition.signal()
}
finally {
lock.unlock()
}
}
}
// 创建两个 Thread 使用同一个runnable
class ThreadDoubleForManager {
val sourceRunnable = SourceRunnable()
val what01 = ThreadOfHong(runnable = sourceRunnable, threadName = "source01").let {
it.start()
}
val what02 = ThreadOfHong(runnable = sourceRunnable, threadName = "source02").let {
it.start()
}
}
- 同步方法 (对象锁)
class ObjectLockForHong {
fun transfer(){
synchronized(this){
// 这里最好用this,所有当前对象的属性或变量都具备了原子性
}
}
}
- 内存的原子性/可见性。有序性
/*
- 线程A - 线程A本地内存(共享变量副本) - 共享变量(同一个主存)
- 线程2 - 线程A本地内存(共享变量副本) - 共享变量(同一个主存)
- 线程(A/B)要通信的话,必须经历下面两个步骤:
- 线程A把自己的本地内存中更新过的过的共享变量刷新到主存中去
- 线程B到主存中去读取线程A已经更新过的共享变量。
- */
- 原子性:一个语句含有多次操作时,就不是原子性操作,只有简单的读取和赋值才是原子性操作(比方说:男孩子的第一次,那叫原子性操作)
AtomicInteger 可作为共享计数器,不需要线程同步
class AtomicIntegerForHong:AtomicInteger() {
override fun toByte(): Byte {
return 9.toByte()
}
override fun toShort(): Short {
this.incrementAndGet()
this.decrementAndGet()
return 9.toShort()
}
}
可见性:这里指的是线程和线程之间的可见性,一个线程修改的状态对另一个线程是可见的。也就是一个线程修改的结果,另一个线程马上就能看到。 当一个共享变量被修饰(@Volatile): @Volatile var content:String 时,它会保证被修改过的值能立即被更新到主存,所以对其他线程是可见的。其他线程可遇到主存中读取新值。而普通的共享变量不能保证其可见性,普通共享变量被修改后,并不会立即被写入到主存,何时写入到主存也是不确定的。这时候当其他线程去读取该值时,主存中可能还是原来的旧值,无法保证其可见即时性
有序性:java内存模型中允许编译器和处理器对指令进行重排序,虽然重排序的过程不会影响到单线程执行的正确性,但是会影响到多线程并发的正确性。这时候可以通过volatile来保证有序性,除了volatile也可以用synchronized和Lock来保证有序性。(把并发改变成串型执行)
volatile的两个案例
// 案例一:状态标志
class VolatileOfHong{
@Volatile var isShut = false
fun shutDown(){
isShut = true
}
fun doWork(){
while (!isShut) {
// ...
}
}
}
// 案例二:双重检查模式 (DCL)
class SingletonForHong {
companion object {
private @Volatile var singletonForHong:SingletonForHong? = null
fun getInstance(){
if (singletonForHong == null) {
synchronized(this.javaClass){
if (singletonForHong == null) {
singletonForHong = SingletonForHong()
}
}
}
}
}
}
补充建议:最好还是使用 synchronized + volatile
-
java 或 kotlin 中提供了7中带有阻塞功能的队列
- ArrayBlockingQueue 由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue 由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue 支持优先级排序的无界阻塞队列。
- DelayQueue 支持延时获取元素的无界阻塞队列。
- SynchronousQueue 不存储元素的阻塞队列
- LinkedTransferQueue 由链表结构组成的无界阻塞队列
- LinkedBlockingDeque 由链表结构组成的双向阻塞队列
// 通常情况下解决生产和消费问题时,使用ArrayBlockingQueue或LinkedBlockingQueue两个类足矣
// PriorityBlockingQueue
/*
* 默认情况下:元素采取自然顺序升序排列。
*
* 1. 可以自定义实现 compareTo方法指定元素进行排序的规则
* 2. 初始化PriorityBlockingQueue时,指定构造参数Comparator对元素进行排序。但是其不能保证同优先级的顺序.
*
* */
// DelayQueue
/*
* 它是一个支持延时获取元素的无界阻塞队列。
*
* 队列使用PriorityQueue来实现,队列中的元素必须实现Delayed接口。创建元素时,可以指定元素到期时间,
* 只有元素到期时才能从队列中取走
*
* */
// SynchronousQueue
/*
* 它是一个不存储元素的的阻塞队列,每个插入操作必须等待另一个线程的移除操作,同样任何一个移除的操作都等待另一个线程的插入操作。
*
* 因此此队列内部其实没有任何一个元素,或者说容量是0,严格来讲它不是一种容器,
*
*
* */
// LinkedTransferQueue
/*
* LinkedTransferQueue 由链表结构组成的无界阻塞队列, 它实现了一个重要的接口TransferQueue.
*
* TransferQueue接口含有5个方法,其中有3个重要的方法,它们分别如下:
*
* 1. transFer(e:E)
* 若当前存在一个正在等待获取的消费者线程,则立刻将元素传递给消费者
* 若没有消费者在等待接收数据,就会将元素插入到队列尾部,并且进入等待阻塞状态
* 直到有消费者线程取走该元素
*
* 2. tryTransfer(e:E)
* 若当前存在一个正在等待获取的消费者线程,则立刻将元素传递给消费者,若不存在返回false,并且不进入队列。
* 这是一个不阻塞的操作,与transfer方法不同的是:无论消费者是否接收,tryTransfer方法都会立即返回,
* 而transfer方法则需要消费者接收了才能返回。
*
* 3. tryTransfer(e:E,timeout:long,unit:TimeUnit)
* 若当前存在一个正在等待获取的消费者线程,则立刻将元素传递给消费者,若没有消费者在等待接收数据,就会将元素插入到队列尾部.
* 并且等待有消费者线程取走该元素,若在指定时间内元素未被消费者取走,则返回false,若在指定时间内元素被消费者取走,则返回true
*
* */
// LinkedBlockingDeque
/*
* 它是一个由链表结构组成的双向阻塞队列。双向队列可以从队列的两端插入和移除元素,因此在多线程同时入队时,也就减少了一半的竞争。
* 由于是双向的,因此 LinkedBlockingDeque 多了 addFirst/addLast/offerFirst/peekFirst/peekLast/等方法.
*
*(First/Last) 分别表示从不同的方向(头部或尾部):插入/获取/移除/
* /
- 阻塞队列的使用场景
class BlockQueueScene {
private var queueSize:Int = 10
private val queueHong = PriorityBlockingQueue<Int>(queueSize)
init {
val produce = ProducerThread(queue = queueHong)
val consume = ConsumerThread(queue = queueHong)
produce.start()
consume.start()
}
internal class ProducerThread(val queue: PriorityBlockingQueue<Int>):Thread(){
override fun run() {
super.run()
while (true) {
synchronized(queue) {
// queue满了会自动挂起,直到queue不满空出位置
try {
queue.put(9)
}
catch (e:Exception) {
e.printStackTrace()
}
}
}
}
}
internal class ConsumerThread(val queue: PriorityBlockingQueue<Int>):Thread(){
override fun run() {
super.run()
while (true){
synchronized(queue) {
// queue为空会自动挂起,直到queue不为空
try {
queue.take()
}
catch (e:Exception) {
e.printStackTrace()
}
}
}
}
}
}
-
线程池
在编程中经常会使用线程来异步处理任务,但是每一个线程的创建和销毁都需要一定的开销。如果每次执行一个任务都需要开一个新的线程去执行,则这些线程的创建和销毁将消耗大量的资源。并且线程是‘各自为政’的,很难对其进行控制,更何况有一些堆的线程在执行。这时候就需要线程池对线程进行管理。
Executor框架用于把任务的提交和执行解耦,任务的提交交给Runnable或者Callable,而Executor框架用来处理任务,
Executor框架中最核心的成员就是 ThreadPoolExecutor. 它是线程池的核心实现类
-
ThreadPoolExecutor
corePoolSize: 核心线程数。默认情况下线程池是空的,只有任务提交时才会去创建线程。如果当前运行的线程数少于corePoolSize,则创建新线程来处理任务,如果当前运行的线程数等于多于 corePoolSize,则不再创建新线程。如果调用线程池的prestartAllcoreThread 方法,则线程池会提前创建并启动所有的核心线程来等待任务。
maximumPoolSize:线程池允许创建的最大线程数。如果任务队列满了并且线程数小于maximumPoolSize时,线程池仍旧会创建新的线程来处理任务。
keepAlivetime:非核心线程闲置的超时时间。超过这个时间核心线程会被回收,如果任务很多,且每个任务的执行时间很短,则可以调大keepAlivetime来提高线程的利用率。另外:如果设置allowCoreThreadTimeOut属性为true时,keepAliveTime也会应用到核心线程上。
TimeUnit:keepAliveTime 参数的时间单位。可选单位有(DAYS,HOURS,MINUTES,SECOUNDS,MILLISECOUNDS)
workQueue:任务队列,如果当前的线程数大于corePoolSize,则将任务添加到此任务队列中,该任务队列是BlockingQueue类型,也就是阻塞队列。
ThreadFactory:线程工厂。可以用线程工厂给每个创建出来的线程设置名字。一般情况下无需设置该参数。
RejectedExecutionHandler:饱和策略。这是当
任务队列和线程池都满了时
所采取的应对策略,默认是AbortPolicy,表示无法处理新任务,并抛出 RejectedExecutionException 异常。此处还有3中策略:
- CallerRunsPolicy:用调用者所在的线程来处理任务。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
- DiscardPolicy:不能执行的任务,并将该任务删除??
- DiscardOldestPolicy:丢弃队列中最近的任务,并执行当前的任务。
-
线程的处理流程主要分为3个步骤:
- 提交任务后,线程池先判断线程数是否达到了核心线程数(corePoolSize),如果为达到核心线程数,创建核心线程处理任务。否则就执行下一步操作。
- 接着线程池判断任务队列是否满了,如果没满则将任务添加到任务队列中,否则就执行下一步操作。
- 这时,因为任务队列满了,所以线程池就判断线程数是否达到了最大线程数。如果未达到最大的线程数,则创建非核心线程处理任务,否则就执行饱和策略,默认会抛出 RejectedExecutionException 异常。
- 线程池的种类
通过直接或间接的配置 ThreadPoolExecutor 的参数可以创建不同类型的 ThreadPoolExecutor。其中有四种线程池比较常用:
FixedThreadPool,CachedThreadPool,SingleThreadExecutor,ScheduledThreadPool
下面分别介绍4中线程池:
- FixedThreadPool
FixedThreadPool (只有核心线程数,并且数量是固定的,没有非核心线程,keepAliveTime设置为0L,意味着多余的线程会被立即终止,因为不会产生非核心线程,所以keepAliveTime是无效参数,它采用了无界阻塞队列LinkedBlockingQueue)
- CacheThreadPool
CacheThreadPool 是一个根据需要创建线程的线程池,corePoolSize为0,maxmimumPoolSize设置为Integer.MAX_VALUE.这意味着 CacheThreadPool 没有核心线程,非核心线程是无界的。
keepAliveTime设置为60L,则空闲线程等待新任务的时间为60S,队列是 SychronousQueue,它是一个不存储元素的阻塞队列,每次插入必须等待另一个线程的移除操作,同样每次移除必须等待另一个线程的插入操作。
当执行execute方法时,首先会执行 SychronousQueue的offer方法类提交任务,并且插叙线程池中是否有空余的线程执行 SychronousQueue的poll方法来移除任务。
如果有,则配对成功,将任务交给这个空闲线程处理。
如果没有,则配对失败,创建新的线程去处理任务,当线程池中的线程空闲时,它会执行 SychronousQueue的poll方法,等待SychronousQueue的offer方法类提交的任务。
如果超过60S没有新任务提交到SychronousQueue,则这个空闲线程将终止。
因为maxmimiumPoolSize是无界的,所以如果提交的任务大于线程池中线程处理任务的速度,救护不断创建新线程。。
另外:每次提交新任务都会立即有线程去处理,比较适用于有大量需要立即处理的任务,且耗时短的操作。
-
SingleThreadExecutor
SingleThreadExecutor (使用单个工作线程的线程池,只有一个核心线程)
如果当前没有运行的线程则创建一个新线程类处理任务。
如果当前有运行的线程,则将任务添加到阻塞队列LinkedBlockingQueue。
所以 SingleThreadExecutor 能确保所有的任务在一个线程中按照顺序逐一执行。
-
ScheduledThreadPool
- ScheduledThreadPool (是一个能实现定时和周期性任务的线程池)
- ScheduledThreadPool 的构造方法最终调用的是 ThreadPoolExecutor的构造方法
- corePoolSize 是传进来固定数值
- maxmimumPoolSize 的值是Integer.MAX_VALUE.
- 它采用的是 DelayedWorkQueue,所以 maxmimumPoolSize 这个参数是无效的。
- 原理:当执行ScheduledThreadPoolExecutor 的 scheduleAtFixedRate 或者 scheduleWithFixedDelay 方法时,
- 会向DelayWorkQueue添加一个实现 RunnableScheduleFuture 接口的 ScheduleFutureTask(任务的包装类),并且检查运行的线程数
- 是否达到 corePoolSize。
- 如果没有达到:则创建新线程,并且启动它,但不是立即去执行任务,而是去DelayWorkQueue中取 ScheduleFutureTask,然后执行任务。
- 如果达到了corePoolSize:则将新来的任务添加到 DelayWorkQueue。DelayWorkQueue会将任务进行排序,先要执行的任务放在队列的前面。
- 其跟其它的线程池不同的是:当执行完任务后,会将 ScheduleFutureTask中的time变量改为下次要执行的时间,并放回到 DelayWorkQueue 中。
- AsyncTask
// AsyncTask
/*
* AsyncTask 它使得异步任务实现起来更加简单,代码更加简练。(比方说:执行完耗时任务回到主线程更新UI)
*
* AsyncTask 是一个抽象的泛型类,他有三个泛型参数:Pramas,Progress,Result
*
* Pramas:参数类型
* Progress: 后台执行进度的类型
* Result:返回结果的类型
* */
// AsyncTask 4个核心的方法:
/*
* 1. onPreExecute():在主线程中执行。一般在任务执行前做准备工作。(比如:对UI做一些标记)
*
* 2. doInBackground(Params..params):在线程池中执行。在onPreExecute方法执行后运行,用来执行较为耗时的操作,
* 在执行过程中可以调用publishProgress(Progress..values)更新进度信息。
*
* 3. onProgressUpdate(Progress..values):在主线程中执行。当调用publishProgress时,此方法会将进度更新到UI组件上。
*
* 4. onPostExecute(result:Result):在主线程中执行,当后台任务执行完成后,它会被执行。doInBackground方法得到的结果就是返回的result的值。
* 此方法一般做执行之后的收尾工作,比如更新UI和数据。
*
* 5. 另外:我们在kotlin中使用Dispatchers就够了。
* */
- Kotlin协程 和 AsyncTask的比较
1. 线程管理:AsyncTask在每个任务执行时都会创建一个新的线程,这会导致线程开销较大,并且线程管理复杂。而Kotlin协程则不需要显式创建线程,它使用挂起和恢复的方式在单个线程中执行多个任务,从而避免了线程的开销和管理问题24。
2. UI更新:在AsyncTask中,更新UI需要在新的线程中使用Handler,这使得代码变得复杂。而Kotlin协程可以通过withContext(Dispatchers.Main)轻松地在主线程中更新UI,简化了代码结构1。
3. 代码简洁性:Kotlin协程使用suspend函数和协程构建器,使得异步代码看起来更像是同步代码,从而提高了代码的可读性和维护性3。
Kotlin协程的使用场景和优势
1. 异步任务:协程非常适合处理需要异步执行的任务,如网络请求、文件读写等。
2. 并发编程:协程可以轻松地实现并发编程,通过withContext(Dispatchers.IO)可以在IO操作中使用多个协程来提高效率。
资源管理:在Android开发中,协程可以帮助管理生命周期,避免内存泄漏,通过lifecycleScope或viewModelScope来管理协程的生命周期。
总之,Kotlin协程是AsyncTask的现代替代品,提供了更高效、更灵活的并发编程解决方案,特别适合在Kotlin中进行异步任务和并发编程。
thank..