Rxjava

参考 http://gank.io/post/560e15be2dca930e00da1083

Rxjava的基本用法:

基础使用

    Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> o) {
              subscriber.onNext("hello,Rx");
              subscriber.onCompleted();
        }
    });
    Observer observer = new Observer() {
        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onNext(Object o) {
        }
    };
    observable.subscribe(observer);

Observable observable = Observable.create(new onSubscribe);

public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
}
public interface Action1<T> extends Action {
    void call(T t);
}
public interface Action extends Function {
}

public class Observable<T> {
    final OnSubscribe<T> onSubscribe;
    private static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

    public final static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(hook.onCreate(f));
    }
    protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
    }
}
public abstract class RxJavaObservableExecutionHook {
     public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
     return f;
}

Observable类里存在成员变量onSubscribe。
Observable.create方法的参数是OnSubscribe的实现类。方法里调用了构造方法Observable(OnSubscribe)。参数是hook.oncreate(OnSubscribe)的返回值,追入hook类,hook.onCreate(OnSubscribe)方法返回传入的onSubscribe,所以构造方法的参数就是我们传入的onSubscribe对象。
所以,create方法的作用是创建一个Observable,并将传入的onSubscribe赋值给自己的成员变量。

Observer observer = new Observer() ;

public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}

创建一个Observer接口的实现类。

observable.subscribe(observer);

public final Subscription subscribe(final Observer<? super T> observer) {
    if (observer instanceof Subscriber) {
        return subscribe((Subscriber<? super T>)observer);
    }
    return subscribe(new Subscriber<T>() {
        @Override
        public void onCompleted() {
            observer.onCompleted();
        }
        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }
        @Override
        public void onNext(T t) {
            observer.onNext(t);
        }
    });
}

Observable类的subscribe方法是将传入的Observer对象封装成Subscriber对象。

public abstract class Subscriber<T> implements Observer<T>, Subscription {

private static final Long NOT_SET = Long.MIN_VALUE;
private final SubscriptionList subscriptions;
private final Subscriber<?> subscriber;
private Producer producer;
private long requested = NOT_SET; // default to not set

protected Subscriber() {
    this(null, false);
}

//...其他构造方法和成员方法

public final void add(Subscription s) {
    subscriptions.add(s);
}
@Override
public final void unsubscribe() {
    subscriptions.unsubscribe();
}
@Override
public final boolean isUnsubscribed() {
    return subscriptions.isUnsubscribed();
}
public void onStart() {
    // 空实现
} 
}
public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}

subscriber实现了Observer和Subscription的抽象类,Subcriptions暂且用不到,我也不知道是干啥的。

Observable的subscribe方法将传入的Observer对象封装成Subscriber对象之后,调用subscribe(Subscriber)方法,且传入的第二个参数是Observable本身。

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

调用重载方法subscribe(Subscriber, Observable)

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    if (subscriber == null) {
        throw new IllegalArgumentException("observer can not be null");
    }
    if (observable.onSubscribe == null) {
        throw new IllegalStateException("onSubscribe function can not be null.");
    }
    subscriber.onStart();   
    if (!(subscriber instanceof SafeSubscriber)) { //包装成安全的subscriber
        subscriber = new SafeSubscriber<T>(subscriber);
    }
    try {
        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {       
        Exceptions.throwIfFatal(e);
        try {
            subscriber.onError(hook.onSubscribeError(e));
        } catch (Throwable e2) {
            Exceptions.throwIfFatal(e2);   
            RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
            hook.onSubscribeError(r);
            throw r;
        }
        return Subscriptions.unsubscribed();
    }
}

方法里首先判空,然后调用了subscriber的onstart方法,再将subscriber对象包装成安全的subscriber,调用hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);

public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
    // pass-thru by default
    return onSubscribe;
}

hook.onSubscribeStart返回observable.onSubcribe对象。调用call()方法并将subscriber作为参数传入。call方法是在第一步创建Observable对象时实现的。hook.onSubscribeReturn(subscriber)返回subscriber对象。

Just && From

 Observable.just("hello").subscribe(observer);
 Observable.from(new String[]{"hello","Rx"}).subscribe(observer);

just

public final static <T> Observable<T> just(final T value) {
    return ScalarSynchronousObservable.create(value);
}
 public static final <T> ScalarSynchronousObservable<T> create(T t) {
    return new ScalarSynchronousObservable<T>(t);
}

protected ScalarSynchronousObservable(final T t) {
    super(new OnSubscribe<T>() {
        @Override
        public void call(Subscriber<? super T> s) { 
             s.onNext(t);
            s.onCompleted();
        }
    });
    this.t = t;
}

from

public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
    return create(new OnSubscribeFromIterable<T>(iterable));
}
public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {

final Iterable<? extends T> is;

public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
    if (iterable == null) {
        throw new NullPointerException("iterable must not be null");
    }
    this.is = iterable;
}
@Override
public void call(final Subscriber<? super T> o) {
    final Iterator<? extends T> it = is.iterator();
    if (!it.hasNext() && !o.isUnsubscribed())
        o.onCompleted();
    else 
        // 调用subscriber的setProducer方法,内部是迭代Iterator获取参数并调用onNext方法
        o.setProducer(new IterableProducer<T>(o, it)); 
}
}
public void setProducer(Producer p) {
    long toRequest;
    boolean passToSubscriber = false;
    synchronized (this) {
        toRequest = requested;
        producer = p;
        if (subscriber != null) { //假
            if (toRequest == NOT_SET) {
                // we pass-thru to the next producer as nothing has been requested
                passToSubscriber = true;
            }
        }
    }
    if (passToSubscriber) { //假
        subscriber.setProducer(producer);
    } else {
        if (toRequest == NOT_SET) {
            producer.request(Long.MAX_VALUE); //走这里 调用IterableProducer的request方法
        } else {
            producer.request(toRequest);
        }
    }
}
private static final class IterableProducer<T> extends AtomicLong implements Producer {   
public void request(long n) {
        if (get() == Long.MAX_VALUE) { // get是AtomicLong的方法。此处是0。
            return;
        }
        // 比较对象的Value是否等于0,如果是就为Value赋值Long.MAX_VALUE并返回true。此处为true
        // 调用fastpath();
        if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) { 
            fastpath();
        } else 
        if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) {
            slowpath(n);
        }
    }
}
 // 迭代调用onNext方法
void fastpath() {
        final Subscriber<? super T> o = this.o;
        final Iterator<? extends T> it = this.it;

        while (true) {
            if (o.isUnsubscribed()) {
                return;
            } else if (it.hasNext()) {
                o.onNext(it.next());
            } else if (!o.isUnsubscribed()) {
                o.onCompleted();
                return;
            } else {
                // is unsubscribed
                return;
            }
        }
    }

ActionX

  Action1<String> action1 = new Action1<String>() {
        @Override
        public void call(String str) {
            Log.d(tag,str);
        }
    };
 Observable.just("hello").subscribe(action1);

以上是简单用法。ActionX的X代表call方法里参数的个数。

public final Subscription subscribe(final Action1<? super T> onNext) {
    if (onNext == null) {
        throw new IllegalArgumentException("onNext can not be null");
    }
    return subscribe(new Subscriber<T>() {
        @Override
        public final void onCompleted() {
            // do nothing
        }
        @Override
        public final void onError(Throwable e) {
            throw new OnErrorNotImplementedException(e);
        }
        @Override
        public final void onNext(T args) {
            onNext.call(args);
        }
    });
}

subscribe方法把Action1包装成一个Subscriber对象。并且在onNext方法里调用action1的call方法。
类似的方法有:

    subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError)  
    subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容