前言
下面的内容基于由浅入深全面分析Handler机制原理之源码的理解,扩展的Handler机制的难点分析。
目录
- 内存共享(如何切换线程的)
- prepare()函数中,使用ThreadLocal存放Looper对象,ThreadLocal的作用。
- Message使用对象复用池(享元设计模式)
- Handler的阻塞/唤醒机制
- Handler的同步屏障
来,上一张大纲图,醒一下神
Handler如何切换线程
当我们都了解了Handler的原理,再结合上述的使用例子,我们也对Handler是如何切换的,其实情况也很明朗了。
- 创建Message对象,我们都是知道创建一个对象,对象一般都是存放在
堆内存
,在JVM中堆是在线程共享区域,并不是存放在线程私有数据区,所以说所有线程都可以访问得到这个Message对象。 - 在子线程中,Handler发送一条Message,Message会插入到MessageQueue中。有人会问MessageQueue是属于那个线程,其实MessageQueue并不是属于那个线程,只仅仅是存放Message的容器而已,就相当于List一样,只不过MessageQueue比较特殊,是一个队列的形式而已。
- 当Looper.loop()函数,不断的从MessageQueue取出Message,当获取到Message后,通过Handler.dispatchMessage(msg)函数去分发消费掉这条Message而已。
Handler的线程如何切换是不是很简单,当然这都是你对Handler的原理熟悉掌握,并不难理解。下面通过一张图加深到Handler线程切换。
用ThreadLocal保存Looper对象,目的是什么?
ThreadLocal是线程本地变量,它是一种特殊的变量,ThreadLocal为每个线程提供一个变量的副本,使得每一个线程在同一时间内访问到的不同对象,这样就隔离了线程之间对数据的一个共享访问,那么在内部实现上,ThreadLocal内部都有一个ThreadLocalMap,用来保存每个线程所拥有的变量的副本。
子线程中创建Handler体现ThreadLocal的使用
错误例子:直接在子线程创建一个Handler。
new Thread(new Runnable() {
@Override
public void run() {
new Handler(){
@Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
}
};
}
}).start();
直接抛出异常:java.lang.RuntimeException: Can't create handler inside thread that has not called Looper.prepare()
直接看源码:
public Handler(Callback callback, boolean async) {
//...省略部分代码
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread that has not called Looper.prepare()");
}
//...省略部分代码
}
//Looper类
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}
使用ThreadLocal 保存当前Looper对象,ThreadLocal 类可以对数据进行线程隔离,保证了在当前线程只能获取当前线程的Looper对象,同时prepare()函数保证当前线程有且只有一个Looper对象。所以在子线程中创建Handler对象,需要添加Looper.prepare(),如果只单单添加Looper.prepare()函数还是不行,缺少动力,也需要添加Looper.loop()函数。
正确代码如下:
new Thread(new Runnable() {
@Override
public void run() {
Looper.prepare();//注意添加
new Handler(){
@Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
}
};
Looper.loop();//注意添加
}
}).start();
为什么可以直接new Message,还提供obtain()函数,为什么这样设计?
如果不断的new出Meaage对象并插入到MessageQueue中,在jvm的堆中是不是会不断有新Message对象创建以及销毁,导致内存抖动,而GC线程虽然作为优先级最低的线程,此时因为必须GC,导致GC线程抢占CPU时间片,主线程拿不到CPU而卡顿调帧。Google在设计消息机制的时候就想到了消息复用机制,几乎所有Framework中发送消息都是通过Message.obtain()函数来进行消息复用。这种消息复用机制其实就是一种享元设计模式。享元设计模式就不展开了,感兴趣的可以自行查阅资料
下面我们看看Message是如何复用的,首先看看Mesage里的几个成员变量:
Message next;//形成一个链表,指向下一个Message
private static final Object sPoolSync = new Object();//对象锁
private static Message sPool;//头节点的消息
private static int sPoolSize = 0;//当前链表的个数
private static final int MAX_POOL_SIZE = 50;//链表最多存放的个数
Looper的loop()函数中不断的从消息队列中取消息diapatch,分发之后会调用Message的回收操作。
public static void loop() {
//获取Looper对应的消息队列MessageQueue
final MessageQueue queue = me.mQueue;
//...省略部分代码
for (;;) {//不断循环从消息队列中取出消息
Message msg = queue.next(); //有可能阻塞
if (msg == null) {//没有消息,则退出消息队列
return;
}
//...省略部分代码
//msg.target就是Handler,把获取到的消息分发出去
msg.target.dispatchMessage(msg);
//...省略部分代码
msg.recycleUnchecked();//回收Message
}
}
当Message分发完之后,就调用recycleUnchecked()函数对Message进行回收。
void recycleUnchecked() {
flags = FLAG_IN_USE; // 标记改Message将加入消息池
// 重置所有消息属性
what = 0;
arg1 = 0;
arg2 = 0;
obj = null;
replyTo = null;
sendingUid = -1;
when = 0;
target = null;
callback = null;
data = null;
synchronized (sPoolSync) { // 线程安全锁
if (sPoolSize < MAX_POOL_SIZE) { // MAX_POOL_SIZE = 50 ,表明消息池最多50个
//头节点设置给Next 将当前对象最为最新的头节点sPool
next = sPool;
sPool = this;
sPoolSize++;
}
}
}
所以获取一个对象池中的Message可以直接调用Message.obtain()函数即可。
public static Message obtain() {
synchronized (sPoolSync) {
//sPool就是Looper.loop(),调用dispatchMessage函数后,调用recycleUnchecked()函数回收的Message
if (sPool != null) {
Message m = sPool; //取出头节点
sPool = m.next; // 将头节点的下一个作为最新的头节点
m.next = null; // 设置需要返回的消息的next为空
m.flags = 0; // 清除是否还在链表中
sPoolSize--;
return m;
}
}
//如果对象池中没有消息,就创建一个消息
return new Message();
}
Handler的阻塞/唤醒机制
首先我们要了解Handler的阻塞或唤醒是在那里发起的,经过上面的源码分析及Handler的整个调用过程,我们都知道先是创建一个Looper对象,然后创建一个Handler对象,最后调用Looper.loop()函数,在loop()函数中不断从MessageQueue.next()函数中的轮询获取Message。
public static void loop() {
//...省略部分代码
for (;;) {//不断循环从消息队列中取出消息
Message msg = queue.next(); //有可能阻塞
//...省略部分代码
try {
//msg.target就是Handler,把获取到的消息分发出去
msg.target.dispatchMessage(msg);
} finally {
}
//...省略部分代码
}
}
Message next() {
//...省略部分代码
int nextPollTimeoutMillis = 0;
for (;;) {
nativePollOnce(ptr, nextPollTimeoutMillis);//根据nextPollTimeoutMillis是阻塞还唤醒
synchronized (this) {
Message msg = mMessages;
if (msg != null && msg.target == null) {
//...省略部分代码
}
if (msg != null) {
if (now < msg.when) {
// 当头消息延迟时间大于当前时间,阻塞消息要到延迟时间和当前时间的差值
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {}
//...省略部分代码
} else {
nextPollTimeoutMillis = -1;//队列已无消息,一直阻塞
}
}
//...省略部分代码
}
}
Looper轮询器调用loop()函数开始轮询,真正干活的是MessageQueue.next()函数,而next()函数中获取消息前首先调用了nativePollOnce(ptr, nextPollTimeoutMillis)函数,其意思是根据nextPollTimeoutMillis判断是否进行阻塞,nextPollTimeoutMillis初始化时默认为0表表示不阻塞。
nextPollTimeoutMillis有三种对应的状态:
- nextPollTimeoutMillis=0 ,不阻塞
- nextPollTimeoutMillis<0 ,一直阻塞
- nextPollTimeoutMillis>0 ,阻塞对应时长,可被新消息唤醒
MessageQueue轮询获取Message时有两种阻塞情况:
- 当轮询MessageQueue时获取获取不到Message,nextPollTimeoutMillis赋值为-1进行阻塞。
- 当轮询MessageQueue时获取获取到Message,msg.when大于当前时间,时间差值就是阻塞的时长。
Handler的阻塞
在MessageQueue类中,有几个Native层的函数
private native static long nativeInit();
private native static void nativeDestroy(long ptr);
private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
private native static void nativeWake(long ptr);
private native static boolean nativeIsPolling(long ptr);
private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
在MessageQueue的构造函数中,就调用nativeInit()函数进行初始化。
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
也就是说在创建MessageQueue对象时,通过JNI调用native层的android_os_MessageQueue_nativeInit()函数进行初始化。进入源码看看里面做了那些事情。
//android_os_MessageQueue.cpp
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
// 创建一个与java对应的MessageQueue
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);//返回到Java层
}
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();//获取该线程关联的Looper
if (mLooper == NULL) {
mLooper = new Looper(false);// 创建一个Looper对象
Looper::setForThread(mLooper);// 将Looper保存到线程里,也相当于Java中用ThreadLocal保存Looper一样
}
}
//Looper.cpp
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
mWakeEventFd = eventfd(0, EFD_NONBLOCK);// 创建一个唤醒事件fd
AutoMutex _l(mLock);
rebuildEpollLocked();// 重构epoll事件
}
#include<sys/eventfd.h>
int eventfd(unsigned int initval,int flags);
void Looper::rebuildEpollLocked() {
if (mEpollFd >= 0) {
close(mEpollFd);//关闭epoll
}
// 创建一个epoll实例
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
struct epoll_event eventItem;
//重新设置
memset(& eventItem, 0, sizeof(epoll_event));
eventItem.events = EPOLLIN;// 监听可读事件
eventItem.data.fd = mWakeEventFd;//唤醒事件fd
// 注册唤醒事件mWakeEventFd到epoll实例中
int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
// 将请求中的事件注册到epoll实例中
for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
// 初始化请求事件
request.initEventItem(&eventItem);
// 注册请求中的事件到epoll实例中
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d",
request.fd, errno);
}
}
}
在调用android_os_MessageQueue_nativeInit()函数做了些什么事情:
- 创建一个NativeMessageQueue对象与Jave层相对应的MessageQueue对象。
- 获取一个MessageQueue相对应Looper对象,没有获取则创建一个Looper 对象。
- 如果创建Looper对象,则保存Looper对应的线程。
了解了nativeInit()层初始化时后,其实也不难明白,说穿了就是构建一个Java层一样的处理方式而已。
值得注意的是 eventfd(0, EFD_NONBLOCK)
这句代码,它是什么意思呢。eventfd函数会创建一个事件描述符对象,通过IO多路复用机制epoll可以监听事件描述符,实现进程(线程)间的等待(wait)/通知(notify),对就这么简单。
eventfd()函数的标志参数有两种:
- EFD_NONBLOCK:设置对象为非阻塞状态。
- EFD_CLOEXEC:调用exec后会自动关闭文件描述符,防止泄漏。
值得注意的是 rebuildEpollLocked()函数
,在函数里,创建了epoll实例,向epoll中注册唤醒监听事件和请求监听事件。其实通过Linux系统的epoll机制,来实现线程间的等待与唤醒操作。好可以简单理解为epoll就是监听内容的读写变化,来实现阻塞或唤醒操作。
nativePollOnce()函数
MassgeQueue#nativePollOnce()函数是Native层调用的,那么我们进行Native层,通过源码看看他的调用流程。
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
//获取NativeMessageQueue
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);// 调用pollOnce()函数
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);// 调用Looper#pollOnce()函数
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {//异步处理
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
//Looper.cpp
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, NULL, NULL, NULL);
}
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
// 一个循环不断处理响应列表中的事件。
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}
if (result != 0) {
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}
result = pollInner(timeoutMillis);// 内部轮询
}
}
//循环处理内部事件
int Looper::pollInner(int timeoutMillis) {
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
//记录当前时间
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
}
// 事件处理类型为POLL_WAKE 唤醒
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
mPolling = true;//正在轮询
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
//等待事件
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
//不要轮询啦
mPolling = false;
//获取锁
mLock.lock();
//如果需要,重新生成epoll集。
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}
//POLL_ERROR Poll错误
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d", errno);
result = POLL_ERROR;
goto Done;
}
//POLL_TIMEOUT Poll超时
if (eventCount == 0) {
result = POLL_TIMEOUT;
goto Done;
}
//循环处理所有事件
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;//获取文件描述符
uint32_t epollEvents = eventItems[i].events;//获取事件
if (fd == mWakeEventFd) {//如果是唤醒事件
if (epollEvents & EPOLLIN) {
awoken();//唤醒
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {//处理请求队列中的事件
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
//请求事件添加到Response数组中
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
//处理MessageEnvelopes的事件
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
// 如果消息的处理时间小于当前时间,则将从列表中移除
if (messageEnvelope.uptime <= now) {
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;//处理消息的handler
Message message = messageEnvelope.message;//获取消息
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();//释放锁
handler->handleMessage(message);//处理消息
} // release handler
mLock.lock();//加上锁
mSendingMessage = false;
result = POLL_CALLBACK;//事件处理类型为POLL_CALLBACK
} else {
mNextMessageUptime = messageEnvelope.uptime;//更新下一个消息的处理时间
break;
}
}
//释放锁
mLock.unlock();
//处理所有Responses事件
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
// 如果响应类型为POLL_CALLBACK
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
//调用callback类型的handleEvent方法
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
response.request.callback.clear();
result = POLL_CALLBACK;//事件处理类型为POLL_CALLBACK
}
}
return result;
}
void Looper::awoken() {
uint64_t counter;
TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));
}
从java层调用Native层的nativePollOnce()函数,通过分析整理得出调用的主路线:nativePollOnce() -> NativeMessageQueue.pollOnce() -> Looper.pollOnce() -> Looper.pollInner()
注意Java传入的mPtr变量,mPtr变量保存了一个NativeMessageQueue对象,从而使得MessageQueue成为Java层和Native层的连接桥梁,使得Java层与native层就可以相互处理消息了。
从源码的过程得知,消息的处理都放在Looper.pollInner()函数中处理了,我们重点分析pollInner()函数。
1、首先是参数timeoutMillis(从Java传入的nextPollTimeoutMillis)做相应的处理,而timeoutMillis有三状态:
- 等于0,马上返回,也就是没有阻塞。
- 小于0,一直阻塞,直接向消息队列中添加消息,才处理事件。
- 大于0,时间差值是多少,那么就等待多久,时间到,才处理事件。
2、调用epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis)获取到等待事件eventCount 。
eventCount 也有三种状态:
- 等于0,返回result =POLL_TIMEOUT,表示在超时之前,要准备的数据没有准备好,即为等待超时。
- 小于0,如果errno == EINTR,返回result =POLL_WAKE,否则返回result = POLL_ERROR,发生了错误。
- 大于0,所有数据都准备好了。如果返回result =POLL_WAKE,通过wake()方法唤醒的。如果返回result = POLL_CALLBACK,在一个或多个文件描述符被触发了。
从eventCount 这三种情况,pollInner()函数返回的值就是pollOnce()函数的返回值,就是POLL_WAKE、POLL_TIMEOUT、POLL_ERROR、POLL_CALLBACK四种状态。
3、获取到等待事件,首先处理eventCount 的事件,也就是处理传入epoll_wait()函数的等待事件。如果fd == mWakeEventFd,并且 epollEvents & EPOLLIN,说明有消息队列中有新的消息要处理,调用awoken()函数,它只把管道中内容读取出来,清空管道,方便下一次调用epoll_wait()函数时,再次阻塞等待。
4、接着处理MessageEnvelopes事件,如果消息的处理时间小于当前时间,则将从消息列表中移除此消息,并且消费掉这条消息,最后更新下一个消息的处理时间。
5、最后处理Responses事件,如果响应类型为POLL_CALLBACK,然后处理这个事件,如果callbackResult等于0,则移除文件描述符,最后也callback.clear()掉。
Handler的唤醒
有Handler阻塞,那么肯定有Handler唤醒,对吧,什么时候唤醒呢,由于一开始时,MessageQueue是没有Message的,从而进入了阻塞等待,如果向MessageQueue插入一条消息呢,是不是会唤醒啊。其实消息从消息队列中全部移除quit()时,可能需要调用nativeWake方法。那么我们从MessageQueue.enqueueMessage()函数入手吧。
//MessageQueue.java
boolean enqueueMessage(Message msg, long when) {
//...省略部分代码
if (needWake) {//调用Native的唤醒机制
nativeWake(mPtr);
}
//...省略部分代码
return true;
}
唤醒是有两种情况:
- 当消息队列中没有消息时,Handler发送一条新消息到消息队列中,那么就会调用Native层的nativeWake(mPtr)函数。
- 当消息队列的消息的时间,与当前时间的比值,就是需要等待的时间,这个会传入Native层,时间到则自动唤醒。
nativeWake()函数
我们从调用Native层的nativeWake(mPtr)函数,通过源码去分析它们的调用过程吧。
//android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
//获取到当前的消息队列
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();//调用NatvieMessageQueue的wake()函数
}
void NativeMessageQueue::wake() {
mLooper->wake();//调用Looper的wake()函数
}
//Looper.cpp
void Looper::wake() {
uint64_t inc = 1;
//向mWakeEventFd(唤醒文件描述符)写入字符
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
看到了Nativen层的唤醒调用过程是不是很简单。最后直接调用了TEMP_FAILURE_RETRY函数,其他就是向管道中写入数据,管道监听到有数据写入就是唤醒Android应用程序主线程处理事件。关于Linux系统的epoll机制,管道,涉及到了Binder相关知识,就不展开讨论了。
Handler的同步屏障
什么是同步屏障
同步屏障:阻碍同步消息,优先执行异步消息。
为什么要设计同步屏障
从上述代码的Handler构造函数可知,一般情况下,默认调用Handler(callback, false)构造函数,也是就是mAsynchronous = false,同时在Handler.enqueueMessage函数中msg.target已经赋值了当前的Handler对象,在MessageQueue.enqueueMessage中按执行时间顺序把Message插入到Message链表合适的位置。在调用MessageQueue.next函数获取Message时添加了synchronied锁,所以取消息的时候是互斥取消息,只能从头部取消息,也因为加消息是按照消息的执行的先后顺序进行。如果要优先立即执行某条Message时,按正常情况是要排队的,是没法做到立即执行,所以就引入了同步屏障。
如何开启同步屏障
我们通过MessageQueue.postSyncBarrier() 函数,是如何引入同步屏障的。源码如下:
public int postSyncBarrier() {
return postSyncBarrier(SystemClock.uptimeMillis());
}
private int postSyncBarrier(long when) {
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();//从消息对象池中获取一条消息
msg.markInUse();
//将消息信息初始化赋值,注意这里并没有给target赋值,这是关键
msg.when = when;
msg.arg1 = token;
Message prev = null;
Message p = mMessages;
if (when != 0) {
//如果开启同步屏障的时间(假设记为T)T不为0,且当前的同步消息里有时间小于T,则prev也不为null
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
//将 msg 按照时间顺序插入到 消息队列(链表)的合适位置
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
return token;//返回一个序号,通过这个序号可以撤销屏障
}
}
从上述开启同步屏障的源码中可以看出,Message 对象初始化时并没有给 target 赋值,也就是说向MessageQueue中插入一条target 为null标记的Message,相对于正常的enqueue操作,在Handler.enqueueMessage函数中Handler与msg.target绑定了 。同步屏障的Message特殊在于 target 为 null,并不会被消费,因为不会唤醒消息队列。
在那里消费掉同步屏障的Message,上述代码:Looper.loop()函数中调用了MessageQueue.next()函数,那么我们再看一次next函数源码:
Message next() {
//...省略部分代码
for (;;) {
//...省略部分代码
nativePollOnce(ptr, nextPollTimeoutMillis);//根据nextPollTimeoutMillis是阻塞还唤醒
synchronized (this) {
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
//target为null 说明这是一条同步屏障消息
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());//如果是异步,则获取并消费这条消息
}
}
//...省略部分代码
}
}
从next函数中可以看出,MessageQueue中的msg.target为null说明开启了同步屏障,同时是异步,那么Message则会优先处理,这就是同步屏障的作用(过滤和优先作用)。
我们来一形象图,加深印象:
发送异步Message
发送同步和异步Message都是在Handler的几个构造函数,可以传入async标志为true,这样构造的Handler发送的消息就是异步消息。
public Handler(boolean async) {
this(null, async);
}
public Handler(Callback callback, boolean async) {
//...省略代码
}
public Handler(Looper looper, Callback callback, boolean async) {
//...省略代码
}
//最终调用enqueueMessage函数,把消息插入到消息队列中
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
//this就是Handler Message中持有一个Handler
//为发送消息出队列交给handler处理埋下伏笔。
msg.target = this;
if (mAsynchronous) {//是否是异步信息
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);//调用消息队列的入队函数
}
当然我们也可以在创建Message时,调用Message.setAsynchronous(true)将消息设为异步。
发送异步消息和发送同步消息一样,唯一区别就在于Asynchronous的设置即可。
移除同步屏障
有启动同步屏障,那么就有移除同步屏障,我们看MessageQueue.removeSyncBarrier()函数源码是怎么移除同步屏障的:
public void removeSyncBarrier(int token) {
synchronized (this) {
Message prev = null;
Message p = mMessages;
//找到token对应的屏障
while (p != null && (p.target != null || p.arg1 != token)) {
prev = p;
p = p.next;
}
final boolean needWake;
//从消息链表中移除
if (prev != null) {
prev.next = p.next;
needWake = false;
} else {
mMessages = p.next;
needWake = mMessages == null || mMessages.target != null;
}
//回收Message到对象池中。
p.recycleUnchecked();
if (needWake && !mQuitting) {
nativeWake(mPtr);//唤醒消息队列
}
}
在启动同步屏障时,已经记录了token,然后通过token对应的屏障,从消息链表中移除,回收Message到对象池中。
同步消息屏障的应用场景
同步屏障在系统源码中有哪些使用场景呢?我们日常开发中也很少用到同步消息屏障,涉及到同步消息屏障一般都是Android系统中使用得最多了,比如我们的UI更新相关的消息,就是利用同步屏障发送异步消息,则会优先处理,从而达到马上刷新UI。既然知道了Android系统中刷新UI使用了异步消息,那么我们看看View的更新,draw()、requestLayout()、invalidate() 等函数都调用了。
我们从View的绘制流程起始,View通过ViewRootImpl来绘制,ViewRootImpl调用到requestLayout()来完成View的绘制操作:知道这个流程,我们看ViewRootImpl.requestLayout()函数源码:
@Override
public void requestLayout() {
if (!mHandlingLayoutInLayoutRequest) {
checkThread();//检查是否在主线程
mLayoutRequested = true;//mLayoutRequested 是否measure和layout布局。
//重要函数
scheduleTraversals();
}
}
void scheduleTraversals() {
if (!mTraversalScheduled) {
mTraversalScheduled = true;
//设置同步障碍,确保mTraversalRunnable优先被执行
mTraversalBarrier = mHandler.getLooper().getQueue().postSyncBarrier();
//内部通过Handler发送了一个异步消息
mChoreographer.postCallback(
Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
if (!mUnbufferedInputDispatch) {
scheduleConsumeBatchedInput();
}
notifyRendererOfFramePending();
pokeDrawLockIfNeeded();
}
}
//移除同步屏障
void unscheduleTraversals() {
if (mTraversalScheduled) {
mTraversalScheduled = false;
mHandler.getLooper().getQueue().removeSyncBarrier(mTraversalBarrier);
mChoreographer.removeCallbacks(
Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
}
}
调用 Handler.getLooper().getQueue().postSyncBarrier() 并设置同步屏障消息。
最终调用Choreographer.postCallbackDelayedInternal()函数,在其函数中设置Message.setAsynchronous(true) 时 ,也就发送异步消息。
private void postCallbackDelayedInternal(int callbackType,
Object action, Object token, long delayMillis) {
synchronized (mLock) {
final long now = SystemClock.uptimeMillis();
final long dueTime = now + delayMillis;
mCallbackQueues[callbackType].addCallbackLocked(dueTime, action, token);
if (dueTime <= now) {
scheduleFrameLocked(now);
} else {
Message msg = mHandler.obtainMessage(MSG_DO_SCHEDULE_CALLBACK, action);//获取消息对象
msg.arg1 = callbackType;
msg.setAsynchronous(true);//设置为异步
mHandler.sendMessageAtTime(msg, dueTime);//发送异步消息
}
}
}
调用Handler.getLooper().getQueue().removeSyncBarrier()函数移除同步屏障。最终调用Choreographer.removeCallbacksInternal()函数移除消息。
private void removeCallbacksInternal(int callbackType, Object action, Object token) {
synchronized (mLock) {
mCallbackQueues[callbackType].removeCallbacksLocked(action, token);
if (action != null && token == null) {
//移除消息
mHandler.removeMessages(MSG_DO_SCHEDULE_CALLBACK, action);
}
}
}
总结
- 线程切换,在子线程发送Message,Message对象是存放在堆内存,获取到Message时,主线程dispatchMessage分发处理这条消息。
- ThreadLocal的作用就是线程隔离,每个线程都提供一个变量的副本,使得每一个线程在同一时间内访问到的不同对象。
- 当MessageQueue中没有Message时,触发Native层进入阻塞状态;或者MessageQueue中的Message更新时间没有到时,也进入了阻塞状态,阻塞时长就是它的更新时间。
- MessageQueue中没有Message时,向MessageQueue中添加Message时,触发Native层的唤醒机制;MessageQueue中的Message更新时间到时,也触发唤醒。
- Handler的同步屏障,就是优化处理系统的Message,并且Message是异步的。比如:Android应用程序出现ANR,系统发起同步屏障,发送一条异步消息,界面上优先处理这条消息了,然后系统弹出一个ANR消息对话框。