Rxjava

参考 http://gank.io/post/560e15be2dca930e00da1083

Rxjava的基本用法:

基础使用

    Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> o) {
              subscriber.onNext("hello,Rx");
              subscriber.onCompleted();
        }
    });
    Observer observer = new Observer() {
        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onNext(Object o) {
        }
    };
    observable.subscribe(observer);

Observable observable = Observable.create(new onSubscribe);

public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
}
public interface Action1<T> extends Action {
    void call(T t);
}
public interface Action extends Function {
}

public class Observable<T> {
    final OnSubscribe<T> onSubscribe;
    private static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

    public final static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(hook.onCreate(f));
    }
    protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
    }
}
public abstract class RxJavaObservableExecutionHook {
     public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
     return f;
}

Observable类里存在成员变量onSubscribe。
Observable.create方法的参数是OnSubscribe的实现类。方法里调用了构造方法Observable(OnSubscribe)。参数是hook.oncreate(OnSubscribe)的返回值,追入hook类,hook.onCreate(OnSubscribe)方法返回传入的onSubscribe,所以构造方法的参数就是我们传入的onSubscribe对象。
所以,create方法的作用是创建一个Observable,并将传入的onSubscribe赋值给自己的成员变量。

Observer observer = new Observer() ;

public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}

创建一个Observer接口的实现类。

observable.subscribe(observer);

public final Subscription subscribe(final Observer<? super T> observer) {
    if (observer instanceof Subscriber) {
        return subscribe((Subscriber<? super T>)observer);
    }
    return subscribe(new Subscriber<T>() {
        @Override
        public void onCompleted() {
            observer.onCompleted();
        }
        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }
        @Override
        public void onNext(T t) {
            observer.onNext(t);
        }
    });
}

Observable类的subscribe方法是将传入的Observer对象封装成Subscriber对象。

public abstract class Subscriber<T> implements Observer<T>, Subscription {

private static final Long NOT_SET = Long.MIN_VALUE;
private final SubscriptionList subscriptions;
private final Subscriber<?> subscriber;
private Producer producer;
private long requested = NOT_SET; // default to not set

protected Subscriber() {
    this(null, false);
}

//...其他构造方法和成员方法

public final void add(Subscription s) {
    subscriptions.add(s);
}
@Override
public final void unsubscribe() {
    subscriptions.unsubscribe();
}
@Override
public final boolean isUnsubscribed() {
    return subscriptions.isUnsubscribed();
}
public void onStart() {
    // 空实现
} 
}
public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}

subscriber实现了Observer和Subscription的抽象类,Subcriptions暂且用不到,我也不知道是干啥的。

Observable的subscribe方法将传入的Observer对象封装成Subscriber对象之后,调用subscribe(Subscriber)方法,且传入的第二个参数是Observable本身。

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

调用重载方法subscribe(Subscriber, Observable)

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    if (subscriber == null) {
        throw new IllegalArgumentException("observer can not be null");
    }
    if (observable.onSubscribe == null) {
        throw new IllegalStateException("onSubscribe function can not be null.");
    }
    subscriber.onStart();   
    if (!(subscriber instanceof SafeSubscriber)) { //包装成安全的subscriber
        subscriber = new SafeSubscriber<T>(subscriber);
    }
    try {
        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {       
        Exceptions.throwIfFatal(e);
        try {
            subscriber.onError(hook.onSubscribeError(e));
        } catch (Throwable e2) {
            Exceptions.throwIfFatal(e2);   
            RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
            hook.onSubscribeError(r);
            throw r;
        }
        return Subscriptions.unsubscribed();
    }
}

方法里首先判空,然后调用了subscriber的onstart方法,再将subscriber对象包装成安全的subscriber,调用hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);

public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
    // pass-thru by default
    return onSubscribe;
}

hook.onSubscribeStart返回observable.onSubcribe对象。调用call()方法并将subscriber作为参数传入。call方法是在第一步创建Observable对象时实现的。hook.onSubscribeReturn(subscriber)返回subscriber对象。

Just && From

 Observable.just("hello").subscribe(observer);
 Observable.from(new String[]{"hello","Rx"}).subscribe(observer);

just

public final static <T> Observable<T> just(final T value) {
    return ScalarSynchronousObservable.create(value);
}
 public static final <T> ScalarSynchronousObservable<T> create(T t) {
    return new ScalarSynchronousObservable<T>(t);
}

protected ScalarSynchronousObservable(final T t) {
    super(new OnSubscribe<T>() {
        @Override
        public void call(Subscriber<? super T> s) { 
             s.onNext(t);
            s.onCompleted();
        }
    });
    this.t = t;
}

from

public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
    return create(new OnSubscribeFromIterable<T>(iterable));
}
public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {

final Iterable<? extends T> is;

public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
    if (iterable == null) {
        throw new NullPointerException("iterable must not be null");
    }
    this.is = iterable;
}
@Override
public void call(final Subscriber<? super T> o) {
    final Iterator<? extends T> it = is.iterator();
    if (!it.hasNext() && !o.isUnsubscribed())
        o.onCompleted();
    else 
        // 调用subscriber的setProducer方法,内部是迭代Iterator获取参数并调用onNext方法
        o.setProducer(new IterableProducer<T>(o, it)); 
}
}
public void setProducer(Producer p) {
    long toRequest;
    boolean passToSubscriber = false;
    synchronized (this) {
        toRequest = requested;
        producer = p;
        if (subscriber != null) { //假
            if (toRequest == NOT_SET) {
                // we pass-thru to the next producer as nothing has been requested
                passToSubscriber = true;
            }
        }
    }
    if (passToSubscriber) { //假
        subscriber.setProducer(producer);
    } else {
        if (toRequest == NOT_SET) {
            producer.request(Long.MAX_VALUE); //走这里 调用IterableProducer的request方法
        } else {
            producer.request(toRequest);
        }
    }
}
private static final class IterableProducer<T> extends AtomicLong implements Producer {   
public void request(long n) {
        if (get() == Long.MAX_VALUE) { // get是AtomicLong的方法。此处是0。
            return;
        }
        // 比较对象的Value是否等于0,如果是就为Value赋值Long.MAX_VALUE并返回true。此处为true
        // 调用fastpath();
        if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) { 
            fastpath();
        } else 
        if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) {
            slowpath(n);
        }
    }
}
 // 迭代调用onNext方法
void fastpath() {
        final Subscriber<? super T> o = this.o;
        final Iterator<? extends T> it = this.it;

        while (true) {
            if (o.isUnsubscribed()) {
                return;
            } else if (it.hasNext()) {
                o.onNext(it.next());
            } else if (!o.isUnsubscribed()) {
                o.onCompleted();
                return;
            } else {
                // is unsubscribed
                return;
            }
        }
    }

ActionX

  Action1<String> action1 = new Action1<String>() {
        @Override
        public void call(String str) {
            Log.d(tag,str);
        }
    };
 Observable.just("hello").subscribe(action1);

以上是简单用法。ActionX的X代表call方法里参数的个数。

public final Subscription subscribe(final Action1<? super T> onNext) {
    if (onNext == null) {
        throw new IllegalArgumentException("onNext can not be null");
    }
    return subscribe(new Subscriber<T>() {
        @Override
        public final void onCompleted() {
            // do nothing
        }
        @Override
        public final void onError(Throwable e) {
            throw new OnErrorNotImplementedException(e);
        }
        @Override
        public final void onNext(T args) {
            onNext.call(args);
        }
    });
}

subscribe方法把Action1包装成一个Subscriber对象。并且在onNext方法里调用action1的call方法。
类似的方法有:

    subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError)  
    subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容