前言
2022年已过1/4,时间过的真是快。近些年大Android的发展也很是迅速,尤其是遵循MVVM或者MVI架构下,使用Jetpack + Kotlin + Corroutine + Flow的组合,大大提升了Android应用的开发效率。然而,类似的效率的提升往往是通过层层封装,隐藏底层原理,简化调用,从而达到降低开发的上手门槛目的的。作为一个有品位的开发者,又怎能满足于只了解上层的API调用。本文我们就来通过实例来试着聊一聊Android经典线程通信框架Handler的基本原理。
目的
通过实现一个简单的Handler框架,试图解释其底层的工作原理。
背景知识
-
线程间通信
即不同线程之间交换信息,Java中常见的有:- Object
wait(), notify()/notifyAll() - LockSupport
park(), uppark() - Condition
await(), signal()/signalAll()
- Object
Android的Handler工作原理
简言之,重复执行“发送消息-分发消息-处理消息”。
更准确的问题描述应该是:Android的基于Handler + Message + MessageQueue + Looper的消息机制是怎么工作的?
Handler借助
handler#sendMessage(message)
方法,把消息Message对象插入消息队列MessageQueue对象中,同时Looper对象循环往复地从MessageQueue对象中取下一个消息Message对象分发给Looper对象所在线程去处理,如此循环往复。
注意: 同一个线程中,Looper和MessageQueue有且只有一个实例,能且只能在该线程内部实例化。Handler可有多个实例,可在任意线程内实例化,但前需保证实例化动作所在线程的Looper已被初始化。另外,也可以通过调用带Looper参数的Handler构造器实例化,以达到关联指定线程(Looper所在)的目的。
从上述描述中,我们可以得出几个关键的点:
消息对象:保存消息的数据结构(会保存发送者的Handler对象引用)
发送消息:调用者通过Handler#sendMessage(message)
发送消息(发到哪里?缓存MessageQueue)
缓存消息:MessageQueue保存由外部发送来的消息(怎么保存?类似单向链表结构)
读取消息:Looper#loop()
内调用MessageQueue.next()
读取消息(没消息了怎么办?阻塞)
分发消息:取到消息后,执行消息对象持有的Handler对象的Handler#dispatchMessage(message)
方法。(运行在哪个线程?Looper对象所在的线程)
处理消息:Handler#dispatchMessage(message)
内再调用Handler#handleMessage(message)
方法,后者便正是调用者常重写的方法。
循环往复:Looper#loop()
内部是一个for(;;){...}
,一直执行,提供循环往复的驱动力。
线程内Looper对象唯一性:依靠ThreadLocal保证每个线程内部Looper唯一,即只被实例化一次。
线程内MessageQueue对象唯一性:被Looper实例化并持有,间接保证了唯一性。
- Android主线程Handler使用
应用启动时,
ActivityThread.main(String[])
内部会提前初始化Looper。故在主线程内部,可直接使用Handler的无参数构造器实例化。
- Android子线程Handler使用
如上所述,子线程使用Handler需要先确保Looper已被初始化,但子线程默认是没有初始化Looper的,故需在子线程执行的一开始主动调用
Looper.prepare()
和Looper.loop()
。此后则可如在主线程一般任意实例化Handler。
- Android的HandlerThread原理
即封装了Handler的Thread子类。
HandlerThread#start()
被调用后,该线程内部会初始化Looper实例。其他线程可从该HandlerThread对象中取出Looper实例,并用它构造出新的Handler对象。此后其他线程可借助该Handler对象,调用Handler#sendMessage(message)
给HandlerThread的线程发送消息。
实现
思路
在了解了上述的Android的Handler工作原理后,自己实现一个简单的消息框架就有方向了,无非就是解决上述中提到的“消息对象”,“发送消息”,“缓存消息”……关键点即可。
线程内Looper对象唯一性,线程内MessageQueue对象唯一性和读取消息是需要着重考虑的。
编码
Message
package com.custom.handler
data class Message(
val what: Int = 0,
val args: Any? = null,
val block: (() -> Unit)? = null,
) {
companion object {
val HEAD_MESSAGE: Message = Message()
}
var target: Handler? = null
var next: Message? = null
}
MessageQueue
package com.custom.handler
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock
class MessageQueue {
private val queueLock = ReentrantLock()
private val queueCondition = queueLock.newCondition()
private val messageLock = ReentrantReadWriteLock()
private val messageReadLock = messageLock.readLock()
private val messageWriteLock = messageLock.writeLock()
private var currentMessage: Message = Message.HEAD_MESSAGE
get() = messageReadLock.withLock { field }
set(value) = messageWriteLock.withLock { field = value }
fun next(): Message {
while (true) {
queueLock.withLock {
if (currentMessage.next == null) {
println("[${Thread.currentThread().name}] Message queue is empty, then it is going to await...")
queueCondition.await()
println("[${Thread.currentThread().name}] Message queue wakes up.")
}
}
val nextMessage = currentMessage.next!!
nextMessage.next = null
currentMessage = nextMessage
return nextMessage
}
}
fun enqueue(msg: Message) {
messageWriteLock.withLock {
msg.next = null
currentMessage.next = msg
queueLock.withLock { queueCondition.signalAll() }
}
}
}
Looper
package com.custom.handler
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock
class Looper private constructor(thread: Thread) {
internal val mMessageQueue = MessageQueue()
private val mThread = thread
private val quitFlagReentrantReadWriteLock = ReentrantReadWriteLock()
private val quitFlagReadLock = quitFlagReentrantReadWriteLock.readLock()
private val quitFlagWriteLock = quitFlagReentrantReadWriteLock.writeLock()
private var quitFlag: Boolean? = null
get() = quitFlagReadLock.withLock { field }
set(value) = quitFlagWriteLock.withLock { field = value }
companion object {
private val looperMapReentrantReadWriteLock = ReentrantReadWriteLock()
private val looperMapReadLock = looperMapReentrantReadWriteLock.readLock()
private val looperMapWriteLock = looperMapReentrantReadWriteLock.writeLock()
private val looperMap: MutableMap<Thread, Looper> = mutableMapOf()
val looper: Looper?
get() = looperMapReadLock.withLock { looperMap[Thread.currentThread()] }
fun prepare(): Looper {
require(looper == null) { "Looper.prepare() must be called once in each thread." }
val thread = Thread.currentThread()
val threadLooper = Looper(thread)
looperMapWriteLock.withLock { looperMap[thread] = threadLooper }
return threadLooper
}
fun loop() {
requireNotNull(looper) { "Looper.prepare() must be called before Looper.loop()" }
looper?.quitFlag = false
while (looper?.quitFlag == false) {
val me = looper!!.mMessageQueue
val msg = me.next()
kotlin.runCatching {
println("[${Thread.currentThread().name}] Dispatching message is started.")
msg.target?.dispatchMessage(msg)
println("[${Thread.currentThread().name}] Dispatching message is stopped. ")
}.onFailure {
it.printStackTrace()
}
}
}
}
fun quit() {
quitFlag = true
looperMap.remove(Thread.currentThread())
}
}
Handler
package com.custom.handler
open class Handler {
private var interceptor: Interceptor? = null
private lateinit var mLooper: Looper
constructor() {
val looper = requireNotNull(Looper.looper) { "No looper found in current thread." }
Handler(looper)
}
constructor(looper: Looper) {
mLooper = looper
}
interface Interceptor {
fun handleMessage(message: Message?): Boolean
}
open fun dispatchMessage(message: Message?) {
println("[${Thread.currentThread().name}] onDispatchMessage: message=$message")
message ?: return
if (message.block == null) {
if (interceptor?.handleMessage(message) != true){
handleMessage(message)
}
return
}
message.block.invoke()
}
open fun handleMessage(message: Message?) {
println("[${Thread.currentThread().name}] onReceiveMessage: message=$message")
}
fun sendMessage(message: Message?) {
println("[${Thread.currentThread().name}] onSendMessage: message=$message")
message?.target = this
mLooper.mMessageQueue.enqueue(message ?: return)
}
fun quit(){
mLooper.quit()
}
}
[扩展] HandlerThread
import com.custom.handler.Looper
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
class HandlerThread(name:String):Thread(name) {
private val lock = ReentrantLock()
private val condition = lock.newCondition()
private var mLooper: Looper? = null
val looper: Looper
get() {
if (mLooper == null) {
lock.withLock { condition.await() }
}
return requireNotNull(mLooper)
}
override fun run() {
super.run()
mLooper = Looper.prepare()
lock.withLock { condition.signalAll() }
Looper.loop()
}
}
[测试] Main
由于HandlerThread已经处理了子线程初始化Looper的操作,因此,使用该类直接测试即可。
import com.custom.handler.Handler
import com.custom.handler.Message
fun main(args: Array<String>) {
testHandler()
}
private fun testHandler(){
val handlerThread = HandlerThread("Thread-1").apply { start() }
val handler = object : Handler(handlerThread.looper) {
override fun handleMessage(message: Message?) {
super.handleMessage(message)
}
}
for (i in 0..3){
if (i == 3) {
handler.quit()
return
}
Thread.sleep(3000)
println("\n")
handler.sendMessage(Message(100, "message from Main-Thread"))
}
}
// console output
[Thread-1] Message queue is empty, then it is going to await...
[main] onSendMessage: message=Message(what=100, args=message from Main-Thread, block=null)
[Thread-1] Message queue wakes up.
[Thread-1] Dispatching message is started.
[Thread-1] onDispatchMessage: message=Message(what=100, args=message from Main-Thread, block=null)
[Thread-1] onReceiveMessage: message=Message(what=100, args=message from Main-Thread, block=null)
[Thread-1] Dispatching message is stopped.
[Thread-1] Message queue is empty, then it is going to await...
[main] onSendMessage: message=Message(what=100, args=message from Main-Thread, block=null)
[Thread-1] Message queue wakes up.
[Thread-1] Dispatching message is started.
[Thread-1] onDispatchMessage: message=Message(what=100, args=message from Main-Thread, block=null)
[Thread-1] onReceiveMessage: message=Message(what=100, args=message from Main-Thread, block=null)
[Thread-1] Dispatching message is stopped.
[Thread-1] Message queue is empty, then it is going to await...
[main] onSendMessage: message=Message(what=100, args=message from Main-Thread, block=null)
[Thread-1] Message queue wakes up.
[Thread-1] Dispatching message is started.
[Thread-1] onDispatchMessage: message=Message(what=100, args=message from Main-Thread, block=null)
[Thread-1] onReceiveMessage: message=Message(what=100, args=message from Main-Thread, block=null)
[Thread-1] Dispatching message is stopped.
Process finished with exit code 0
Q&A
- Handler是怎么实现线程切换的?
就个人理解而言,所谓“线程切换”,即就是实现线程间的通信即可。此处,我们姑且把“线程”理解为一个无限循环。外界的消息/操作要想插入到这个“线程”,可想象为把消息/操作插入到无限循环中,如此以来便完成了“线程切换”。
消息插入无限循环:即为存在竞争关系的变量的同步问题。线程外部修改变量,线程内部读取变量。可通过背景知识中提到的各种线程间通信的方法来实现。
操作插入无限循环:即为注册回调问题。
发送消息时,调用
Handler#sendMessage(message)
,该方法运行在发生调用动作所在的线程(简称,调用线程);处理消息时,会先后调用Handler#dispatchMessage(messge)
、Handler#handleMessage(message)
,此两方法运行在Looper.loop()
的无限循环中,即Looper的实例对象所属的线程(简称,处理线程)。而发送消息的本质是:调用线程向处理线程的Looper内的消息队列MessageQueue中插入一个消息对象;处理消息的本质是:处理线程取出消息再调用消息实例对象持有的Handler实例的Handler#handleMessage(message)
,参数即为当前取出的消息实例对象。
- 无消息时,MessageQueue#next()如何被阻塞?
方法内部会先调用android.os.MessageQueue#nativePollOnce(ptr, timeoutMillis)
方法。若当前没有消息,则nativePollOnce(ptr, timeoutMillis)会从native层使用epoll的epoll_wait
机制阻塞当前线程,至此MessageQueue.next()
也就被阻塞。
Message next () {
...
for (;;) {
...
android.os.MessageQueue#nativePollOnce
...
return nextMsg
}
}
- 有新消息时,MessageQueue#next()如何被唤醒?
当消息队列被插入新消息时,MessageQueue#enqueueMessage(msg, when)
会被调用,消息插入完成后,其内部android.os.MessageQueue#nativeWake(ptr)
会被调用,此时原本被android.os.MessageQueue#nativePollOnce(ptr, timeoutMillis)
阻塞的线程就会唤醒,并开始执行取消息和处理消息的流程。
boolean enqueueMessage (message, when) {
...
//insert message
...
nativeWake(mPtr)
}
写在最后
文中提到的ThreadLocal,简言之,就是JDK提供的一种存储“线程内部”变量的手段。Android的基于Handler的消息机制,也正是借助这一点来保存各个线程内部Looper的。当然,若在不考虑性能优化等各种因素条件下,不使用ThreadLocal也是可以实现类似功能的,如本文所实现的消息框架则是使用了Map的数据结构来实现的。 当然,本文的简单实现也旨在解释清楚Android消息机制的原理。
附:
源码地址: https://github.com/heychinje/Simple-Handler
关于Android中的epoll:https://stackoverflow.com/questions/38818642/android-what-is-message-queue-native-poll-once-in-android