Rxjava_map源代码分析

ObservableA:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
}
});

ObservableB:
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "This is " + integer;
}
})

Subscriber_One:
new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted!");
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onNext(String s) {
System.out.println(s);
}
}

ObservableB.subscribe(Subscriber_One)

分析:
ObservableB:
call
this.source = source; // ObservableA
this.transformer = transformer;
call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
//MapSubscriber的构造函数
//this.actual = actual; Subscriber_One 对象
// this.mapper = mapper; 就是真是的map方法
o.add(parent);
source.unsafeSubscribe(parent);// 调用ObservableA的call
}

ObservableA:
call
call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
}
看MapSubscriber的onNext方法:
onNext(T t) {
R result;
try {
result = mapper.call(t); //调用 mapper的call: "This is " + integer
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result); //actual Subscriber_One
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 怎么如此平静, 感觉像是走错了片场.为什么呢, 因为上下游工作在同一个线程呀骚年们! 这个时候上游每次调用emit...
    Young1657阅读 5,468评论 2 1
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 10,916评论 7 62
  • 转一篇文章 原地址:http://gank.io/post/560e15be2dca930e00da1083 前言...
    jack_hong阅读 4,461评论 0 2
  • 1.移除现有的Ruby默认源 1).gem sources --remove https://rubygems....
    9e5f2143c765阅读 3,064评论 0 0
  • 2010-02-10 宵夜只两桌客人,弹完《茉莉花》,有客人隔空喊话。 我抬头。他用广东话说:“笑傲江湖。” 我点...
    万马堂中花飞扬阅读 1,514评论 0 3