参考 http://gank.io/post/560e15be2dca930e00da1083
Rxjava的基本用法:
基础使用
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> o) {
subscriber.onNext("hello,Rx");
subscriber.onCompleted();
}
});
Observer observer = new Observer() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
}
};
observable.subscribe(observer);
Observable observable = Observable.create(new onSubscribe);
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
}
public interface Action1<T> extends Action {
void call(T t);
}
public interface Action extends Function {
}
public class Observable<T> {
final OnSubscribe<T> onSubscribe;
private static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
public final static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
}
public abstract class RxJavaObservableExecutionHook {
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
return f;
}
Observable类里存在成员变量onSubscribe。
Observable.create方法的参数是OnSubscribe的实现类。方法里调用了构造方法Observable(OnSubscribe)。参数是hook.oncreate(OnSubscribe)的返回值,追入hook类,hook.onCreate(OnSubscribe)方法返回传入的onSubscribe,所以构造方法的参数就是我们传入的onSubscribe对象。
所以,create方法的作用是创建一个Observable,并将传入的onSubscribe赋值给自己的成员变量。
Observer observer = new Observer() ;
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
创建一个Observer接口的实现类。
observable.subscribe(observer);
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
return subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
observer.onCompleted();
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
});
}
Observable类的subscribe方法是将传入的Observer对象封装成Subscriber对象。
public abstract class Subscriber<T> implements Observer<T>, Subscription {
private static final Long NOT_SET = Long.MIN_VALUE;
private final SubscriptionList subscriptions;
private final Subscriber<?> subscriber;
private Producer producer;
private long requested = NOT_SET; // default to not set
protected Subscriber() {
this(null, false);
}
//...其他构造方法和成员方法
public final void add(Subscription s) {
subscriptions.add(s);
}
@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}
@Override
public final boolean isUnsubscribed() {
return subscriptions.isUnsubscribed();
}
public void onStart() {
// 空实现
}
}
public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}
subscriber实现了Observer和Subscription的抽象类,Subcriptions暂且用不到,我也不知道是干啥的。
Observable的subscribe方法将传入的Observer对象封装成Subscriber对象之后,调用subscribe(Subscriber)方法,且传入的第二个参数是Observable本身。
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
调用重载方法subscribe(Subscriber, Observable)
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
}
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) { //包装成安全的subscriber
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
hook.onSubscribeError(r);
throw r;
}
return Subscriptions.unsubscribed();
}
}
方法里首先判空,然后调用了subscriber的onstart方法,再将subscriber对象包装成安全的subscriber,调用hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
// pass-thru by default
return onSubscribe;
}
hook.onSubscribeStart返回observable.onSubcribe对象。调用call()方法并将subscriber作为参数传入。call方法是在第一步创建Observable对象时实现的。hook.onSubscribeReturn(subscriber)返回subscriber对象。
Just && From
Observable.just("hello").subscribe(observer);
Observable.from(new String[]{"hello","Rx"}).subscribe(observer);
just
public final static <T> Observable<T> just(final T value) {
return ScalarSynchronousObservable.create(value);
}
public static final <T> ScalarSynchronousObservable<T> create(T t) {
return new ScalarSynchronousObservable<T>(t);
}
protected ScalarSynchronousObservable(final T t) {
super(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> s) {
s.onNext(t);
s.onCompleted();
}
});
this.t = t;
}
from
public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
return create(new OnSubscribeFromIterable<T>(iterable));
}
public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {
final Iterable<? extends T> is;
public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
if (iterable == null) {
throw new NullPointerException("iterable must not be null");
}
this.is = iterable;
}
@Override
public void call(final Subscriber<? super T> o) {
final Iterator<? extends T> it = is.iterator();
if (!it.hasNext() && !o.isUnsubscribed())
o.onCompleted();
else
// 调用subscriber的setProducer方法,内部是迭代Iterator获取参数并调用onNext方法
o.setProducer(new IterableProducer<T>(o, it));
}
}
public void setProducer(Producer p) {
long toRequest;
boolean passToSubscriber = false;
synchronized (this) {
toRequest = requested;
producer = p;
if (subscriber != null) { //假
if (toRequest == NOT_SET) {
// we pass-thru to the next producer as nothing has been requested
passToSubscriber = true;
}
}
}
if (passToSubscriber) { //假
subscriber.setProducer(producer);
} else {
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE); //走这里 调用IterableProducer的request方法
} else {
producer.request(toRequest);
}
}
}
private static final class IterableProducer<T> extends AtomicLong implements Producer {
public void request(long n) {
if (get() == Long.MAX_VALUE) { // get是AtomicLong的方法。此处是0。
return;
}
// 比较对象的Value是否等于0,如果是就为Value赋值Long.MAX_VALUE并返回true。此处为true
// 调用fastpath();
if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) {
fastpath();
} else
if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) {
slowpath(n);
}
}
}
// 迭代调用onNext方法
void fastpath() {
final Subscriber<? super T> o = this.o;
final Iterator<? extends T> it = this.it;
while (true) {
if (o.isUnsubscribed()) {
return;
} else if (it.hasNext()) {
o.onNext(it.next());
} else if (!o.isUnsubscribed()) {
o.onCompleted();
return;
} else {
// is unsubscribed
return;
}
}
}
ActionX
Action1<String> action1 = new Action1<String>() {
@Override
public void call(String str) {
Log.d(tag,str);
}
};
Observable.just("hello").subscribe(action1);
以上是简单用法。ActionX的X代表call方法里参数的个数。
public final Subscription subscribe(final Action1<? super T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
return subscribe(new Subscriber<T>() {
@Override
public final void onCompleted() {
// do nothing
}
@Override
public final void onError(Throwable e) {
throw new OnErrorNotImplementedException(e);
}
@Override
public final void onNext(T args) {
onNext.call(args);
}
});
}
subscribe方法把Action1包装成一个Subscriber对象。并且在onNext方法里调用action1的call方法。
类似的方法有:
subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError)
subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete)