由浅入深全面分析Handler机制原理之源码<难点>

前言

下面的内容基于由浅入深全面分析Handler机制原理之源码的理解,扩展的Handler机制的难点分析。

目录

  • 内存共享(如何切换线程的)
  • prepare()函数中,使用ThreadLocal存放Looper对象,ThreadLocal的作用。
  • Message使用对象复用池(享元设计模式)
  • Handler的阻塞/唤醒机制
  • Handler的同步屏障

来,上一张大纲图,醒一下神

image

Handler如何切换线程

当我们都了解了Handler的原理,再结合上述的使用例子,我们也对Handler是如何切换的,其实情况也很明朗了。

  1. 创建Message对象,我们都是知道创建一个对象,对象一般都是存放在堆内存,在JVM中堆是在线程共享区域,并不是存放在线程私有数据区,所以说所有线程都可以访问得到这个Message对象。
  2. 在子线程中,Handler发送一条Message,Message会插入到MessageQueue中。有人会问MessageQueue是属于那个线程,其实MessageQueue并不是属于那个线程,只仅仅是存放Message的容器而已,就相当于List一样,只不过MessageQueue比较特殊,是一个队列的形式而已。
  3. 当Looper.loop()函数,不断的从MessageQueue取出Message,当获取到Message后,通过Handler.dispatchMessage(msg)函数去分发消费掉这条Message而已。

Handler的线程如何切换是不是很简单,当然这都是你对Handler的原理熟悉掌握,并不难理解。下面通过一张图加深到Handler线程切换。

image

用ThreadLocal保存Looper对象,目的是什么?

ThreadLocal是线程本地变量,它是一种特殊的变量,ThreadLocal为每个线程提供一个变量的副本,使得每一个线程在同一时间内访问到的不同对象,这样就隔离了线程之间对数据的一个共享访问,那么在内部实现上,ThreadLocal内部都有一个ThreadLocalMap,用来保存每个线程所拥有的变量的副本。

子线程中创建Handler体现ThreadLocal的使用

错误例子:直接在子线程创建一个Handler。

new Thread(new Runnable() {
    @Override
    public void run() {
        new Handler(){
            @Override
            public void handleMessage(Message msg) {
                super.handleMessage(msg);
            }
        };
    }
}).start();

直接抛出异常:java.lang.RuntimeException: Can't create handler inside thread that has not called Looper.prepare()

直接看源码:

public Handler(Callback callback, boolean async) {
   //...省略部分代码
    mLooper = Looper.myLooper();
    if (mLooper == null) {
        throw new RuntimeException(
            "Can't create handler inside thread that has not called Looper.prepare()");
    }
    //...省略部分代码
}
//Looper类
private static void prepare(boolean quitAllowed) {
    if (sThreadLocal.get() != null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
}
public static @Nullable Looper myLooper() {
    return sThreadLocal.get();
}

使用ThreadLocal 保存当前Looper对象,ThreadLocal 类可以对数据进行线程隔离,保证了在当前线程只能获取当前线程的Looper对象,同时prepare()函数保证当前线程有且只有一个Looper对象。所以在子线程中创建Handler对象,需要添加Looper.prepare(),如果只单单添加Looper.prepare()函数还是不行,缺少动力,也需要添加Looper.loop()函数。

正确代码如下:

new Thread(new Runnable() {
    @Override
    public void run() {
        Looper.prepare();//注意添加
        new Handler(){
            @Override
            public void handleMessage(Message msg) {
                super.handleMessage(msg);
            }
        };
        Looper.loop();//注意添加
    }
}).start();

为什么可以直接new Message,还提供obtain()函数,为什么这样设计?

如果不断的new出Meaage对象并插入到MessageQueue中,在jvm的堆中是不是会不断有新Message对象创建以及销毁,导致内存抖动,而GC线程虽然作为优先级最低的线程,此时因为必须GC,导致GC线程抢占CPU时间片,主线程拿不到CPU而卡顿调帧。Google在设计消息机制的时候就想到了消息复用机制,几乎所有Framework中发送消息都是通过Message.obtain()函数来进行消息复用。这种消息复用机制其实就是一种享元设计模式。享元设计模式就不展开了,感兴趣的可以自行查阅资料

下面我们看看Message是如何复用的,首先看看Mesage里的几个成员变量:

Message next;//形成一个链表,指向下一个Message
private static final Object sPoolSync = new Object();//对象锁
private static Message sPool;//头节点的消息
private static int sPoolSize = 0;//当前链表的个数
private static final int MAX_POOL_SIZE = 50;//链表最多存放的个数

Looper的loop()函数中不断的从消息队列中取消息diapatch,分发之后会调用Message的回收操作。

public static void loop() {
    //获取Looper对应的消息队列MessageQueue
    final MessageQueue queue = me.mQueue;
    //...省略部分代码
    for (;;) {//不断循环从消息队列中取出消息
        Message msg = queue.next(); //有可能阻塞
        if (msg == null) {//没有消息,则退出消息队列
            return;
        }
        //...省略部分代码
        //msg.target就是Handler,把获取到的消息分发出去
        msg.target.dispatchMessage(msg);  
        //...省略部分代码
        msg.recycleUnchecked();//回收Message
    }
}

当Message分发完之后,就调用recycleUnchecked()函数对Message进行回收。

void recycleUnchecked() {
    flags = FLAG_IN_USE; // 标记改Message将加入消息池
    // 重置所有消息属性
    what = 0;
    arg1 = 0;
    arg2 = 0;
    obj = null;
    replyTo = null;
    sendingUid = -1;
    when = 0;
    target = null;
    callback = null;
    data = null;

    synchronized (sPoolSync) { // 线程安全锁
        if (sPoolSize < MAX_POOL_SIZE) { // MAX_POOL_SIZE = 50 ,表明消息池最多50个
            //头节点设置给Next 将当前对象最为最新的头节点sPool 
            next = sPool;  
            sPool = this;
            sPoolSize++;
        }
    }
}

所以获取一个对象池中的Message可以直接调用Message.obtain()函数即可。

public static Message obtain() {
    synchronized (sPoolSync) {
        //sPool就是Looper.loop(),调用dispatchMessage函数后,调用recycleUnchecked()函数回收的Message
        if (sPool != null) {
           Message m = sPool;  //取出头节点
           sPool = m.next; // 将头节点的下一个作为最新的头节点
           m.next = null; // 设置需要返回的消息的next为空
           m.flags = 0; // 清除是否还在链表中
           sPoolSize--;  
           return m;
        }
    }
    //如果对象池中没有消息,就创建一个消息
    return new Message();
}

Handler的阻塞/唤醒机制

首先我们要了解Handler的阻塞或唤醒是在那里发起的,经过上面的源码分析及Handler的整个调用过程,我们都知道先是创建一个Looper对象,然后创建一个Handler对象,最后调用Looper.loop()函数,在loop()函数中不断从MessageQueue.next()函数中的轮询获取Message。

public static void loop() { 
    //...省略部分代码
    for (;;) {//不断循环从消息队列中取出消息
        Message msg = queue.next(); //有可能阻塞
        //...省略部分代码
        try {
            //msg.target就是Handler,把获取到的消息分发出去
            msg.target.dispatchMessage(msg);
        } finally {
        }
        //...省略部分代码
    }
}

Message next() {
    //...省略部分代码
    int nextPollTimeoutMillis = 0;
    for (;;) {
        nativePollOnce(ptr, nextPollTimeoutMillis);//根据nextPollTimeoutMillis是阻塞还唤醒
        synchronized (this) {
            Message msg = mMessages;
            if (msg != null && msg.target == null) {
               //...省略部分代码
            }
            if (msg != null) {
                if (now < msg.when) {
                    // 当头消息延迟时间大于当前时间,阻塞消息要到延迟时间和当前时间的差值
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                } else {}
               //...省略部分代码
            } else {
                nextPollTimeoutMillis = -1;//队列已无消息,一直阻塞
            }
        }
        //...省略部分代码
    }
}

Looper轮询器调用loop()函数开始轮询,真正干活的是MessageQueue.next()函数,而next()函数中获取消息前首先调用了nativePollOnce(ptr, nextPollTimeoutMillis)函数,其意思是根据nextPollTimeoutMillis判断是否进行阻塞,nextPollTimeoutMillis初始化时默认为0表表示不阻塞。

nextPollTimeoutMillis有三种对应的状态:

  • nextPollTimeoutMillis=0 ,不阻塞
  • nextPollTimeoutMillis<0 ,一直阻塞
  • nextPollTimeoutMillis>0 ,阻塞对应时长,可被新消息唤醒

MessageQueue轮询获取Message时有两种阻塞情况:

  • 当轮询MessageQueue时获取获取不到Message,nextPollTimeoutMillis赋值为-1进行阻塞。
  • 当轮询MessageQueue时获取获取到Message,msg.when大于当前时间,时间差值就是阻塞的时长。

Handler的阻塞

在MessageQueue类中,有几个Native层的函数

    private native static long nativeInit();
    private native static void nativeDestroy(long ptr);
    private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
    private native static void nativeWake(long ptr);
    private native static boolean nativeIsPolling(long ptr);
    private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);

在MessageQueue的构造函数中,就调用nativeInit()函数进行初始化。

MessageQueue(boolean quitAllowed) {
      mQuitAllowed = quitAllowed;
      mPtr = nativeInit();
 }

也就是说在创建MessageQueue对象时,通过JNI调用native层的android_os_MessageQueue_nativeInit()函数进行初始化。进入源码看看里面做了那些事情。

//android_os_MessageQueue.cpp
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    // 创建一个与java对应的MessageQueue
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    if (!nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return 0;
    }

    nativeMessageQueue->incStrong(env);
    return reinterpret_cast<jlong>(nativeMessageQueue);//返回到Java层
}

NativeMessageQueue::NativeMessageQueue() :
    mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    mLooper = Looper::getForThread();//获取该线程关联的Looper
    if (mLooper == NULL) {
        mLooper = new Looper(false);// 创建一个Looper对象
        Looper::setForThread(mLooper);// 将Looper保存到线程里,也相当于Java中用ThreadLocal保存Looper一样
    }
}
//Looper.cpp
Looper::Looper(bool allowNonCallbacks) :
    mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
    mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
    mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    mWakeEventFd = eventfd(0, EFD_NONBLOCK);// 创建一个唤醒事件fd
    AutoMutex _l(mLock);
    rebuildEpollLocked();// 重构epoll事件
}

#include<sys/eventfd.h>  
int eventfd(unsigned int initval,int flags);

void Looper::rebuildEpollLocked() {
    if (mEpollFd >= 0) {
        close(mEpollFd);//关闭epoll
    }
    // 创建一个epoll实例
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    struct epoll_event eventItem;
    //重新设置
    memset(& eventItem, 0, sizeof(epoll_event)); 
    eventItem.events = EPOLLIN;// 监听可读事件
    eventItem.data.fd = mWakeEventFd;//唤醒事件fd
    // 注册唤醒事件mWakeEventFd到epoll实例中
    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
    // 将请求中的事件注册到epoll实例中
    for (size_t i = 0; i < mRequests.size(); i++) {
        const Request& request = mRequests.valueAt(i);
        struct epoll_event eventItem;
        // 初始化请求事件
        request.initEventItem(&eventItem);
        // 注册请求中的事件到epoll实例中
        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
        if (epollResult < 0) {
            ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d",
                    request.fd, errno);
        }
    }
}

在调用android_os_MessageQueue_nativeInit()函数做了些什么事情:

  • 创建一个NativeMessageQueue对象与Jave层相对应的MessageQueue对象。
  • 获取一个MessageQueue相对应Looper对象,没有获取则创建一个Looper 对象。
  • 如果创建Looper对象,则保存Looper对应的线程。

了解了nativeInit()层初始化时后,其实也不难明白,说穿了就是构建一个Java层一样的处理方式而已。

值得注意的是 eventfd(0, EFD_NONBLOCK)这句代码,它是什么意思呢。eventfd函数会创建一个事件描述符对象,通过IO多路复用机制epoll可以监听事件描述符,实现进程(线程)间的等待(wait)/通知(notify),对就这么简单。

eventfd()函数的标志参数有两种:

  • EFD_NONBLOCK:设置对象为非阻塞状态。
  • EFD_CLOEXEC:调用exec后会自动关闭文件描述符,防止泄漏。

值得注意的是 rebuildEpollLocked()函数,在函数里,创建了epoll实例,向epoll中注册唤醒监听事件和请求监听事件。其实通过Linux系统的epoll机制,来实现线程间的等待与唤醒操作。好可以简单理解为epoll就是监听内容的读写变化,来实现阻塞或唤醒操作。

nativePollOnce()函数

MassgeQueue#nativePollOnce()函数是Native层调用的,那么我们进行Native层,通过源码看看他的调用流程。

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
    jlong ptr, jint timeoutMillis) {
    //获取NativeMessageQueue
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);// 调用pollOnce()函数
}

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    mLooper->pollOnce(timeoutMillis);// 调用Looper#pollOnce()函数
    mPollObj = NULL;
    mPollEnv = NULL;

    if (mExceptionObj) {//异步处理
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}

//Looper.cpp
inline int pollOnce(int timeoutMillis) {
    return pollOnce(timeoutMillis, NULL, NULL, NULL); 
}

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        // 一个循环不断处理响应列表中的事件。
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                return ident;
           }
        }

        if (result != 0) {
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }
        result = pollInner(timeoutMillis);// 内部轮询
     }
}

//循环处理内部事件
int Looper::pollInner(int timeoutMillis) {

    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
        //记录当前时间
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);

        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
        if (messageTimeoutMillis >= 0
                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
            timeoutMillis = messageTimeoutMillis;
        }
    }

    // 事件处理类型为POLL_WAKE  唤醒
    int result = POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;
    mPolling = true;//正在轮询
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];

    //等待事件
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    //不要轮询啦
    mPolling = false;
    //获取锁
    mLock.lock();

    //如果需要,重新生成epoll集。
    if (mEpollRebuildRequired) {
        mEpollRebuildRequired = false;
        rebuildEpollLocked();
        goto Done;
    }

    //POLL_ERROR Poll错误
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ALOGW("Poll failed with an unexpected error, errno=%d", errno);
        result = POLL_ERROR;
        goto Done;
    }

    //POLL_TIMEOUT Poll超时
    if (eventCount == 0) {
        result = POLL_TIMEOUT;
        goto Done;
    }

    //循环处理所有事件
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;//获取文件描述符
        uint32_t epollEvents = eventItems[i].events;//获取事件
        if (fd == mWakeEventFd) {//如果是唤醒事件
            if (epollEvents & EPOLLIN) {
                awoken();//唤醒
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } else {//处理请求队列中的事件
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                //请求事件添加到Response数组中
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
    Done: ;

    //处理MessageEnvelopes的事件
    mNextMessageUptime = LLONG_MAX;
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        // 如果消息的处理时间小于当前时间,则将从列表中移除
        if (messageEnvelope.uptime <= now) {
            { // obtain handler
                sp<MessageHandler> handler = messageEnvelope.handler;//处理消息的handler
                Message message = messageEnvelope.message;//获取消息
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();//释放锁

                handler->handleMessage(message);//处理消息
            } // release handler

            mLock.lock();//加上锁
            mSendingMessage = false;
            result = POLL_CALLBACK;//事件处理类型为POLL_CALLBACK
        } else {
            mNextMessageUptime = messageEnvelope.uptime;//更新下一个消息的处理时间
            break;
        }
    }

    //释放锁
    mLock.unlock();

    //处理所有Responses事件
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        // 如果响应类型为POLL_CALLBACK
        if (response.request.ident == POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;
            //调用callback类型的handleEvent方法
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq);
            }
            response.request.callback.clear();
            result = POLL_CALLBACK;//事件处理类型为POLL_CALLBACK
        }
    }
    return result;
}

void Looper::awoken() {
    uint64_t counter;
    TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));
}

从java层调用Native层的nativePollOnce()函数,通过分析整理得出调用的主路线:nativePollOnce() -> NativeMessageQueue.pollOnce() -> Looper.pollOnce() -> Looper.pollInner()

注意Java传入的mPtr变量,mPtr变量保存了一个NativeMessageQueue对象,从而使得MessageQueue成为Java层和Native层的连接桥梁,使得Java层与native层就可以相互处理消息了。

从源码的过程得知,消息的处理都放在Looper.pollInner()函数中处理了,我们重点分析pollInner()函数。

1、首先是参数timeoutMillis(从Java传入的nextPollTimeoutMillis)做相应的处理,而timeoutMillis有三状态:

  • 等于0,马上返回,也就是没有阻塞。
  • 小于0,一直阻塞,直接向消息队列中添加消息,才处理事件。
  • 大于0,时间差值是多少,那么就等待多久,时间到,才处理事件。

2、调用epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis)获取到等待事件eventCount 。

eventCount 也有三种状态:

  • 等于0,返回result =POLL_TIMEOUT,表示在超时之前,要准备的数据没有准备好,即为等待超时。
  • 小于0,如果errno == EINTR,返回result =POLL_WAKE,否则返回result = POLL_ERROR,发生了错误。
  • 大于0,所有数据都准备好了。如果返回result =POLL_WAKE,通过wake()方法唤醒的。如果返回result = POLL_CALLBACK,在一个或多个文件描述符被触发了。

从eventCount 这三种情况,pollInner()函数返回的值就是pollOnce()函数的返回值,就是POLL_WAKE、POLL_TIMEOUT、POLL_ERROR、POLL_CALLBACK四种状态。

3、获取到等待事件,首先处理eventCount 的事件,也就是处理传入epoll_wait()函数的等待事件。如果fd == mWakeEventFd,并且 epollEvents & EPOLLIN,说明有消息队列中有新的消息要处理,调用awoken()函数,它只把管道中内容读取出来,清空管道,方便下一次调用epoll_wait()函数时,再次阻塞等待。

4、接着处理MessageEnvelopes事件,如果消息的处理时间小于当前时间,则将从消息列表中移除此消息,并且消费掉这条消息,最后更新下一个消息的处理时间。

5、最后处理Responses事件,如果响应类型为POLL_CALLBACK,然后处理这个事件,如果callbackResult等于0,则移除文件描述符,最后也callback.clear()掉。

Handler的唤醒

有Handler阻塞,那么肯定有Handler唤醒,对吧,什么时候唤醒呢,由于一开始时,MessageQueue是没有Message的,从而进入了阻塞等待,如果向MessageQueue插入一条消息呢,是不是会唤醒啊。其实消息从消息队列中全部移除quit()时,可能需要调用nativeWake方法。那么我们从MessageQueue.enqueueMessage()函数入手吧。

//MessageQueue.java
boolean enqueueMessage(Message msg, long when) {
     //...省略部分代码
        if (needWake) {//调用Native的唤醒机制
            nativeWake(mPtr);
        }
    //...省略部分代码
    return true;
}

唤醒是有两种情况:

  • 当消息队列中没有消息时,Handler发送一条新消息到消息队列中,那么就会调用Native层的nativeWake(mPtr)函数。
  • 当消息队列的消息的时间,与当前时间的比值,就是需要等待的时间,这个会传入Native层,时间到则自动唤醒。

nativeWake()函数

我们从调用Native层的nativeWake(mPtr)函数,通过源码去分析它们的调用过程吧。

//android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    //获取到当前的消息队列
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->wake();//调用NatvieMessageQueue的wake()函数
}
void NativeMessageQueue::wake() {
    mLooper->wake();//调用Looper的wake()函数
}

//Looper.cpp
void Looper::wake() {
    uint64_t inc = 1;
    //向mWakeEventFd(唤醒文件描述符)写入字符
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
        ALOGW("Could not write wake signal, errno=%d", errno);
        }
    }
}

看到了Nativen层的唤醒调用过程是不是很简单。最后直接调用了TEMP_FAILURE_RETRY函数,其他就是向管道中写入数据,管道监听到有数据写入就是唤醒Android应用程序主线程处理事件。关于Linux系统的epoll机制,管道,涉及到了Binder相关知识,就不展开讨论了。

Handler的同步屏障

什么是同步屏障

同步屏障:阻碍同步消息,优先执行异步消息。

为什么要设计同步屏障

从上述代码的Handler构造函数可知,一般情况下,默认调用Handler(callback, false)构造函数,也是就是mAsynchronous = false,同时在Handler.enqueueMessage函数中msg.target已经赋值了当前的Handler对象,在MessageQueue.enqueueMessage中按执行时间顺序把Message插入到Message链表合适的位置。在调用MessageQueue.next函数获取Message时添加了synchronied锁,所以取消息的时候是互斥取消息,只能从头部取消息,也因为加消息是按照消息的执行的先后顺序进行。如果要优先立即执行某条Message时,按正常情况是要排队的,是没法做到立即执行,所以就引入了同步屏障。

如何开启同步屏障

我们通过MessageQueue.postSyncBarrier() 函数,是如何引入同步屏障的。源码如下:

public int postSyncBarrier() {
    return postSyncBarrier(SystemClock.uptimeMillis());
}

private int postSyncBarrier(long when) {
    synchronized (this) {
        final int token = mNextBarrierToken++;
        final Message msg = Message.obtain();//从消息对象池中获取一条消息
        msg.markInUse();
        //将消息信息初始化赋值,注意这里并没有给target赋值,这是关键
        msg.when = when;
        msg.arg1 = token;

        Message prev = null;
        Message p = mMessages;
        if (when != 0) {
            //如果开启同步屏障的时间(假设记为T)T不为0,且当前的同步消息里有时间小于T,则prev也不为null 
            while (p != null && p.when <= when) {
                prev = p;
                p = p.next;
            }
        }
        //将 msg 按照时间顺序插入到 消息队列(链表)的合适位置 
        if (prev != null) { // invariant: p == prev.next
            msg.next = p;
            prev.next = msg;
        } else {
            msg.next = p;
            mMessages = msg;
        }
        return token;//返回一个序号,通过这个序号可以撤销屏障
    }
}

从上述开启同步屏障的源码中可以看出,Message 对象初始化时并没有给 target 赋值,也就是说向MessageQueue中插入一条target 为null标记的Message,相对于正常的enqueue操作,在Handler.enqueueMessage函数中Handler与msg.target绑定了 。同步屏障的Message特殊在于 target 为 null,并不会被消费,因为不会唤醒消息队列。

在那里消费掉同步屏障的Message,上述代码:Looper.loop()函数中调用了MessageQueue.next()函数,那么我们再看一次next函数源码:

Message next() {
    //...省略部分代码
    for (;;) {
        //...省略部分代码
        nativePollOnce(ptr, nextPollTimeoutMillis);//根据nextPollTimeoutMillis是阻塞还唤醒
        synchronized (this) {
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            Message msg = mMessages;
            //target为null 说明这是一条同步屏障消息
            if (msg != null && msg.target == null) {
                // Stalled by a barrier.  Find the next asynchronous message in the queue.
                do {
                    prevMsg = msg;
                    msg = msg.next;
                } while (msg != null && !msg.isAsynchronous());//如果是异步,则获取并消费这条消息
            }
        }
        //...省略部分代码   
    }
}

从next函数中可以看出,MessageQueue中的msg.target为null说明开启了同步屏障,同时是异步,那么Message则会优先处理,这就是同步屏障的作用(过滤和优先作用)。

我们来一形象图,加深印象:

image

发送异步Message

发送同步和异步Message都是在Handler的几个构造函数,可以传入async标志为true,这样构造的Handler发送的消息就是异步消息。

public Handler(boolean async) {
    this(null, async);
}
 public Handler(Callback callback, boolean async) {
     //...省略代码
 }
 public Handler(Looper looper, Callback callback, boolean async) {
     //...省略代码
 }
//最终调用enqueueMessage函数,把消息插入到消息队列中
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
    //this就是Handler Message中持有一个Handler
    //为发送消息出队列交给handler处理埋下伏笔。
    msg.target = this;
    if (mAsynchronous) {//是否是异步信息
        msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);//调用消息队列的入队函数
}

当然我们也可以在创建Message时,调用Message.setAsynchronous(true)将消息设为异步。
发送异步消息和发送同步消息一样,唯一区别就在于Asynchronous的设置即可。

移除同步屏障

有启动同步屏障,那么就有移除同步屏障,我们看MessageQueue.removeSyncBarrier()函数源码是怎么移除同步屏障的:

public void removeSyncBarrier(int token) {
    synchronized (this) {
        Message prev = null;
        Message p = mMessages;
        //找到token对应的屏障
        while (p != null && (p.target != null || p.arg1 != token)) {
            prev = p;
            p = p.next;
        }
        final boolean needWake;
        //从消息链表中移除
        if (prev != null) {
            prev.next = p.next;
            needWake = false;
        } else {
            mMessages = p.next;
            needWake = mMessages == null || mMessages.target != null;
        }
        //回收Message到对象池中。
        p.recycleUnchecked();
        if (needWake && !mQuitting) {
            nativeWake(mPtr);//唤醒消息队列
        }
    }

在启动同步屏障时,已经记录了token,然后通过token对应的屏障,从消息链表中移除,回收Message到对象池中。

同步消息屏障的应用场景

同步屏障在系统源码中有哪些使用场景呢?我们日常开发中也很少用到同步消息屏障,涉及到同步消息屏障一般都是Android系统中使用得最多了,比如我们的UI更新相关的消息,就是利用同步屏障发送异步消息,则会优先处理,从而达到马上刷新UI。既然知道了Android系统中刷新UI使用了异步消息,那么我们看看View的更新,draw()、requestLayout()、invalidate() 等函数都调用了。

我们从View的绘制流程起始,View通过ViewRootImpl来绘制,ViewRootImpl调用到requestLayout()来完成View的绘制操作:知道这个流程,我们看ViewRootImpl.requestLayout()函数源码:

@Override
public void requestLayout() {
    if (!mHandlingLayoutInLayoutRequest) {
        checkThread();//检查是否在主线程
        mLayoutRequested = true;//mLayoutRequested 是否measure和layout布局。
        //重要函数
        scheduleTraversals();
    }
}

void scheduleTraversals() {
    if (!mTraversalScheduled) {
        mTraversalScheduled = true;
        //设置同步障碍,确保mTraversalRunnable优先被执行
        mTraversalBarrier = mHandler.getLooper().getQueue().postSyncBarrier();
        //内部通过Handler发送了一个异步消息
        mChoreographer.postCallback(
                Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
        if (!mUnbufferedInputDispatch) {
            scheduleConsumeBatchedInput();
        }
        notifyRendererOfFramePending();
        pokeDrawLockIfNeeded();
    }
}

//移除同步屏障
void unscheduleTraversals() {
    if (mTraversalScheduled) {
        mTraversalScheduled = false;
        mHandler.getLooper().getQueue().removeSyncBarrier(mTraversalBarrier);
        mChoreographer.removeCallbacks(
            Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
    }
}

调用 Handler.getLooper().getQueue().postSyncBarrier() 并设置同步屏障消息。

最终调用Choreographer.postCallbackDelayedInternal()函数,在其函数中设置Message.setAsynchronous(true) 时 ,也就发送异步消息。

private void postCallbackDelayedInternal(int callbackType,
                                         Object action, Object token, long delayMillis) {
    synchronized (mLock) {
        final long now = SystemClock.uptimeMillis();
        final long dueTime = now + delayMillis;
        mCallbackQueues[callbackType].addCallbackLocked(dueTime, action, token);

        if (dueTime <= now) {
            scheduleFrameLocked(now);
        } else {
            Message msg = mHandler.obtainMessage(MSG_DO_SCHEDULE_CALLBACK, action);//获取消息对象
            msg.arg1 = callbackType;
            msg.setAsynchronous(true);//设置为异步
            mHandler.sendMessageAtTime(msg, dueTime);//发送异步消息
        }
    }
}

调用Handler.getLooper().getQueue().removeSyncBarrier()函数移除同步屏障。最终调用Choreographer.removeCallbacksInternal()函数移除消息。

private void removeCallbacksInternal(int callbackType, Object action, Object token) {
    synchronized (mLock) {
        mCallbackQueues[callbackType].removeCallbacksLocked(action, token);
        if (action != null && token == null) {
            //移除消息
            mHandler.removeMessages(MSG_DO_SCHEDULE_CALLBACK, action);
        }
    }
}

总结

  • 线程切换,在子线程发送Message,Message对象是存放在堆内存,获取到Message时,主线程dispatchMessage分发处理这条消息。
  • ThreadLocal的作用就是线程隔离,每个线程都提供一个变量的副本,使得每一个线程在同一时间内访问到的不同对象。
  • 当MessageQueue中没有Message时,触发Native层进入阻塞状态;或者MessageQueue中的Message更新时间没有到时,也进入了阻塞状态,阻塞时长就是它的更新时间。
  • MessageQueue中没有Message时,向MessageQueue中添加Message时,触发Native层的唤醒机制;MessageQueue中的Message更新时间到时,也触发唤醒。
  • Handler的同步屏障,就是优化处理系统的Message,并且Message是异步的。比如:Android应用程序出现ANR,系统发起同步屏障,发送一条异步消息,界面上优先处理这条消息了,然后系统弹出一个ANR消息对话框。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350