加强版异步任务方案

一、前言

为了提高流畅性,耗时任务放后台线程运行,已是APP开发的常识了。
关于异步有很多方案,当前最流行的,莫过于RxJava了;
更早一些时候,还有AsyncTask(骨灰级的API)。

总的来说,AsyncTask构思精巧,代码简洁,使用方便,有不少地方值得借鉴。
当然问题也有不少,比如不能随Activity销毁而销毁导致的内存泄漏,还有不适合做长时间的任务等。

笔者以AsyncTask为范本,写了一个“AsyncTaskPlus”:
保留了AsyncTask的所有用法,解决了其中的一些问题,同时引入了一些新特性。
接下来给大家介绍一下这“加强版”的框架,希望对各位有所启发。

二、任务调度

2.1 AsyncTask的Executor

AsyncTask的任务调度主要依赖两个Executor:ThreadPoolExecutor 和 SerialExecutor。
代码如下:

private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;

public static final Executor THREAD_POOL_EXECUTOR;
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

static {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, 30, TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(128), sThreadFactory);
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    THREAD_POOL_EXECUTOR = threadPoolExecutor;
}

private static class SerialExecutor implements Executor {
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;

    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }

    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}

关于线程池,估计大家都很熟悉了,参数就不多作解释了。
如果不是很熟悉,推荐阅读笔者的另一篇文章《速读Java线程池》

上面代码中,通过巧用“装饰者模式”,增加“串行调度”的功能。
装饰者模式有以下特点:

  1. 装饰对象和真实对象有相同的接口,这样客户端对象就能以和真实对象相同的方式和装饰对象交互。
  2. 装饰对象包含一个真实对象的引用。
  3. 装饰对象接受所有来自客户端的请求,它把这些请求转发给真实的对象。
  4. 装饰对象可以在转发这些请求以前或以后增加一些附加功能。

SerialExecutor只有二十来行代码,却用了两次装饰者模式:Runnable和Executor。

  • Runnable部分,往队列添加的匿名Runnable对象(装饰对象),当被Executor调用run()方法时,先执行“真实对象”的run()方法,然后再调用scheduleNext();
  • Executor部分,通过增加一个任务队列,实现串行调度的功能,而具体的任务执行转发给“真实对象”THREAD_POOL_EXECUTOR。

想要串行调度,为什么不多加一个coreSize=1的ThreadPoolExecutor呢?
两个ThreadPoolExecutor,彼此线程不可复用。

虽然SerialExecutor的方案很不错,但是THREAD_POOL_EXECUTOR的coreSize太小了(不超过4),
这导致AsyncTask不适合执行长时间运行的任务,否则多几个任务就会堵塞。
因此,如果要改进AsyncTask,首先要改进Executor。

2.2 通用版Executor

实现思路和 SerialExecutor 差不多,加一个队列, 实现另一层调度控制。
首先,把 RunnablescheduleNext 两部分都抽象出来:

interface Trigger {
    fun next()
}

class RunnableWrapper constructor(
        private val r: Runnable,
        private val trigger: Trigger) : Runnable {
    override fun run() {
        try {
            r.run()
        } finally {
            trigger.next()
        }
    }
}

接下来的实现和SerialExecutor类似:

class PipeExecutor @JvmOverloads constructor(
        windowSize: Int,
        private val capacity: Int = -1,
        private val rejectedHandler: RejectedExecutionHandler = defaultHandler) : TaskExecutor {

    private val tasks = PriorityQueue<RunnableWrapper>()
    private val windowSize: Int = if (windowSize > 0) windowSize else 1
    private var count = 0

    private val trigger : Trigger = object : Trigger {
        override fun next() {
            scheduleNext()
        }
    }

    fun execute(r: Runnable, priority: Int) {
        schedule(RunnableWrapper(r, trigger), priority)
    }

    @Synchronized
    internal fun scheduleNext() {
        count--
        if (count < windowSize) {
            startTask(tasks.poll())
        }
    }

    @Synchronized
    internal fun schedule(r: RunnableWrapper, priority: Int) {
        if (capacity > 0 && tasks.size() >= capacity) {
            rejectedHandler.rejectedExecution(r, TaskCenter.poolExecutor)
        }
        if (count < windowSize || priority == Priority.IMMEDIATE) {
            startTask(r)
        } else {
            tasks.offer(r, priority)
        }
    }

    private fun startTask(active: Runnable?) {
        if (active != null) {
            count++
            TaskCenter.poolExecutor.execute(active)
        }
    }
}

解析一下代码中的参数和变量:

  • tasks:任务缓冲区
  • count:正在执行的任务的数量
  • windowSize:并发窗口,控制Executor的并发
  • capacity:任务缓冲区容量,小于等于0时为不限容量,超过容量触发rejectedHandler
  • rejectedHandler:默认为AbortPolicy(抛出异常)
  • priority:调度优先级

当count>=windowSize时,priority高者先被调度;
优先级相同的任务,遵循先进先出(FIFO)的调度规则。

需要注意的是,调度优先级不同于线程优先级,线程优先级更底层一些。
比如AsyncTask的doInBackground()中就调用了:
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND)
这可以使得后台线程的线程优先级低于UI线程。

以下是PipeExecutor的流程图:


定义了PipeExecutor了之后,我们可以实现多个实例。
例如,可以仿照 RxJava 的 Schedulers,定义适用于“IO密集型”任务和“计算密集型”任务的Executor。

val io = PipeExecutor(20, 512)
val computation = PipeExecutor(Math.min(Math.max(2, cpuCount), 4), 512)

也可以定义串行调度的Executor:

val single = PipeExecutor(1)

不过我们不建议定义全局的串行调度Executor,因为会有相互阻塞的风险。
但是可以根据场景定义专属的串行调度Executor,比如给日志收集创建一个,给数据上报创建一个……

不同实例,犹如不同的水管,往同一个池子进水,故而命名为PipeExecutor。

2.3 去重版Executor

我们项目中,页面更新用的是“发布订阅模式”:
数据层有变更,发布更新消息;
上层收到消息,异步加载数据,刷新页面。

然后就碰到一个问题:若短时间内有多次数据更新,就会有多个消息发往上层。
不做特殊处理,就会几乎同时启动多个异步任务,浪费计算资源;
多个线程对并发读取同一数据,多线程问题也随之而来,若处理不好,结果不可预知。

用串行执行器?所有任务串行的话,无法利用任务并发的优势。

所以经过比较多种方案,最终的结论是:

  • 1、任务分组,不同组并行,同组串行
  • 2、同组的任务,如果有任务在执行,最多只能有一个在等待,丢弃后面的任务

所谓分组,就是给任务打tag, 比如刷新A数据的任务叫ATask, 刷新B任务的叫BTask。

关于第2点,其实有考虑过其他一些方案,比如下面两个:

  • 取消正在执行的任务
    • 首先不是所有任务都可以中断的,可以不接收其结果,但是不一定能中断其执行
    • 即使能取消(比如中断网络请求),也不是最佳方案。
      比方说当前线程或许已经快要下载完了,在等一会后面的任务就可以读缓存去结果了;
      任务2取消任务1,任务3取消任务2……等到最后一个任务执行,用户可能已经不耐烦了。
  • 如果有任务在执行,丢弃后面的任务
    比方说任务1读取了数据,在计算的时候,数据源变更,然后发送事件,启动任务2……
    直接丢弃后面的任务,最终页面显示的是旧的数据。

我们定义了一个LaneExecutor来实现这个方案,示意图如下:

discard=true

各组任务就像一个个车道(Lane), 故而命名为LaneExecutor。

洋葱似地一层包一层,很明显,也是装饰者模式。
职责分配:
LaneExecutor负责任务去重;
PipeExecutor负责任务并发控制和调度优先级;
ThreadPoolExecutor负责分配线程来执行任务。

但后来又遇到另一个问题:
有多个控件要加载同一个URL的数据,然后很自然地我们就以 URL作为tag了,以避免重复下载(做有缓存,第一任务下载完成之后,后面的任务可以读取缓存)。
但是用LaneExecutor来执行时,只保留一个任务在等待,然后最终只有两个控件能显示数据。
查到问题后,笔者给LaneExecutor加了一种模式,该模式下,不丢弃任务。

discard=false

如此,所有任务都会被执行,但是只有第一个需要下载数据,后面任务读缓存就好了。

2.4 统一管理Executor

当项目复杂度到了一定程度,如果没有统一的公共定义,可能会出现各种冗余实例。
分散的Executor无法较好地控制并发;
如果各自创建的是ThreadPoolExecutor,则还要加上一条:降低线程复用。
故此,可以集中定义Executor,各模块统一调用。
代码如下:

object TaskCenter {
    internal val poolExecutor: ThreadPoolExecutor = ThreadPoolExecutor(
            0, 256,
            60L, TimeUnit.SECONDS,
            SynchronousQueue(),
            threadFactory)
    
    // 常规的任务调度器,可控制任务并发,支持任务优先级
    val io = PipeExecutor(20, 512)
    val computation = PipeExecutor(Math.min(Math.max(2, cpuCount), 4), 512)

    // 带去重策略的 Executor,可用于数据刷新等任务
    val laneIO = LaneExecutor(io, true)
    val laneCP = LaneExecutor(computation, true)

    // 相同的tag的任务会被串行执行,相当于串行的Executor
    // 可用于写日志,上报统计信息等任务
    val serial = LaneExecutor(PipeExecutor(Math.min(Math.max(2, cpuCount), 4), 1024))
}

2.5 Executor的使用

    TaskCenter.io.execute{
        // do something
    }

    TaskCenter.laneIO.execute("laneIO", {
        // do something
    }, Priority.HIGH)

    val serialExecutor = PipeExecutor(1)
    serialExecutor.execute{
        // do something
    }

    TaskCenter.serial.execute ("your tag", {
        // do something
    })
  • PipeExecutor的使用和常规的Executor是一样的,execute中传入Runnable即可,
    然后由于Runnable只有一个方法,也没有参数,lambda的形式就显得更加简洁了。
  • LaneExecutor由于要给任务打tag, 所以要传入tag参数;
    如果不传,则没有分组的效果,也就是回退到PipeExecutor的特性;
  • 两种Executor都可以传入优先级。

很多开源项目都设计了API来使用外部的Executor,比如RxJava可以这样用:

object TaskSchedulers {
    val io: Scheduler by lazy { Schedulers.from(TaskCenter.io) }
    val computation: Scheduler by lazy { Schedulers.from(TaskCenter.computation) }
    val single by lazy { Schedulers.from(PipeExecutor(1)) }
}
Observable.range(1, 8)
       .subscribeOn(TaskSchedulers.computation)
       .subscribe { Log.d(tag, "number:$it") }

这样有一个好处,各种任务都在一个线程池上执行任务,可复用彼此创建的线程。

三、流程控制

3.1 AsyncTask的执行流

上一章我们分析了任务调度,构造了一系列Executor,增强任务处理方面的通用性。
不过任务调度只是AsyncTask的一部分,AsyncTask的精髓其实在于流程控制:在任务执行的不同阶段,回调相应的方法
下面是AsyncTask的流程图:

通过使用FutureTask和Callable,使得AsyncTask具备对任务执行更强的控制力,比如cancel任务。
有的文章说cancel()不一定的立即中断任务,但其实Futuret.cancel()确实已经是最好的方案了,
如果强行调用Thread.stop(),则犹如关掉空中飞机的引擎,后果不堪设想。

通过与Handler的配合,AsyncTask可以在任务执行过程中和执行结束后发布数据到UI线程,
这使得AsyncTask尤其适用于“数据加载+界面刷新”的场景。
而这类场景在APP开发中较为常见,这也是AsyncTask一度被广泛使用的原因之一。

3.2 生命周期

AsyncTask其中一个广为诟病的问题就是内存泄漏:
若AsyncTask持有Activity引用,且生命周期比Activity的长,则Activity无法被及时回收。
这个问题其实不是AsyncTask独有,Handler,RxJava等都存在类似问题。
解决方案有多种,静态类、弱引用、Activity销毁时取消等。
RxJava提供了dispose方法来取消任务,同时也有很多集成生命周期的开源方案,比如RxLifecycleAutoDispose等。

AsyncTask也提供了cancel方法,但是比较命苦,吐槽者众,助力者寡。
其实要实现自动cancel不难,建立和Activity/Fragment的关系即可,可通过观察者模式来实现。


UITask是参考AsyncTask写的一个类, 使用了上一章介绍的Executor。
结构上,UITask为观察者,Activity/Fragment为被观察者,LifecycleManager为 UITask 和 Activity/Fragment 构建关系的桥梁。
实现上需要两个数据结构:一个SparseArray,一个List。
SparseArray的key为被观察者的identityHashCode, value为观察者列表。

UITask提供了host()方法,方法中获取宿主(也就是Activity/Fragment)的identityHashCode,
通过register()方法,添加 “Activity->UITask” 到SparseArray中。

abstract class UITask<Params, Progress, Result> : LifeListener {
    fun host(host: Any): UITask<Params, Progress, Result> {
        LifecycleManager.register(System.identityHashCode(host), this)
        return this
    }

    override fun onEvent(event: Int) {
        if (event == LifeEvent.DESTROY) {
            cancel(true)
        } else if (event == LifeEvent.SHOW) {
            changePriority(+1)
        } else if (event == LifeEvent.HIDE) {
            changePriority(-1)
        }
    }
}
override fun onCreate(savedInstanceState: Bundle?) {
    TestTask().host(this).execute("hello")
}

需要在BaseActivity中通知事件:

abstract class BaseActivity : Activity() {
    override fun onDestroy() {
        super.onDestroy()
        LifecycleManager.notify(this, LifeEvent.DESTROY)
    }

    override fun onPause() {
        super.onPause()
        LifecycleManager.notify(this, LifeEvent.HIDE)
    }

    override fun onResume() {
        super.onResume()
        LifecycleManager.notify(this, LifeEvent.SHOW)
    }
}

调用notify()方法时,会根据Activity索引到对应观察者列表,然后遍历列表,回调观察者onEvent()方法。
其中,当通知的事件为DESTROY时,UITask执行cancel()方法,从而取消任务。

3.3 动态调整优先级

上一节,我们看到UITask除了关注DESTROY事件,还关注 Activity/Fragment 的HIDESHOW,
并根据可见状态调整优先级。

调整优先级有什么用呢? 下面先看两张图感受一下。
为了凸显效果,我们把加载任务的并发量控制为1(串行)。

第一张是不会自动调整优先级的,完全的先进先出:

不改变优先级

可以看到,切换到第二个页面,由于上一页的任务还没执行完,
所以要一直等到上一页的任务都完成了才轮到第二个页面加载。
很显然这样体验不太好。

接下来我们看下动态调整优先级是什么效果:

动态调整优先级

切换到第二个页面之后,第一个页面的任务的“调度优先级”被降低了,所以会优先加载第二个页面的图片;
再次切换回第一个页面,第二个页面的优先级被降低,第一个页面的优先级恢复,所以优先加载第一个页面的图片。

那可否进入第二个页面的时暂停第一个页面的任务?
暂停的方案不太友好,比方说用户在第二个页面停留很久,第二个页面的任务都完成了,然后切换回第一个页面,发现只有部分图片(其他被暂停了)。
而如果只是调整优先级,则第二个页面的任务都执行完之后,会接着执行第一个页面的任务,返回第一个页面时就能够看到所有图片了。
这就好比赶车,让其他人给插个队,没有问题,但是不能不给别人排队了吧。

3.4 链式调用

UITask的用法和AsyncTask大同小异,回调方法和参数泛型都是一样的,所以就不多作介绍了。
如今很多开源库都提供了链式API,使用起来确实灵活方便,视觉上也比较连贯。
喜欢冰糖葫芦一样的链式调用?
项目中提供了一个ChainTask类,拓展了UITask,提供链式调用的API。

override fun onCreate(savedInstanceState: Bundle?) {
    val task = ChainTask<Double, Int, String>()
    task.tag("ChainTest")
        .preExecute { result_tv.text = "running" }
        .background { params ->
            for (i in 0..100 step 2) {
                // do something
                task.publishProgress(i)
            }
            "result is:" + (params[0] * 100)
        }
        .progressUpdate { values ->
            val progress = values[0]
            progress_bar.progress = progress
            progress_tv.text = "$progress%"
        }
        .postExecute { result_tv.text = it }
        .cancel { showTips("ChainTask cancel") }
        .priority(Priority.IMMEDIATE)
        .host(this)
        .execute(3.14)
}

四、总结

最后,可能会这样的疑问:
既然已经有 RxJava 这样好用的开源库来实现异步了, 为什么还要写这个项目呢?
首先,RxJava 不仅仅是异步而已:“ReactiveX是一个通过使用可观察序列来编写异步和基于事件的程序的库。”
“可观察序列 - 事件 - 异步”加起来才使得 RxJava 如此富有魅力。
有所得,必有所付出,为了实现这些丰富的特性,代码量也是比较可观的(当前版本jar包约2.2M)。

AsyncTask则比较简单,除去注释只有三百多行代码;
功能也比较纯粹:执行异步任务,在任务执行的不同阶段,回调相应的方法。
Task参考了AsyncTask,功能类似,只是做了一些完善;
jar包大小45K,也算是比较轻量的。

这个年头,apk动辄几十M甚至上百M,2.2M的库并非不可接受。
但是也有一些场景,比方说给第三方写SDK的时候,对包大小和依赖比较敏感,而且也不需要这么大而全的特性,这时一些轻量级的方案就比较合适了。
而且,除了包大小之外,Task所实现的功能和RxJava也不尽相同。

如果说AsyncTask是自行车,RxJava是汽车,则Task是摩托车。
各有各的用途,各有各的灵魂。

项目地址:
https://github.com/No89757/Task

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容

  • 简介 1. 线程分类 主线程(UI线程) : 处理和界面相关的事情. 子线程 : 处理耗时操作. Android中...
    王世军Steven阅读 910评论 0 2
  • 第5章 多线程编程 5.1 线程基础 5.1.1 如何创建线程 在java要创建线程,一般有==两种方式==:1)...
    AndroidMaster阅读 1,789评论 0 11
  • 不同形式的线程虽然都是线程,但是它们仍然具有不同的特性和实用场景: AsyncTask封装了线程池和Handler...
    胡二囧阅读 1,115评论 0 4
  • 前段时间遇到这样一个问题,有人问微信朋友圈的上传图片的功能怎么做才能让用户的等待时间较短,比如说一下上传9张图片,...
    加油码农阅读 1,190评论 0 2
  • 其实所谓理财的话题,就是赚钱、花钱、攒钱的问题。 人这一辈子都跟钱脱不了干系,网上有一本书,叫《钱定江山——世界是...
    晓霞出东方阅读 1,270评论 0 0