初识 Thread/Looper/MessageQueue/Handler

学习 Android 中定时器实现方式时候,发现有下面这些方式。

实现方式 优点 缺点 使用场景 所用的API
普通线程sleep的方式 简单易用,可用于一般的轮询Polling 不精确,不可靠,容易被系统杀死或者休眠 需要在App内部执行短时间的定时任务 Thread.sleep(long)
Timer定时器 简单易用,可以设置固定周期或者延迟执行的任务 不精确,不可靠,容易被系统杀死或者休眠 需要在App内部执行短时间的定时任务 Timer.schedule(TimerTask,long)
ScheduledExecutorService 灵活强大,可以设置固定周期或者延迟执行的任务,并支持多线程并发 不精确,不可靠,容易被系统杀死或者休眠 需要在App内部执行短时间且需要多线程并发的定时任务 Executors.newScheduledThreadPool(int).schedule(Runnable,long,TimeUnit)
Handler中的postDelayed方法 简单易用,可以设置延迟执行的任务,并与UI线程交互 不精确,不可靠,容易被系统杀死或者休眠 需要在App内部执行短时间且需要与UI线程交互的定时任务 Handler.postDelayed(Runnable,long)
Service + AlarmManger + BroadcastReceiver 可靠稳定,可以设置精确或者不精确的闹钟,并在后台长期运行 需要声明相关权限,并受系统时间影响 需要在App外部执行长期且对时间敏感的定时任务 AlarmManager.set(int,PendingIntent), BroadcastReceiver.onReceive(Context,Intent), Service.onStartCommand(Intent,int,int)
WorkManager 可靠稳定,不受系统时间影响,并可以设置多种约束条件来执行任务 需要添加依赖,并不能保证准时执行 需要在App外部执行长期且对时间不敏感且需要满足特定条件才能执行的定时任务 WorkManager.enqueue(WorkRequest), Worker.doWork()
RxJava 简洁、灵活、支持多线程、支持背压、支持链式操作 学习曲线较高、内存占用较大 需要处理复杂的异步逻辑或数据流 io.reactivex:rxjava:2.2.21
CountDownTimer 简单易用、不需要额外的线程或handler 不支持取消或重置倒计时、精度受系统时间影响 需要实现简单的倒计时功能 android.os.CountDownTimer
协程+Flow 语法简洁、支持协程作用域管理生命周期、支持流式操作和背压 需要引入额外的依赖库、需要熟悉协程和Flow的概念和用法 需要处理异步数据流或响应式编程 kotlinx-coroutines-core:1.5.0
使用downTo关键字和Flow实现一个定时任务 1、可以使用简洁的语法创建一个倒数的范围 2 、可以使用Flow异步地发射和收集倒数的值3、可以使用onEach等操作符对倒数的值进行处理或转换 1、需要注意倒数的范围是否包含0,否则可能会出现偏差 2、需要注意倒数的间隔是否与delay函数的参数一致,否则可能会出现不准确 3、需要注意取消或停止Flow的时机,否则可能会出现内存泄漏或资源浪费 1、适合于需要实现简单的倒计时功能,例如显示剩余时间或进度 2、适合于需要在倒计时过程中执行一些额外的操作,例如播放声音或更新UI 3、适合于需要在倒计时结束后执行一些额外的操作,例如跳转页面或弹出对话框 implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0"
Kotlin 内联函数的协程和 Flow 实现 很容易离开主线程,样板代码最少,协程完全活用了 Kotlin 语言的能力,包括 suspend 方法。可以处理大量的异步数据,而不会阻塞主线程。 可能会导致内存泄漏和性能问题。 处理 I/O 阻塞型操作,而不是计算密集型操作。 kotlinx.coroutines 和 kotlinx.coroutines.flow

在学习其中两种实现方式时候遇到多线程问题。

CountDownTimer

  1. 主线程中使用

本地测试代码如下:

private CountDownTimer countDownTimer;

private void initCountDownTimer() {
    System.out.println("construct and start countDownTimer in thread: " + Thread.currentThread().getName();

    countDownTimer = new CountDownTimer(3000, 1000) {
        @Override
        public void onTick(long millisUntilFinished) {
            System.out.println("count down onTick: " + Thread.currentThread().getName());
        }

        @Override
        public void onFinish() {
            System.out.println("count down onFinish: " + Thread.currentThread().getName());
        }
    };

    countDownTimer.start();
}

public static void main(String[] args) {
    initCountDownTimer();
}

日志如下:

construct and start countDownTimer in thread: main
count down onTick: main
count down onTick: main
count down onFinish: main

跟自己预期的一样,主线程中构造并调用start,能满足回调方法也是在主线程中执行相应的回调方法。

  1. 后台线程中使用
    查了下 Thread 类的简单说明,构造时候传入一个 Runnable 对象,再调用 start 方法,其中就会从系统申请创建一个线程并执行 runnable 对象中的 run 方法。
    于是修改本地测试代码如下(由于只想在新建的线程中一次性构造 CountDownTimer 对象并调用 start 即可,于是不持有 thread 对象,让其执行完成则自己销毁):
public static void main(String[] args) {
    //initCountDownTimer();
    new Thread(new Runnable() {
        @Override
        public void run() {
            initCountDownTimer();
        }
    }).start();
}

日志如下:

construct and start countDownTimer in thread: Thread-121

发现并没有出发定时器的回调方法(自己隐约也有这种感觉,因为这个临时线程只是完成上面的构造和调用 start 则销毁,定时器的实现逻辑如果不在主线程就不可能会回调)

于是看了下 CountDownTimer 的实现,验证下自己的想法。发现其中有个私有的成员变量 mHandler,在其中实现了定时器的处理回调逻辑(主要就是一旦收到消息,就判断时间决定是 onFinish() 还是 onTick(),并计算下一次应该 onTick() / onFinish() 回调的时间点,发送一个 delayMessage 给 handler)。

private Handler mHandler = new Handler() {
    @Override
    public void handleMessage(Message msg) {
        ...
        onTick();
        ...
        onFinish();
        ...
    }
}

CountDownTimer 的 start 方法实现如下:

public synchronized final CountDownTimer start() {
    mCancelled = false;
    if (mMillisInFuture <= 0) {
        onFinish();
        return this;
    }
    mStopTimeInFuture = SystemClock.elapsedRealtime() + mMillisInFuture;
    mHandler.sendMessage(mHandler.obtainMessage(MSG));
    return this;
}

可以看到这要是立即向 mHandler 发送消息,让其开始第一次回调判断处理。
于是产生了一个疑问,这个 handler 的 handleMessage 代码到底会在那个线程中执行呢?
阅读 Handler 源码中构造函数发现,其中通过 Looper.myLooper() 获取并持有为 mLooper,并通过该 looper 对象获取并持有队列 mQueue:

public Handler(@Nullable Callback callback, boolean async) {
    if (FIND_POTENTIAL_LEAKS) {
        final Class<? extends Handler> klass = getClass();
        if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
                (klass.getModifiers() & Modifier.STATIC) == 0) {
            Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
                klass.getCanonicalName());
        }
    }

    mLooper = Looper.myLooper();
    if (mLooper == null) {
        throw new RuntimeException(
            "Can't create handler inside thread " + Thread.currentThread()
                    + " that has not called Looper.prepare()");
    }
    mQueue = mLooper.mQueue;
    mCallback = callback;
    mAsynchronous = async;
    mIsShared = false;
}

于是我们需要继续查找
handler.sendMessage()、handler.sendMessageDelayed()、handler.post(runnable)、handler.postDelayed(runnable) 这些方法时候,这些任务会在那个线程执行,跟上述的 mLooper 对象和 mQueue 又有什么关系呢?

public final boolean sendMessageDelayed(@NonNull Message msg, long delayMillis) {
    if (delayMillis < 0) {
        delayMillis = 0;
    }
    return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}

public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
    MessageQueue queue = mQueue;
    if (queue == null) {
        RuntimeException e = new RuntimeException(
                this + " sendMessageAtTime() called with no mQueue");
        Log.w("Looper", e.getMessage(), e);
        return false;
    }
    return enqueueMessage(queue, msg, uptimeMillis);
}

private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
        long uptimeMillis) {
    msg.target = this;
    msg.workSourceUid = ThreadLocalWorkSource.getUid();

    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);
}

public final boolean post(@NonNull Runnable r) {
   return  sendMessageDelayed(getPostMessage(r), 0);
}

public final boolean postDelayed(@NonNull Runnable r, long delayMillis) {
    return sendMessageDelayed(getPostMessage(r), delayMillis);
}

private static Message getPostMessage(Runnable r) {
    Message m = Message.obtain();
    m.callback = r;
    return m;
}

通过 Handler 的源码发现上述调用最终都是往 mQueue 中加入一个 msg 消息(Message 有个全局的复用对象池,使用双向链表)

msg.target = handler
msg.callback = r;  // 如果是 post(runnable) 相关方法,callback 即为该 runnable 对象,否则为 NULL
msg.what = 用户传入;
msg.args1 = 用户传入;
msg.args2 = 用户传入;
msg.obj = 用户传入;

handler 中的 handleMessage() 方法会在 dispatchMessage() 中被调用:

public void dispatchMessage(@NonNull Message msg) {
    if (msg.callback != null) {
        handleCallback(msg);
    } else {
        if (mCallback != null) {
            if (mCallback.handleMessage(msg)) {
                return;
            }
        }
        handleMessage(msg);
    }
}

接着查找是谁调用了 handler.dispatchMessage() 方法,就能知道为什么上述的 CountDownTimer 为什么不能按照设想的在后台线程正常回调,怎样才能让其正常回调。通过搜索发现 Looper 中的关键源码如下:

public static void prepare() {
    prepare(true);
}

private static void prepare(boolean quitAllowed) {
    if (sThreadLocal.get() != null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
}

public static @Nullable Looper myLooper() {
    return sThreadLocal.get();
}

public static @NonNull MessageQueue myQueue() {
    return myLooper().mQueue;
}

private Looper(boolean quitAllowed) {
    mQueue = new MessageQueue(quitAllowed);
    mThread = Thread.currentThread();
}

public static void loop() {
    final Looper me = myLooper();
    if (me == null) {
        throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
    }
    if (me.mInLoop) {
        Slog.w(TAG, "Loop again would have the queued messages be executed"
                + " before this one completed.");
    }

    me.mInLoop = true;

    // Make sure the identity of this thread is that of the local process,
    // and keep track of what that identity token actually is.
    Binder.clearCallingIdentity();
    final long ident = Binder.clearCallingIdentity();

    // Allow overriding a threshold with a system prop. e.g.
    // adb shell 'setprop log.looper.1000.main.slow 1 && stop && start'
    final int thresholdOverride =
            SystemProperties.getInt("log.looper."
                    + Process.myUid() + "."
                    + Thread.currentThread().getName()
                    + ".slow", -1);

    me.mSlowDeliveryDetected = false;

    for (;;) {
        if (!loopOnce(me, ident, thresholdOverride)) {
            return;
        }
    }
}

private static boolean loopOnce(final Looper me,
        final long ident, final int thresholdOverride) {
    Message msg = me.mQueue.next(); // might block
    if (msg == null) {
        // No message indicates that the message queue is quitting.
        return false;
    }

    // This must be in a local variable, in case a UI event sets the logger
    final Printer logging = me.mLogging;
    if (logging != null) {
        logging.println(">>>>> Dispatching to " + msg.target + " "
                + msg.callback + ": " + msg.what);
    }
    // Make sure the observer won't change while processing a transaction.
    final Observer observer = sObserver;

    final long traceTag = me.mTraceTag;
    long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;
    long slowDeliveryThresholdMs = me.mSlowDeliveryThresholdMs;

    final boolean hasOverride = thresholdOverride >= 0;
    if (hasOverride) {
        slowDispatchThresholdMs = thresholdOverride;
        slowDeliveryThresholdMs = thresholdOverride;
    }
    final boolean logSlowDelivery = (slowDeliveryThresholdMs > 0 || hasOverride)
            && (msg.when > 0);
    final boolean logSlowDispatch = (slowDispatchThresholdMs > 0 || hasOverride);

    final boolean needStartTime = logSlowDelivery || logSlowDispatch;
    final boolean needEndTime = logSlowDispatch;

    if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
        Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
    }

    final long dispatchStart = needStartTime ? SystemClock.uptimeMillis() : 0;
    final long dispatchEnd;
    Object token = null;
    if (observer != null) {
        token = observer.messageDispatchStarting();
    }
    long origWorkSource = ThreadLocalWorkSource.setUid(msg.workSourceUid);
    try {
        msg.target.dispatchMessage(msg);
        if (observer != null) {
            observer.messageDispatched(token, msg);
        }
        dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
    } catch (Exception exception) {
        if (observer != null) {
            observer.dispatchingThrewException(token, msg, exception);
        }
        throw exception;
    } finally {
        ThreadLocalWorkSource.restore(origWorkSource);
        if (traceTag != 0) {
            Trace.traceEnd(traceTag);
        }
    }
    if (logSlowDelivery) {
        if (me.mSlowDeliveryDetected) {
            if ((dispatchStart - msg.when) <= 10) {
                Slog.w(TAG, "Drained");
                me.mSlowDeliveryDetected = false;
            }
        } else {
            if (showSlowLog(slowDeliveryThresholdMs, msg.when, dispatchStart, "delivery",
                    msg)) {
                // Once we write a slow delivery log, suppress until the queue drains.
                me.mSlowDeliveryDetected = true;
            }
        }
    }
    if (logSlowDispatch) {
        showSlowLog(slowDispatchThresholdMs, dispatchStart, dispatchEnd, "dispatch", msg);
    }

    if (logging != null) {
        logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
    }

    // Make sure that during the course of dispatching the
    // identity of the thread wasn't corrupted.
    final long newIdent = Binder.clearCallingIdentity();
    if (ident != newIdent) {
        Log.wtf(TAG, "Thread identity changed from 0x"
                + Long.toHexString(ident) + " to 0x"
                + Long.toHexString(newIdent) + " while dispatching to "
                + msg.target.getClass().getName() + " "
                + msg.callback + " what=" + msg.what);
    }

    msg.recycleUnchecked();

    return true;
}

发现其中的 loop() 方法中会一直轮询自己的 mQueue 的消息,然后调用 msg.target.dispatchMessage() 即为上述的 handler.dispatchMessage()。
但是需要调用 Looper.prepare() 方法才会为当前线程创建一个 looper 对象(构造时创建唯一的一个 mQueue 对象)并写入到当前线程的线程变量中,key 为:

static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();

经过上面的代码阅读,CountDownTimer 的 handler 中 handleMessage 方法会在创建 Handler 时候,其中获取到的当前线程的 looper 中执行,往该 handler 发送的消息也都是放到了 looper 对应的 mQueue 中,该线程在 looper 轮询到消息后通过调用 handler.dispatchMessage() 分发给消息对应的 handler。

所以上面放在后台线程构造和调用 start 为什么不能正常定时器回调的原因,就是这个临时后台线程没有创建对应的 Looper 对象并在创建好后、调用loop() 之前调用该 CountDownTimer 的构造方法和 start(主要是构造,因为其中有 mHandler 的初始化会获取当前线程的 looper),即在后台线程使用 CountDownTimer 的前提是需要后台线程的 Looper 已创建,并且在该后台线程中调用 CountDownTimer 的构造方法。

线程开启 Looper 并执行 loop() 方法才会一直轮询其 messageQueue。然后在该线程中处理注册到这个线程 looper 的 handler 中 handler.post(runnable) 或 handler.sendMessage() 等放入的消息。通过那个 handler 发送到 messageQueue 的消息则调用相应的 handler.dispatchMessage 回调(如果是 post(runnable) 则直接执行 runnable.run 方法;否则执行其委托者或者自己的 handleMessage() 实现)。
注意:每个线程只能有 0 或者 1 个 looper 对象/messageQueue对象,但是一个线程的 looper 可以创建多个 handler,相当于任务分组。
所以上述的代码修改成下面这样,就可以使 CountDownTimer 在后台线程也能正常回调了。

new Thread(new Runnable() {
    @Override
    public void run() {
        Looper.prepare();

        initCountDownTimer();

        Looper.loop();
    }
}).start();

日志如下:

construct and start countDownTimer in thread: Thread-4875
count down onTick: Thread-4875
count down onTick: Thread-4875
count down onTick: Thread-4875
count down onFinish: Thread-4875

但是这个定时器即使 onFinish(),该后台线程也不会停止,要注意防止线程泄漏(详见下面总结中第 9 条)。

Timer

在使用 Timer 实现中发现,schedule 的 TimerTask 的 run 方法会在一个后台线程中执行,不管是不是在主线中调用 schedule 方法,这点需要注意一下。

查了下 Timer 的源码实现,找到的原因:

  1. Timer 构造时候会自己创建一个自定义的优先队列容器 TaskQueue 类对象(其中的任务抽象为 TimerTask),然后创建一个自定义的线程类 TimerThread(重写了其中的 run,自己实现了一个 loop 轮询 queue 中下一个到达的定时任务,然后处理);
  2. 上面的 TaskQueue 类不是线程安全的,是 Timer 和 TimerThread 中通过 synchronized(queue) 实现线程安全的;
  3. timer.schedule 系列方法只是构造一个 TimerTask 任务放入该优先队列中;

Thread/Looper/MessageQueue/Handler 一些总结

综上,Thread/Looper/MessageQueue/Handler 的关系类图和一些知识点总结如下:

类图关系
  1. 主线程默认开启 looper,可以通过 Looper.getMainLooper() 获取。操作放入主线程执行,常见使用如下:
// 1
new Handler(Looper.getMainLooper()).post(new Runnable() {
    @Override
    public void run() {
        xxxx;
    }
});

// 2
new Handler(Looper.getMainLooper()).post(() -> { xxxx; });

// 3、调用已有的工具方法,例如 Activity 中的
public final void runOnUiThread(Runnable action) {
    if (Thread.currentThread() != mUiThread) {
        mHandler.post(action);
    } else {
        action.run();
    }
}
  1. LiveData 中 postValue() 方法中则是放到主线程中执行,方便订阅者(一般是 View 层)直接操作 UI。

  2. 自己创建的后台线程默认不开启 looper,需要自己在线程的 run 实现中调用 Looper.prepare() 和 Looper.loop() 来开启 looper,注意 loop() 方法会一直循环轮询消息队列中的消息分发给相应的 handler 处理直到线程异常中止或相应中断退出机制退出循环。在该线程中 Looper.prepare() 后则可以通过 Looper.myLooper() 获取到该线程已经启用的 looper(在主线程调用 Looper.myLooper() 则获取到的是主线程的 looper)。

  3. 一个线程只有启用了 looper,才能构造相应的 handler 在这个后台线程通过 handler 发消息和处理消息。一个 looper 可用于创建多个 handler,类似于任务分组。

  4. handler 的 sendMessage、sendMessageDelayed、post(runnable)、postDelayed(runnable) 都是构造一个消息体往相应的线程的消息队列中插入。消息各属性如下:

msg.target = handler
msg.callback = r;  // 如果是 post(runnable) 相关方法,callback 即为该 runnable 对象,否则为 NULL
msg.what = 用户传入;
msg.args1 = 用户传入;
msg.args2 = 用户传入;
msg.obj = 用户传入;

已经通过上述方法插入消息队列的消息,如果还未调度执行,则可以通过调用 handler 相应的 removeCallbacks()、removeMessages() 等接口从队列中移除。

线程 looper 轮询 messageQueue 后调用相应的 handler.dispatchMessage(msg) 方法,如果是 Runnable 的任务则直接调用,否则调用委托对象或者重载的 handleMessage 来处理消息(注意:messageQueue 还支持屏障消息和异步消息,详见下面总结的第 10 条)

  1. handler.post(runnable)、sendMessage() 接口与 iOS 开发中的 dispatch(queue, block) 接口不同,handler 只能关联某个线程的 queue(开启looper后),但是 handler 不仅支持 runnable 对象,也支持定制消息体和处理逻辑,方便用户定制一些共同的处理(变化的地方用消息的参数区分),想要并发调度执行请使用 android 中提供的 ExecutorService 等线程池相关类。

  2. 可以使用 HandlerThread 方便创建一个开启了 looper 的线程,并提供方法方便获取通过该 looper 构造的 handler。

  3. 为了方便的创建一个后台线程并开启其 looper 后,创建一个用其 looper 构造的 handler 并且可以自定义 handleMessage。可以实现一个基类 BackgroundHandler() 其中创建一个 HandlerThread 后台线程,然后使用该线程创建好的 looper 去构造一个内部类 InnerHandler 对象(继承自 Handler,重写其 handleMessage 为调用 BackgroundHandler 中的 handleBackgroundMessage 抽象方法)(总结:即对 HandlerThread 和 Handler 做了一层封装,方便构造和使用)。
    这样我们只需要重写一个子类继承自 BackgroundHandler,并自定义 handleBackgroundMessage 实现即可快速实现一个后台线程创建并定制消息处理。

  4. 自己创建的线程开启 looper 一定要注意其生命周期,在适当的时候(例如:页面 onDestroy、onStop 时候)考虑 handler 撤销已发送未处理消息、线程销毁,否则可能会出现 handler 所引用的对象、创建的线程的内存泄漏。

  • Thread 持有 looper,looper 持有 queue,多个 handler 可以持有 looper/queue,已经放入队列中的消息会持有 handler,而 handler 所引用的对象(通常写成内部类,还会引用其所在的类对象)则不能得到及时释放造成内存泄漏。
  • 另外这个线程如果只是用于某个页面,则页面退出时候也需要注意销毁该线程,否则该开启 looper 的线程会一直不会停止轮询,需要退出机制让线程停止,避免内存泄漏。
  • Looper 中提供了一些退出线程的方法。
    。quitSafely 方法:实际上执行了 MessageQueue 中的 removeAllFutureMessagesLocked 方法,消息队列会清除 when 晚于当前时间的所有同步/异步消息与同步障碍器,留下本应处理完的消息继续处理。
    。quit 方法:实际上执行了 MessageQueue 中的 removeAllMessageLocked 方法,消息队列中会清除所有的消息。
    。无论是调用了 quitSafely 还是 quit 方法,Looper 就不再接收新的消息。即在调用了 Looper 的 quitSafely 或 quit 方法之后,消息循环就终结了,这时候再通过 Handler 调用 sendMessage 或 post 等方法发送消息时均返回 false,表示消息没有成功放入消息队列 MessageQueue 中,因为消息队列已经退出了。
  1. MessageQueue 出了支持同步消息,还支持屏障消息和异步消息,可以满足优先调度一些比较急迫的任务。

  2. Looper 还支持一些比较特殊的 IdleHander,用于在线程空闲的时候执行一些不是很急迫的任务,感觉可以用于主线程性能优化、启动过程中一些性能优化。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容