概述
Android是一个消息驱动的系统,Android的四大组件Activity, Service, ContentProvider, BroadcastReceiver的交互过程都离不开消息机制。
Java层的消息机制主要由以下4个类来实现
- Message:消息分为硬件产生的消息(如按钮、触摸)和软件生成的消息;
- MessageQueue:消息队列的主要功能向消息池投递消息(MessageQueue.enqueueMessage)和取走消息池的消息(MessageQueue.next);
- Handler:消息辅助类,主要功能向消息池发送各种消息事件(Handler.sendMessage)和处理相应消息事件(Handler.handleMessage);
- Looper:不断循环执行(Looper.loop),按分发机制将消息分发给目标处理者。
类结构图如下
Handler的简单使用方法如下,可以用于线程间通信
//在主线程中初始化Handler
private Handler handler = new Handler() {
@Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
switch(msg.what){
//在此处处理消息
}
}
};
Thread thread = new Thread(){
@Override
public void run() {
super.run();
//在子线程中发送消息
Message msg = new Message();
msg.what = 1;
handler.sendMessage(msg);
}
};
thread.start();
由代码可见从子线程发送消息到主线程需要三步
- 1: 在主行程创建Handler对象,并实现handleMessage(Message msg)方法
- 2: 创建Message对象
- 3: 在子线程中使用handler对象的sendMessage方法发送消息
其实在创建Handler对象的时候,所在的线程必须先执行Looper.prepare()和Looper.loop()两个方法, 由于我们在Android创建Handler的时候一般是在Android的主线程中创建的,可以省略这个步骤, 因为Android的主线程在创建的时候已经为我们做了这些事情, 这也是主线程之所以称之为主线程的意义。
以下是ActivityThread.main方法代码
public static void main(String[] args) {
Looper.prepareMainLooper();
ActivityThread thread = new ActivityThread();
thread.attach(false, startSeq);
if (sMainThreadHandler == null) {
sMainThreadHandler = thread.getHandler();
}
Looper.loop();
throw new RuntimeException("Main thread loop unexpectedly exited");
}
Looper.prepareMainLooper方法和Looper.prepare方法类似, 只不过前者是专门为主线程实现的方法。
Looper初始化过程
如果想要在一个线程中创建Handler对象,必须在该线程中调用Looper.prepare()和Looper.loop()两个方法初始化,这两个方法究竟做了哪些准备呢?
Looper.prepare
/** Initialize the current thread as a looper.
* This gives you a chance to create handlers that then reference
* this looper, before actually starting the loop. Be sure to call
* {@link #loop()} after calling this method, and end it by calling
* {@link #quit()}.
*/
public static void prepare() {
prepare(true);
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
这个方法非常简单,只是调用了带参数的prepare方法,带参数的prepare又做了什么工作呢?创建了一个Looper对象,并将它存储到了线程本地内存中, 也就是说Looper是某个线程独有的。
这个方法的的作用是为本线程初始化Looper, 初始化Looper之后,就可以在本线程中创建Handler了。
Looper构造方法
final MessageQueue mQueue;
final Thread mThread;
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
Looper的构造方法中创建了一个MessageQueue对象,并且保存在了当前线程的Looper对象中,MessageQueue中保存了Java层的Message消息队列。
MessageQueue构造方法
private long mPtr; // used by native code
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
MessageQueue对象的构造方法中调用nativeInit方法初始化了mPtr变量,该变量中保存了一个C++对象的指针地址。
nativeInit是一个native方法, 通过JNI调用,具体实现在/frameworks/base/core/jni/android_os_MessageQueue.cpp类中。
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);
}
通过该方法的实现可以看出,native层创建了一个NativeMessageQueue对象, 并将该对象的指针返回给了Java层,最终保存在了Java层MessageQueue对象的mPtr变量中, 也就是说一个Java层的MessageQueue对象对应了一个Natvie层的NativeMessageQueue对象。那NativeMessageQueue的作用是什么呢?
NativeMessageQueue
NativeMessageQueue::NativeMessageQueue() {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
在NativeMessageQueue中, 首先调用getForThread方法从线程本地内存中获取Native的Looper对象,如果没有, 则创建一个新的Native Looper对象,同样保存到现场本地内存中,和Java层的ThreadLocal类似。
Native Looper构造方法
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",
strerror(errno));
AutoMutex _l(mLock);
rebuildEpollLocked();
}
Native的Looper主要使用了Linux的Epoll机制,eventfd 是 Linux 的一个系统调用,创建一个文件描述符用于事件通知, 创建的文件描述符保存在mWakeEventFd变量中。
然后调用rebuildEpollLocked方法创建Epoll监听事件
void Looper::rebuildEpollLocked() {
// 如果Epoll已经创建,则先关闭
if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
close(mEpollFd);
}
// 创建一个 epoll 的句柄,EPOLL_SIZE_HINT 是指监听的描述符个数
// 现在内核支持动态扩展,该值的意义仅仅是初次分配的 fd 个数,后面空间不够时会动态扩容。
// 当创建完 epoll 句柄后,占用一个 fd 值.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd;
//对 mWakeEventFd 文件描述符进行注册,这样 mEpollFd 就能监听到 mWakeEventFd 的读写事件。
int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
strerror(errno));
//设置要监听的其他fd
for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
request.fd, strerror(errno));
}
}
}
rebuildEpollLocked方法创建了Epoll句柄, 并设置了Epoll监听mWakeEventFd文件节点的可读事件,当mWakeEventFd有新内容的时候触发事件。同时还设置了Epoll还可以监听其他文件描述符,此处Native Looper的用法,不再详细分析。Native Looper和Java层的Looper虽然名字一样, 但是两者没有关系,Android在Java层和Native层分别实现了一套Handler消息机制。
Looper.prepare到底做了什么工作呢?
为调用线程创建了一个Looper对象,保存在了线程本地内存中,为该线程独有,Looper对象中创建并保存了一个MessageQueue对象,这个Java层的MessageQueue对象对应了一个NativeMessageQueue对象。而NativeMessageQueue对象初始化了Epoll, 监听mWakeEventFd文件节点。
Looper.loop
/**
* Run the message queue in this thread. Be sure to call
* {@link #quit()} to end the loop.
*/
public static void loop() {
// 找到本线程的Looper对象
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
拿到Looper中的MessageQueue对象
final MessageQueue queue = me.mQueue;
// 无限循环遍历
for (;;) {
//调用MessageQueue.next方法获取下一个message,如果没有消息会阻塞在这个方法上
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
final long dispatchStart = needStartTime ? SystemClock.uptimeMillis() : 0;
final long dispatchEnd;
try {
// 调用Message.target的dispatchMessage方法处理消息
// target就是Handler, 后续在详细分析
msg.target.dispatchMessage(msg);
dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
} finally {
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}
msg.recycleUnchecked();
}
}
Looper.loop方法做了以下几个事情:
1: 调用myLooper方法,从线程本地内存中取出该线程的Looper对象,然后从Looper对象中拿出MessageQueue对象
2: 调用MessageQueue的next方法获取下一个Message消息,如果没有消息就阻塞在该方法上
3: 将获取的Message消息,通过target的dispatchMessage方法进行处理
循环执行步骤2和步骤3
那我们看下MessageQueue的next方法是如何处理的?
Message next() {
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
// 调用Native的方法,可能阻塞在此方法上
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null) {
if (now < msg.when) {
// 下一个Message还不到触发事件,计算距离出发时所剩的时间,设置为下次触发时间
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// 如果没有Message, 则设置下次触发超时时间为-1
nextPollTimeoutMillis = -1;
}
}
}
next方法主要作用是计算下一个Message触发时间,如果还未到下一个Message的触发时间,则计算到触发剩余时间,如果没有新的Message可以处理,则设置下次触发时间为-1
最终将触发事件作为参数,调用nativePollOnce方法,再接着看下nativePollOnce的用法, 它对应的JNI方法实现在android_os_MessageQueue.cpp中。
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
将保存在java层的NativeMessageQueue指针转换成NativeMessageQueue对象, 然后调用该对象的pollOnce方法。
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
NativeMessageQueue的pollOnce方法,最终调用了Native Looper的pollOnce方法
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
result = pollInner(timeoutMillis);
}
int Looper::pollInner(int timeoutMillis) {
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// 等待 mEpollFd 上的 IO 事件
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd) {
// 收到mWakeEventFd可读事件
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
// 监听的其他文件节点fd时间的处理逻辑
}
}
return result;
}
void Looper::awoken() {
uint64_t counter;
TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));
}
nativePollOnce最终调用到了pollInner方法中,参数timeoutMillis就是next方法中计算的时间,然后将timeoutMillis作为参数调用epoll的epoll_wait方法,等待mEpollFd的IO事件上报。此处timeoutMillis参数为等待超时时间
如果,timeoutMillis > 0, 表示收到IO事件或者等待超时之后返回
如果,timeoutMillis = 0, 表示不等待立刻返回
如果,timeoutMills = -1, 表示没有超时时间,一直阻塞在wait方法上,直到有IO消息上报再返回
当收到IO消息上报之后,调用awoken从mWakeEventFd中读取内容。
小结
至此Looper的初始化过程已经分析完了,做个总结
- Looper.prepare
为当前线程创建了Looper对象,保存在线程本地内存
Looper对象中创建了MessageQueue对象,MessageQueue对象中对应了一个NativeMessageQueue对象
NativeMessageQueue对象中创建了Epoll句柄,用于监听mWakeEventFd文件节点的可读IO事件 - Looper.loop
不断循环执行,等待符合条件的Message, 按分发机制将消息分发给目标处理者
loop方法中对于Message队列中的消息执行时间是基于Linux的Epoll机制,每次Epoll触发后分发消息,然后计算下一条Message触发时间,设置为Epoll的下次wait的超时时间
Handler消息发送过程
Looper初始化完成之后,就处于loop循环之中,等待接受消息,消息是如何发送的呢?
创建Handler对象
public Handler() {
this(null, false);
}
public Handler(Looper looper, Callback callback) {
this(looper, callback, false);
}
public Handler(Callback callback, boolean async) {
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread " + Thread.currentThread()
+ " that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
Handler对象创建的时候首先调用myLooper方法从当前线程本地内存中拿到初始化时保存的Looper对象,如果当前线程中没有初始化Looper对象,就会跑出了RuntimeException,这就是线程使用Handler消息机制必须先初始化Looper的原因。
然后从Looper对象中拿出MessageQueue对象保存在mQueue对象中。
创建Message对象
Message创建可以自己new一个Message对象, 也可以使用Handler.obtainMessage()方法,从Message池中获取一个空闲Message,可以重复利用,减少资源消耗。
public final Message obtainMessage()
{
// 调用Message的obtain方法
return Message.obtain(this);
}
public static Message obtain() {
synchronized (sPoolSync) {
if (sPool != null) {
Message m = sPool;
sPool = m.next;
m.next = null;
m.flags = 0; // clear in-use flag
sPoolSize--;
return m;
}
}
return new Message();
}
Handler的obtainMessage方法,调用了Message的静态obtain方法,首先判断Message池中是否还有空闲的Message对象,如果有就直接返回一个空闲Message,如果Message池中已经为空,则新创建一个Message返回。
Handler发送消息
sendEmptyMessage
sendEmptyMessageAtTime
sendEmptyMessageDelayed
sendMessage
sendMessageAtFrontOfQueue
sendMessageAtTime
sendMessageDelayed
Hander有很多种发送消息的方法,我们只需要sendMessage方法和sendMessageDelay方法
public final boolean sendMessage(Message msg)
{
return sendMessageDelayed(msg, 0);
}
/**
* Enqueue a message into the message queue after all pending messages
* before (current time + delayMillis). You will receive it in
* {@link #handleMessage}, in the thread attached to this handler.
*
* @return Returns true if the message was successfully placed in to the
* message queue. Returns false on failure, usually because the
* looper processing the message queue is exiting. Note that a
* result of true does not mean the message will be processed -- if
* the looper is quit before the delivery time of the message
* occurs then the message will be dropped.
*/
public final boolean sendMessageDelayed(Message msg, long delayMillis)
{
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
sendMessage方法调用了sendMessageDelayed方法,这个方法第二个参数是消息的延迟时间,sendMessage是无需延迟的,所以这个参数是0,如果需要延迟的话需要设置这个参数,单位为ms
sendMessageDelay方法调用了sendMessageAtTime方法,将延迟时间+当前时间,计算出消息触发的时间点。
最终调用enqueueMessage方法,将Message添加到MessageQueue队列中。
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
将Handler对象本身保存到Message的target变量中,然后调用MessageQueue的enqueueMessage方法。
boolean enqueueMessage(Message msg, long when) {
如果Message的target变量没有设置,则抛出异常
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
synchronized (this) {
msg.when = when; //消息的触发时间
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// 如果比队列最前的Message的触发时间小, 则放在队列最前端,最先触发该消息
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
//遍历MessageQueue队列,按照时间,将Message插入合适的位置
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// 调用Message的nativeWake方法
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
equeueMessage方法是将Message对象插入到Message队列合适的位置上去,Message队列是按照Message触发时间进行排序的,最先触发的放在队列最前端。将Message插入队列之后,如果消息队列阻塞在Epoll的wait方法上等待消息到来,则调用nativeWake方法唤醒。
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
mLooper->wake();
}
通过JNI调用了NativeMessageQueue的wake方法, NativeMessageQueue又调用了Native Looper的wake方法。
void Looper::wake() {
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",
mWakeEventFd, strerror(errno));
}
}
}
wake方法其实就是想mWakeEventFd文件节点中写入一个数字,来达到唤醒Epoll wait方法的目的。这样就又回到了Message.next方法中。next方法从nativePollOnce方法中唤醒返回,然后从消息队列拿出第一个消息,查看是否到了触发时间,如果没有到则就算触发剩余时间,然后继续调用nativePollOnce;如果到达触发时间,则将调用Message.target的dispatchMessage方法对消息进行分发。
Message在创建的时候将发送Message的Handler对象设置到了target变量中,下面看下Handler的dispatchMessage方法。
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
//如果Message的Callback不为null,则调用Message的Callback进行处理
handleCallback(msg);
} else {
//如果Handler的Callback不为空,则调用Handler的Callback进行处理
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
//否则,调用HandleMessage进行处理
handleMessage(msg);
}
}
首先判断Message的callback是否为空,如果不为空则将Message交给Message的callbak处理;
然后判断如果Handler是否设置了Callback回调,如果设置了则将Message交给 Callback处理;
最后,如果上述两者都没有处理的话就交给handleMessage方法进行处理。
到此Java层的Handler机制就简单分析完成了。