本文章讲解的内容是深入了解Android消息机制和源码分析(Java层和Native层),建议对着示例项目阅读文章,示例项目链接如下:
本文章分析的相关的源码基于Android SDK 29(Android 10.0,即Android Q)。
概述
Android消息机制涉及到以下四个类:
- Message:消息,它分为硬件产生的消息(例如:触摸、点击)和软件产生的消息。
- MessageQueue:消息队列,它的作用是向消息池投递消息和从消息池中取出消息。
- Looper:用于为线程运行消息循环,从MessageQueue(消息队列)中取出消息,然后分发给Message(消息)对应的宿主Handler。
- Handler:用于处理消息,向消息池发送消息事件和处理消息事件。
示例代码
示例代码如下所示:
package com.tanjiajun.handlerdemo;
import android.app.Activity;
import android.os.Bundle;
import android.os.Handler;
import android.os.Message;
import android.widget.TextView;
import androidx.annotation.NonNull;
import androidx.appcompat.app.AppCompatActivity;
import java.lang.ref.WeakReference;
/**
* Created by TanJiaJun on 2020/9/25.
*/
public class MainActivity extends AppCompatActivity {
// 消息类别
private static final int MESSAGE_CODE_MAIN = 0;
// 继承Handler,并且重写它的handleMessage(Message msg)方法
private static final class MainHandler extends Handler {
// 声明Activity的弱引用对象
private final WeakReference<Activity> activityRef;
MainHandler(Activity activity) {
// 创建Activity的弱引用对象
activityRef = new WeakReference<>(activity);
}
@Override
public void handleMessage(@NonNull Message msg) {
super.handleMessage(msg);
// 得到MainActivity对象
MainActivity activity = (MainActivity) activityRef.get();
if (msg.what == MESSAGE_CODE_MAIN) {
// 如果消息类别是MESSAGE_CODE_MAIN的值,就执行以下逻辑
// 得到消息内容
String content = (String) msg.obj;
// 设置TextView的文本为消息内容
activity.tvContent.setText(content);
}
}
}
private TextView tvContent;
private MainHandler mainHandler;
private Runnable runnable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
tvContent = findViewById(R.id.tv_content);
// 创建MainHandler对象
mainHandler = new MainHandler(this);
// 创建Runnable对象
runnable = new Runnable() {
@Override
public void run() {
// 从消息池中取出消息
Message message = mainHandler.obtainMessage();
// 设置消息类别
message.what = MESSAGE_CODE_MAIN;
// 设置消息内容
message.obj = "谭嘉俊";
// 将该消息添加到消息队列的尾部
mainHandler.sendMessage(message);
}
};
// 创建线程,并且启动它
new Thread(runnable).start();
}
@Override
protected void onDestroy() {
super.onDestroy();
// 删除消息队列中Runnable的普通消息
mainHandler.removeCallbacks(runnable);
}
}
使用静态内部类MainHandler的原因是静态内部类默认不持有外部类的引用,同时使用弱引用持有Activity对象,当Activity对象销毁的时候,也就是没有强引用持有Activity对象的时候,垃圾收集器就会回收它,防止出现内存泄漏。
Message
Message(消息)分为硬件产生的消息(例如:触摸、点击)和软件产生的信息。
Java层
成员变量
Message类的成员变量,源码如下所示:
// Message.java
// 消息类别,用户定义的消息代码,以便接受者能够识别这个消息的内容,每个Handler都有自己的消息代码名称空间,因此不必担心我们的Handler和其他的Handler会产生冲突
public int what;
// 参数1,如果我们只需要存储int数据类型,那么使用arg1和arg2是代替setData(Bundle data)方法最好的办法
public int arg1;
// 参数2,如果我们只需要存储int数据类型,那么使用arg1和arg2是代替setData(Bundle data)方法最好的办法
public int arg2;
// 消息内容,要发送给接受者的任意对象,要注意的是,当使用Messenger跨进程发送消息时,这个属性无法传输自己实现Parcelable接口的类,只能传输Framework已有的实现Parcelable接口的类,但是我们可以使用setData(Bundle data)方法传输它
public Object obj;
// 可选的信使,具体语义由发送者和接受者决定
public Messenger replyTo;
// 未设置的uid,默认值是-1
public static final int UID_NONE = -1;
// 发送消息的uid,它的值是UID_HOME,是可选字段,只对由Messenger发布的消息有效
public int sendingUid = UID_NONE;
// 消息进入队列的uid,它的值是UID_HOME,是可选字段,要注意的是,它不支持外部应用使用
public int workSourceUid = UID_NONE;
// 标记消息正在使用状态,默认值是1左移0位,也就是1
/*package*/ static final int FLAG_IN_USE = 1 << 0;
// 标记消息是异步的,默认值是1左移1位,也就是2
/*package*/ static final int FLAG_ASYNCHRONOUS = 1 << 1;
// 要在copyFrom()方法中清除的标记,默认值是FLAG_IN_USE
/*package*/ static final int FLAGS_TO_CLEAR_ON_COPY_FROM = FLAG_IN_USE;
// 标记
@UnsupportedAppUsage
/*package*/ int flags;
// 消息触发时间
@UnsupportedAppUsage
@VisibleForTesting(visibility = VisibleForTesting.Visibility.PACKAGE)
public long when;
// 设置一组任意数据值,通过setData(Bundle data)方法设值,通过getData()或者peekData()取值
/*package*/ Bundle data;
// 消息接收者,通过setTarget(Handler target)设值,通过getTarget()取值
@UnsupportedAppUsage
/*package*/ Handler target;
// 回调方法,只能通过getCallback()取值
@UnsupportedAppUsage
/*package*/ Runnable callback;
// 下一个存储消息的结点,它不支持外部应用使用
@UnsupportedAppUsage
/*package*/ Message next;
// 用于作为锁对象
public static final Object sPoolSync = new Object();
// 存储Message的头结点
private static Message sPool;
// 消息池大小,默认值是0
private static int sPoolSize = 0;
// 最大消息池大小,默认值是50
private static final int MAX_POOL_SIZE = 50;
// 是否需要检查回收
private static boolean gCheckRecycle = true;
我们看下与成员变量data相关的三个方法,分别是setData(Bundle data)方法、getData()方法和peekData()方法。
setData(Bundle data)方法可以设置一组任意数据值,如果是int数据类型的值可以使用arg1和arg2代替它,源码如下所示:
// Message.java
public void setData(Bundle data) {
this.data = data;
}
getData()方法可以惰性地获取与这个事件关联的一组数据,可以通过setData(Bundle data)设置该值,要注意的是,当使用Messenger跨进程传输数据时,我们可以通过Bundle类的setClassLoader(ClassLoader loader)方法设置类加载器,以便它可以在检索对象时实例化对象,源码如下所示:
// Message.java
public Bundle getData() {
// 判断data是否为空
if (data == null) {
// 如果是空的话,就创建一个新的Bundle对象
data = new Bundle();
}
return data;
}
peekData()方法类似于getData()方法,但是这个方法不会惰性创建,如果Bundle为null,就返回null,源码如下所示:
// Message.java
public Bundle peekData() {
return data;
}
obtain()
obtain()方法可以从消息池中返回一个新的消息对象,避免重新分配新的消息对象,建议使用这个方法得到Message对象,而不是去new一个新的Message对象,源码如下所示:
// Message.java
public static Message obtain() {
// 取Object对象作为锁对象
synchronized (sPoolSync) {
if (sPool != null) {
// 如果消息池不为空,就执行以下逻辑
// 获得消息队列的单向链表的头结点
Message m = sPool;
// 把消息队列的单向链表的下一个结点赋值给成员变量sPool
sPool = m.next;
// 将这个结点的下一个结点赋值为空,相当于将这个结点从消息队列的单向链表中删除
m.next = null;
// 将这个消息标记为不再使用,正在使用状态是FLAG_IN_USE,它的值是1左移0位,
m.flags = 0;
// 自减消息池的数量
sPoolSize--;
// 返回消息
return m;
}
}
// 如果消息池为空,就创建新的Message对象,并且返回它
return new Message();
}
recycle()
recycle()方法可以把不再使用的消息对象添加到消息池,调用这个方法后,这个消息对象就不能使用,因为它已经被有效地释放,回收正在排队的消息和回收正在传递给Handler的消息都是错误的,源码如下所示:
// Message.java
public void recycle() {
if (isInUse()) {
if (gCheckRecycle) {
// 如果这个消息是正在使用状态,就抛出IllegalStateException异常
throw new IllegalStateException("This message cannot be recycled because it "
+ "is still in use.");
}
return;
}
// 调用recycleUnchecked()方法
recycleUnchecked();
}
// 回收不再使用的消息,添加到消息池
@UnsupportedAppUsage
void recycleUnchecked() {
// 将消息标记为FLAG_IN_USE状态,所有参数设置为初始化状态
flags = FLAG_IN_USE;
what = 0;
arg1 = 0;
arg2 = 0;
obj = null;
replyTo = null;
sendingUid = UID_NONE;
workSourceId = UID_NONE;
when = 0;
target = null;
callback = null;
data = null;
// 取Object对象作为锁对象
synchronized (sPoolSync) {
if (sPoolSize < MAX_POOL_SIZE) {
// 如果消息池的数量小于MAX_POOL_SIZE的值(50),就将当前消息添加到消息队列的单向链表的头部
// 将原来的头结点赋值给下一个结点
next = sPool;
// 将这个消息设为头结点
sPool = this;
// 自增消息池数量
sPoolSize++;
}
}
}
native层
Message结构体在Looper.h文件中,源码如下所示:
// system/core/libutils/include/utils/Looper.h
struct Message {
Message() : what(0) { }
Message(int w) : what(w) { }
// 消息类别
int what;
};
MessageQueue
MessageQueue(消息队列)是存放消息的队列,它的数据结构是单向链表,每个线程内部都维护着一个消息队列,它的作用是向消息池中投递消息和从消息池中取出消息,它是消息机制的Java层和C++层的连接纽带。
Java层
native方法
在MessageQueue的源码中,调用了多个C++方法,有如下native方法,源码如下所示:
// MessageQueue.java
// 初始化
private native static long nativeInit();
// 销毁
private native static void nativeDestroy(long ptr);
// 取出消息队列中的消息,形式参数timeoutMillis是超时时间
private native void nativePollOnce(long ptr, int timeoutMillis);
// 唤醒线程
private native static void nativeWake(long ptr);
// 线程是否处于阻塞状态
private native static boolean nativeIsPolling(long ptr);
// 设置文件描述符
private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
enqueueMessage(Message msg, long when)
enqueueMessage(Message msg, long when)方法的作用是添加一条普通消息到消息队列中,源码如下所示:
// MessageQueue.java
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
// 如果消息没有接收者,就抛出IllegalArgumentException异常
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
// 如果消息正在使用,就抛出IllegalStateException异常
throw new IllegalStateException(msg + " This message is already in use.");
}
// 取NessageQueue对象作为锁对象
synchronized (this) {
if (mQuitting) {
// 如果正在退出,就执行以下逻辑
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
// 打印消息的接收者
Log.w(TAG, e.getMessage(), e);
// 调用recycle()方法,回收消息
msg.recycle();
return false;
}
// 将消息标记为正在使用状态
msg.markInUse();
// 设置消息的触发时间
msg.when = when;
// 取消息队列的单向链表的头结点存储的消息
Message p = mMessages;
// 这个变量用来判断是否需要唤醒线程
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// 如果是消息队列为空,或者没有设置触发时间,或者触发时间比头结点的消息要早,就插入消息队列的单向链表的头部
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// 将消息按着时间顺序插入到消息队列的单向链表中,通常不需要唤醒事件队列,除非队列的头部有一个屏障,并且消息是队列中最早的异步消息
// needWake的值是由线程是否被阻塞,并且消息存在宿主Handler,并且消息是异步的决定
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
// 循环执行
for (;;) {
// 采用快慢指针,prev是慢指针,p是快指针
prev = p;
p = p.next;
if (p == null || when < p.when) {
// 如果已经到了消息队列的单向链表的尾部,或者有触发时间早于该消息的消息,就结束循环
break;
}
if (needWake && p.isAsynchronous()) {
// 如果消息是是异步,在它前面的消息也是异步的,就不用去唤醒线程
needWake = false;
}
}
// 将消息插入到消息队列的单向链表中
// 和p == prev.next同义
msg.next = p;
prev.next = msg;
}
// 消息没有退出,可以认为这个时候mPtr != 0
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
上面也提到了,MessageQueue的数据结构是单向链表,从上面的源码可得知,它是按着时间顺序进行排列的,头结点存储的是最先分发的消息,当要添加一条消息到消息队列时,如果消息队列为空,或者消息没有设置触发时间,或者消息触发时间比头结点的消息要早,就插入到单向链表的头部,否则,就按着时间顺序插入到单向链表中。
next()
next()方法的作用是从消息池中取出消息,它会被下面讲解到Looper.loop()方法调用,源码如下所示:
// MessageQueue.java
@UnsupportedAppUsage
Message next() {
// mPtr是从native方法中的到的NativeMessageQueue地址
final long ptr = mPtr;
if (ptr == 0) {
// 如果mPtr是0,说明消息队列不存在或者被清理掉了
return null;
}
// 待处理的IdleHandler数量,默认值是-1
int pendingIdleHandlerCount = -1;
// 线程被阻塞的时间,-1是一直阻塞,大于等于0是不阻塞
int nextPollTimeoutMillis = 0;
// 循环执行
for (;;) {
if (nextPollTimeoutMillis != 0) {
// 如果nextPollTimeoutMillis不等于0,就调用Binder.flushPendingCommands()方法,把要释放的对象释放
Binder.flushPendingCommands();
}
// 阻塞线程
nativePollOnce(ptr, nextPollTimeoutMillis);
// 取MessageQueue对象作为锁对象
synchronized (this) {
// 得到当前时间
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// 如果是同步屏障消息,就循环执行,找到第一条异步的消息
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
// 如果消息队列中有可取出的消息,就执行以下逻辑
if (now < msg.when) {
// 如果待取出的消息还没到应该要处理的时间,就让线程阻塞到应该要处理的时间
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// 如果待取出的消息到达应该处理的时间,就直接取出消息
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
// 从消息队列中的单向链表中删除该消息的结点
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
// 设置消息的状态为正在使用状态
msg.markInUse();
// 返回消息,结束循环,结束next()方法
return msg;
}
} else {
// 如果消息队列没有可取出消息,就一直让线程阻塞
nextPollTimeoutMillis = -1;
}
if (mQuitting) {
// 如果消息队列已经退出,就处理退出消息
dispose();
return null;
}
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
// 如果第一次空闲,就会得到IdleHandler数组的大小,要注意的是,IdleHandler仅在消息队列为空或者只有一个将要在将来执行的消息(可能是个同步屏障)
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// 如果没有要运行的IdleHandler,就进入下此循环,并且阻塞线程
mBlocked = true;
continue;
}
// 执行到这里,证明线程中有待处理的IdleHandler
if (mPendingIdleHandlers == null) {
// 如果mPendingIdleHandlers为空,就初始化IdleHandler数组,最小长度是4
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
// 从IdleHandler数组中取出待处理的IdleHandler
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// 运行IdleHandler,只能在第一次迭代中执行以下逻辑
for (int i = 0; i < pendingIdleHandlerCount; i++) {
// 从mPendingIdleHandlers数组中取出idleHandler
final IdleHandler idler = mPendingIdleHandlers[i];
// 释放引用
mPendingIdleHandlers[i] = null;
// IdleHandler的执行模式,true是执行一次,false是总是执行
boolean keep = false;
try {
// 得到IdleHandler的执行模式
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
// 通过IdleHandler的执行模式来判断是否需要从mIdleHandlers数组中删除对应的IdleHandler
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// 将IdleHandler的计数数量重置为0,这样就不会再运行它们了
pendingIdleHandlerCount = 0;
// 在调用执行IdleHandler的逻辑的时候,可能已经有新的消息添加到消息队列,所以在这里就不用去阻塞线程,直接去查看还有没有新的消息
nextPollTimeoutMillis = 0;
}
}
要注意的是,nativePollOnce(long ptr, int timeoutMillis)方法的作用是阻塞线程,nextPollTimeoutMillis是在下一个消息到来之前,还需要等待的时长,-1表示消息队列中没有消息,线程被阻塞,大于等于0表示线程不阻塞;当有待处理的消息时,就会在nativePollOnce(long ptr, int timeoutMillis)方法返回后,从消息池(mMessages)中取出消息(Message);当处于线程空闲状态时,就会执行IdleHandler的方法。
quit(boolean safe)
quit(boolean safe)方法的作用是退出当前消息队列,清空消息队列中的所有消息,源码如下所示:
// MessageQueue.java
void quit(boolean safe) {
if (!mQuitAllowed) {
// 如果是不允许退出消息队列的状态,就抛出IllegalStateException异常
throw new IllegalStateException("Main thread not allowed to quit.");
}
synchronized (this) {
if (mQuitting) {
// 如果已经退出消息队列,就结束该方法
return;
}
// 标记为已退出消息队列
mQuitting = true;
if (safe) {
// 如果是安全退出消息队列,就调用removeAllFutureMessageLocked()方法,删除消息队列中没有被处理的消息
removeAllFutureMessagesLocked();
} else {
// 如果不是安全退出消息队列,就调用removeAllMessagesLocked()方法,删除消息队列中所有的消息
removeAllMessagesLocked();
}
// 唤醒线程
nativeWake(mPtr);
}
}
removeAllFutureMessagesLocked()
removeAllFutureMessagesLocked()方法的作用是删除消息队列中没有被处理的消息,源码如下所示:
// MessageQueue.java
private void removeAllFutureMessagesLocked() {
// 获取当前系统时间
final long now = SystemClock.uptimeMillis();
Message p = mMessages;
if (p != null) {
if (p.when > now) {
// 如果消息的触发时间晚于当前时间,就调用removeAllMessagesLocked()方法,删除消息队列中的所有的消息
removeAllMessagesLocked();
} else {
// 如果消息的触发时间早于当前时间,就执行以下逻辑
Message n;
// 循环执行
for (;;) {
n = p.next;
if (n == null) {
// 如果消息为空,就结束该方法
return;
}
if (n.when > now) {
// 如果消息的触发时间早于当前时间,就结束该循环
break;
}
p = n;
}
p.next = null;
do {
// n是晚于当前时间的消息
p = n;
n = p.next;
// 调用recycleUnchecked()方法,回收从n之后的消息到消息池中,也就是回收所有晚于当前时间的消息到消息池中
p.recycleUnchecked();
} while (n != null);
}
}
}
removeAllMessagesLocked()
removeAllMessagesLocked()方法的作用是删除消息队列中的所有的消息,源码如下所示:
// MessageQueue.java
private void removeAllMessagesLocked() {
Message p = mMessages;
while (p != null) {
Message n = p.next;
// 调用recycleUnchecked()方法,将消息回收到消息池中
p.recycleUnchecked();
p = n;
}
mMessages = null;
}
removeMessages(Handler h, Runnable r, Object object)
removeMessages(Handler h, Runnable r, Object object)方法的作用是删除消息队列中符合条件的普通消息,源码如下所示:
// MessageQueue.java
void removeMessages(Handler h, int what, Object object) {
if (h == null) {
// 如果Handler为空,就返回
return;
}
// 取MessageQueue对象作为锁对象
synchronized (this) {
Message p = mMessages;
// 从消息队列的单向链表的头结点开始,删除所有符合条件的消息
while (p != null && p.target == h && p.what == what
&& (object == null || p.obj == object)) {
Message n = p.next;
mMessages = n;
// 调用recycleUnchecked()方法,将消息回收到消息池中
p.recycleUnchecked();
p = n;
}
// 删除剩余符合条件的消息
while (p != null) {
Message n = p.next;
if (n != null) {
if (n.target == h && n.what == what
&& (object == null || n.obj == object)) {
Message nn = n.next;
// 调用recycleUnchecked()方法,将消息回收到消息池中
n.recycleUnchecked();
p.next = nn;
continue;
}
}
p = n;
}
}
}
removeMessages(Handler h, Runnable r, Object object)方法逻辑中有两个while循环,第一个循环的作用是从消息队列的单向链表的头结点开始,删除符合条件的消息,第二个循环的作用是删除剩余符合条件的消息。
postSyncBarrier(long when)
postSyncBarrier(long when)方法的作用是添加一条同步屏障消息到消息队列中。
普通消息和同步屏障消息的区别是:普通消息的target变量不为空,同步屏障消息的变量target为空,同步屏障的作用是在处理消息时,优先处理异步的消息,起到过滤的作用。
源码如下所示:
// MessageQueue.java
@TestApi
public int postSyncBarrier() {
// 调用postSyncBarrier(long when),形式参数when是当前时间
return postSyncBarrier(SystemClock.uptimeMillis());
}
private int postSyncBarrier(long when) {
// 去MessageQueue对象作为锁对象
synchronized (this) {
// 得到同步屏障token
final int token = mNextBarrierToken++;
// 从消息池中取出消息,并没有对变量target赋值,所以为空
final Message msg = Message.obtain();
// 标记消息为正在使用状态
msg.markInUse();
// 设置触发时间
msg.when = when;
// 将token赋值给变量arg1
msg.arg1 = token;
Message prev = null;
Message p = mMessages;
if (when != 0) {
while (p != null && p.when <= when) {
// 如果触发时间不为空,就在消息队列中按照时间顺序找到它的位置
prev = p;
p = p.next;
}
}
if (prev != null) {
// 如果不是消息队列的单向链表的头结点,就把同步屏障消息添加到消息队列中
msg.next = p;
prev.next = msg;
} else {
// 如果是消息队列的单向链表的头结点,就把同步屏障消息添加到头部
msg.next = p;
mMessages = msg;
}
// 同步屏障消息添加成功,就返回对应的token
return token;
}
}
token是的作用是在删除同步屏障消息时,可以找到对应的消息,下面会讲解。
removeSyncBarrier(int token)
removeSyncBarrier(int token)方法的作用是删除消息队列中符合条件的同步屏障消息,源码如下所示:
// MessageQueue.java
@TestApi
public void removeSyncBarrier(int token) {
// 取MessageQueue对象作为锁对象
synchronized (this) {
Message prev = null;
Message p = mMessages;
// 循环遍历消息队列的单向链表,找出target为空,并且变量args是对应token的消息
while (p != null && (p.target != null || p.arg1 != token)) {
prev = p;
p = p.next;
}
if (p == null) {
// 如果p为空,证明没找到对应token的同步屏障消息,抛出IllegalStateException异常
throw new IllegalStateException("The specified message queue synchronization "
+ " barrier token has not been posted or has already been removed.");
}
// 该变量用于判断是否需要唤醒线程
final boolean needWake;
if (prev != null) {
// 如果不是消息队列的单向链表的头结点,就不需要唤醒线程,证明在它前面还有其他消息
prev.next = p.next;
needWake = false;
} else {
// 如果是消息队列中的单向链表的头结点,就执行以下逻辑
mMessages = p.next;
// 如果消息队列为空或者下一条消息是普通消息,就需要唤醒线程,否则,就不需要唤醒线程
needWake = mMessages == null || mMessages.target != null;
}
// 将消息回收到消息池中
p.recycleUnchecked();
if (needWake && !mQuitting) {
// 如果需要唤醒线程,并且循环还没退出,就调用nativeWake(long ptr)方法,唤醒线程
nativeWake(mPtr);
}
}
}
同步屏障使用场景
在类ViewRootImpl的scheduleTraversals()方法和unscheduleTraversals()方法中使用到了同步屏障。
scheduleTraversals()方法的作用是执行添加同步屏障消息,View更新相关的方法都有调用scheduleTraversals()方法,目的是让Android系统优先执行跟View更新相关的异步消息,优先处理跟View更新相关的逻辑,源码如下所示:
// ViewRootImpl.java
@UnsupportedAppUsage
void scheduleTraversals() {
if (!mTraversalScheduled) {
mTraversalScheduled = true;
// 添加同步屏障消息
mTraversalBarrier = mHandler.getLooper().getQueue().postSyncBarrier();
// 执行添加同步屏障消息
mChoreographer.postCallback(
Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
if (!mUnbufferedInputDispatch) {
scheduleConsumeBatchedInput();
}
notifyRendererOfFramePending();
pokeDrawLockIfNeeded();
}
}
unscheduleTraversals()方法的作用是执行删除同步屏障消息任务,源码如下所示:
// ViewRootImpl.java
void unscheduleTraversals() {
if (mTraversalScheduled) {
mTraversalScheduled = false;
// 删除同步屏障消息
mHandler.getLooper().getQueue().removeSyncBarrier(mTraversalBarrier);
// 执行删除同步屏障消息
mChoreographer.removeCallbacks(
Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
}
}
IdleHandler
IdleHandler是一个回调接口,用于发现线程什么时候需要阻塞更多消息。
如果消息队列为空,它就会阻塞线程等待消息到来,这种状态称之为空闲状态。
源码如下所示:
// MessageQueue.java
public static interface IdleHandler {
boolean queueIdle();
}
queueIdle()方法会在当消息队列处理完所有消息,并且等待更多消息的时候调用,也就是处于空闲状态,返回true代表只执行一次,返回false代表会一直执行这个方法。
IdleHandler有如下使用场景:
- 提供一个Activity绘制完成的回调。
- 结合HandlerThread,提供一个单线程消息通知器。
Native层
MessageQueue是连接Java层和Native层的纽带,Java层的MessageQueue和Native层的MessageQueue是通过JNI(Java Native Interface)建立关联,Java层可以向MessageQueue添加Message,Native层也可以向MessageQueue添加Message。
nativeInit()
nativeInit()方法在MessageQueue类的构造方法中调用,源码如下所示:
// MessageQueue.java
@UnsupportedAppUsage
@SuppressWarnings("unused")
private long mPtr;
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
// 调用nativeInit()函数
mPtr = nativeInit();
}
nativeInit()方法通过JNI调用android_os_MessageQueue_nativeInit(JNIEnv env, jclass clazz)*函数,该函数的源码如下所示:
// frameworks/base/core/jni/android_os_MessageQueue.cpp
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
// 初始化Native层的MessageQueue
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
// 在JNI中抛出RuntimeException异常
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
// 增加引用计数
nativeMessageQueue->incStrong(env);
// 将nativeMessageQueue的指针强转成JNI的jlong类型
return reinterpret_cast<jlong>(nativeMessageQueue);
}
成员变量mPtr是nativeInit()函数的返回值,它是Native层的NativeMessage的指针。
看下NativeMessage的构造函数,源码如下所示:
// frameworks/base/core/jni/android_os_MessageQueue.cpp
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
// 从ThreadLocal中取出Looper实例,相当于Java层的Looper.myLooper()方法
mLooper = Looper::getForThread();
if (mLooper == NULL) {
// 如果ThreadLocal中没有Looper实例,就执行以下逻辑
// 创建Native层的Looper实例
mLooper = new Looper(false);
// 将Looper对象存储到ThreadLocal,相当于Java层的ThreadLocal的set(T value)方法
Looper::setForThread(mLooper);
}
}
看下Looper的构造函数,源码如下所示:
// system/core/libutils/Looper.cpp
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(0),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
// eventfd(unsigned int __initial_value, int __flags)函数用来创建一个事件对象,它返回一个文件描述符来代表这个事件对象,我们可以使用它来调用对象,这里是创建一个唤醒事件的文件描述符
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
// AutoMutex _l函数的作用是对mLock加锁,执行完成后自动释放锁,它利用了C++的构造函数和析构函数的自动加锁和自动释放锁
AutoMutex _l(mLock);
// 重建mPoll事件
rebuildEpollLocked();
}
看下rebuildEpollLocked()函数,源码如下所示:
// system/core/libutils/Looper.cpp
void Looper::rebuildEpollLocked() {
if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
// 如果epoll文件描述符大于等于0,也就是已经有一个,就关闭旧的epoll实例
mEpollFd.reset();
}
// 创建新的epoll实例,并且注册wake管道,参数是表示监听的文件描述符数目,它向内核申请一段内存空间
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
struct epoll_event eventItem;
// 将未使用数据区域归零
memset(& eventItem, 0, sizeof(epoll_event));
// 设置eventItem为可读事件
eventItem.events = EPOLLIN;
// 把mWakeEventFd赋值给eventItem
eventItem.data.fd = mWakeEventFd.get();
// 将mWakeEventFd(唤醒事件)添加到mEpollFd(epoll实例)
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
strerror(errno));
for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);
// 将request队列的事件分别添加到epoll实例
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
request.fd, strerror(errno));
}
}
}
文件描述符
Linux内核(kernel)使用文件描述符(File Descriptor)来访问文件,它是一个非负整数,它是一个文件的索引。新建文件或者打开已经存在的文件的时候,Linux内核会返回一个文件描述符。
epoll
epoll是一种当文件描述符的内核缓冲区不是空的时候,发出可读信号进行通知;当写缓冲区还没满的时候,发出可写信号的进行通知的机制,它是一种I/O多路复用机制,可以同时监控多个文件描述符。
epoll是Linux内核2.6中提出的,它比select和poll强大;epoll没有描述符数量限制,更加灵活,它使用一个文件描述符(File Descriptor,简称:fd)管理多个描述符;epoll是Linux最高效的I/O复用机制,它将用户空间的文件描述符的事件存储到Linux内核的一个事件表中,从而使用户空间和内核空间的复制只需要一次,而且它可以在一个地方等待多个文件句柄的I/O事件。
epoll的操作可以分为三个函数:
int epoll_create(int size)
int epoll_create(int size)函数的作用是创建一个epoll的句柄。
参数size是初次分配的需要监听的描述符数量,如果后面空间不足的时候,就会进行动态扩容。
创建完epoll句柄后,就会占用一个文件描述符的值,并且返回这个值。
要注意的是,使用完epoll后,需要调用close()函数关闭,否则,会耗尽文件描述符。
int epoll_ctl(int __epoll_fd, int __op, int __fd, struct epoll_event* __event)
int epoll_ctl(int __epoll_fd, int __op, int __fd, struct epoll_event __event)函数的作用是对需要监听的文件描述符执行op操作,例如:将文件描述符加入到epoll句柄*。
参数如下所示:
epoll_fd
epoll_fd是epoll_create(int size)的返回值,它生成epoll专用的文件描述符。
op
op是op操作,它用三个宏表示,如下所示:
- EPOLL_CTL_ADD:注册新的文件描述符到epoll文件描述符中。
- EPOLL_CTL_MOD:修改已经注册的文件描述符的监听事件。
- EPOLL_CTL_DEL:从epoll文件描述符中删除一个文件描述符。
fd
fd是需要监听的文件描述符。
event
event是需要监听的事件,它的类型是epoll_event,epoll_event的源码如下所示:
// bionic/libc/include/bits/epoll_event.h
struct epoll_event {
// epoll事件
uint32_t events;
// 数据
epoll_data_t data;
}
events的值有如下:
- EPOLLIN:表示epoll的文件描述符有可读数据。
- EPOLLOUT:表示epoll的文件描述符有可写数据。
- EPOLLPRI:表示epoll的文件描述符有紧急的可读数据。
- EPOLLERR:表示epoll的文件描述符发生错误。
- EPOLLHUP:表示epoll的文件描述符被挂断。
- EPOLLET:表示将epoll设置为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)。
- EPOLLONESHOT:表示只监听一次事件,当监听完这次事件后,如果需要继续监听,就需要再次把对应的事件加入到epoll队列中。
epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
epoll_wait(int epfd, struct epoll_event events, int maxevents, int timeout)函数的作用是等待事件上报*。
参数如下所示:
- epfd:epoll_create(int size)的返回值,它生成epoll专用的文件描述符。
- events:需要监听的事件,前面讲解过,这里就不再赘述。
- maxevents:每次能处理的最大事件数量。
- timeout:等待I/O事件发生的超时值,-1是阻塞,0是非阻塞。
小结
nativeInit()方法执行如下逻辑:
- 创建NativeMessageQueue对象,增加其引用计数,并且将Native层的NativeMessageQueue的指针mPtr保存到Java层的MessageQueue。
- 创建Native层的Looper对象。
- 调用epoll的epoll_create(int size)函数和epoll_ctl(int __epoll_fd, int __op, int __fd, struct epoll_event __event)函数来完成对唤醒事件(mWakeEventFd)和Request队列事件(mRequests)的监听,它们都是可读事件*。
nativeDestroy(long ptr)
nativeDestroy(long ptr)方法的作用是回收。
nativeDestroy(long ptr)方法在MessageQueue类的dispose()方法调用,当需要退出消息队列的时候就会调用,源码如下所示:
// MessageQueue.java
private void dispose() {
if (mPtr != 0) {
// 调用nativeDestroy(long ptr)方法
nativeDestroy(mPtr);
mPtr = 0;
}
}
nativeDestroy(long ptr)方法通过JNI调用android_os_MessageQueue_nativeDestroy(JNIEnv env, jclass clazz, jlong ptr)*函数,该函数源码如下所示:
// frameworks/base/core/jni/android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
// 调用RefBase类decStrong(const void* id)函数
nativeMessageQueue->decStrong(env);
}
NativeMessageQueue类继承RefBase类,这里调用了RefBase类的decStrong(const void id)*函数,源码如下所示:
// system/core/libutils/RefBase.cpp
void RefBase::decStrong(const void* id) const
{
weakref_impl* const refs = mRefs;
// 删除强引用
refs->removeStrongRef(id);
const int32_t c = refs->mStrong.fetch_sub(1, std::memory_order_release);
#if PRINT_REFS
ALOGD("decStrong of %p from %p: cnt=%d\n", this, id, c);
#endif
LOG_ALWAYS_FATAL_IF(BAD_STRONG(c), "decStrong() called on %p too many times",
refs);
if (c == 1) {
std::atomic_thread_fence(std::memory_order_acquire);
refs->mBase->onLastStrongRef(id);
int32_t flags = refs->mFlags.load(std::memory_order_relaxed);
if ((flags&OBJECT_LIFETIME_MASK) == OBJECT_LIFETIME_STRONG) {
delete this;
// 析构函数在这种情况下不会删除引用
}
}
// 删除弱引用
refs->decWeak(id);
}
小结
nativeDestroy(long ptr)方法执行如下逻辑:
- 调用RefBase::decStrong(const void id)函数减少对象的引用计数*。
- 当引用计数是0的时候,就删除NativeMessageQueue对象。
nativePollOnce(long ptr, int timeoutMillis)
nativePollOnce(long ptr, int timeoutMillis)方法是在MessageQueue类的next()方法调用,源码如下所示:
// MessageQueue.java
@UnsupportedAppUsage
Message next() {
// 省略部分代码
for (;;) {
// 省略部分代码
nativePollOnce(ptr, nextPollTimeoutMillis);
// 省略部分代码
}
}
nativePollOnce(long ptr, int timeoutMillis)方法通过JNI调用android_os_MessageQueue_nativePollOnce(JNIEnv env, jobject obj,
jlong ptr, jint timeoutMillis)*函数,该函数的源码如下所示:
// frameworks/base/core/jni/android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
// 将Java层的mPtr传递到该函数,然后将其强转成NativeMessageQueue
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
// 调用NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis)函数
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
NativeMessageQueue::pollOnce(JNIEnv env, jobject pollObj, int timeoutMillis)*函数的源码如下所示:
// frameworks/base/core/jni/android_os_MessageQueue.cpp
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
// 调用pollOnce(int timeoutMillis)函数
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
pollOnce(int timeoutMillis)函数的源码如下所示:
// system/core/libutils/include/utils/Looper.h
inline int pollOnce(int timeoutMillis) {
// 调用Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData)函数
return pollOnce(timeoutMillis, nullptr, nullptr, nullptr);
}
Looper::pollOnce(int timeoutMillis, int outFd, int outEvents, void* outData)*函数的源码如下所示:
// system/core/libutils/Looper.cpp
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
// 循环执行
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
// 如果ident大于等于0,说明没有callback,因为POLL_CALLBACK等于-2,就执行以下逻辑
// 处理没有callback的response事件
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != nullptr) *outFd = fd;
if (outEvents != nullptr) *outEvents = events;
if (outData != nullptr) *outData = data;
return ident;
}
}
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != nullptr) *outFd = 0;
if (outEvents != nullptr) *outEvents = 0;
if (outData != nullptr) *outData = nullptr;
return result;
}
// 调用Looper::pollInner(int timeoutMillis)函数
result = pollInner(timeoutMillis);
}
}
参数如下所示:
- timeoutMillis:超时时间。
- outFd:发生事件的文字描述符。
-
outEvents:当前outFd上发生的事件,类型如下所示:
- EVENT_INPUT:可读
- EVENT_OUTPUT:可写
- EVENT_ERROR:错误
- EVENT_HANGUP:中断
- outData:上下文数据。
Looper::pollInner(int timeoutMillis)函数的源码如下所示:
// system/core/libutils/Looper.cpp
int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif
// 根据下一条消息到期的时间调整超时时间
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d",
this, mNextMessageUptime - now, timeoutMillis);
#endif
}
// poll
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// 即将处于空闲状态
mPolling = true;
// 文件描述符的最大数量是16
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// 等待事件上报
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// 不再是空闲状态
mPolling = false;
// 加锁
mLock.lock();
// 如果需要的,就重新设置poll
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
// 如果epoll的事件数量小于0,说明发生错误,跳转到Done标签
result = POLL_ERROR;
goto Done;
}
if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
// 如果epoll的事件数量等于0,说明超时,跳转到Done标签
result = POLL_TIMEOUT;
goto Done;
}
// 循环执行,处理所有事件
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd.get()) {
if (epollEvents & EPOLLIN) {
// 如果已经唤醒,就读取并且清空管道数据
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
// 处理request,并且生成对应的response对象,同时push到响应数据
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
// 处理Native层的Message,调用相应的回调方法
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// 从列表中删除信封,持有Handler的强引用,直到handleMessage方法调用结束,然后删除它,这样处理器就可以在我们重新获得锁之前被删除
{
// 获得Handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
handler->handleMessage(message);
}
// 释放Handler
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
// 消息队列中最后一条消息决定下一次唤醒的时间
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// 释放锁
mLock.unlock();
// 调用所有response的回调方法
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
// 调用回调方法,要注意的是,在函数返回之前,文件描述符可能被回调关闭(甚至可能被重用),因此随后删除文件描述符时,我们需要稍微小心一点
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
// 迅速清除response结构中回调引用,因为直到下一次轮询才会清除response本身
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
return result;
}
Looper::pollInner(int timeoutMillis)函数的返回值有如下所示:
- POLL_WAKE:表示由wake()触发,也就是pipe写端的write事件触发。
- POLL_CALLBACK:表示由某个被监听的文件描述符被触发。
- POLL_TIMEOUT:表示等待超时。
- POLL_ERROR:表示等待期间发生错误。
Looper::awoken()函数的源码如下所示:
// system/core/libutils/Looper.cpp
void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ awoken", this);
#endif
uint64_t counter;
// 不断读取管道数据,清空管道内容
TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t)));
}
小结
nativePollOnce(long ptr, int timeoutMillis)方法执行以下逻辑:
- 调用Looper::pollOnce(int timeoutMillis, int outFd, int outEvents, void* outData)函数,在空闲状态下停留在epoll_wait(int fd, struct epoll_event events, int max_events, int timeout)函数,目的是等待事件的发生或者超时**。
nativeWake(long ptr)
nativeWake(long ptr)方法的作用是唤醒线程,在MessageQueue类的enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis)方法和quit()方法都有调用该方法。
android_os_MessageQueue_nativeWake(JNIEnv env, jclass clazz, jlong ptr)*函数的源码如下所示:
// frameworks/base/core/jni/android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
// 调用NativeMessageQueue::wake()函数
nativeMessageQueue->wake();
}
NativeMessageQueue::wake()函数的源码如下所示:
// frameworks/base/core/jni/android_os_MessageQueue.cpp
void NativeMessageQueue::wake() {
// 调用Looper::wake()函数
mLooper->wake();
}
Looper::wake()函数的源码如下所示:
// system/core/libutils/Looper.cpp
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
uint64_t inc = 1;
// 向管道mWakeEventFd写入字符
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}
小结
nativeWake(long ptr)执行如下逻辑:
- 调用Looper::wake()函数,向管道mWakeEventFd写入字符。
Looper
Looper用于为线程运行消息循环,从MessageQueue(消息队列)中取出消息,然后分发给Message(消息)对应的宿主Handler。默认情况下,线程没有与之关联的消息循环,要创建一个循环,就在运行循环的线程中调用prepare()方法,然后调用loop()方法让它处理消息,直到循环停止。
要注意的是,每一个线程只有存在一个Looper对象。
Java层
成员变量
Looper类相关的成员变量,源码如下所示:
// Looper.java
// 标签
private static final String TAG = "Looper";
// 使用ThreadLocal存储Looper对象,使每个线程都能访问自己的一份Looper对象的变量副本
@UnsupportedAppUsage
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
// 主线程的Looper对象
@UnsupportedAppUsage
private static Looper sMainLooper;
// 当前Looper对象的观察者
private static Observer sObserver;
// 当前Looper对象的消息队列
@UnsupportedAppUsage
final MessageQueue mQueue;
// 当前Looper对象的所在的线程
final Thread mThread;
// 打印文本
@UnsupportedAppUsage
private Printer mLogging;
// 跟踪标记
private long mTraceTag;
// 如果消息分发时间超过该时间,就会显示警告日志
private long mSlowDispatchThresholdMs;
// 如果消息传递时间超过该时间,就会显示警告日志
private long mSlowDeliveryThresholdMs;
prepare()
prepare()方法的作用是在当前线程初始化Looper对象,相关的源码如下所示:
// Looper.java
// 在普通线程中初始化Looper对象
public static void prepare() {
// 初始化一个可以退出Looper的Looper对象
prepare(true);
}
// 私有的Looper对象初始化方法,形式参数quitAllowed为是否退出Looper
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
// 每个线程只有一个Looper对象,否则就抛出RuntimeException异常
throw new RuntimeException("Only one Looper may be created per thread");
}
// 将Looper对象存储到ThreadLocal对象
sThreadLocal.set(new Looper(quitAllowed));
}
// 在主线程中初始化Looper对象,要注意的是,我们不需要自己手动调用这个方法来在主线程中初始化Looper对象,因为它已经被Android系统创建
public static void prepareMainLooper() {
// 初始化一个不可以退出Looper的Looper对象
prepare(false);
// 取Looper的Class对象作为锁对象
synchronized (Looper.class) {
if (sMainLooper != null) {
// 如果已经在主线程创建过Looper对象,就抛出IllegalStateException异常
throw new IllegalStateException("The main Looper has already been prepared.");
}
// 调用myLooper()方法拿到与当前线程关联的Looper对象,并且赋值给成员变量sMainLooper
sMainLooper = myLooper();
}
}
// 返回与当前线程关联的Looper对象
public static @Nullable Looper myLooper() {
// 从ThreadLocal对象中取出与当前线程关联的Looper对象
return sThreadLocal.get();
}
// 返回与当前线程关联的Looper对象对应的MessageQueue对象
public static @NonNull MessageQueue myQueue() {
return myLooper().mQueue;
}
loop()
loop()方法的作用是在当前线程中运行消息队列,源码如下所示:
// Looper.java
public static void loop() {
// 与当前线程关联的Looper对象
final Looper me = myLooper();
if (me == null) {
// 如果没有初始化Looper对象,就抛出RuntimeException异常
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
// 与当前线程关联的Looper对象对应的消息队列
final MessageQueue queue = me.mQueue;
// 得到当前线程的唯一标识(uid和pid),作用是下面每次循环都会判断线程有没有被切换
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
// 允许使用系统属性覆盖阈值,例如:adb shell 'setprop log.looper.1000.main.slow 1 && stop && start'
final int thresholdOverride =
SystemProperties.getInt("log.looper."
+ Process.myUid() + "."
+ Thread.currentThread().getName()
+ ".slow", 0);
boolean slowDeliveryDetected = false;
// 循环执行
for (;;) {
// 从消息队列中取出消息,这有可能会阻塞线程
Message msg = queue.next();
if (msg == null) {
// 如果消息是空,证明消息队列已经退出,结束循环,结束方法
return;
}
// 打印日志
final Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " " +
msg.callback + ": " + msg.what);
}
// 确保观察者在处理事务的时候不会改变
final Observer observer = sObserver;
// 性能分析相关的逻辑
final long traceTag = me.mTraceTag;
long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;
long slowDeliveryThresholdMs = me.mSlowDeliveryThresholdMs;
if (thresholdOverride > 0) {
slowDispatchThresholdMs = thresholdOverride;
slowDeliveryThresholdMs = thresholdOverride;
}
final boolean logSlowDelivery = (slowDeliveryThresholdMs > 0) && (msg.when > 0);
final boolean logSlowDispatch = (slowDispatchThresholdMs > 0);
final boolean needStartTime = logSlowDelivery || logSlowDispatch;
final boolean needEndTime = logSlowDispatch;
if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
}
final long dispatchStart = needStartTime ? SystemClock.uptimeMillis() : 0;
final long dispatchEnd;
Object token = null;
if (observer != null) {
// 如果观察者不为空,就调用观察者的messageDispatchStarting()方法拿到token,它会在发送消息之前调用
token = observer.messageDispatchStarting();
}
long origWorkSource = ThreadLocalWorkSource.setUid(msg.workSourceUid);
try {
// 调用宿主Handler的dispatchMessage(Message msg)方法,将消息分发给宿主Handler
msg.target.dispatchMessage(msg);
if (observer != null) {
// 如果观察者不为空,就调用观察者的messageDispatched(Object token, Message msg)方法,它会在消息被处理的时候调用
observer.messageDispatched(token, msg);
}
dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
} catch (Exception exception) {
if (observer != null) {
// 如果发生异常,并且观察者不为空,就调用观察者的dispatchingThrewException(Object token, Message msg, Exception exception)方法,它会在处理消息时引发异常的时候调用
observer.dispatchingThrewException(token, msg, exception);
}
throw exception;
} finally {
// ThreadLocalWorkSource是用于跟踪是谁触发了这个线程上当前执行的工作,调用store(long token)方法,使用提供的令牌恢复状态
ThreadLocalWorkSource.restore(origWorkSource);
if (traceTag != 0) {
// 性能分析相关的逻辑
Trace.traceEnd(traceTag);
}
}
if (logSlowDelivery) {
// 如果消息传递时间超过该时间,就会显示警告日志
if (slowDeliveryDetected) {
if ((dispatchStart - msg.when) <= 10) {
Slog.w(TAG, "Drained");
slowDeliveryDetected = false;
}
} else {
if (showSlowLog(slowDeliveryThresholdMs, msg.when, dispatchStart, "delivery",
msg)) {
// Once we write a slow delivery log, suppress until the queue drains.
slowDeliveryDetected = true;
}
}
}
if (logSlowDispatch) {
// 如果消息分发时间超过时间,就会显示警告日志
showSlowLog(slowDispatchThresholdMs, dispatchStart, dispatchEnd, "dispatch", msg);
}
if (logging != null) {
// 打印日志
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}
final long newIdent = Binder.clearCallingIdentity();
if (ident != newIdent) {
// 如果本次循环中的线程和开始调用的线程不一样,就打印日志
Log.wtf(TAG, "Thread identity changed from 0x"
+ Long.toHexString(ident) + " to 0x"
+ Long.toHexString(newIdent) + " while dispatching to "
+ msg.target.getClass().getName() + " "
+ msg.callback + " what=" + msg.what);
}
// 消息分发完毕,将消息回收到消息池中
msg.recycleUnchecked();
}
}
quit()
quit()的作用是退出Looper,源码如下所示:
// Looper.java
// 退出Looper,要注意的是,在Looper被要求退出后,任何向消息队列发送消息的尝试都是将失败,例如:sendMessage(Message msg)方法返回false
public void quit() {
mQueue.quit(false);
}
// 安全退出Looper,要注意的是,在Looper被要求退出后,任何向消息队列发送消息的尝试都是将失败,例如:sendMessage(Message msg)方法返回false
public void quitSafely() {
mQueue.quit(true);
}
调用到MessageQueue的quit(boolean safe)方法,这个方法前面解释过了,这里不再赘述。
Native层
Looper.h文件中定义了三个结构体,分别是请求(Request)结构体、响应(Response)结构体和信封(MessageEnvelope)结构体,源码如下所示:
// system/core/libutils/include/utils/Looper.h
// 请求结构体
struct Request {
// 文件描述符
int fd;
// 当前事件发生的标识符,数值大于等于0或者是POLL_CALLBACK(-2)
int ident;
// 要监听的文件类型,默认是EVENT_INPUT,前面解释过,这里不再赘述
int events;
int seq;
// 当有事件发生时,会回调该callback函数
sp<LooperCallback> callback;
// 文件描述符里的数据
void* data;
void initEventItem(struct epoll_event* eventItem) const;
};
// 响应结构体
struct Response {
// 文件类型,默认是EVENT_INPUT,前面解释过,这里不再赘述
int events;
// Request对象
Request request;
};
// 信封结构体
struct MessageEnvelope {
MessageEnvelope() : uptime(0) { }
MessageEnvelope(nsecs_t u, sp<MessageHandler> h, const Message& m)
: uptime(u), handler(std::move(h)), message(m) {}
// 发信时间
nsecs_t uptime;
// 信封接受者
sp<MessageHandler> handler;
// 信封内容
Message message;
};
构造函数
Looper.cpp文件部分的源码如下所示:
// system/core/libutils/Looper.cpp
// 每次迭代检索轮询事件所需的文件描述符的最大数量是16
static const int EPOLL_MAX_EVENTS = 16;
// Looper的构造函数
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(0),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
// eventfd(unsigned int __initial_value, int __flags)函数用来创建一个事件对象,它返回一个文件描述符来代表这个事件对象,我们可以使用它来调用对象,这里是创建一个唤醒事件的文件描述符
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
// AutoMutex _l函数的作用是对mLock加锁,执行完成后自动释放锁,它利用了C++的构造函数和析构函数的自动加锁和自动释放锁
AutoMutex _l(mLock);
// 重建mPoll事件
rebuildEpollLocked();
}
Looper的构造函数里执行的逻辑前面解释过了,这里就不再赘述。
sendMessage开头系列函数
源码如下所示:
// system/core/libutils/Looper.cpp
void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
// 获得系统时间
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
// 调用Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message)函数
sendMessageAtTime(now, handler, message);
}
void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
const Message& message) {
// 获得系统时间
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
// 调用Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message)函数
sendMessageAtTime(now + uptimeDelay, handler, message);
}
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d",
this, uptime, handler.get(), message.what);
#endif
size_t i = 0;
{
// 加锁
AutoMutex _l(mLock);
size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
// 按着时间顺序统计索引i
i += 1;
}
// 创建信封,并且传入对应的参数
MessageEnvelope messageEnvelope(uptime, handler, message);
// 根据索引i将信封插入到信封向量(Vector, 它是一个封装了动态大小数组的顺序容器)
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
if (mSendingMessage) {
// 如果当前正在发送消息,就不再调用wake()函数唤醒poll循环,结束函数
return;
}
}
// 释放锁
if (i == 0) {
// 如果i等于0,说明该消息插入到信封向量的头部,调用wake()函数唤醒poll循环
wake();
}
}
回调类
LooperCallback类的源码如下所示:
// system/core/libutils/include/utils/Looper.h
class LooperCallback : public virtual RefBase {
protected:
// LooperCallback()函数是虚析构函数
virtual ~LooperCallback();
public:
// 处理给定的文件描述符的poll事件,它提供与之关联的文件描述符,被触发的poll事件的位掩码(通常是EVENT_INPUT)和最初提供的数据指针
virtual int handleEvent(int fd, int events, void* data) = 0;
};
SimpleLooperCallback类继承LooperCallback类,SimpleLooperCallback类的源码如下所示:
// system/core/libutils/include/utils/Looper.h
class SimpleLooperCallback : public LooperCallback {
protected:
// SimpleLooperCallback()是虚析构函数
virtual ~SimpleLooperCallback();
public:
SimpleLooperCallback(Looper_callbackFunc callback);
// handleEvent(int fd, int events, void* data)是虚函数
virtual int handleEvent(int fd, int events, void* data);
private:
// 回调函数
Looper_callbackFunc mCallback;
};
看下handleEvent(int fd, int events, void data)*函数,源码如下所示:
// system/core/libutils/Looper.cpp
int SimpleLooperCallback::handleEvent(int fd, int events, void* data) {
// 调用回调函数
return mCallback(fd, events, data);
}
Handler
Handler用于处理消息,向消息池发送消息事件和处理消息事件。
Java层
构造方法
看下Handler的构造方法,源码如下所示:
// Handler.java
// 默认构造方法,用于将该Handler与当前线程的Looper对象相关联
public Handler() {
this(null, false);
}
// 构造方法,用于将该Handler与当前线程的Looper对象相关联,并且接受一个可以处理消息的回调接口,参数callback可以为空
public Handler(@Nullable Callback callback) {
this(callback, false);
}
// 构造方法,用于将该Handler与传递进来的Looper对象相关联,参数looper不可为空
public Handler(@NonNull Looper looper) {
this(looper, null, false);
}
// 构造方法,用于将该Handler与传递进来的Looper对象相关联,并且接受一个可以处理消息的回调接口,参数looper不可为空,参数callback可为空
public Handler(@NonNull Looper looper, @Nullable Callback callback) {
this(looper, callback, false);
}
// 构造方法,用于将该Handler与当前线程的Looper对象相关联,并且设置Handler是否应该是异步,Handler在默认情况下是同步的,调用该方法可以构造异步的Handler,要注意的是,该方法不支持外部应用使用
public Handler(boolean async) {
this(null, async);
}
上面的构造方法最终会调用以下两个方法,源码如下所示:
// Handler.java
public Handler(@Nullable Callback callback, boolean async) {
if (FIND_POTENTIAL_LEAKS) {
// 如果存在内存泄漏的风险,就执行以下逻辑
final Class<? extends Handler> klass = getClass();
if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
(klass.getModifiers() & Modifier.STATIC) == 0) {
// 如果是匿名类,或者是内部类,或者是本地类,都要申明成静态,否则就会打印出警告,警告有可能会出现内存泄漏
Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
klass.getCanonicalName());
}
}
// 得到当前线程的Looper对象
mLooper = Looper.myLooper();
if (mLooper == null) {
// 如果当前线程的Looper对象为空,也就是Looper还没初始化,就抛出RuntimeException异常
throw new RuntimeException(
"Can't create handler inside thread " + Thread.currentThread()
+ " that has not called Looper.prepare()");
}
// 得到当前线程的Looper对象所对应的MessageQueue
mQueue = mLooper.mQueue;
// 将形式参数callback赋值给给成员变量mCallback
mCallback = callback;
// 将形式参数asyn赋值给成员变量mAsynchronous
mAsynchronous = async;
}
// 使用传递进来的Looper对象,而不是使用默认的,并且接受一个处理消息的回调接口,还要设置Handler是否应该是异步的,需要注意的是,该方法不支持外部应用使用
@UnsupportedAppUsage
public Handler(@NonNull Looper looper, @Nullable Callback callback, boolean async) {
mLooper = looper;
mQueue = looper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
发送消息
发送消息主要分为两类方法:
- post开头的方法
- sendMessage开头的方法
看下相关的源码,源码如下所示:
// Handler.java
// 将传递进来的Runnable添加到消息队列的尾部,该Runnable将在附加在该Handler的线程上运行
public final boolean post(@NonNull Runnable r) {
return sendMessageDelayed(getPostMessage(r), 0);
}
// 将传递进来的Runnable添加到消息队列的尾部,并在uptimeMillis时间内运行,该Runnable将在附加在该Handler的线程上运行
public final boolean postAtTime(@NonNull Runnable r, long uptimeMillis) {
return sendMessageAtTime(getPostMessage(r), uptimeMillis);
}
// 将传递进来的Runnable添加到消息队列的尾部,并在uptimeMillis时间内运行,token是个令牌,它为了在调用removeCallbacksAndMessages(Object token)方法可以删除消息队列中对应的消息,该Runnable将在附加在该Handler的线程上运行
public final boolean postAtTime(
@NonNull Runnable r, @Nullable Object token, long uptimeMillis) {
return sendMessageAtTime(getPostMessage(r, token), uptimeMillis);
}
// 将传递进来的Runnable添加到消息队列的尾部,并在delayMillis的时间后运行,该Runnable将在附加在该Handler的线程上运行
public final boolean postDelayed(@NonNull Runnable r, long delayMillis) {
return sendMessageDelayed(getPostMessage(r), delayMillis);
}
// 将传递进来的Runnable添加到消息队列的尾部,并在delayMillis的时间后运行,同时设置该消息的消息类别what,该Runnable将在附加在该Handler的线程上运行,要注意的是,该方法不支持外部应用调用
public final boolean postDelayed(Runnable r, int what, long delayMillis) {
return sendMessageDelayed(getPostMessage(r).setWhat(what), delayMillis);
}
// 将传递进来的Runnable添加到消息队列的尾部,并在delayMillis的时间后运行,token是个令牌,它为了在调用removeCallbacksAndMessages(Object token)方法可以删除消息队列中对应的消息,该Runnable将在附加在该Handler的线程上运行
public final boolean postDelayed(
@NonNull Runnable r, @Nullable Object token, long delayMillis) {
return sendMessageDelayed(getPostMessage(r, token), delayMillis);
}
// 将传递进来的Runnable添加到消息队列的头部,该Runnable将在附加在该Handler的线程上运行
public final boolean postAtFrontOfQueue(@NonNull Runnable r) {
return sendMessageAtFrontOfQueue(getPostMessage(r));
}
// 将传递进来的Message添加到消息队列的尾部
public final boolean sendMessage(@NonNull Message msg) {
return sendMessageDelayed(msg, 0);
}
// 添加一条空的普通消息到消息队列的尾部,并且设置该消息的消息类别what
public final boolean sendEmptyMessage(int what)
{
return sendEmptyMessageDelayed(what, 0);
}
// 添加一条空的延时delayMillis的普通消息到消息队列的尾部,并且设置该消息的消息类别what
public final boolean sendEmptyMessageDelayed(int what, long delayMillis) {
Message msg = Message.obtain();
msg.what = what;
return sendMessageDelayed(msg, delayMillis);
}
// 添加一条空的uptimeMillis时间内运行的普通消息到消息队列的尾部,并且设置该消息的消息类别what
public final boolean sendEmptyMessageAtTime(int what, long uptimeMillis) {
Message msg = Message.obtain();
msg.what = what;
return sendMessageAtTime(msg, uptimeMillis);
}
// 添加一条延时delayMillis的普通消息到消息队列的尾部
public final boolean sendMessageDelayed(@NonNull Message msg, long delayMillis) {
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
// 添加一条uptimeMillis时间内运行的普通消息到消息队列的尾部
public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
// 添加一条普通消息到消息队列的头部
public final boolean sendMessageAtFrontOfQueue(@NonNull Message msg) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, 0);
}
// 如果在该Handler对应的同一个线程上调用该消息,就同步执行该消息,否则,调用sendMessage(Message msg)方法将其添加到消息队列中,如果消息成功运行或者成功添加到消息队列中,就返回true,否则,就返回false,这通常是因为处理消息的Looper退出
public final boolean executeOrSendMessage(@NonNull Message msg) {
if (mLooper == Looper.myLooper()) {
dispatchMessage(msg);
return true;
}
return sendMessage(msg);
}
可以发现post开头的方法都调用getPostMessage(Runnable r)方法或者getPostMessage(Runnable r, Object token)方法,其实post开头的方法本质上和sendMessage开头的方法是一样的,都是添加一条普通消息到消息队列中,源码如下所示:
// Handler.java
// 从消息池中取出消息,并且设置该消息的callback为传进来的Runnable,最后返回该消息
private static Message getPostMessage(Runnable r) {
Message m = Message.obtain();
m.callback = r;
return m;
}
// 从消息池中取出消息,并且设置该消息的obj为传递进来的令牌token,callback为传递进来的Runnable,最后返回该消息
@UnsupportedAppUsage
private static Message getPostMessage(Runnable r, Object token) {
Message m = Message.obtain();
m.obj = token;
m.callback = r;
return m;
}
以上发送消息的方法最后会调用sendMessageAtTime(Message msg, long uptimeMillis)方法或者sendMessageAtFrontOfQueue(Message msg)方法,然后这两个方法会调用enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis)方法,最后这个方法会调用MessageQueue的enqueueMessage(Message msg, long when)方法,前面提到过了,这里就不再赘述了,源码如下所示:
// Handler.java
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
long uptimeMillis) {
// 将消息的接受者设置为本身
msg.target = this;
// 设置消息进入队列的uid
msg.workSourceUid = ThreadLocalWorkSource.getUid();
if (mAsynchronous) {
// 如果是异步的,就设置消息为异步消息
msg.setAsynchronous(true);
}
// 调用MessageQueue的enqueueMessage(Message msg, long when)方法
return queue.enqueueMessage(msg, uptimeMillis);
}
dispatchMessage(Message msg)
dispatchMessage(Message msg)方法的作用是分发消息,源码如下所示:
// Handler.java
public void dispatchMessage(@NonNull Message msg) {
if (msg.callback != null) {
// 如果msg.callback不为空,也就是通过post开头的方法添加消息到消息队列中的,就调用handleCallback(Message message)方法
handleCallback(msg);
} else {
// 如果msg.callback为空,也就是通过sendMessage开头的方法添加消息到消息队列中,就执行以下逻辑
if (mCallback != null) {
// 如果成员变量mCallback不为空,说明Handler设置了Callback,就调用mCallback的handleMessage(Message msg)方法
if (mCallback.handleMessage(msg)) {
// 如果handleMessage(Message msg)方法返回true,就说明该消息已经被拦截,不需要再进一步处理,结束方法
return;
}
}
// 如果handleMessage(Message msg)方法返回false,就说明该消息没被拦截,需要进一步处理,调用handleMessage(Message msg)方法
handleMessage(msg);
}
}
如果是通过post开头的方法添加消息到消息队列中,最后会调用handleCallback(Message message)方法,源码如下所示:
// Handler.java
private static void handleCallback(Message message) {
// 调用消息的Runnable的run()方法
message.callback.run();
}
如果是通过sendMessage开头的方法添加消息到消息队列中,最后会调用Handler的内部接口Callback的handleMessage(Message msg)方法或者handleMessage(Message msg)方法,这两个方法需要开发者自己去实现,源码如下所示:
// Handler.java
public interface Callback {
// 该方法需要开发者自己去实现,如果该方法返回true,就说明该消息已经被拦截;如果该方法返回false,就说明该消息没被拦截,需要进一步处理
boolean handleMessage(@NonNull Message msg);
}
// 该方法需要开发者自己去实现
public void handleMessage(@NonNull Message msg) {
}
处理Message的优先级
处理Message的优先级,从上到下,优先级从低到高,如下所示:
- Handler的handleMessage(Message msg)方法:通过sendMessage开头的方法添加Message(消息)到MessageQueue(消息队列)中。
- Handler的内部接口Callback的handleMessage(Message msg)方法:通过构造方法设置Handler的内部接口Callback。
- Message的成员变量callback(Runnable)的run()方法:通过post开头的方法添加Message到MessageQueue(消息队列)中。
obtainMessage系列方法
obtainMessage系列方法的作用是从消息池中取出消息,源码如下所示:
// Handler.java
// 从消息池中取出消息
@NonNull
public final Message obtainMessage()
{
return Message.obtain(this);
}
// 从消息池中取出消息类别是what的消息
@NonNull
public final Message obtainMessage(int what)
{
return Message.obtain(this, what);
}
// 从消息池中取出消息类别是what,并且消息内容是obj的消息
@NonNull
public final Message obtainMessage(int what, @Nullable Object obj) {
return Message.obtain(this, what, obj);
}
// 从消息池中取出消息类别是what,并且带有参数arg1和参数arg2的消息
@NonNull
public final Message obtainMessage(int what, int arg1, int arg2)
{
return Message.obtain(this, what, arg1, arg2);
}
// 从消息池中取出消息类别是what,并且带有参数arg1和参数arg2,同时消息内容是obj的消息
@NonNull
public final Message obtainMessage(int what, int arg1, int arg2, @Nullable Object obj) {
return Message.obtain(this, what, arg1, arg2, obj);
}
以上方法最后都是调用了Message类的obtain系列方法,它们的作用是从消息池中返回一个新的消息对象,避免重新分配新的消息对象,这些系列的方法前面解释过,这里就不再赘述。
删除消息
删除消息分为两类方法:
- removeCallbacks系列方法
- removeMessages系列方法
源码如下所示:
// Handler.java
// 删除消息队列中的Runnable普通消息
public final void removeCallbacks(@NonNull Runnable r) {
mQueue.removeMessages(this, r, null);
}
// 删除消息队列中带有令牌token的Runnable普通消息
public final void removeCallbacks(@NonNull Runnable r, @Nullable Object token) {
mQueue.removeMessages(this, r, token);
}
// 删除消息队列中消息类别是what的普通消息
public final void removeMessages(int what) {
mQueue.removeMessages(this, what, null);
}
// 删除消息队列中消息类别是what,消息内容是object的普通消息
public final void removeMessages(int what, @Nullable Object object) {
mQueue.removeMessages(this, what, object);
}
// 删除消息队列中所有带有令牌token的普通消息,要注意的是,如果形式参数token传递的是空,就会删除消息队列中所有普通消息
public final void removeCallbacksAndMessages(@Nullable Object token) {
mQueue.removeCallbacksAndMessages(this, token);
}
可以发现removeCallbacks系列方法最后和removeMessages系列方法是调用是MessageQueue的removeMessages(Handler h, int what, Object object)方法,它的作用是删除消息队列中符合条件的普通消息,这个方法前面解释过,这里就不再赘述。
Native层
MessageHandler类的源码如下所示:
// external/webrtc_legacy/webrtc/base/messagehandler.h
class MessageHandler {
public:
// MessageHandler()函数是虚析构函数
virtual ~MessageHandler();
// onMessage(Message* msg)是虚函数
virtual void OnMessage(Message* msg) = 0;
protected:
MessageHandler() {}
private:
RTC_DISALLOW_COPY_AND_ASSIGN(MessageHandler);
};
WeakMessageHandler类继承MessageHandler类,WeakMessageHandler类的作用是持有对MessageHandler的弱引用的简单代理,WeakMessageHandler类的源码如下所示:
// system/core/libutils/include/utils/Looper.h
class WeakMessageHandler : public MessageHandler {
protected:
// WeakMessageHandler()函数是虚析构函数
virtual ~WeakMessageHandler();
public:
WeakMessageHandler(const wp<MessageHandler>& handler);
// handleMessage(const Message& message)函数是虚函数
virtual void handleMessage(const Message& message);
private:
wp<MessageHandler> mHandler;
};
看下handleMessage(const Message& message)函数,源码如下所示:
// system/core/libutils/Looper.cpp
void WeakMessageHandler::handleMessage(const Message& message) {
sp<MessageHandler> handler = mHandler.promote();
if (handler != nullptr) {
// 如果MessageHandler强指针不为空,就调用MessageHandler的handleMessage(const Message& message)函数
handler->handleMessage(message);
}
}
为什么主线程不会因为Looper.loop()方法里的死循环卡死?
在解答这个问题之前,先解释下进程和线程的概念:
进程是代码在数据集合上的一次运行活动,它是系统进行资源分配和调度的基本单位。一个进程至少有一个线程,线程是进程中的实体,线程本身是不会独立存在的,进程中多个线程可以共享进程的资源(例如:内存地址、文件I/O等),也可以独立调度。
每个应用进程都从一个名为Zygote的现有进程分叉(fork)。系统启动并加载通用框架(Framework)代码和资源(例如:Activity主题背景)时,Zygote进程随之启动,为了启动新的应用进程,系统会分叉(fork)Zygote进程,然后在新进程中加载并运行引用代码,这种方法可以让框架(Framework)代码和资源分配的大多数RAM页面在所有应用进程之间共享,大多数情况下,应用程序都运行在一个进程中,我们也可以在AnroidManifest.xml(清单文件)设置android:process属性或者通过native代码分叉(fork)进程来创建多个进程,如果想深入了解,可以看下Android的内存管理这篇文章。
Java语言统一处理了不同硬件和操作系统平台的线程操作,一个线程是一个已经执行start()方法而且还没结束的java.lang.Thread类的实例,其中Thread类的所有关键方法都是本地方法(Native Method)来的,这意味着这些方法没有使用或者无法使用平台相关的手段来实现。
在Linux中,进程和线程除了是否共享资源外,并没有本质的区别,对于CPU来说就是一段可执行的代码,它们都是一个task_struct的结构体,CPU采用完全公平调度器(Completely Fair Scheduler,简称CFS)算法,这个算法可以保证每个任务都尽可能公平地享有CPU时间片。
因为线程是一段可执行的代码,当线程执行完成后,也就是可执行的代码执行完成后,线程的运行状态就进入结束(Terminated)状态,然后线程就会终止,对于主线程来说,系统不希望主线程运行一段时间后,主线程就会终止,那如何让主线程不被终止呢?可以让可执行的代码一直执行下去,死循环就是它的实现方法,它可以保证线程不被终止,Binder线程也是使用死循环这种方法,不断与Binder驱动进行读操作或者写操作,当然并非简单地死循环,这里涉及到Linux的pipe/epoll机制,在主线程中,当MessageQueue(消息队列)没有Message(消息)时,就会阻塞在MessageQueue类的next()方法的nativePollOnce()方法中,这个时候主线程就会释放CPU资源进入休眠状态,直到下一个消息到达或者有事务发生,通过往pipe管道写端写入数据来唤醒主线程工作,epoll是一种当文件描述符的内核缓冲区不是空的时候,发出可读信号进行通知;当写缓冲区还没满的时候,发出可写信号的进行通知的机制,它是一种I/O多路复用机制,可以同时监控多个文件描述符;Looper类的loop()方法也不会导致应用卡死,导致卡死的原因是Activity的生命周期方法操作时间过长,导致掉帧,甚至导致应用无响应(ANR),所以,主线程在大多数时候是空闲状态的,并不会消耗大量CPU资源,也不会导致应用卡死。
总结
开发者通过调用Handler的post开头的方法或者sendMessage开头的方法添加Message(消息)到MessageQueue(消息队列)中,通过调用Looper的loop()方法不断从MessageQueue中取出达到触发条件的Message,然后调用宿主Handler的dispatchMessage(Message msg)方法,根据条件判断,如果是通过post开头的方法添加Message到MessageQueue中,就调用Runnable的run()方法;如果是通过sendMessage开头的方法添加Message到MessageQueue中,先判断是否通过构造方法设置Handler的内部接口Callback,如果设置了,就调用Handler的内部接口Callback的handleMessage(Message msg)方法让开发者处理Message;如果没有设置,就调用handleMessage(Message msg)方法让开发者处理Message,要注意的是,当MessageQueue中没有Message时,就会处于空闲状态,调用MessageQueue的内部接口IdleHandler的queueIdle()方法。
MessageQueue是连接Java层和Native层的纽带,Java层的MessageQueue和Native层的MessageQueue是通过JNI(Java Native Interface)建立关联,Java层可以向MessageQueue添加Message,Native层也可以向MessageQueue添加Message;Message、Looper和Handler这三个类在Java层和Native层之间没有任何关联,它们都在各自层实现相似的逻辑,彼此是独立的;Native层的NativeMessageQueue类继承Native层的MessageQueue类,Native层的WeakMessageHandler类继承MessageHandler类。
要注意的是,消息的处理步骤如下所示:
- 处理Native层的Message
- 处理Native层的Request
- 处理Java层的Message
题外话
在Android消息机制中,可以看到使用ThreadLocal存储Looper对象,这里我想讲解下线程局部存储(TLS)。
线程局部存储(TLS)
线程局部存储(TLS)是一种存储持续期(Storage Duration),对象的存储在线程开始时分配,线程结束时回收,每个线程都有该对象自己的实例。
在C中,使用关键字_Thread_local来定义线程局部变量,在头文件<thread.h>定义thread_local为该关键字的同义,代码如下所示:
#include <threads.h>
thread_local int foo = 0;
在C++中,使用关键字thread_local来定义线程局部变量。
在Java中,使用ThreadLocal类来提供线程局部变量,普通变量可以被任何一个线程读或者写,而ThreadLocal创建的变量只能被当前线程读或者写,其他线程无法读或者写该变量;ThreadLocal类的内部类ThreadLocalMap采用开放地址法解决哈希冲突,如果想深入了解,可以看下Java集合框架——Android中的HashMap源码分析这篇文章的题外话。
我的GitHub:TanJiaJunBeyond
Android通用框架:Android通用框架
我的掘金:谭嘉俊
我的简书:谭嘉俊
我的CSDN:谭嘉俊