接着ALooper、AMessage、AHandler的简述,现在来分析下ALooper的声明以及定义。
声明
这篇重在细节,废话不多说,各位看官搬好小板凳,且请看代码(已加注释)。
#include <media/stagefright/foundation/ABase.h>
#include <media/stagefright/foundation/AString.h>
#include <utils/Errors.h>
#include <utils/KeyedVector.h>
#include <utils/List.h>
#include <utils/RefBase.h>
#include <utils/threads.h>
namespace android {
//前置声明,在ALooper中只用到了这三个结构体的指针或者引用。至于为什么要使用前置声明,我在C/C++专栏会写
struct AHandler;
struct AMessage;
struct AReplyToken;
struct ALooper : public RefBase {
typedef int32_t event_id;
typedef int32_t handler_id;
//构造函数
ALooper();
// Takes effect in a subsequent call to start().
//给这个Looper起名字
void setName(const char *name);
//将一个Handler注册其中
handler_id registerHandler(const sp<AHandler> &handler);
//注销对应ID的AHandler
void unregisterHandler(handler_id handlerID);
//启动该Looper的独立线程
status_t start(
bool runOnCallingThread = false,
bool canCallJava = false,
int32_t priority = PRIORITY_DEFAULT
);
//停止该Looper的独立线程
status_t stop();
//获得当前的系统时间
static int64_t GetNowUs();
//获取当前Looper的名字
const char *getName() const {
return mName.c_str();
}
protected:
virtual ~ALooper();
private:
friend struct AMessage; // post()
//包装Message
struct Event {
int64_t mWhenUs;
sp<AMessage> mMessage;
};
Mutex mLock;
Condition mQueueChangedCondition;
AString mName;
List<Event> mEventQueue;
//继承于Thread
struct LooperThread;
sp<LooperThread> mThread;
bool mRunningLocally;
// use a separate lock for reply handling, as it is always on another thread
// use a central lock, however, to avoid creating a mutex for each reply
Mutex mRepliesLock;
Condition mRepliesCondition;
// START --- methods used only by AMessage
// posts a message on this looper with the given timeout
void post(const sp<AMessage> &msg, int64_t delayUs);
// creates a reply token to be used with this looper
sp<AReplyToken> createReplyToken();
// waits for a response for the reply token. If status is OK, the response
// is stored into the supplied variable. Otherwise, it is unchanged.
status_t awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response);
// posts a reply for a reply token. If the reply could be successfully posted,
// it returns OK. Otherwise, it returns an error value.
status_t postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &msg);
// END --- methods used only by AMessage
//如果有队列中有消息,且未超过等待时间,则将队列头的消息交付给对应的handler处理
bool loop();
DISALLOW_EVIL_CONSTRUCTORS(ALooper);
};
下面,将就代码中和结构相关的部分简要分析一下。
struct Event {
int64_t mWhenUs;
sp<AMessage> mMessage;
};
List<Event> mEventQueue;
可以看到,Event结构体对Message以及该Message所对应的入队时间进行的组装。而后,又将这些进入该Looper的消息进行的一个排队(用一个List包起来)。如果大家还记得之前《ALooper、AMessage、AHandler的简述》一篇的图1的话,就能很好的理解,为什么称ALooper为消息队列了。
定义
下面,将就代码中和重要功能相关的部分简要分析一下。
1. void ALooper::post(const sp<AMessage> &msg, int64_t delayUs)
168 void ALooper::post(const sp<AMessage> &msg, int64_t delayUs) {
169 Mutex::Autolock autoLock(mLock);
170
171 int64_t whenUs;
172 if (delayUs > 0) {
173 whenUs = GetNowUs() + delayUs;
174 } else {
175 whenUs = GetNowUs();
176 }
177
178 List<Event>::iterator it = mEventQueue.begin();
179 while (it != mEventQueue.end() && (*it).mWhenUs <= whenUs) {
180 ++it;
181 }
182
183 Event event;
184 event.mWhenUs = whenUs;
185 event.mMessage = msg;
186
187 if (it == mEventQueue.begin()) {
188 mQueueChangedCondition.signal();
189 }
190
191 mEventQueue.insert(it, event);
192}
这个函数很简单。
- 根据设置的delayUS(延时交付时间)和NowUS(系统时间)来计算真正这条Message需要交付的时间。
- 之后再根据这个时间计算当前Message在消息队列中的位置。
- 如果该消息是队列头消息,就通知一下(等待中的线程中只有一个会被唤醒执行)。
- 如果不是就加入到队列的相应位置中。
2. status_t postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &msg);
252 status_t ALooper::postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &reply) {
253 Mutex::Autolock autoLock(mRepliesLock);
254 status_t err = replyToken->setReply(reply);
255 if (err == OK) {
256 mRepliesCondition.broadcast();
257 }
258 return err;
259}
先看看replyToken是个什么鬼(他住在AMessage的家里)
33 struct AReplyToken : public RefBase {
34 AReplyToken(const sp<ALooper> &looper)
35 : mLooper(looper),
36 mReplied(false) {
37 }
再看看setReply做了什么事情。
40 status_t AReplyToken::setReply(const sp<AMessage> &reply) {
41 if (mReplied) {
42 ALOGE("trying to post a duplicate reply");
43 return -EBUSY;
44 }
45 CHECK(mReply == NULL);
46 mReply = reply;
47 mReplied = true;
48 return OK;
49}
现在就很清楚了
- 创建一个和当前Looper相关联的AReplyToken ,并且初始化reply状态为false,就是还没有回复。
- 之后将reply的消息和reply的状态(变为true,意为这个时候已经回复了)给这个ReplyToken
- 如果成功返回OK,就广播一下(等待的线程都会被唤醒)
3. status_t ALooper::start(bool runOnCallingThread, bool canCallJava, int32_t priority)
96 status_t ALooper::start(
97 bool runOnCallingThread, bool canCallJava, int32_t priority) {
98 if (runOnCallingThread) {
99 {
100 Mutex::Autolock autoLock(mLock);
101
102 if (mThread != NULL || mRunningLocally) {
103 return INVALID_OPERATION;
104 }
105
106 mRunningLocally = true;
107 }
108
109 do {
110 } while (loop());
111
112 return OK;
113 }
114
115 Mutex::Autolock autoLock(mLock);
116
117 if (mThread != NULL || mRunningLocally) {
118 return INVALID_OPERATION;
119 }
120
121 mThread = new LooperThread(this, canCallJava);
122
123 status_t err = mThread->run(
124 mName.empty() ? "ALooper" : mName.c_str(), priority);
125 if (err != OK) {
126 mThread.clear();
127 }
128
129 return err;
130}
首先,我们可以知道这是该Looper线程启动的函数. 当设置了runOnCallingThread
为true的时候, 对应逻辑里面有一个重要的代码段:
do {
} while (loop());
那这个loop() 是干嘛用的? 我们来看一看.
194 bool ALooper::loop() {
195 Event event;
196
197 {
198 Mutex::Autolock autoLock(mLock);
199 if (mThread == NULL && !mRunningLocally) {
200 return false;
201 }//如果没有分到线程且没有在本地运行,返回错误
202 if (mEventQueue.empty()) {
203 mQueueChangedCondition.wait(mLock);
204 return true;
205 }
206 int64_t whenUs = (*mEventQueue.begin()).mWhenUs;
207 int64_t nowUs = GetNowUs();
208
209 if (whenUs > nowUs) {
210 int64_t delayUs = whenUs - nowUs;
211 mQueueChangedCondition.waitRelative(mLock, delayUs * 1000ll);
212
213 return true;
214 }
215
216 event = *mEventQueue.begin();
217 mEventQueue.erase(mEventQueue.begin());
218 }
219
220 event.mMessage->deliver();
221
222 // NOTE: It's important to note that at this point our "ALooper" object
223 // may no longer exist (its final reference may have gone away while
224 // delivering the message). We have made sure, however, that loop()
225 // won't be called again.
226
227 return true;
228}
重要的有以下几点
if (mEventQueue.empty()) {
mQueueChangedCondition.wait(mLock);
return true;
}
不知道大家是否还记得,之前讲异步消息机制简述那一篇中, 有提到过说, ALooper会检查消息队列中消息是否为空, 如果为空, 则等待事件, 降低CPU资源的消耗. 那么这段代码就是上面所说的具体实现了.
之后, 会判断whenUS和nowUS, 根据这两个值的比较来判断是否还需要等待一段时间处理Event.
紧接着, 用上面定义了的Event event, 拿到消息队列的列头, 并清除原队列的列头.
最后, 交付这个消息. (这个交付的实现, 会在AMessage中讲解)
好回到start()
代码的接下来的流程中. 接下来就很简单了:
- 创建一个LooperThread的实例;
- 把这个实例线程跑起来.
接下来,讲两个和AMessage::postAndAwaitResponse
方法紧密相连的两个方法. 一个是ALooper::createReplyToken()
, 而另外一个是ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response)
首先, 来了解下
4. ALooper::createReplyToken()
这个玩意儿的代码如下:
230 // to be called by AMessage::postAndAwaitResponse only
231 sp<AReplyToken> ALooper::createReplyToken() {
232 return new AReplyToken(this);
233 }
没错, 没干什么事情, 只是用当前的Looper, 实例化了一个AReplyToken(在第2点钟有提到这个东东)并返回.
接着,来看这个
5. ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response)
它长这个样子:
236 status_t ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response) {
237 // return status in case we want to handle an interrupted wait
238 Mutex::Autolock autoLock(mRepliesLock);
239 CHECK(replyToken != NULL);
240 while (!replyToken->retrieveReply(response)) {
241 {
242 Mutex::Autolock autoLock(mLock);
243 if (mThread == NULL) {
244 return -ENOENT;
245 }
246 }
247 mRepliesCondition.wait(mRepliesLock);
248 }
249 return OK;
250}
我们遇到了陌生的retrieveReply
, 他长这个样子:
49 // if reply is not set, returns false; otherwise, it retrieves the reply and returns true
50 bool retrieveReply(sp<AMessage> *reply) {
51 if (mReplied) {
52 *reply = mReply;
53 mReply.clear();
54 }
55 return mReplied;
56 }
原生注释已经说得很清楚了. 如果replay没有设置,就返回false; 否则, 就获得这个replay的Message并返回true.
回到awaitResponse
中. 当retrieveReply
返回false, 也就是说没有拿到需要replay的Message的时候, 等待着, 一直等待到拿到这个消息为止(各位看官, 这里就是为了实现postAndAwaitResponse
的同步的目的了哦)
总结
这篇讲述了一个叫ALooper的履带。它将AMessage根据交付时间(whenUs)进行排队(List<mEvent>)。然后简单介绍了它是怎么将消息(AMessage交付的(有同步的, 也有异步的))。关于交付这一块是不完整的,大部分的逻辑在AMessage中,今天先介绍到这里,有空再写最复杂的AMessage和相对简单的AHandler。