Video source: Exploring RxJava 2 for Android ·Jake Wharton
RxJava provide:
- A set of classes for representing sources of data.
- A set of classes for listening to data sources.
- A set of methods for modifying and composing the data.
Sources
-
Observable<T>
- Emits 0 to n items
- Terminates with complete or error
- Does not have backpressure
-
Flowable<T>
- Emits 0 to n items
- Terminates with complete or error
- Has backpressure
-
Flowable vs. Observable
- Backpressure allows you to control how fast (slow them down) a source emits items.
- RxJava 1.x added backpressure late in the design process.
- All types exposed backpressure but not all sources respected it.
- Backpressure likes inheritance, must be designed for.
Observable<MotionEvent> events = RxView.touches(paintView);
Flowable<Row> rows = db.createQuery("SELECT * ...").
-
Single
- Either succeeds with an item or errors.
- No backpressure support
- Think "reactive scalar"
-
Completable
- Either completes or errors. Has no items!
- No backpressure support
- Think "reactive runnable"
-
Maybe
- Either succeeds with an item, completes with no items, or errors.
- No backpressure support
- Think "reactive optional"
Consumers
interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
interface Subscriber<T> {
void onNext(T t);
void onComplete();
void onError();
void onSubscriber(Subscription s);
}
interface Subscription {
void request(long n);
void cancel();
}
interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Creating Sources
Observable.fromCallable(new Callable<String>() {
@Override public String call() throws Exception {
return getName();
}
});
Flowable.fromCallable(() -> "Hello");
Maybe.fromCallable(() -> "Hello");
Maybe.fromAction(() -> System.out.println("Hello"));
Maybe.fromRunnable(() -> System.out.println("Hello"));
Single.fromCallable(() -> "Hello");
Completable.fromCallable(() -> "Hello");
Completable.fromAction(() -> System.out.println("Hello"));
Completable.fromRunnable(() -> System.out.println("Hello"));
- RxJava 2 has a fixed Observable.create() method:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Hello");
e.onComplete();
}
});
Observing Sources
Observable<String> o = Observable.just("Hello");
o.subscribe(new Observer<String>() {
@Override public void onNext(String s) {...}
@Override public void onComplete() {...}
@Override public void onError(Throwable t) {...}
@Override public void onSubscribe(Disposable d) {
...
}
});
// DisposableObserver help to handle unsubscribe logic
DisposableObserver observer = new DisposableObserver<String>() {
@Override public void onNext(String s) {...}
@Override public void onComplete() {...}
@Override public void onError(Throwable t) {...}
}
o.subscribe(observer);
observer.dispose();
// RxJava2 provide new subscribeWith() method, return a Disposable object, like RxJava1 Subscription object.
Disposable d = o.subscribeWith(new DisposableObserver<String>() {
@Override public void onNext(String s) {...}
@Override public void onComplete() {...}
@Override public void onError(Throwable t) {...}
});
d.dispose();
// RxJava2 also provide CompositeDisposable to handle composite disposable.
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(o.subscribeWith(new DisposableObserver<String>() {
@Override public void onNext(String s) {...}
@Override public void onComplete() {...}
@Override public void onError(Throwable t) {...}
}));
disposables.dispose();
Operators
- Manipulate or combine data in some way.
- Manipulate threading in some way.
- Manipulate emissions in some way.
first()
In RxJava1, Observable object use first()
return an Observable object.
In RxJava2, those return a Single object, if the Observable is empty, it will throw NoSuchElementException, because a Single either has an item, or errors.
firstElement()
In RxJava2, When the Observable is empty, Maybe can actually model that by completing without an error.
ignoreElements()
If you are just ignoring the elements, all you care about is whether it completes or fails,that now returns Completable.