Handler详解
举例
- 线程间传递数据(主线程跟子线程、两个子线程)
简介
- 一套android消息传递机制。在多线程的应用场景中,将工作线程中需更新UI的操作信息传递到 UI主线程,从而实现工作线程对UI的更新处理,最终实现异步消息的处理。
- 使用Handler的原因:将工作线程需操作UI的消息传递到主线程,使得主线程可根据工作线程的需求更新UI,从而避免线程操作不安全的问题
工作流程
- 异步通信准备
- 在主线程中创建处理器对象(Looper)、消息队列对象(MessageQueue)、Handler对象
- 消息入队
- 工作线程通过Handler发送消息(Message)到消息队列(MessageQueue)中
- 消息循环
- 消息出队:Looper循环取出消息队列(MessageQueue)中的消息(Message)
- 消息分发:Looper将取出的消息(Message)发送给创建该消息的处理者(Handler)
- 消息处理
- 处理者(Handler)接受处理器(Looper)发送过来的消息(Message)
-
处理者(Handler)根据消息(Message)进行UI操作
核心类
源码分析
- 创建Handler
//在主线程中 通过匿名内部类创建Handler类对象
private Handler mhandler = new Handler(){
// 通过复写handlerMessage()从而确定更新UI的操作
@Override
public void handleMessage(Message msg) {
// 需执行的UI操作
...
}
};
//创建消息对象
Message message = handler.obtainMessage(MESSAGE_WHAT);
message.obj = "I am message from work thread";
//发送
handler.sendMessage(message);
/**
* 源码分析:Handler的构造方法
* 作用:初始化Handler对象 & 绑定线程
* a. Handler需绑定线程才能使用;绑定后,Handler的消息处理会在绑定的线程中执行
* b. 绑定方式为先指定Looper对象,从而绑定了Looper对象所绑定的线程(因为Looper对象本已绑定了对应线程),即指定了Handler对象的Looper对象绑定到了Looper对象所在的线程
*/
public Handler() {
this(null, false);
}
/**
* 分析:this(null, false) = Handler(null,false)
*/
public Handler(Callback callback, boolean async) {
// 1、 指定Looper对象
// Looper.myLooper()作用:获取当前线程的Looper对象;若线程无Looper对象则抛出异常
// 若线程中无创建Looper对象,则也无法创建Handler对象
// 故若需在子线程中创建Handler对象,则需先创建Looper对象
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread that has not called Looper.prepare()");
}
// 2、绑定消息队列对象(MessageQueue),获取该Looper对象中保存的消息队列对象(MessageQueue),保证了handler对象关联上Looper对象中MessageQueue
mQueue = mLooper.mQueue;
}
- 创建主线程时,会自动调用ActivityThread的1个静态的main();而main()内则会调用Looper.prepareMainLooper()为主线程生成1个Looper对象,同时也会生成其对应的MessageQueue对象
//主线程(ActivityThread)创建Looper对象和对应的消息队列对象(MessageQueue)
public static void main(String[] args) {
//为主线程创建1个Looper对象,同时生成1个消息队列对象(MessageQueue),该方法在主线程(UI线程)创建时自动调用,即主线程的Looper对象自动生成,不需手动生成
Looper.prepareMainLooper();
//创建主线程
ActivityThread thread = new ActivityThread();
//自动开启 消息循环
Looper.loop();
}
/**
* 分析:prepareMainLooper()
*/
public static void prepareMainLooper() {
prepare(false);
//主线程只能有一个looper
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}
/**
* 分析:prepare(false)
*/
private static void prepare(boolean quitAllowed) {
// 判断sThreadLocal是否为null,否则抛出异常
//即 Looper.prepare()方法不能被调用两次,1个线程中只能对应1个Looper实例
//注:sThreadLocal是ThreadLocal(线程本地存储区)对象,用于保存线程共享变量
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
/**
* 分析:Looper()
*/
private Looper(boolean quitAllowed) {
//生成消息队列(MessageQueue,绑定对应线程)
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
- 创建消息对象
Message message = handler.obtainMessage(MESSAGE_WHAT);
message.obj = "I am message from work thread";
/**
* 分析:handler.obtainMessage(MESSAGE_WHAT);
* 作用:创建消息对象
* 注:创建Message对象可用关键字new 或 Message.obtain()
*/
public final Message obtainMessage(int what)
{
return Message.obtain(this, what);
}
public static Message obtain() {
// Message内部维护了1个Message池,用于Message消息对象的复用
// 使用obtain()则是直接从池内获取
synchronized (sPoolSync) {
if (sPool != null) {
Message m = sPool;
sPool = m.next;
m.next = null;
m.flags = 0; // clear in-use flag
sPoolSize--;
return m;
}
// 建议使用obtain()创建消息对象,避免每次都使用new重新分配内存
}
// 若池内无消息对象可复用,则还是用关键字new创建
return new Message();
}
- 发送消息
mHandler.sendMessage(msg);
或者
handler.post(new Runnable() {
@Override public void run() {
...
}
});
/**
* 分析:sendMessage(Message msg)
*/
public final boolean sendMessage(Message msg)
{
return sendMessageDelayed(msg, 0);
}
/**
* 分析:post(Runnable r)
*/
public final boolean post(Runnable r)
{
return sendMessageDelayed(getPostMessage(r), 0);
}
/**
* 分析:getPostMessage(r)
* 作用:将传入的Runable对象封装成1个消息对象
*/
private static Message getPostMessage(Runnable r) {
//创建1个消息对象(Message)
Message m = Message.obtain();
//将Runable对象赋值给消息对象(message)的callback属性
m.callback = r;
//返回该消息对象
return m;
}
/**
* 分析:sendMessageDelayed(msg, 0)
*/
public final boolean sendMessageDelayed(Message msg, long delayMillis)
{
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
/**
* 分析sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis)
*/
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
// 1. 获取对应的消息队列对象(MessageQueue)
MessageQueue queue = mQueue;
// 2. 调用了enqueueMessage方法
return enqueueMessage(queue, msg, uptimeMillis);
}
/**
* 分析:enqueueMessage(queue, msg, uptimeMillis)
*/
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
// 1. 将msg.target赋值为this,即把当前的Handler实例对象作为msg的target属性
msg.target = this;
// 2. 调用消息队列的enqueueMessage(),即:Handler发送的消息,最终是保存到消息队列
return queue.enqueueMessage(msg, uptimeMillis);
}
/**
* 定义:属于消息队列类(MessageQueue)的方法
* 作用:入队,即将消息根据时间放入到消息队列中(Message ->> MessageQueue)
* 采用单链表实现:提高插入消息、删除消息的效率
*/
boolean enqueueMessage(Message msg, long when) {
...// 仅贴出关键代码
synchronized (this) {
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
// 判断消息队列里有无消息
// a. 若无,则将当前插入的消息作为队头,若此时消息队列处于等待状态,则唤醒
if (p == null || when == 0 || when < p.when) {
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
// b. 判断消息队列里有消息,则根据消息(Message)触发的时间插入到队列中
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
- Looper从消息队列取消息并分发给Handler
/**
* 源码分析: Looper.loop()
* 作用:消息循环,即从消息队列中获取消息、分发消息到Handler
*/
public static void loop() {
...// 仅贴出关键代码
// 获取当前Looper的消息队列,myLooper()作用:返回sThreadLocal存储的Looper实例;若me为null 则抛出异常,即loop()执行前必须执行prepare(),从而创建1个Looper实例
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
// 获取Looper实例中的消息队列对象(MessageQueue)
final MessageQueue queue = me.mQueue;
// 消息循环
for (;;) {
//从消息队列中取出消息,next():取出消息队列里的消息,若取出的消息为空,则线程阻塞
Message msg = queue.next();
if (msg == null) {
return;
}
// 派发消息到对应的Handler,把消息Message派发给消息对象msg的target属性,target属性实际是1个handler对象
msg.target.dispatchMessage(msg);
// 释放消息占据的资源
msg.recycleUnchecked();
}
}
/**
* 分析:queue.next()
* 定义:属于消息队列类(MessageQueue)中的方法
* 作用:出队消息,即从 消息队列中 移出该消息
*/
Message next() {
...// 仅贴出关键代码
// 该参数用于确定消息队列中是否还有消息,从而决定消息队列应处于出队消息状态 or 等待状态
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
// nativePollOnce方法在native层,若是nextPollTimeoutMillis为-1,此时消息队列处于等待状态
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
// 出队消息,即从消息队列中取出消息:按创建Message对象的时间顺序
if (msg != null) {
if (now < msg.when) {
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// 取出了消息
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
msg.markInUse();
return msg;
}
} else {
// 若 消息队列中已无消息,则将nextPollTimeoutMillis参数设为-1
// 下次循环时,消息队列则处于等待状态
nextPollTimeoutMillis = -1;
}
......
}
.....
}
}
/**
* 分析:dispatchMessage(msg)
* 定义:属于处理者类(Handler)中的方法
* 作用:派发消息到对应的Handler实例 & 根据传入的msg作出对应的操作
*/
public void dispatchMessage(Message msg) {
// 若msg.callback属性不为空,则代表使用了post(Runnable r)发送消息,则执行handleCallback(msg),即回调Runnable对象里复写的run()
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
// 若msg.callback属性为空,则代表使用了sendMessage(Message msg)发送消息,则执行handleMessage(msg),即回调复写的handleMessage(msg)
handleMessage(msg);
}
}
/**
* 分析:handleMessage(msg)
* 在创建Handler实例时复写
*/
public void handleMessage(Message msg) {
... // 创建Handler实例时复写
}
- MessageQueue native中nativeWake(long ptr)跟nativePollOnce(long ptr, int timeoutMillis)
/**
* 分析:android_os_MessageQueue.cpp中nativeWake(long ptr)
*/
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
/**
* 指向了Looper.cpp的wake()
*/
void NativeMessageQueue::wake() {
mLooper->wake();
}
/**
* Looper.cpp中wake()
*/
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
ssize_t nWrite;
do {
// 向管道mWakeWritePipeFd写入字符,用来唤醒线程
nWrite = write(mWakeWritePipeFd, "W", 1);
} while (nWrite == -1 && errno == EINTR);
if (nWrite != 1) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
/**
* 分析:android_os_MessageQueue.cpp中nativePollOnce(long ptr, int timeoutMillis)
*/
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
/**
* 指向了Looper.cpp中的pollOnce()
*/
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
/**
* Looper.cpp中pollOnce
*/
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
// 先处理没有Callback方法的 Response事件
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}
if (result != 0) {
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}
//处理内部轮询
result = pollInner(timeoutMillis);
}
}
/**
* pollInner(int timeoutMillis)
*/
int Looper::pollInner(int timeoutMillis) {
// 调整超时时间根据下一条消息
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
int result = ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
//等待事件发生或者超时,向管道写端写入字符,则该方法会返回
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// Acquire lock.
mLock.lock();
// Check for poll error.
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
result = ALOOPER_POLL_ERROR;
goto Done;
}
// Check for poll timeout.
if (eventCount == 0) {
result = ALOOPER_POLL_TIMEOUT;
goto Done;
}
// 循环遍历,处理所有的事件
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
//通过管道读端被唤醒
if (fd == mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) {
//去把管道数据中数据读完
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
//处理request,生成对应的respone对象,push到响应数组
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
// 再处理Native的Message,调用相应回调方法
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
result = ALOOPER_POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
//处理带有Callback()方法的Response事件,执行Reponse相应的回调方法
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == ALOOPER_POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd);
}
//清除reponse引用的回调方法
response.request.callback.clear();
result = ALOOPER_POLL_CALLBACK;
}
}
return result;
}
/**
* 读取管道数据
*/
void Looper::awoken() {
char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}
总结
- 应用程序主线程创建时,则自动为主线程
- 创建一个处理器对象Looper
- 创建消息队列对象(MessageQueue)
- 进入消息循环
- 创建Handler实例,绑定所在的当前线程和对应的消息队列
- 创建消息对象(Message、Runnable)
- 随着Looper对象的无限消息循环,从消息队列(MessageQueue)中取出Handler发送的消息,不断将消息发送到对应的Handler,根据回调方法进行相关的处理。