【原汁原味】RxJava 2.0 官方文档中文版

内容翻译自官方文档,水平有限,仅供初学者学习交流。

官网文档英文版

1. 使用 RxJava 2.0 实现响应式编程

1.1 RxJava 和响应式编程是什么?

在响应式编程模式中,消费者仅在数据到达时才进行响应,这就是异步编程为什么也叫作响应式编程的原因。响应式编程允许将事件的变化传递给每一个已注册的观察者。

ReactiveX 为上述概念提供了不同编程语言的实现:

The Observer pattern done right. ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.

RxJava 就是这个概念的 Java 版实现。RxJava 遵循 Apache 2.0 协议,为具有可观察流的异步编程提供 JAVA API。

2. RxJava 核心概念

RxJava 代码的核心概念如下:

  • 被观察者(Observables),代表数据来源。
  • 订阅者(Subscribers)或观察者(Observers)监听 Observables。
  • 另外,还有一组用于修改和组合数据的方法集。

Observable 发射一组 Items,Subscriber 则消费这些 Items。

2.1 Observables

Observables 是数据的来源。通常,当 Subscriber 启动监听时,Observable 就开始提供数据。一个 Observable 可以发射任意数量的 Items(包括 0 个),当处理成功或出现错误时进行终止。数据源可以永不终止,比如,一个按钮点击的 Observable 可能会产生无限的事件流。

2.2 Subscribers

一个 Observable 可以关联任意数量的 Subscriber。每当 Observable 发射一个新的 Item,每一个 Subscriber 上的 onNext() 方法就会被调用。当 Observable 成功完成它的数据流时,每一个 Subscriber 上的 onComplete() 方法就会被调用。同理,如果 Observable 以出错状态终结了数据流,每一个 Subscriber 上的 onError() 方法就会被调用。

RxJava 样例

一个非常简单的 JUnit4 样例如下

package com.vogella.android.rxjava.simple;

import org.junit.Test;

import io.reactivex.Observable;

import static junit.framework.Assert.assertTrue;


public class RxJavaUnitTest {
    String result="";

    // Simple subscription to a fix value
    @Test
    public void returnAValue(){
        result = "";
        Observable<String> observer = Observable.just("Hello"); // provides datea
        observer.subscribe(s -> result=s); // Callable as subscriber
        assertTrue(result.equals("Hello"));
    }
}

3.1 为何要进行异步编程?

响应式编程提供了一种简单的异步编程方式,允许简化潜在长期运行操作的异步处理。它还定义了一种处理多个事件、异常和事件流终止的方法,支持在多个线程间运行不同任务。例如,SWT 和 Android 中的窗体必须由 UI 线程进行更新,而响应式编程就提供了一种方法,让 Observables 和 Subscribers 可以运行在不同的线程上。

流可以在被观察者接收前进行转换,并且支持链式操作,比如,一个 API 调用依赖于另一个 API 的调用,响应式编程可以减少对中间状态变量的需求,因为它可能引起错误。

3.2 将 RxJava2 添加到 Java 工程

在撰写本文时,2.1.1 版本已经发布,当然,你可以选择自己需要的版本替换它。

要在 Gradle 构建中使用 RxJava,请添加如下依赖:

compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.1.1'

使用 Maven 构建的同学,可以添加如下依赖:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.0.4</version>
</dependency>

在 OSGi 环境下,例如 Eclipse RCP 开发,可以使用这个网址作为 p2 更新站点 https://dl.bintray.com/simon-scholz/RxJava-OSGi/

4. 创建 Observables,订阅与注销

4.1 创建 Observables

可以创建多种类型的 Observable,如下表所示

类型 描述
Flowable<T> 发射 0 或 n 个 Items,并以成功或错误事件终止。支持背压,即控制数据源以多快的速率发射 Items
Observable<T> 发射 0 或 n 个 Items,并以成功或错误事件终止
Single<T> 发射单个 Item 或错误事件,是方法调用的响应式版本
Maybe<T> 以 0 或 1 个 Item,或者错误作为终止,是 Optional 的响应式版本
Completable 以成功或错误事件作为结束,永远不会发射 Item,是 Runnable 的响应式版本

一个使用 Flowable 的场景是触发事件,你无法控制正在执行触发事件的用户,但是,你可以告诉数据源,以一个较慢的速率发射事件,以防止自己无法及时处理。

下面是一个创建 Observable 的样例:

 Observable<Todo> todoObservable = Observable.create(new ObservableOnSubscribe<Todo>() {
            @Override
            public void subscribe(ObservableEmitter<Todo> emitter) throws Exception {
                try {
                    List<Todo> todos = RxJavaUnitTest.this.getTodos();
                    for (Todo todo : todos) {
                        emitter.onNext(todo);
                    }
                    emitter.onComplete();
                } catch (Exception e) {
                    emitter.onError(e);
                }
            }
        });

使用 Lambdas 表达式,可以表述为:

Observable<Todo> todoObservable = Observable.create(emitter -> {
    try {
        List<Todo> todos = getTodos();
        for (Todo todo : todos) {
            emitter.onNext(todo);
        }
        emitter.onComplete();
    } catch (Exception e) {
        emitter.onError(e);
    }
});

下面是一个 Maybe 的样例:

Maybe<List<Todo>> todoMaybe = Maybe.create(emitter -> {
    try {
        List<Todo> todos = getTodos();
        if(todos != null && !todos.isEmpty()) {
            emitter.onSuccess(todos); 
        } else {
            emitter.onComplete(); 
        }
    } catch (Exception e) {
        emitter.onError(e); 
    }
});

emitter.onSuccess(tools) 表示 java.util.Optinal 有一个值。
emitter.onComplete() 表示 java.util.Optinal 没有值,即 null。
emitter.onError(e) 表示出现了错误。

4.2 快速创建 Observables

RxJava 提供了几种快速创建 Observables 的方法:

  • Observable.just("Hello") - 允许创建一个 Observable 对象,作为其他数据类型的包装器。
  • Observable.fromIterable() - 获取一个java.lang.Iterable<T>对象,以给定的数据结构按顺序发射它们的值。
  • Observable.fromArray() - 获取一个数组,以给定的数据结构按顺序发射它们的值。
  • Observable.fromCallable() - 允许为java.util.concurrent.Callable<V>创建一个 Observable 对象。
  • Observable.fromFuture() - 允许为java.util.concurrent.Future创建一个 Observable 对象。
  • Observable.interval() - 创建一个 Observable 对象,以给定的时间间隔,定期发射Long对象。

其他类型也具备相似的方法,例如,Flowable.just()Maybe.just()Single.just()

4.3 RxJava 订阅机制

为了接收从 Observable 发射的数据,我们需要对它进行订阅。Observables 提供了各种各样的订阅方法。

Observable<Todo> todoObservable = Observable.create(emitter -> { ... });

// Simply subscribe with a io.reactivex.functions.Consumer<T>, which will be informed onNext()
Disposable disposable = todoObservable.subscribe(t -> System.out.print(t));

// Dispose the subscription when not interested in the emitted data any more
disposable.dispose();

// Also handle the error case with a second io.reactivex.functions.Consumer<T>
Disposable subscribe = todoObservable.subscribe(t -> System.out.print(t), e -> e.printStackTrace());

Observable 提供了subscribeWith方法,使用方式如下:

DisposableObserver<Todo> disposableObserver = todoObservable.subscribeWith(new  DisposableObserver<Todo>() {

    @Override
    public void onNext(Todo t) {
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onComplete() {
    }
});

取消订阅

当监听器或订阅者被添加后,通常不需要永远监听。

有些时候,由于状态变更,观察者不再对 Observable 发射的事件感兴趣,需要取消订阅。

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;

Single<List<Todo>> todosSingle = getTodos();

Disposable disposable = todosSingle.subscribeWith(new DisposableSingleObserver<List<Todo>>() {

    @Override
    public void onSuccess(List<Todo> todos) {
        // work with the resulting todos
    }

    @Override
    public void onError(Throwable e) {
        // handle the error case
    }
});

// continue working and dispose when value of the Single is not interesting any more
disposable.dispose();

Observable 类提供了各种各样的订阅方法,并返回一个Disposable对象,用于取消订阅。

当处理多个订阅关系时,由于状态变化可能需要全部废弃。这时,使用CompositeDisposable来处理一组订阅关系的注销会非常方便。

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.disposables.CompositeDisposable;

CompositeDisposable compositeDisposable = new CompositeDisposable();

Single<List<Todo>> todosSingle = getTodos();

Single<Happiness> happiness = getHappiness();

compositeDisposable.add(todosSingle.subscribeWith(new DisposableSingleObserver<List<Todo>>() {

    @Override
    public void onSuccess(List<Todo> todos) {
        // work with the resulting todos
    }

    @Override
    public void onError(Throwable e) {
        // handle the error case
    }
}));

compositeDisposable.add(happiness.subscribeWith(new DisposableSingleObserver<Happiness>() {

    @Override
    public void onSuccess(Happiness happiness) {
        // celebrate the happiness :-D
    }

    @Override
    public void onError(Throwable e) {
        System.err.println("Don't worry, be happy! :-P");
    }
}));

// continue working and dispose all subscriptions when the values from the Single objects are not interesting any more
compositeDisposable.dispose();

5. 缓存处理后的值

使用 Observable 时,通常不需要对每个订阅执行异步调用。

更常见的情况是,在应用程序中传递 Observable 对象,这样,就无需在每次添加订阅者时都执行如此昂贵的调用。

下面这段代码执行了 4 次开销非常大的 web 请求,其实只需一次请求即可完成,因为同一个Todo对象只是以不同的形式显示。

Single<List<Todo>> todosSingle = Single.create(emitter -> {
    Thread thread = new Thread(() -> {
        try {
            List<Todo> todosFromWeb = // query a webservice

            System.out.println("Called 4 times!");

            emitter.onSuccess(todosFromWeb);
        } catch (Exception e) {
            emitter.onError(e);
        }
    });
    thread.start();
});

todosSingle.subscribe(... " Show todos times in a bar chart " ...);

showTodosInATable(todosSingle);

todosSingle.subscribe(... " Show todos in gant diagram " ...);

anotherMethodThatsSupposedToSubscribeTheSameSingle(todosSingle);

下面这段代码使用了cache方法,以便Single实例在第一次执行成功后保存其结果。

Single<List<Todo>> todosSingle = Single.create(emitter -> {
    Thread thread = new Thread(() -> {
        try {
            List<Todo> todosFromWeb = // query a webservice

            System.out.println("I am only called once!");

            emitter.onSuccess(todosFromWeb);
        } catch (Exception e) {
            emitter.onError(e);
        }
    });
    thread.start();
});

// cache the result of the single, so that the web query is only done once
Single<List<Todo>> cachedSingle = todosSingle.cache();

cachedSingle.subscribe(... " Show todos times in a bar chart " ...);

showTodosInATable(cachedSingle);

cachedSingle.subscribe(... " Show todos in gant diagram " ...);

anotherMethodThatsSupposedToSubscribeTheSameSingle(cachedSingle);

6. 类型转换

RxJava 的类型间转换非常简单。

From/To Flowable Observable Maybe Single Completable
Flowable toObservable() reduce()
elementAt()
firstElement()
lastElement()
singleElement()
scan()
elementAt()
first()/firstOrError()
last()/lastOrError()
single()/singleOrError()
all()/any()/count()
(and more...)
ignoreElements()
Observable toFlowable() reduce()
elementAt()
firstElement()
lastElement()
singleElement()
scan()
elementAt()
first()/firstOrError()
last()/lastOrError()
single()/singleOrError()
all()/any()/count()
(and more...)
ignoreElements()
Maybe toFlowable() toObservable() toSingle()
sequenceEqual()
toCompletable()
Single toFlowable() toObservable() toMaybe() toCompletable()
Completable toFlowable() toObservable() toMaybe() toSingle()
toSingleDefault()

7. RxAndroid

7.1 使用 RxAndroid

RxAndroid 是 RxJava 的一个扩展,它提供了一个调度器,可以在 Android 的主线程中运行代码。它也提供了创建在 Android 处理类上运行的调度器的能力。通过这些调度器,你可以定义一个在后台线程执行的 Observable,并将我们的结果同步回主线程。这有点像 RxJava 中替换AsyncTask的实现。

为了在 Android 中使用 RxJava,你需要在Gradle 构建文件中添加如下依赖:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.8'

举个例子,你可以通过下面这个 Observable 定义一个长期运行的操作。

final Observable<Integer> serverDownloadObservable = Observable.create(emitter -> {
        SystemClock.sleep(1000); // simulate delay
        emitter.onNext(5);
        emitter.onComplete();
    });

你现在可以订阅这个 Observable,这将触发它的执行,并为订阅提供所需的信息。

例如,假设你将其分配给了一个按钮。

serverDownloadObservable.
                        observeOn(AndroidSchedulers.mainThread()).
                        subscribeOn(Schedulers.io()).  
                        subscribe(integer -> {
                            updateTheUserInterface(integer); // this methods updates the ui
                            view.setEnabled(true); // enables it again
                        });
            }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

由于我们只关心最终的结果,我们可以使用一个Single对象。

Subscription subscription = Single.create(new Single.OnSubscribe() {
           @Override
           public void call(SingleSubscriber singleSubscriber) {
               String result = doSomeLongRunningStuff();
               singleSubscriber.onSuccess(value);
           }
       })
       .subscribeOn(Schedulers.io())
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe(new Action1() {
           @Override
           public void call(String value) {
               // onSuccess
               updateTheUserInterface(); // this methods updates the ui
           }
       }, new Action1() {
           @Override
           public void call(Throwable throwable) {
               // handle onError
           }
       });

7.2 取消订阅以防止内存泄漏

Observable.subsribe()返回一个Subscription(如果你使用了 Flowable)或Disposable对象。为了防止可能(临时)的内存泄漏,在 Activity 或 Fragment 的onStop()方法中取消对 Observable 的订阅。例如,对 Disposable 对象执行如下操作:

@Override
    protected void onDestroy() {
        super.onDestroy();
        if (bookSubscription != null && !bookSubscription.isDisposed()) {
            bookSubscription.dispose();
        }
    }

8. 练习:使用 RxJava 和 RxAndroid 的第一步

使用 com.vogella.android.rxjava.simple 作为顶级包名,创建一个新的工程。

8.1 Gradle 依赖

在你的 app/build.gradle 文件中添加如下依赖:

compile 'com.android.support:recyclerview-v7:23.1.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.8'
compile 'com.squareup.okhttp:okhttp:2.5.0'
testCompile 'junit:junit:4.12'

并且在你的 app/build.gradle 文件中启用 Java 8:

android {
   // more stuff
    compileOptions {
        sourceCompatibility JavaVersion.VERSION_1_8
        targetCompatibility JavaVersion.VERSION_1_8
    }
}

8.2 创建活动

将主布局文件修改为一下内容:

<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
              android:layout_width="match_parent"
              android:layout_height="match_parent"
              android:orientation="vertical"
    >

    <Button
        android:id="@+id/first"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:onClick="onClick"
        android:text="First"
        />

    <Button
        android:id="@+id/second"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:onClick="onClick"
        android:text="Second"

        />

    <Button
        android:id="@+id/third"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:onClick="onClick"
        android:text="Third"

        />
</LinearLayout>

创建三个活动:

  • RxJavaSimpleActivity
  • BooksActivity
  • ColorsActivity

创建 activity_rxjavasimple.xml 布局文件:

<?xml version="1.0" encoding="utf-8"?>
<LinearLayout
    xmlns:android="http://schemas.android.com/apk/res/android"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    >

    <Button
        android:id="@+id/button"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:text="Server"
        />
    <Button
        android:id="@+id/toastbutton"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:text="Toast"
        android:onClick="onClick"
        />

    <TextView
        android:id="@+id/resultView"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:text="Result"
        />
</LinearLayout>

RxJavaSimpleActivity中创建一个 Observable 用来模拟一个长期运行的操作(10s),然后返回数字 5。通过一个按钮点击来订阅它,禁用按钮。

package com.vogella.android.rxjava.simple;

import android.os.Bundle;
import android.os.SystemClock;
import android.support.v7.app.AppCompatActivity;
import android.support.v7.widget.RecyclerView;
import android.view.View;
import android.widget.TextView;
import android.widget.Toast;

import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;


public class RxJavaSimpleActivity extends AppCompatActivity {

    RecyclerView colorListView;
    SimpleStringAdapter simpleStringAdapter;
    CompositeDisposable disposable = new CompositeDisposable();
    public int value =0;

    final Observable<Integer> serverDownloadObservable = Observable.create(emitter -> {
        SystemClock.sleep(10000); // simulate delay
        emitter.onNext(5);
        emitter.onComplete();
    });

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxjavasimple);
        View view = findViewById(R.id.button);
        view.setOnClickListener(v -> {
            v.setEnabled(false); // disables the button until execution has finished
            Disposable subscribe = serverDownloadObservable.
                    observeOn(AndroidSchedulers.mainThread()).
                    subscribeOn(Schedulers.io()).
                    subscribe(integer -> {
                        updateTheUserInterface(integer); // this methods updates the ui
                        v.setEnabled(true); // enables it again
                    });
            disposable.add(subscribe);
        });
    }

    private void updateTheUserInterface(int integer) {
        TextView view = (TextView) findViewById(R.id.resultView);
        view.setText(String.valueOf(integer));
    }

    @Override
    protected void onStop() {
        super.onStop();
        if (disposable!=null && !disposable.isDisposed()) {
            disposable.dispose();
        }
    }

    public void onClick(View view) {
        Toast.makeText(this, "Still active " + value++, Toast.LENGTH_SHORT).show();
    }
}

为回收器视图创建适配器:

package com.vogella.android.rxjava.simple;

import android.content.Context;
import android.support.v7.widget.RecyclerView;
import android.view.LayoutInflater;
import android.view.View;
import android.view.ViewGroup;
import android.widget.TextView;
import android.widget.Toast;

import java.util.ArrayList;
import java.util.List;

/**
 * Adapter used to map a String to a text view.
 */
public class SimpleStringAdapter extends RecyclerView.Adapter<SimpleStringAdapter.ViewHolder> {

    private final Context mContext;
    private final List<String> mStrings = new ArrayList<>();

    public SimpleStringAdapter(Context context) {
        mContext = context;
    }

    public void setStrings(List<String> newStrings) {
        mStrings.clear();
        mStrings.addAll(newStrings);
        notifyDataSetChanged();
    }

    @Override
    public ViewHolder onCreateViewHolder(ViewGroup parent, int viewType) {
        View view = LayoutInflater.from(parent.getContext()).inflate(R.layout.string_list_item, parent, false);
        return new ViewHolder(view);
    }

    @Override
    public void onBindViewHolder(ViewHolder holder, final int position) {
        holder.colorTextView.setText(mStrings.get(position));
        holder.itemView.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                Toast.makeText(mContext, mStrings.get(position), Toast.LENGTH_SHORT).show();
            }
        });
    }

    @Override
    public int getItemCount() {
        return mStrings.size();
    }

    public static class ViewHolder extends RecyclerView.ViewHolder {

        public final TextView colorTextView;

        public ViewHolder(View view) {
            super(view);
            colorTextView = (TextView) view.findViewById(R.id.color_display);
        }
    }
}

实现ColorsActivity,它使用一个 Observable 来接收颜色列表。

创建 activity_colors.xml 布局文件:

<?xml version="1.0" encoding="utf-8"?>
<FrameLayout
    xmlns:android="http://schemas.android.com/apk/res/android"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    >
    <android.support.v7.widget.RecyclerView
        android:id="@+id/color_list"
        android:layout_width="match_parent"
        android:layout_height="match_parent"
        />
</FrameLayout>
package com.vogella.android.rxjava.simple;

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.support.v7.widget.LinearLayoutManager;
import android.support.v7.widget.RecyclerView;

import java.util.ArrayList;
import java.util.List;

import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;


public class ColorsActivity extends AppCompatActivity {

    RecyclerView colorListView;
    SimpleStringAdapter simpleStringAdapter;
    private Disposable disposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        configureLayout();
        createObservable();
    }

    private void createObservable() {
        Observable<List<String>> listObservable = Observable.just(getColorList());
        disposable = listObservable.subscribe(colors -> simpleStringAdapter.setStrings(colors));

    }

    private void configureLayout() {
        setContentView(R.layout.activity_colors);
        colorListView = (RecyclerView) findViewById(R.id.color_list);
        colorListView.setLayoutManager(new LinearLayoutManager(this));
        simpleStringAdapter = new SimpleStringAdapter(this);
        colorListView.setAdapter(simpleStringAdapter);
    }

    private static List<String> getColorList() {
        ArrayList<String> colors = new ArrayList<>();
        colors.add("red");
        colors.add("green");
        colors.add("blue");
        colors.add("pink");
        colors.add("brown");
        return colors;
    }

    @Override
    protected void onStop() {
        super.onStop();
        if (disposable!=null && !disposable.isDisposed()) {
            disposable.dispose();
        }
    }
}

创建如下(虚拟)服务器实现:

package com.vogella.android.rxjava.simple;

import android.content.Context;
import android.os.SystemClock;

import java.util.ArrayList;
import java.util.List;

/**
 * This is a fake REST client.
 *
 * It simulates making blocking calls to an REST endpoint.
 */
public class RestClient {
    private Context mContext;

    public RestClient(Context context) {
        mContext = context;
    }

    public List<String> getFavoriteBooks() {
        SystemClock.sleep(8000);// "Simulate" the delay of network.
        return createBooks();
    }

    public List<String> getFavoriteBooksWithException() {
        SystemClock.sleep(8000);// "Simulate" the delay of network.
        throw new RuntimeException("Failed to load");
    }

    private List<String> createBooks() {
        List<String> books = new ArrayList<>();
        books.add("Lord of the Rings");
        books.add("The dark elf");
        books.add("Eclipse Introduction");
        books.add("History book");
        books.add("Der kleine Prinz");
        books.add("7 habits of highly effective people");
        books.add("Other book 1");
        books.add("Other book 2");
        books.add("Other book 3");
        books.add("Other book 4");
        books.add("Other book 5");
        books.add("Other book 6");
        return books;
    }
}

创建 activity_books.xml 布局文件:

<?xml version="1.0" encoding="utf-8"?>
<FrameLayout
    xmlns:android="http://schemas.android.com/apk/res/android"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    >

    <ProgressBar
        android:id="@+id/loader"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_gravity="center"
        />

    <android.support.v7.widget.RecyclerView
        android:id="@+id/books_list"
        android:layout_width="match_parent"
        android:layout_height="match_parent"
        android:visibility="gone"
        />

</FrameLayout>

实现BooksActivity

package com.vogella.android.rxjava.simple;

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.support.v7.widget.LinearLayoutManager;
import android.support.v7.widget.RecyclerView;
import android.view.View;
import android.widget.ProgressBar;

import java.util.List;

import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;


public class BooksActivity extends AppCompatActivity {

    private Disposable bookSubscription;
    private RecyclerView booksRecyclerView;
    private ProgressBar progressBar;
    private SimpleStringAdapter stringAdapter;
    private RestClient restClient;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        restClient = new RestClient(this);
        configureLayout();
        createObservable();
    }

    private void createObservable() {
        Observable<List<String>> booksObservable =
                Observable.fromCallable(() -> restClient.getFavoriteBooks());
        bookSubscription = booksObservable.
                subscribeOn(Schedulers.io()).
                observeOn(AndroidSchedulers.mainThread()).
                subscribe(strings -> displayBooks(strings));
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (bookSubscription != null && !bookSubscription.isDisposed()) {
            bookSubscription.dispose();
        }
    }

    private void displayBooks(List<String> books) {
        stringAdapter.setStrings(books);
        progressBar.setVisibility(View.GONE);
        booksRecyclerView.setVisibility(View.VISIBLE);
    }

    private void configureLayout() {
        setContentView(R.layout.activity_books);
        progressBar = (ProgressBar) findViewById(R.id.loader);
        booksRecyclerView = (RecyclerView) findViewById(R.id.books_list);
        booksRecyclerView.setLayoutManager(new LinearLayoutManager(this));
        stringAdapter = new SimpleStringAdapter(this);
        booksRecyclerView.setAdapter(stringAdapter);
    }

    @Override
    protected void onStop() {
        super.onStop();
        if (bookSubscription!=null && !bookSubscription.isDisposed()) {
            bookSubscription.dispose();
        }
    }
}

8.3 通过 Callable 实现一个长期运行的任务

java.util.Calllable 和 Runnable 有些类似,它可以抛出异常以及返回一个值。

下面的活动实现了一个基于Callable的 Observable,在订阅期间,进度条可见。一旦执行结束,再次隐藏进度条,并更新文本视图。

长期运行的操作将在后台执行,而 UI 的更新将发生在主线程。

下面是 activity_scheduler.xml 的布局文件:

<?xml version="1.0" encoding="utf-8"?>
<RelativeLayout
    xmlns:android="http://schemas.android.com/apk/res/android"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    >

    <Button
        android:id="@+id/scheduleLongRunningOperation"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:lines="3"
        android:text="Start something long"
        android:layout_marginStart="12dp"
        android:layout_alignParentStart="true"
        />

    <TextView
        android:id="@+id/messagearea"
        android:layout_width="match_parent"
        android:layout_height="match_parent"
        android:layout_alignParentStart="true"
        android:text=""
        android:layout_below="@+id/scheduleLongRunningOperation"
        />

    <ProgressBar
        android:id="@+id/progressBar"
        style="?android:attr/progressBarStyle"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:visibility="gone"
        android:layout_alignBottom="@+id/scheduleLongRunningOperation"
        android:layout_toEndOf="@+id/scheduleLongRunningOperation"
        />
</RelativeLayout>
package com.vogella.android.rxjava.simple;

import android.os.Bundle;
import android.os.SystemClock;
import android.support.v7.app.AppCompatActivity;
import android.view.View;
import android.widget.ProgressBar;
import android.widget.TextView;

import java.util.concurrent.Callable;

import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableObserver;
import io.reactivex.schedulers.Schedulers;


/** Demonstrates a long running operation of the main thread
 * during which a  progressbar is shown
 *
 */
public class SchedulerActivity extends AppCompatActivity {

    private Disposable subscription;
    private ProgressBar progressBar;
    private TextView messagearea;
    private View button;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        configureLayout();
        createObservable();
    }

    private void createObservable() {
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (subscription != null && !subscription.isDisposed()) {
            subscription.dispose();
        }
    }

    private void configureLayout() {
        setContentView(R.layout.activity_scheduler);
        progressBar = (ProgressBar) findViewById(R.id.progressBar);
        messagearea = (TextView) findViewById(R.id.messagearea);
        button  = findViewById(R.id.scheduleLongRunningOperation);
        button.setOnClickListener(new View.OnClickListener(){
            @Override
            public void onClick(View v) {
//                progressBar.setVisibility(View.VISIBLE);
                Observable.fromCallable(callable).
                        subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).
                        doOnSubscribe(disposable ->
                                {
                                    progressBar.setVisibility(View.VISIBLE);
                                    button.setEnabled(false);
                                    messagearea.setText(messagearea.getText().toString() +"\n" +"Progressbar set visible" );
                                }
                        ).
                        subscribe(getDisposableObserver());
            }
        });
    }

    Callable<String> callable = new Callable<String>() {
        @Override
        public String call() throws Exception {
            return doSomethingLong();
        }
    };

    public String doSomethingLong(){
        SystemClock.sleep(1000);
        return "Hello";
    }

    /**
     * Observer
     * Handles the stream of data:
     */
    private DisposableObserver<String> getDisposableObserver() {
        return new DisposableObserver<String>() {

            @Override
            public void onComplete() {
                messagearea.setText(messagearea.getText().toString() +"\n" +"OnComplete" );
                progressBar.setVisibility(View.INVISIBLE);
                button.setEnabled(true);
                messagearea.setText(messagearea.getText().toString() +"\n" +"Hidding Progressbar" );
            }

            @Override
            public void onError(Throwable e) {
                messagearea.setText(messagearea.getText().toString() +"\n" +"OnError" );
                progressBar.setVisibility(View.INVISIBLE);
                button.setEnabled(true);
                messagearea.setText(messagearea.getText().toString() +"\n" +"Hidding Progressbar" );
            }

            @Override
            public void onNext(String message) {
                messagearea.setText(messagearea.getText().toString() +"\n" +"onNext " + message );
            }
        };
    }
}

9. 测试 RxJava 的 Observables 和 Subcriptions

9.1 测试 Observables

Flowable可以用io.reactivex.subscribers.TestSubscriber进行测试。不支持背压的 Observable,Single,Maybe 和 Completable 可以用io.reactivex.observers.TestObserver进行测试。

@Test
    public void anObservableStreamOfEventsAndDataShouldEmitsEachItemInOrder() {

        Observable<String> pipelineOfData = Observable.just("Foo", "Bar");

        pipelineOfData.subscribe(testObserver);

        List<Object> dataEmitted = testObserver.values();
        assertThat(dataEmitted).hasSize(2);
        assertThat(dataEmitted).containsOnlyOnce("Foo");
        assertThat(dataEmitted).containsOnlyOnce("Bar");
    }

所有基础的响应类型都有一个test()方法,可以很方便的返回 TestSubscriber 或 TestObserver 对象。

TestSubscriber<Integer> ts = Flowable.range(1, 5).test();

TestObserver<Integer> to = Observable.range(1, 5).test();

TestObserver<Integer> tso = Single.just(1).test();

TestObserver<Integer> tmo = Maybe.just(1).test();

TestObserver<Integer> tco = Completable.complete().test();

10. 为 RxJava 编写单元测试

10.1 一个简单的单元测试

package com.vogella.android.rxjava.simple;

import org.junit.Test;

import java.util.List;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.observers.TestObserver;

import static junit.framework.Assert.assertTrue;


public class RxJavaUnitTest {
    String result="";

    // Simple subscription to a fix value
    @Test
    public void returnAValue(){
        result = "";
        Observable<String> observer = Observable.just("Hello"); // provides data
        observer.subscribe(s -> result=s); // Callable as subscriber
        assertTrue(result.equals("Hello"));
    }

    @Test
    public void expectNPE(){
        Observable<Todo> todoObservable = Observable.create(new ObservableOnSubscribe<Todo>() {
            @Override
            public void subscribe(ObservableEmitter<Todo> emitter) throws Exception {
                try {
                    List<Todo> todos = RxJavaUnitTest.this.getTodos();
                    if (todos == null){
                        throw new NullPointerException("todos was null");
                    }
                    for (Todo todo : todos) {
                        emitter.onNext(todo);
                    }
                    emitter.onComplete();
                } catch (Exception e) {
                    emitter.onError(e);
                }
            }
        });
        TestObserver<Object> testObserver = new TestObserver<>();
        todoObservable.subscribeWith(testObserver);

        // expect a NPE by using the TestObserver
        testObserver.assertError(NullPointerException.class);
    }

    private List<Todo> getTodos() {
        return null;
    }

    public class Todo {
    }
}

下面这段代码演示了Callable与 OkHttp 和 RxJava 结合用法。

package com.vogella.android.rxjava.simple;

import org.junit.Test;

import java.util.List;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.observers.TestObserver;

import static junit.framework.Assert.assertTrue;


public class RxJavaUnitTest {
    String result="";

    // Simple subscription to a fix value
    @Test
    public void returnAValue(){
        result = "";
        Observable<String> observer = Observable.just("Hello"); // provides data
        observer.subscribe(s -> result=s); // Callable as subscriber
        assertTrue(result.equals("Hello"));
    }

    @Test
    public void expectNPE(){
        Observable<Todo> todoObservable = Observable.create(new ObservableOnSubscribe<Todo>() {
            @Override
            public void subscribe(ObservableEmitter<Todo> emitter) throws Exception {
                try {
                    List<Todo> todos = RxJavaUnitTest.this.getTodos();
                    if (todos == null){
                        throw new NullPointerException("todos was null");
                    }
                    for (Todo todo : todos) {
                        emitter.onNext(todo);
                    }
                    emitter.onComplete();
                } catch (Exception e) {
                    emitter.onError(e);
                }
            }
        });
        TestObserver<Object> testObserver = new TestObserver<>();
        todoObservable.subscribeWith(testObserver);

        // expect a NPE by using the TestObserver
        testObserver.assertError(NullPointerException.class);
    }

    private List<Todo> getTodos() {
        return null;
    }

    public class Todo {
    }
}

11. RxJava 资源

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容