Android 异步编程探索 — 最喜爱 RxJava 2

本文参考自 Asynchronous Android Programming – the Good, the Bad, and the Ugly

有关 RxJava 2 的例子 RxJava2-Android-Samples

有关 RxSearchObservable 的参考博文 Implement Search Using RxJava Operators

编写 Android 代码时,很多情况下你需要进行网络调用,目前有几种不同的异步编程方式可以满足我们的需求。我们对这几种方式进行了比较,通过代码示例,将选出我们认为在 Android 中进行网络调用的最佳方法。

一、The Ugly:在 UI 线程上进行网络调用

进行网络调用的最简单方法是直接在 UI 线程上进行调用:

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);

        // Insert network call here!
    }

这可能适用于 Android 早期版本上的短网络调用,但这种方式是非常 ugly (丑陋的)。当你在 UI 线程上运行耗时的代码段时,它同时会阻塞其他操作。对于用户来说,应用程序似乎停止运行了。如果用户在 UI 线程被阻塞时执行操作,并且应用在 5 秒内没有响应,则 Android 系统将显示 “应用程序无响应 (ANR)” 对话框:

如果你尝试在较新版本的 Android(3.0 及以上版本)上以这种方式进行网络调用,应用程序会崩溃,因为这些新版本对于 UI 线程的使用要求更为严格。

二、The Bad Part 1:使用 Runnable 和 Handler

为了避免 Android 应用程序停止运行甚至崩溃,我们需要在后台执行网络调用。一种方法是使用 Runnable,它包含需要在后台运行的代码,并且在工作线程上运行该代码。

        Runnable runnable = new Runnable() {
            public void run() {
                // Insert network call here!
            }
        };
        Thread mythread = new Thread(runnable);
        mythread.start();

现在可以放心地进行网络调用了,但如果你想在这之后更新用户界面,你是无法在 Runnable 中执行此操作的,因为它是在 UI 线程以外的线程上运行。但是,你可以在 UI 线程上创建一个 Handler,并在 Runnable 中使用它来触发 HandlerhandleMessage() 回调。

    private Handler handler = new Handler() {
        @Override
        public void handleMessage(Message msg) {
            // Update user interface
        }
    };


        Runnable runnable = new Runnable() {
            public void run() {
                // Insert network call here!
                handler.sendEmptyMessage(0);
            }
        };
        Thread mythread = new Thread(runnable);
        mythread.start();

RunnableHandler 的主要风险是内存泄漏。如果 Activity 创建了一个 Handler,此时在工作线程中执行耗时任务,并在 Runnable 仍在工作时离开 Activity,Handler 会持有创建它的 Activity 的引用,该引用将阻止 Activity 被垃圾回收。为了避免这种情况,需要静态创建 Handler,因为它不包含对 Activity 的引用,并且使用 WeakReference 持有 Activity 的引用,这样就可以调用其方法来更新 UI 而不会发生内存泄漏。还有一个问题,就是如果同时执行多个更新用户界面的网络调用,则无法保证 Handler 按照你想要的顺序接收回调。

以下是优化过的 Handler:
    private final MyHandler mHandler = new MyHandler(this);


    private static class MyHandler extends Handler {
        
        private final WeakReference<MainActivity> mActivity;

        private MyHandler(MainActivity activity) {
            mActivity = new WeakReference<MainActivity>(activity);
        }

        @Override
        public void handleMessage(Message msg) {
            MainActivity activity = mActivity.get();

            if (activity != null) {
                // Update user interface
            }
        }
    }


        Runnable runnable = new Runnable() {
            public void run() {
                // Insert network call here!
                mHandler.sendEmptyMessage(0);
            }
        };
        Thread mythread = new Thread(runnable);
        mythread.start();

更多有关 Handler 导致内存泄漏的介绍请看这里 Android Handler 导致内存泄漏原因及解决方案

三、The Bad Part 2:使用 AsyncTask

在后台进行网络调用的另一种不太理想的方法是使用 AsyncTask。它们的用法相当简单:创建 AsyncTask 的子类,覆盖 doInBackground() 方法,可能还有 onPreExecute()onProgressUpdate()onPostExecute() 方法。

    private class NetworkCallTask extends AsyncTask<String, Void, Integer> {
        @Override
        protected Integer doInBackground(String... strings) {
            // Insert network call here!
            return null;
        }

        @Override
        protected void onPreExecute() {
            // Perform setup - runs on user interface thread
            super.onPreExecute();
        }

        @Override
        protected void onProgressUpdate(Void... values) {
            // Update user with progress bar or similar - runs on user interface thread
            super.onProgressUpdate(values);
        }

        @Override
        protected void onPostExecute(Integer integer) {
            // Update user interface
            super.onPostExecute(integer);
        }
    }

        new NetworkCallTask().execute();

onPreExecute() 在 UI 线程上运行,用于在启动后台任务之前执行任何需要的设置,例如显示进度条。接下来,在后台调用 doInBackground() 来执行网络任务。此时,如果从 doInBackground() 内部调用 publishProgress() 方法,则将在 UI 线程上调用 onProgressUpdate() 方法,执行更新进度条等与用户相关的任务。最后,一旦后台任务完成,将在 UI 线程上调用 onPostExecute(),执行诸如取消进度条和显示网络调用结果等操作。

这比在 UI 上进行网络调用要好得多,但仍有一些问题需要避免:如果要取消 AsyncTask,需要等到 doInBackground() 方法完成后才能进行。这意味着即使你在异步任务上调用 cancel() 方法,它也会等到后台工作完成后再取消任务,除非你在 doInBackground() 方法中不断地检查 isCancelled() 的结果。

AsyncTask 的另一个问题是:如果你同时运行多个 AsyncTask,你无法保证它们会以怎样的顺序完成,导致需要复杂的逻辑代码来检查。更糟糕的是你假设 A AsyncTask 将在 B AsyncTask 之前完成,直到遇到一个边缘情况使 A AsyncTask 调用变慢,使得它们以错误的顺序完成并且产生不希望的结果。

由于 AsyncTask 可以部分在 UI 线程上运行,部分在后台线程上运行,因此它们可能导致内存泄漏。因为 AsyncTask 经常在 onPostExecute() 方法中更新 UI,因此很容易使 AsyncTask 成为 Activity 的内部类,并且可以访问 Activity 的 UI 元素。这会创建一个对 Activity 的隐式引用,当 AsyncTask 仍在运行时,该引用会阻止对 Activity 进行垃圾回收,即使 Activity 已被销毁。

四、The Good:使用 RxJava

最后,终于到了我们最喜欢的后台网络调用方法:RxJava 库,一个 ReactiveX 的 Java 实现,这里我们使用的是专门为 Android 开发所设计的基于 RxJava 2 的 RxAndroid 库以及开源的网络请求库 OkHttp 3。使用 RxJava 的 Observable,可以创建一个覆写了 onNext()onError()onComplete() 方法的 ObserverObservable 会将数据项发射给 Observer,导致调用 onNext() 方法并访问所发射的数据项。如果 Observable 已发射完数据项,则调用 onCompleted()。如果在任何时候抛出异常,则会利用该异常调用 onError(),从而轻松处理错误。

以下代码是从网络加载一张图片并设置给 ImageView 显示出来:
    private CompositeDisposable mCompositeDisposable = new CompositeDisposable();

    private void setBitmapFromNet() {
        getObservable()
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(getObserver());
    }


    private Observable<Bitmap> getObservable() {
        return Observable.create(new ObservableOnSubscribe<Bitmap>() {
            @Override
            public void subscribe(final ObservableEmitter<Bitmap> emitter) throws Exception {
                if (!emitter.isDisposed()) {
                    Request request = new Request.Builder()
                            .url("https://img3.doubanio.com/img/musician/large/35000.jpg")
                            .build();

                    client.newCall(request).enqueue(new Callback() {
                        @Override
                        public void onFailure(Call call, IOException e) {
                            Log.d(TAG, "onFailure: " + e.getMessage());
                        }

                        @Override
                        public void onResponse(Call call, Response response) throws IOException {
                            InputStream inputStream = response.body().byteStream();
                            emitter.onNext(BitmapFactory.decodeStream(inputStream));
                            emitter.onComplete();
                        }
                    });
                }
            }
        });
    }


    private Observer<Bitmap> getObserver() {
        return new Observer<Bitmap>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: 1 ");
                mCompositeDisposable.add(d);
            }

            @Override
            public void onNext(Bitmap bitmap) {
                Log.d(TAG, "onNext: 2 ");
                mImageView.setImageBitmap(bitmap);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: 3 " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: 3 ");
            }
        };
    }

Observable 还允许使用操作符来修改 Observable。例如,subscribeOn() 声明 Observable 在哪个线程上执行工作,以及 observeOn() 声明观察者使用的线程。有许多操作符可用于修改 Observable,例如 map,它将发射的数据项转换为其它类型的数据项。

  • Map — 映射,通过对序列的每一项都应用一个函数变换 Observable 发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项。
    /*
    * Here we are getting ApiUser Object from api server
    * then we are converting it into User Object because
    * may be our database support User Not ApiUser Object
    * Here we are using Map Operator to do that
    */
    private void doSomeWork() {
        getObservable()
                // Run on a background thread
                .subscribeOn(Schedulers.io())
                // Be notified on the main thread
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<List<ApiUser>, List<User>>() {

                    @Override
                    public List<User> apply(List<ApiUser> apiUsers) {
                        return Utils.convertApiUserListToUserList(apiUsers);
                    }
                })
                .subscribe(getObserver());
    }

    private Observable<List<ApiUser>> getObservable() {
        return Observable.create(new ObservableOnSubscribe<List<ApiUser>>() {
            @Override
            public void subscribe(ObservableEmitter<List<ApiUser>> e) {
                if (!e.isDisposed()) {
                    e.onNext(Utils.getApiUserList());
                    e.onComplete();
                }
            }
        });
    }

    private Observer<List<User>> getObserver() {
        return new Observer<List<User>>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(List<User> userList) {
                textView.append(" onNext");
                textView.append(AppConstant.LINE_SEPARATOR);
                for (User user : userList) {
                    textView.append(" firstname : " + user.firstname);
                    textView.append(AppConstant.LINE_SEPARATOR);
                }
                Log.d(TAG, " onNext : " + userList.size());
            }

            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                textView.append(" onComplete");
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onComplete");
            }
        };
    }

另一个有用的操作符是 merge,它使用多个 Observable,并将所有 Observable 中的数据项发送到同一个 Observer,为每个 Observable 发射的每个数据项调用 onNext()。如果在此过程中 Observable 出现错误,则调用 onError() 并且不再发射任何数据项。如果不想这样,可以使用 mergeDelayError,它与 merge 相同,但如果有错误,它会等到所有 Observable完成或出现多次错误之后才会触发 onError()

  • Merge — 将两个 Observable 发射的数据组合并成一个。
    /*
     * Using merge operator to combine Observable : merge does not maintain
     * the order of Observable.
     * It will emit all the 7 values may not be in order
     * Ex - "A1", "B1", "A2", "A3", "A4", "B2", "B3" - may be anything
     */
    private void doSomeWork() {
        final String[] aStrings = {"A1", "A2", "A3", "A4"};
        final String[] bStrings = {"B1", "B2", "B3"};

        final Observable<String> aObservable = Observable.fromArray(aStrings);
        final Observable<String> bObservable = Observable.fromArray(bStrings);

        Observable.merge(aObservable, bObservable)
                .subscribe(getObserver());
    }


    private Observer<String> getObserver() {
        return new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(String value) {
                textView.append(" onNext : value : " + value);
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onNext : value : " + value);
            }

            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                textView.append(" onComplete");
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onComplete");
            }
        };
    }

也可以组合操作符,对发射对象进行各种修改。组合运算符没有限制,使得使用 RxJava 完成复杂任务变得更加简单。以下是一个响应式搜索框的实现,用到了 DebounceFilterDistinctUntilChangedSwitchMap

  • Debounce — 只有在空闲了一段时间后才发射数据,通俗的说,就是如果一段时间没有操作,就执行一次操作,俗称去抖。Debounce 操作符与时间常数一起使用。当用户在非常短的时间内键入 “a”,“ab”,“abc” 时,会导致多次网络调用,但是用户最终只对搜索 “abc” 的结果感兴趣。所以,你必须丢弃 “a” 和 “ab” 的结果。理想情况下,当用户在非常短的时间内输入“a” 和 “ab” 时,不应该发生网络调用。所以,Debounce 操作符就是为此而生的。它使 Observable 等待所提供的时间,如果在该时间内进行搜索,它将忽略之前的搜索并使用最新的搜索进行查询,然后再次等待相同的时间。如果在给定的时间内没有任何新输入,它将继续使用最新的搜索项进行搜索。

  • Filter:过滤器运算符用于过滤不需要的字符串,如空字符串,以避免不必要的网络调用。

  • DistinctUntilChangedDistinctUntilChanged 操作符用于避免重复的网络调用。假设最后一个正在进行的搜索是 “abc”,用户删除了 “c” 并再次键入 “c”。所以它又是 “abc”。因此,如果已经在搜索 “abc” 了,则不会再次进行搜索。因此,distinctUntilChanged 会去除 Observable 发射的重复连续项。

  • SwitchMapSwitchMap 操作符用于避免那些结果对于用户不再需要的网络调用。假设最后一个搜索是 “ab”,并且正在进行 “ab” 的网络调用,此时用户键入 “abc”。那用户就不再对 “ab” 的结果感兴趣了,只对 “abc” 的结果感兴趣。SwitchMap 就是为此而生的,它仅提供最后一次搜索的结果并忽略其余的搜索。

public class RxSearchObservable {

    public static Observable<String> fromView(SearchView searchView) {

        final PublishSubject<String> subject = PublishSubject.create();

        searchView.setOnQueryTextListener(new SearchView.OnQueryTextListener() {
            @Override
            public boolean onQueryTextSubmit(String s) {
                subject.onComplete();
                return true;
            }

            @Override
            public boolean onQueryTextChange(String text) {
                subject.onNext(text);
                return true;
            }
        });

        return subject;
    }
}


    private void setUpSearchObservable() {
        RxSearchObservable.fromView(searchView)
                .debounce(300, TimeUnit.MILLISECONDS)
                .filter(new Predicate<String>() {
                    @Override
                    public boolean test(String text) {
                        if (text.isEmpty()) {
                            textViewResult.setText("");
                            return false;
                        } else {
                            return true;
                        }
                    }
                })
                .distinctUntilChanged()
                .switchMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(String query) {
                        return dataFromNetwork(query);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String result) {
                        textViewResult.setText(result);
                    }
                });
    }

使用 Observable 时需要注意的地方是确保在 Activity 的 onDestroy() 方法中取消订阅,可以使用 CompositeDisposable 来统一管理。这是一种防止内存泄漏的简单方法,并且是有意义的,因为它遵循 Android 的生命周期。

    private final CompositeDisposable disposables = new CompositeDisposable();

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposables.clear(); // do not send event after activity has been destroyed
    }

    /*
     * Example to understand how to use disposables.
     * disposables is cleared in onDestroy of this activity.
     */
    void doSomeWork() {
        disposables.add(sampleObservable()
                // Run on a background thread
                .subscribeOn(Schedulers.io())
                // Be notified on the main thread
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new DisposableObserver<String>() {
                    @Override
                    public void onComplete() {
                        textView.append(" onComplete");
                        textView.append(AppConstant.LINE_SEPARATOR);
                        Log.d(TAG, " onComplete");
                    }

                    @Override
                    public void onError(Throwable e) {
                        textView.append(" onError : " + e.getMessage());
                        textView.append(AppConstant.LINE_SEPARATOR);
                        Log.d(TAG, " onError : " + e.getMessage());
                    }

                    @Override
                    public void onNext(String value) {
                        textView.append(" onNext : value : " + value);
                        textView.append(AppConstant.LINE_SEPARATOR);
                        Log.d(TAG, " onNext value : " + value);
                    }
                }));
    }

    static Observable<String> sampleObservable() {
        return Observable.defer(new Callable<ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> call() {
                // Do some long running operation
                SystemClock.sleep(2000);
                return Observable.just("one", "two", "three", "four", "five");
            }
        });
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,012评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,628评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,653评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,485评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,574评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,590评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,596评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,340评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,794评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,102评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,276评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,940评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,583评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,201评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,441评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,173评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,136评论 2 352

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 172,008评论 25 707
  • 用两张图告诉你,为什么你的 App 会卡顿? - Android - 掘金 Cover 有什么料? 从这篇文章中你...
    hw1212阅读 12,710评论 2 59
  • 滚滚红尘,茫茫人海,无数次遇见,千百次擦肩,而你我的邂逅,是我今生最美的情缘。那初次相见的浅笑嫣然,那蓦然回首的灯...
    风情公子阅读 236评论 0 1
  • 《世说新语·德行》 【原文】 王安丰遭艰,至性过人。裴令往吊之,曰:“若使一恸果能伤人,濬冲必不免灭性之讥。” 【...
    哑巢父阅读 585评论 0 0
  • 公司刚上架的app,活动当天因为web崩溃的bug邮件一大堆,就是这孙子啦 原因就是:我们自己的web页不会有二级...
    CoderFarmer阅读 7,205评论 4 51