手撸一个Android经典线程通信框架:Handler

前言


2022年已过1/4,时间过的真是快。近些年大Android的发展也很是迅速,尤其是遵循MVVM或者MVI架构下,使用Jetpack + Kotlin + Corroutine + Flow的组合,大大提升了Android应用的开发效率。然而,类似的效率的提升往往是通过层层封装,隐藏底层原理,简化调用,从而达到降低开发的上手门槛目的的。作为一个有品位的开发者,又怎能满足于只了解上层的API调用。本文我们就来通过实例来试着聊一聊Android经典线程通信框架Handler的基本原理。

目的


通过实现一个简单的Handler框架,试图解释其底层的工作原理。

背景知识


  • 线程间通信
    即不同线程之间交换信息,Java中常见的有:

    1. Object
      wait(), notify()/notifyAll()
    2. LockSupport
      park(), uppark()
    3. Condition
      await(), signal()/signalAll()
  • Android的Handler工作原理
    简言之,重复执行“发送消息-分发消息-处理消息”。
    更准确的问题描述应该是:Android的基于Handler + Message + MessageQueue + Looper的消息机制是怎么工作的?

Handler借助handler#sendMessage(message)方法,把消息Message对象插入消息队列MessageQueue对象中,同时Looper对象循环往复地从MessageQueue对象中取下一个消息Message对象分发给Looper对象所在线程去处理,如此循环往复。

注意: 同一个线程中,Looper和MessageQueue有且只有一个实例,能且只能在该线程内部实例化。Handler可有多个实例,可在任意线程内实例化,但前需保证实例化动作所在线程的Looper已被初始化。另外,也可以通过调用带Looper参数的Handler构造器实例化,以达到关联指定线程(Looper所在)的目的。

从上述描述中,我们可以得出几个关键的点:
消息对象:保存消息的数据结构(会保存发送者的Handler对象引用)
发送消息:调用者通过Handler#sendMessage(message)发送消息(发到哪里?缓存MessageQueue
缓存消息MessageQueue保存由外部发送来的消息(怎么保存?类似单向链表结构)
读取消息Looper#loop()内调用MessageQueue.next()读取消息(没消息了怎么办?阻塞)
分发消息:取到消息后,执行消息对象持有的Handler对象的Handler#dispatchMessage(message)方法。(运行在哪个线程?Looper对象所在的线程)
处理消息Handler#dispatchMessage(message)内再调用Handler#handleMessage(message)方法,后者便正是调用者常重写的方法。
循环往复Looper#loop()内部是一个for(;;){...},一直执行,提供循环往复的驱动力。
线程内Looper对象唯一性:依靠ThreadLocal保证每个线程内部Looper唯一,即只被实例化一次。
线程内MessageQueue对象唯一性:被Looper实例化并持有,间接保证了唯一性。

  • Android主线程Handler使用

应用启动时,ActivityThread.main(String[])内部会提前初始化Looper。故在主线程内部,可直接使用Handler的无参数构造器实例化。

  • Android子线程Handler使用

如上所述,子线程使用Handler需要先确保Looper已被初始化,但子线程默认是没有初始化Looper的,故需在子线程执行的一开始主动调用Looper.prepare()Looper.loop()。此后则可如在主线程一般任意实例化Handler

  • Android的HandlerThread原理

即封装了HandlerThread子类。HandlerThread#start()被调用后,该线程内部会初始化Looper实例。其他线程可从该HandlerThread对象中取出Looper实例,并用它构造出新的Handler对象。此后其他线程可借助该Handler对象,调用Handler#sendMessage(message)给HandlerThread的线程发送消息。

实现

思路

在了解了上述的Android的Handler工作原理后,自己实现一个简单的消息框架就有方向了,无非就是解决上述中提到的“消息对象”,“发送消息”,“缓存消息”……关键点即可。

线程内Looper对象唯一性线程内MessageQueue对象唯一性读取消息是需要着重考虑的。

编码

Message

package com.custom.handler

data class Message(
    val what: Int = 0,
    val args: Any? = null,
    val block: (() -> Unit)? = null,
) {
    companion object {
        val HEAD_MESSAGE: Message = Message()
    }

    var target: Handler? = null
    var next: Message? = null
}

MessageQueue

package com.custom.handler

import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock

class MessageQueue {
    private val queueLock = ReentrantLock()
    private val queueCondition = queueLock.newCondition()

    private val messageLock = ReentrantReadWriteLock()
    private val messageReadLock = messageLock.readLock()
    private val messageWriteLock = messageLock.writeLock()

    private var currentMessage: Message = Message.HEAD_MESSAGE
        get() = messageReadLock.withLock { field }
        set(value) = messageWriteLock.withLock { field = value }

    fun next(): Message {
        while (true) {
            queueLock.withLock {
                if (currentMessage.next == null) {
                    println("[${Thread.currentThread().name}] Message queue is empty, then it is going to await...")
                    queueCondition.await()
                    println("[${Thread.currentThread().name}] Message queue wakes up.")
                }
            }

            val nextMessage = currentMessage.next!!
            nextMessage.next = null
            currentMessage = nextMessage
            return nextMessage
        }
    }

    fun enqueue(msg: Message) {
        messageWriteLock.withLock {
            msg.next = null
            currentMessage.next = msg
            queueLock.withLock { queueCondition.signalAll() }
        }
    }
}

Looper

package com.custom.handler

import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock

class Looper private constructor(thread: Thread) {
    internal val mMessageQueue = MessageQueue()
    private val mThread = thread
    private val quitFlagReentrantReadWriteLock = ReentrantReadWriteLock()
    private val quitFlagReadLock = quitFlagReentrantReadWriteLock.readLock()
    private val quitFlagWriteLock = quitFlagReentrantReadWriteLock.writeLock()
    private var quitFlag: Boolean? = null
        get() = quitFlagReadLock.withLock { field }
        set(value) = quitFlagWriteLock.withLock { field = value }

    companion object {
        private val looperMapReentrantReadWriteLock = ReentrantReadWriteLock()
        private val looperMapReadLock = looperMapReentrantReadWriteLock.readLock()
        private val looperMapWriteLock = looperMapReentrantReadWriteLock.writeLock()
        private val looperMap: MutableMap<Thread, Looper> = mutableMapOf()

        val looper: Looper?
            get() = looperMapReadLock.withLock { looperMap[Thread.currentThread()] }

        fun prepare(): Looper {
            require(looper == null) { "Looper.prepare() must be called once in each thread." }
            val thread = Thread.currentThread()
            val threadLooper = Looper(thread)
            looperMapWriteLock.withLock { looperMap[thread] = threadLooper }
            return threadLooper
        }

        fun loop() {
            requireNotNull(looper) { "Looper.prepare() must be called before Looper.loop()" }
            looper?.quitFlag = false
            while (looper?.quitFlag == false) {
                val me = looper!!.mMessageQueue
                val msg = me.next()
                kotlin.runCatching {
                    println("[${Thread.currentThread().name}] Dispatching message is started.")
                    msg.target?.dispatchMessage(msg)
                    println("[${Thread.currentThread().name}] Dispatching message is stopped. ")
                }.onFailure {
                    it.printStackTrace()
                }
            }
        }
    }

    fun quit() {
        quitFlag = true
        looperMap.remove(Thread.currentThread())
    }
}

Handler

package com.custom.handler

open class Handler {
    private var interceptor: Interceptor? = null
    private lateinit var mLooper: Looper

    constructor() {
        val looper = requireNotNull(Looper.looper) { "No looper found in current thread." }
        Handler(looper)
    }

    constructor(looper: Looper) {
        mLooper = looper
    }

    interface Interceptor {
        fun handleMessage(message: Message?): Boolean
    }

    open fun dispatchMessage(message: Message?) {
        println("[${Thread.currentThread().name}] onDispatchMessage: message=$message")
        message ?: return
        if (message.block == null) {
            if (interceptor?.handleMessage(message) != true){
                handleMessage(message)
            }
            return
        }
        message.block.invoke()
    }

    open fun handleMessage(message: Message?) {
        println("[${Thread.currentThread().name}] onReceiveMessage: message=$message")
    }

    fun sendMessage(message: Message?) {
        println("[${Thread.currentThread().name}] onSendMessage: message=$message")
        message?.target = this
        mLooper.mMessageQueue.enqueue(message ?: return)
    }

    fun quit(){
        mLooper.quit()
    }
}

[扩展] HandlerThread

import com.custom.handler.Looper
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

class HandlerThread(name:String):Thread(name) {
    private val lock = ReentrantLock()
    private val condition = lock.newCondition()
    private var mLooper: Looper? = null
    val looper: Looper
        get() {
            if (mLooper == null) {
                lock.withLock { condition.await() }
            }
            return requireNotNull(mLooper)
        }

    override fun run() {
        super.run()
        mLooper = Looper.prepare()
        lock.withLock { condition.signalAll() }
        Looper.loop()
    }
}

[测试] Main
由于HandlerThread已经处理了子线程初始化Looper的操作,因此,使用该类直接测试即可。

import com.custom.handler.Handler
import com.custom.handler.Message

fun main(args: Array<String>) {
    testHandler()
}

private fun testHandler(){
    val handlerThread = HandlerThread("Thread-1").apply { start() }
    val handler = object : Handler(handlerThread.looper) {
        override fun handleMessage(message: Message?) {
            super.handleMessage(message)
        }
    }
    for (i in 0..3){
        if (i == 3) {
            handler.quit()
            return
        }
        Thread.sleep(3000)
        println("\n")
        handler.sendMessage(Message(100, "message from Main-Thread"))
    }
}
// console output
[Thread-1] Message queue is empty, then it is going to await...

[main] onSendMessage: message=Message(what=100, args=message from Main-Thread, block=null)
[Thread-1] Message queue wakes up.
[Thread-1] Dispatching message is started.
[Thread-1] onDispatchMessage: message=Message(what=100, args=message from Main-Thread, block=null)
[Thread-1] onReceiveMessage: message=Message(what=100, args=message from Main-Thread, block=null)
[Thread-1] Dispatching message is stopped. 
[Thread-1] Message queue is empty, then it is going to await...

[main] onSendMessage: message=Message(what=100, args=message from Main-Thread, block=null)
[Thread-1] Message queue wakes up.
[Thread-1] Dispatching message is started.
[Thread-1] onDispatchMessage: message=Message(what=100, args=message from Main-Thread, block=null)
[Thread-1] onReceiveMessage: message=Message(what=100, args=message from Main-Thread, block=null)
[Thread-1] Dispatching message is stopped. 
[Thread-1] Message queue is empty, then it is going to await...

[main] onSendMessage: message=Message(what=100, args=message from Main-Thread, block=null)
[Thread-1] Message queue wakes up.
[Thread-1] Dispatching message is started.
[Thread-1] onDispatchMessage: message=Message(what=100, args=message from Main-Thread, block=null)
[Thread-1] onReceiveMessage: message=Message(what=100, args=message from Main-Thread, block=null)
[Thread-1] Dispatching message is stopped. 

Process finished with exit code 0

Q&A


  1. Handler是怎么实现线程切换的?
    就个人理解而言,所谓“线程切换”,即就是实现线程间的通信即可。此处,我们姑且把“线程”理解为一个无限循环。外界的消息/操作要想插入到这个“线程”,可想象为把消息/操作插入到无限循环中,如此以来便完成了“线程切换”。
    消息插入无限循环:即为存在竞争关系的变量的同步问题。线程外部修改变量,线程内部读取变量。可通过背景知识中提到的各种线程间通信的方法来实现。
    操作插入无限循环:即为注册回调问题。

发送消息时,调用Handler#sendMessage(message),该方法运行在发生调用动作所在的线程(简称,调用线程);处理消息时,会先后调用Handler#dispatchMessage(messge)Handler#handleMessage(message),此两方法运行在Looper.loop()的无限循环中,即Looper的实例对象所属的线程(简称,处理线程)。而发送消息的本质是:调用线程向处理线程的Looper内的消息队列MessageQueue中插入一个消息对象;处理消息的本质是:处理线程取出消息再调用消息实例对象持有的Handler实例的Handler#handleMessage(message),参数即为当前取出的消息实例对象

  1. 无消息时,MessageQueue#next()如何被阻塞?
    方法内部会先调用android.os.MessageQueue#nativePollOnce(ptr, timeoutMillis)方法。若当前没有消息,则nativePollOnce(ptr, timeoutMillis)会从native层使用epollepoll_wait机制阻塞当前线程,至此MessageQueue.next()也就被阻塞。
Message next () {
    ...
    for (;;) {
        ...
        android.os.MessageQueue#nativePollOnce
        ...
        return nextMsg
    }
}
  1. 有新消息时,MessageQueue#next()如何被唤醒?
    当消息队列被插入新消息时,MessageQueue#enqueueMessage(msg, when)会被调用,消息插入完成后,其内部android.os.MessageQueue#nativeWake(ptr)会被调用,此时原本被android.os.MessageQueue#nativePollOnce(ptr, timeoutMillis)阻塞的线程就会唤醒,并开始执行取消息和处理消息的流程。
boolean enqueueMessage (message, when) {
    ...
    //insert message
    ...
    nativeWake(mPtr)
}

写在最后


文中提到的ThreadLocal,简言之,就是JDK提供的一种存储“线程内部”变量的手段。Android的基于Handler的消息机制,也正是借助这一点来保存各个线程内部Looper的。当然,若在不考虑性能优化等各种因素条件下,不使用ThreadLocal也是可以实现类似功能的,如本文所实现的消息框架则是使用了Map的数据结构来实现的。 当然,本文的简单实现也旨在解释清楚Android消息机制的原理。

附:


源码地址: https://github.com/heychinje/Simple-Handler
关于Android中的epoll:https://stackoverflow.com/questions/38818642/android-what-is-message-queue-native-poll-once-in-android

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容