在 Android 的应用开发中,线程之间的通信机制主要是以 Handler 机制为主。我们都知道 Android 中的 UI 更新只能在主线程中执行,通常的做法是,我们在 Activity 中直接 new Handler,通过 handler 发送消息,在 handlerMessage 处理即可。本篇博文就根据源码来具体分析其中工作机制以及源码架构。
Java 层的三巨头
上层的 Java 代码,集中于以下三个类,Handler,Looper 以及 MessageQueue,下面我们集中分析它们。
Handler
我们通常都会在 Activity 中直接新建 Handler 来实现主线程下 UI 的更新。我们思考一下为什么直接新建的 handler 便可以更新 UI,默认为主线程呢?(直接新建 Handler 可以更新 UI,但是涉及到 Activity 生命周期以及复杂的逻辑延时处理等,此处建议以弱引用、虚引用方式持有对象,否则在延时处理中,Activity 已经销毁了,才反过来处理消息必定会发生崩溃)。
Handler 与 Looper 的绑定
我们都知道 Activity 由 ActivityThread 启动,之后会调用 ActivityThread 的 main 函数,其内部有如下代码:
public static void main(String[] args) {
Trace.traceBegin(Trace.TRACE_TAG_ACTIVITY_MANAGER, "ActivityThreadMain");
SamplingProfilerIntegration.start();
// CloseGuard defaults to true and can be quite spammy. We
// disable it here, but selectively enable it later (via
// StrictMode) on debug builds, but using DropBox, not logs.
CloseGuard.setEnabled(false);
Environment.initForCurrentUser();
// Set the reporter for event logging in libcore
EventLogger.setReporter(new EventLoggingReporter());
AndroidKeyStoreProvider.install();
// Make sure TrustedCertificateStore looks in the right place for CA certificates
final File configDir = Environment.getUserConfigDirectory(UserHandle.myUserId());
TrustedCertificateStore.setDefaultUserDirectory(configDir);
Process.setArgV0("<pre-initialized>");
// 创建主线程 looper,只创建一次
Looper.prepareMainLooper();
ActivityThread thread = new ActivityThread();
// attach到系统进程
thread.attach(false);
if (sMainThreadHandler == null) {
sMainThreadHandler = thread.getHandler();
}
if (false) {
Looper.myLooper().setMessageLogging(new
LogPrinter(Log.DEBUG, "ActivityThread"));
}
// End of event ActivityThreadMain.
Trace.traceEnd(Trace.TRACE_TAG_ACTIVITY_MANAGER);
// 主线程进入循环状态
Looper.loop();
throw new RuntimeException("Main thread loop unexpectedly exited");
}
由此可见,Activity 中已经默认给我们创建了 Looper 轮训器。此时我们直接可以使用 Handler 机制进行消息通信了。
下面我们来看 prepareMainLooper
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}
此处我们看到,直接调用 prepare,注意参数 false,之后调用 sMainLooper = myLooper(),将内部 sMainLooper 赋值,及主线程 looper 循环器。
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
此处最后通过线程本地存储 sThreadLocal 新建 Looper 对象并设置进去。sMainLooper 获取就是这个对象。
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
此处在 Looper 的构造函数中,新建了 MessageQueue 对象。这样,在应用 Activity 启动之时,Looper 与 MessageQueue就已经被系统分配完毕,后续我们直接使用即可,此处的分析暂时告一段落。
Handler 实例化
Handler 拥有多个构造函数,我们在 Activity 中直接使用的 Handler 为默认的构造函数,如下:
// 默认的构造函数
public Handler() {
this(null, false);
}
// 参数为 null,false
public Handler(Callback callback, boolean async) {
if (FIND_POTENTIAL_LEAKS) {
final Class<? extends Handler> klass = getClass();
if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
(klass.getModifiers() & Modifier.STATIC) == 0) {
Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
klass.getCanonicalName());
}
}
// 直接从 Looper 中获取 mLooper 对象
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread that has not called Looper.prepare()");
}
// Looper 创建即产生了 MessageQueue
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
有上面分析,此处直接获取了 mLooper 对象,由于在 Activity 之前调用了 Looper.prepareMainLooper() 函数,之前分析可以得知,此处获取的便是 sMainLooper 轮训器。Looper 创建之时便已经 与 MessageQueue 建立联系,此处直接可以获取消息队列 mQueue。
Handler 获取消息
Handler 内部维持了一个消息池,我们通过 obtainMessage 便可以获取一个消息实例 Message。obtainMessage 重载了很多方法,最终走向 Message.obtain 中去,Message.obtain 中根据多种参数的重载,最终指向了内部方法 obtain()。
public static Message obtain() {
synchronized (sPoolSync) {
if (sPool != null) {
Message m = sPool;
sPool = m.next;
m.next = null;
m.flags = 0; // clear in-use flag
sPoolSize--;
return m;
}
}
return new Message();
}
此方法原译注释
Return a new Message instance from the global pool. Allows us to avoid allocating new objects in many cases.
译文为:从全局池返回新的消息实例。允许我们在许多情况下避免分配新对象。
由此可见我们通过此方法可以避免自己 new Message,直接是使用提供的接口获取。减少分配对象造成的内存消耗。
关于 Message 内更多具体的分析,可以参考一下博文
Handler 发送消息、Runnable以及取消
通过 Handler 对象 sendMessage,或者 post* runnable 都可以,其内部方法重载众多,我们直接追溯根源,其最终调用到 enqueueMessage 方法,期间涉及延迟等不作过多分析。
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
代码可以看出,最终还是通过内部持有对象 MessageQueue queue 的 enqueueMessage 方法来实现。其内部具体逻辑由后面的 MessageQueue 章节具体分析。
通过 removeMessages 方法即可取消一个消息,通过 removeCallbacksAndMessages 可取消一个 Callback,其最终实现代码再后面章节中具体分析。
public final void removeMessages(int what, Object object) {
mQueue.removeMessages(this, what, object);
}
public final void removeCallbacksAndMessages(Object token) {
mQueue.removeCallbacksAndMessages(this, token);
}
从上面分析我们可以得知为什么我们可以拿来主义,直接使用了 Handler,原因是在应用启动之后,系统已经自动帮我们完成了 Looper 以及 MessageQueue 的创建。
Handler 处理消息
Handler 内部对消息的处理进行了一定程度的划分,如下代码:
public void handleMessage(Message msg) {
}
public void dispatchMessage(Message msg) {
// 首先调用消息中的 callback 回调
if (msg.callback != null) {
handleCallback(msg);
} else {
// 然后检查 Handler 中是否存在 mCallback,回调
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
// 最后调用 Handler 自身的 handleMessage
handleMessage(msg);
}
}
其接口 handleMessage 由用户自己实现,dispatchMessage 为系统内部默认的处理方式,根据消息内容作不同的处理。
Looper
Looper 我们可以称其为消息轮训器,其主要的工作是不断的读取 MessageQueue 内部的消息,取到之后便交给 Handler 来处理。
之前在 handler 中我们分析了其绑定过程,其调用的 prepareMainLooper 方法只能在主线程中调用一个,我们开发者使用时候不可以直接调用,下面我们用代码来说明。
Looper 的使用
Looper 内部有一个静态变量 sThreadLocal,其具有单例唯一性。
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
此时,如果你在主线程中调用了 prepare,那么你将会收到一个RuntimeException,代码中如下:
public static void prepare() {
prepare(true);
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
我们如果要在一个线程中使用 Looper,我们只需要做以下几步即可:
- 调用 prepare 初始化 Looper 对象
- 调用 loop 方法,启动 Looper 消息轮训
- 处理 Handler 的 handlerMessage 方法
- 调用 quit,停止 Lopper 轮训工作
Looper 构造
Looper 的构造函数,在 prepare 方法中就已经实现了,sThreadLocal.set(new Looper(quitAllowed)),具体代码如下:
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
prepare
prepare 方法用于初始化创建一个 Looper,内部新建 Looper 对象,其还会创建 MessageQueue 对象,与其绑定,完成消息队列与轮训的双重工作。具体代码见上。
loop
loop 使得 Looper 轮训器开始工作,不断检测 MessageQueue 内部是否存在消息需要处理,代码如下:
public static void loop() {
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
// 获取 loop 轮训器的消息队列 MessageQueue
final MessageQueue queue = me.mQueue;
// Make sure the identity of this thread is that of the local process,
// and keep track of what that identity token actually is.
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
// 无限循环
for (;;) {
// 调用 MessageQueue 的 next 获取下一条消息
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
// This must be in a local variable, in case a UI event sets the logger
Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " " +
msg.callback + ": " + msg.what);
}
// 分发消息,msg.target 为 msg 所依赖的 Handler对象
msg.target.dispatchMessage(msg);
if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}
// Make sure that during the course of dispatching the
// identity of the thread wasn't corrupted.
final long newIdent = Binder.clearCallingIdentity();
if (ident != newIdent) {
Log.wtf(TAG, "Thread identity changed from 0x"
+ Long.toHexString(ident) + " to 0x"
+ Long.toHexString(newIdent) + " while dispatching to "
+ msg.target.getClass().getName() + " "
+ msg.callback + " what=" + msg.what);
}
// 回收消息资源
msg.recycleUnchecked();
}
}
代码中我们可以看出,这里在无限循环中,通过 queue.next() 不断获取消息,得到的消息便调用 msg.target.dispatchMessage(msg) 将消息交与其 handler 进行处理,最终在将 msg 消息资源进行回收。
quit
通过 quit 即可停止 Looper 的工作,逻辑很简单,调用 MessageQueue 的 quit,其包含普通退出和安全退出
public void quit() {
mQueue.quit(false);
}
public void quitSafely() {
mQueue.quit(true);
}
MessageQueue
MessageQueue 为消息队列,承载 Message 对象的结构。其可以安排 Messgae 的插入,删除,查找,退出等。我们分别来看一下其内部结构。
MessageQueue 的初始化
Java 层的 MessageQueue 初始化,直接走向 Native 层,通过 JNI 指向 Native 层,后续分析 Native 层的逻辑。
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
// 调用 native 层,对应于 android_os_MessageQueue.cpp
mPtr = nativeInit();
}
enqueueMessage 发送消息
Java 层的 Handler 通过 send*Mesage 等通过 MessageQueue 内部方法 enqueueMessage 来发送消息,代码如下:
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
synchronized (this) {
// 退出轮训
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
// 回收资源
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
// 找到合适的位置,添加 msg 到队列中去
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
// 需要唤醒操作,Native 操作
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
MessageQueue 通过 enqueueMessage 将 msg 封装到队列中去,之后如果需要唤醒,便会调用 nativeWake 进行唤醒。
next 获取消息
Looper 消息轮训器通过无限循环调用 MessageQueue 的 next方法,不断获取消息并处理,代码如下:
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
// 无限循环
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
// 调用 Native 层的 nativePollOnce
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
// 异步消息
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
// 延迟消息处理
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
// 获取消息并返回
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
// 最大空闲线程
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
// 调用空闲线程 handler 的 queueIdle
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
从上面可以看出,next 优先调用 nativePollOnce 处理 Native 层的消息,其次在获取到 Java 层 MessageQueue 中的消息并返回。
quit 退出
MessageQueue 通过 quit 方法,完成 Looper轮训器的退出操作。
void quit(boolean safe) {
if (!mQuitAllowed) {
throw new IllegalStateException("Main thread not allowed to quit.");
}
synchronized (this) {
if (mQuitting) {
return;
}
mQuitting = true;
if (safe) {
removeAllFutureMessagesLocked();
} else {
removeAllMessagesLocked();
}
// We can assume mPtr != 0 because mQuitting was previously false.
nativeWake(mPtr);
}
}
其上层 Looper 通过普通模式和安全模式调用到此,最终通过 nativeWake 指向 Natvie。
消息以及回调的删除
首先简单看一下消息的删除,方法涉及多个重载,我们这里只看其中一个,代码如下:
// 根据 handler,what,以及 object 查找到 msg 并删除
void removeMessages(Handler h, int what, Object object) {
if (h == null) {
return;
}
synchronized (this) {
// 获取当前消息
Message p = mMessages;
// Remove all messages at front.
// 便利找到对应的消息
while (p != null && p.target == h && p.what == what
&& (object == null || p.obj == object)) {
Message n = p.next;
mMessages = n;
p.recycleUnchecked();
p = n;
}
// Remove all messages after front.
// 删除消息,链接前后消息
while (p != null) {
Message n = p.next;
if (n != null) {
if (n.target == h && n.what == what
&& (object == null || n.obj == object)) {
Message nn = n.next;
n.recycleUnchecked();
p.next = nn;
continue;
}
}
p = n;
}
}
}
删除消息回调
// 根据 handler 以及 object 删除回调
void removeCallbacksAndMessages(Handler h, Object object) {
if (h == null) {
return;
}
synchronized (this) {
Message p = mMessages;
// Remove all messages at front.
while (p != null && p.target == h
&& (object == null || p.obj == object)) {
Message n = p.next;
mMessages = n;
p.recycleUnchecked();
p = n;
}
// Remove all messages after front.
while (p != null) {
Message n = p.next;
if (n != null) {
if (n.target == h && (object == null || n.obj == object)) {
Message nn = n.next;
n.recycleUnchecked();
p.next = nn;
continue;
}
}
p = n;
}
}
}
JNI 层分析
Java 通过 JNI 方式,链接到 Native 层代码 android_os_MessageQueue.cpp,具体代码如下:
static JNINativeMethod gMessageQueueMethods[] = {
/* name, signature, funcPtr */
{ "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },
{ "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },
{ "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },
{ "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },
{ "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },
{ "nativeSetFileDescriptorEvents", "(JII)V",
(void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },
};
int register_android_os_MessageQueue(JNIEnv* env) {
int res = RegisterMethodsOrDie(env, "android/os/MessageQueue", gMessageQueueMethods,
NELEM(gMessageQueueMethods));
jclass clazz = FindClassOrDie(env, "android/os/MessageQueue");
gMessageQueueClassInfo.mPtr = GetFieldIDOrDie(env, clazz, "mPtr", "J");
gMessageQueueClassInfo.dispatchEvents = GetMethodIDOrDie(env, clazz,
"dispatchEvents", "(II)I");
return res;
}
nativeInit
Java 层的 nativeInit 在 MessageQueue 初始化时调用,代码如下:
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
// 新建 NativeMessageQueue,对应与 Java 层
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);
}
其对象 NativeMessageQueue 为内部类,其构造函数如下:
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
// 创建 Native 层的 Looper 对象
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
Native 层的 NativeMessageQueue 在构造时候回依赖 Looper 对象,此时回新建 Native 层的 Looper 对象。
nativeWake
Java 层通过 enqueueMessage 发消息,quit 退出以及 removeSyncBarrier 方法都会调用到 nativeWake,其指向函数 android_os_MessageQueue_nativeWake,代码如下:
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
同样依赖内部成员 nativeMessageQueue->wake() 方法,最终指向 mLooper->wake()。
void NativeMessageQueue::wake() {
mLooper->wake();
}
nativePollOnce
Java 层的 nativePollOnce 调用于 上层 MessageQueue 的 next 方法,用于不断获取数据处理操作,这里是优先处理 Native 的逻辑。在这里指向函数 android_os_MessageQueue_nativePollOnce,,如下:
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
函数最终指向了 nativeMessageQueue->pollOnce,代码如下:
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
// 调用持有对象 looper 的 pollOnce
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
此处通过持有对象 Native 层的 looper,调用到 pollOnce 方法,后面具体分析。
分析到这里,我们之是大概的分析了其内部调用逻辑,JNI 部分也只是衔接 Java 和 Native 部分,具体的逻辑还是在下面的 Native 层章节具体分析。
Native 层三巨头
Java 中存在的三巨头,Native 也同样存在,下面我们来具体分析。
NativeMessageQueue
Java 层建议对象 MessageQueue 时,便和 Native 曾建立联系,NativeMessageQueue 声明在 android_os_MessageQueue.h 中,其依赖两个父类,如下:
class MessageQueue : public virtual RefBase {
public:
inline sp<Looper> getLooper() const {
return mLooper;
}
bool raiseAndClearException(JNIEnv* env, const char* msg);
virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj) = 0;
protected:
MessageQueue();
virtual ~MessageQueue();
protected:
sp<Looper> mLooper;
};
class LooperCallback : public virtual RefBase {
protected:
virtual ~LooperCallback() { }
public:
virtual int handleEvent(int fd, int events, void* data) = 0;
};
class NativeMessageQueue : public MessageQueue, public LooperCallback {
public:
NativeMessageQueue();
virtual ~NativeMessageQueue();
virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj);
void pollOnce(JNIEnv* env, jobject obj, int timeoutMillis);
void wake();
void setFileDescriptorEvents(int fd, int events);
virtual int handleEvent(int fd, int events, void* data);
private:
JNIEnv* mPollEnv;
jobject mPollObj;
jthrowable mExceptionObj;
};
上面可以看出,父类 MessageQueue 主要提供 raiseAndClearException 和
raiseException 方法,同时持有成员变量 mLooper 轮训器。父类 LooperCallback 则更简单,提供一个共有接口 handleEvent。
NativeMessageQueue 构造函数
NativeMessageQueue 的构造函数如下:
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
创建 Native 层的 Looper 对象,设置 Looper 线程的 looper 对象。
wake
NativeMessageQueue 的 wake 函数相对简单,代码如下:
void NativeMessageQueue::wake() {
mLooper->wake();
}
pollOnce
NativeMessageQueue 的 pollOnce 实现对消息队列的轮训检查,具体代码如下:
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
// 调用持有对象 looper 的 pollOnce
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
最终实现方法还是依赖 Looper 的 pollOnce,看来 Native 层的 Looper 才是最终要干活的人啊!
Looper(Native)
上面分析,均已 Looper 截断,此处我们具体分析 Looper 的逻辑。
Looper 的构造函数
Native 层的 Looper 类相对最复杂,我们首先分析其工作流程,剩下的其他部分后续再补充说明。代码如下:
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
// 创建事件对象 mWakeEventFd,不阻塞方式
mWakeEventFd = eventfd(0, EFD_NONBLOCK);
LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd. errno=%d", errno);
AutoMutex _l(mLock);
rebuildEpollLocked();
}
Looper 初始化将创建一个时间对象用于唤醒,eventfd 方式的具体解释,可以参考一下博文
构造函数最后,调用 rebuildEpollLocked 重新构建 epoll 下文件描述符的监听任务,如下代码:
void Looper::Request::initEventItem(struct epoll_event* eventItem) const {
int epollEvents = 0;
if (events & EVENT_INPUT) epollEvents |= EPOLLIN;
if (events & EVENT_OUTPUT) epollEvents |= EPOLLOUT;
memset(eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem->events = epollEvents;
eventItem->data.fd = fd;
}
void Looper::rebuildEpollLocked() {
// Close old epoll instance if we have one.
if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
close(mEpollFd);
}
// Allocate the new epoll instance and register the wake pipe.
// 创建 epoll 监听文件描述符,监数目 8
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd;
// 将 mWakeEventFd 文件描述符加入监听列表中
int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance. errno=%d",
errno);
// mRequests 为 KeyedVector<int, Request> 类型,通过 index 和 Request 进行绑定,后续进行结构体的具体分析
for (size_t i = 0; i < mRequests.size(); i++) {
// 获取对应的 Request
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
// 初始化 eventItem
request.initEventItem(&eventItem);
// 加入监听中
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d",
request.fd, errno);
}
}
}
如此一来,Looper 便完成了相关文件描述符的监听,其中包括事件通知对象 mWakeEventFd。至于 mRequests 内部对象的添加,后续分析。
wake
Looper 的 wake 函数实际上是最终调用的函数,代码如下:
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
uint64_t inc = 1;
// 想文件描述符中写入数据,以便唤醒
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
wake 函数通过向 mWakeEventFd 写入数据,此时发生了数据写入操作,在 pollOnce 中监听便得到响应。
pollOnce
Java 层的 Looper 在无限循环中,通过 next 获取消息并处理,优先处理 Native 的消息,机调用 pollOnce 走到次数,代码如下:
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
// 无限循环
for (;;) {
// 优先处理 response,此处的 mResponses 为一个 Vector<Response> 类型
while (mResponseIndex < mResponses.size()) {
// 取出对应的 response
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
// 赋值
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
// 直接返回 ident,此时的 ident 为大于 0 的值
return ident;
}
}
// 一次轮训结束后,从下面返回的 result
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}
// 调用 pollInner 再次处理,依然在无限循环内部
result = pollInner(timeoutMillis);
}
}
代码逻辑中,优先处理 mResponses 中对应的 Response,存在赋值并直接返回 ident,否则调用 pollInner 进入下层逻辑。代码如下:
int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d",
this, mNextMessageUptime - now, timeoutMillis);
#endif
}
// Poll.
int result = POLL_WAKE;
// mResponses 处理完成,清理工作
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mPolling = true;
// epoll_event 结构体,容纳 16 个文件描述符事件单位的数组
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// epoll_wait 等待在 mEpollFd 文件描述符上发生事件
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mPolling = false;
// Acquire lock.
mLock.lock();
// Rebuild epoll set if needed.
// 是否有需要重新构建 epoll 监听列表
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}
// Check for poll error.
// epoll 监听发生错误
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d", errno);
result = POLL_ERROR;
goto Done;
}
// Check for poll timeout.
// epoll 监听超时
if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = POLL_TIMEOUT;
goto Done;
}
// Handle all events.
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
// 处理文件节点发生的事件,eventCount 为监听文件描述符发生事件的个数
for (int i = 0; i < eventCount; i++) {
// 获取发生事件的具体文件描述符
int fd = eventItems[i].data.fd;
// 获取对应发生事件类型
uint32_t epollEvents = eventItems[i].events;
// 唤醒操作,即客户端调用 wake 向 mWakeEventFd 写入数据,追溯上层 Java 层可以调用 nativeWake 引起
if (fd == mWakeEventFd) {
if (epollEvents & EPOLLIN) {
// 唤醒操作
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
// 其他文件描述符使用了 <index,Request> 来存储,这里从 mRequests 中获取获取 index,返回值 requestIndex 大于 0 即为获取成功,存在
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
// 调用 pushResponse 继续处理,从 mRequests 中 <int,Request> 获取fd对应的 Request,并添加到 mResponses 中
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
// 调用对应的 msg 的 handleMessage 方法,具体分析见后面
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
// Invoke all response callbacks.
// 处理 mResponses 中具体的 response
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
// response 中元素 request.ident 为 POLL_CALLBACK,即含有回调函数
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
// 回调 callback 的 handleEvent 函数
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
// 成功,移除工作
removeFd(fd, response.request.seq);
}
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
return result;
}
void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
// 再次使用 mResponses 进行处理,调用 push 后,mResponses 中便添加了新元素 response
mResponses.push(response);
}
以上代码众多,主要涉及到了 epoll 机制,如果读者对 epoll 不了解的话,可能需要重新学习一下 epoll 机制了,这里不做过多说明了,有需要请参考之前的博文,这里对上述代码做一个总结:
- pollOnce 中主要对 mResponses 信息进行处理,如果 response.request.ident 大于0,就取出信息直接返回,否则再调用 pollInner 继续处理
- pollInner 中首先针对过期的消息设置超时时间,然后再清理 Responses 数据,因为之前已经处理了 Responses 中的数据,后续要对发生事件的文件描述符进行统计,再次集中记录到 Responses 处理
- 接下来便是基本的 epoll 模型,epoll_wait 监听文件描述符事件发生
- 分别处理 epoll 监听错误,监听超时等错误
- 接下来处理文件节点发生的事件,eventCount 返回发生事件文件描述符个数
- 如果 fd 为 mWakeEventFd,即进行唤醒操作,执行 awoken
- 对于其他文件描述符,Looper 使用了 <int,Request> 来存储,这里从 mRequests 中获取获取 index,返回值 requestIndex 大于 0 即为获取成功,之后调用 pushResponse 集中添加到 mResponses 中
- 遍历完成后,便开始对消息进行处理,优先处理 mMessageEnvelopes 中的消息回调
- 再处理 mResponses 中具体的 response,这里的是之前文件描述符发生事件写入的集合列表
- 最后执行相应的清理任务并返回执行结果
上述代码中,需要读者对 epoll 机制有一定的了解,否则真的难以看懂,之前博主已经对 epoll 进行了相关的介绍,请自行查看,这里主要是处在两个数据集合,一个是 mResponses,另一个是 mMessageEnvelopes,下面我们具体分析一下这两个数据集合。
mResponses 数据分析
Looper.h 的头文件中声明了 Vector
struct Response {
int events;
Request request;
};
结构体 Response 内部有两个元素,events 即位事件,那么 Request 又是什么?
struct Request {
int fd;
int ident;
int events;
int seq;
sp<LooperCallback> callback;
void* data;
void initEventItem(struct epoll_event* eventItem) const;
};
到这里就清晰明了了,Request 集中封装了事件发生的文件结构,例如文件描述符,事件类型,事件回调,以及额外数据,同时提供函数指针 initEventItem 用于 Looper 中实现快捷的将 int events 类型转换到 epoll 原型监听文件结构,如下:
void Looper::Request::initEventItem(struct epoll_event* eventItem) const {
int epollEvents = 0;
// 根据 events 转换到 epoll 下的事件类型
if (events & EVENT_INPUT) epollEvents |= EPOLLIN;
if (events & EVENT_OUTPUT) epollEvents |= EPOLLOUT;
memset(eventItem, 0, sizeof(epoll_event));
// 赋值操作
eventItem->events = epollEvents;
eventItem->data.fd = fd;
}
由上可以看出,mResponses 中存放了 Vector
数据添加
到这里,数据结构分析完毕了,那么文件监听是如何添加进去的呢?Looper 中有一个函数如下:
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);
}
SimpleLooperCallback 继承自 LooperCallback,如下:
class SimpleLooperCallback : public LooperCallback {
protected:
virtual ~SimpleLooperCallback();
public:
SimpleLooperCallback(Looper_callbackFunc callback);
virtual int handleEvent(int fd, int events, void* data);
private:
Looper_callbackFunc mCallback;
};
SimpleLooperCallback::SimpleLooperCallback(Looper_callbackFunc callback) :
mCallback(callback) {
}
int SimpleLooperCallback::handleEvent(int fd, int events, void* data
{
return mCallback(fd, events, data);
}
子实现类 SimpleLooperCallback, 构造函数维持有 Looper_callbackFunc 对象,需要实现 handleEvent 方法,下面继续看 addFd 函数:
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
events, callback.get(), data);
#endif
// 存在为空
if (!callback.get()) {
if (! mAllowNonCallbacks) {
ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
return -1;
}
if (ident < 0) {
ALOGE("Invalid attempt to set NULL callback with ident < 0.");
return -1;
}
} else {
ident = POLL_CALLBACK;
}
{ // acquire lock
AutoMutex _l(mLock);
// 使用 Request 结构体承载数据
Request request;
request.fd = fd;
request.ident = ident;
request.events = events;
request.seq = mNextRequestSeq++;
request.callback = callback;
request.data = data;
if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1
struct epoll_event eventItem;
// 将 Request 转化成 epoll 需要的 eventItem 数据结构
request.initEventItem(&eventItem);
ssize_t requestIndex = mRequests.indexOfKey(fd);
// 没有添加过,直接添加监听
if (requestIndex < 0) {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
// mRequests 为一个 KeyedVector<int, Request>,没有记录项,添加
mRequests.add(fd, request);
} else {
// 监听存在,修改属性
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
// 添加出错
if (epollResult < 0) {
if (errno == ENOENT) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor "
"being recycled, falling back on EPOLL_CTL_ADD, errno=%d",
this, errno);
#endif
epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error modifying or adding epoll events for fd %d, errno=%d",
fd, errno);
return -1;
}
scheduleEpollRebuildLocked();
} else {
ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
}
// 替换对应元素
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return 1;
}
Native 层提供的 addfd 函数,Java 层通过函数 nativeSetFileDescriptorEvents 连接到 Native,JNI 接口函数如下:
static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz,
jlong ptr, jint fd, jint events) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->setFileDescriptorEvents(fd, events);
}
至于 Java 层添加等详细流程,后续的分析这里就不展开了。
数据删除
既然有了添加,那么肯定有移除操作了,如下;
int Looper::removeFd(int fd, int seq) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeFd - fd=%d, seq=%d", this, fd, seq);
#endif
{ // acquire lock
AutoMutex _l(mLock);
// 从 mRequests 集合中获取 fd 对应的 index
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
return 0;
}
if (seq != -1 && mRequests.valueAt(requestIndex).seq != seq) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeFd - sequence number mismatch, oldSeq=%d",
this, mRequests.valueAt(requestIndex).seq);
#endif
return 0;
}
// 移除
mRequests.removeItemsAt(requestIndex);
// 将 fd 从 epoll 监听节点 mEpollFd 中删除
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL);
if (epollResult < 0) {
if (seq != -1 && (errno == EBADF || errno == ENOENT)) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeFd - EPOLL_CTL_DEL failed due to file descriptor "
"being closed, errno=%d", this, errno);
#endif
scheduleEpollRebuildLocked();
} else {
ALOGE("Error removing epoll events for fd %d, errno=%d", fd, errno);
scheduleEpollRebuildLocked();
return -1;
}
}
} // release lock
return 1;
}
同样,Java 层也可以通过 JNI 进行回调。
数据回调
mResponses 中的数据发生改变是如何回调的呢,之前我们在函数 pollInner 中,有如下代码
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
// response 中元素 request.ident 为 POLL_CALLBACK,即含有回调函数
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
// 回调 callback 的 handleEvent 函数
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
// 成功,移除工作
removeFd(fd, response.request.seq);
}
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
具体对应的函数如下:
int NativeMessageQueue::handleEvent(int fd, int looperEvents, void* data) {
int events = 0;
// 事件类型转换
if (looperEvents & Looper::EVENT_INPUT) {
events |= CALLBACK_EVENT_INPUT;
}
if (looperEvents & Looper::EVENT_OUTPUT) {
events |= CALLBACK_EVENT_OUTPUT;
}
if (looperEvents & (Looper::EVENT_ERROR | Looper::EVENT_HANGUP | Looper::EVENT_INVALID)) {
events |= CALLBACK_EVENT_ERROR;
}
int oldWatchedEvents = reinterpret_cast<intptr_t>(data);
// 反过来回调 Java 层的 dispatchEvents 方法
int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj,
gMessageQueueClassInfo.dispatchEvents, fd, events);
if (!newWatchedEvents) {
return 0; // unregister the fd
}
if (newWatchedEvents != oldWatchedEvents) {
// 更改 Native 层的 fd
setFileDescriptorEvents(fd, newWatchedEvents);
}
return 1;
}
void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) {
if (events) {
int looperEvents = 0;
if (events & CALLBACK_EVENT_INPUT) {
looperEvents |= Looper::EVENT_INPUT;
}
if (events & CALLBACK_EVENT_OUTPUT) {
looperEvents |= Looper::EVENT_OUTPUT;
}
mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this,
reinterpret_cast<void*>(events));
} else {
mLooper->removeFd(fd);
}
}
函数中 mPollEnv->CallIntMethod(mPollObj,gMessageQueueClassInfo.dispatchEvents, fd, events) 中指明了函数的出处,此处通过 JNI 指向了 Java 层接口回调,如下:
private int dispatchEvents(int fd, int events) {
// Get the file descriptor record and any state that might change.
final FileDescriptorRecord record;
final int oldWatchedEvents;
final OnFileDescriptorEventListener listener;
final int seq;
synchronized (this) {
record = mFileDescriptorRecords.get(fd);
if (record == null) {
return 0; // spurious, no listener registered
}
oldWatchedEvents = record.mEvents;
events &= oldWatchedEvents; // filter events based on current watched set
if (events == 0) {
return oldWatchedEvents; // spurious, watched events changed
}
listener = record.mListener;
seq = record.mSeq;
}
// Invoke the listener outside of the lock.
// 通过 listener 监听接口,回调函数 onFileDescriptorEvents
int newWatchedEvents = listener.onFileDescriptorEvents(
record.mDescriptor, events);
if (newWatchedEvents != 0) {
newWatchedEvents |= OnFileDescriptorEventListener.EVENT_ERROR;
}
if (newWatchedEvents != oldWatchedEvents) {
synchronized (this) {
int index = mFileDescriptorRecords.indexOfKey(fd);
if (index >= 0 && mFileDescriptorRecords.valueAt(index) == record
&& record.mSeq == seq) {
record.mEvents = newWatchedEvents;
if (newWatchedEvents == 0) {
mFileDescriptorRecords.removeAt(index);
}
}
}
}
return newWatchedEvents;
}
至此,基本分析完了 mResponses 中的数据结构,包括数据的添加,回调以及删除,下面我们来看 mMessageEnvelopes 数据结构。
mMessageEnvelopes 数据分析
首先来看其声明部分,如下:
Vector<MessageEnvelope> mMessageEnvelopes;
// 承载消息队列的信封
struct MessageEnvelope {
MessageEnvelope() : uptime(0) { }
MessageEnvelope(nsecs_t uptime, const sp<MessageHandler> handler,
const Message& message) : uptime(uptime), handler(handler), message(message) {
}
nsecs_t uptime;
sp<MessageHandler> handler;
Message message;
};
mMessageEnvelopes 为元素为 MessageEnvelope 类型的 Vector 容器,容器内每一个元素都代表着一个 消息 msg,MessageEnvelope 通过 sp 强引用与 handler 建立链接,每封信封封装了消息的信息,事件,handler接受人等。
MessageEnvelope 的发送
Native 层通过 send*Message 接口发送消息,最终走向 sendMessageAtTime 如下代码:
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d",
this, uptime, handler.get(), message.what);
#endif
size_t i = 0;
{ // acquire lock
AutoMutex _l(mLock);
// 获取 mMessageEnvelopes 的大小
size_t messageCount = mMessageEnvelopes.size();
// 根据时间,找到合适位置进行插入
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}
// 新建信封单位 messageEnvelope 并加入到 mMessageEnvelopes
MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
if (mSendingMessage) {
return;
}
} // release lock
// 信封没有信件,进入 wake 状态
if (i == 0) {
wake();
}
}
代码注释很详细,下面来看信封中信息的删除。
MessageEnvelope 的删除
Looper 中的函数 removeMessages 有多个重载,逻辑差不太多,我们直接看其中一个的一个,根据 handler 与 what 类型进行删除
void Looper::removeMessages(const sp<MessageHandler>& handler, int what) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeMessages - handler=%p, what=%d", this, handler.get(), what);
#endif
{ // acquire lock
AutoMutex _l(mLock);
// 遍历 mMessageEnvelopes 数据结构,找到对应的并删除
for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
if (messageEnvelope.handler == handler
&& messageEnvelope.message.what == what) {
mMessageEnvelopes.removeAt(i);
}
}
} // release lock
}
MessageEnvelope 的回调
对于 MessageEnvelope 的添加和删除操作,同样,在函数 pollOnce 的子函数 pollInner 中,有如下代码实现了信息的回调:
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
// 调用对应的 msg 的 handleMessage 方法
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
通览 pollOnce 函数,可以看出,MessageEnvelope 的信息处理优先级高于 Response 类型的 mResponses 回调。对于 MessageHandler 到下一节中介绍。
MessageHandler
Native 层的 Handler 对应于 MessageHandler,其在 Looper 中声明如下:
class MessageHandler : public virtual RefBase {
protected:
virtual ~MessageHandler() { }
public:
/**
* Handles a message.
*/
virtual void handleMessage(const Message& message) = 0;
};
基类 MessageHandler 只声明了 handleMessage 接口,而实际上实现的确实子类 WeakMessageHandler,如下:
class WeakMessageHandler : public MessageHandler {
protected:
virtual ~WeakMessageHandler();
public:
WeakMessageHandler(const wp<MessageHandler>& handler);
virtual void handleMessage(const Message& message);
private:
wp<MessageHandler> mHandler;
};
WeakMessageHandler::WeakMessageHandler(const wp<MessageHandler>& handler) :
mHandler(handler) {
}
WeakMessageHandler::~WeakMessageHandler() {
}
子类 WeakMessageHandler 通过弱引用方式维持了 MessageHandler 类型的 handler 对象,其 handleMessage 方法如下:
void WeakMessageHandler::handleMessage(const Message& message) {
sp<MessageHandler> handler = mHandler.promote();
if (handler != NULL) {
handler->handleMessage(message);
}
}
代码中并未直接处理 handleMessage,因为弱饮用的关系,可能饮用对象不存在,需要尝试升级到强引用 mHandler.promote,然后再处理 handleMessage。关于智能指针的引用计数,有需要请参考之前的博文。
总结
通过以上内容的介绍,相信读者对安卓中的 Handler 通信机制有更深的了解。本文通过源码分析的方式,从 Java 到 JNI 到 Native,分别分析了 Handler、Looper、MessageQueue三者的关系,网上很多博主对其进行了总结,感觉也十分出众,这里本人也画了一幅小图来总结一下其之间的关系,如下:

在分析 MessageQueue 的过程中,本人使用 Xmind 思维导图将函数之间的调用关系进行了相关的汇总,图片如下:

文章内容涉及 epoll 模型,强烈推荐读者了解一下 select,poll,epoll等高并发模型,在后续代码分析中将会如鱼得水。
文件从邓凡平前辈的相关博文中汲取灵感,推荐博文如下: