RxJava的另一个好处在于,我们可以清楚地看到数据是如何在一系列操作符之间进行转换的。
ReactiveX/RxJava文档中文版
可观察对象(Observables)
观察者(observers)
桥梁或者代理(Subject)
AsyncSubject
BehaviorSubject
PublishSubject
ReplaySubject
串行化
如果你把
Subject当作一个Subscriber使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。
要避免此类问题,你可以将 Subject 转换为一个 SerializedSubject ,类似于这样:
mySafeSubject = new SerializedSubject( myUnsafeSubject );
调度器 (Scheduler)
RxJava示例
调度器的种类
下表展示了RxJava中可用的调度器种类:
| 调度器类型 | 效果 |
|---|---|
| Schedulers.computation( ) | 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量 |
| Schedulers.from(executor) | 使用指定的Executor作为调度器 |
| Schedulers.immediate( ) | 在当前线程立即开始执行任务 |
| Schedulers.io( ) | 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用 Schedulers.computation(); |
| Schedulers.io( ) | 默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器 |
| Schedulers.newThread( ) | 为每个任务创建一个新线程 |
| Schedulers.trampoline( ) | 当其它排队的任务完成后,在当前线程排队开始执行 |
操作符 (Operators)
- [Creating 创建型]
Create,Defer,Empty/Never/Throw,From,Interval,Just,Range,Repeat,Start,Timer - [Transforming 变换型]
Buffer,FlatMap,GroupBy,Map,Scan,Window - [Filtering 过滤型]
Debounce,Distinct,ElementAt,Filter,First,IgnoreElements,Last,Sample,Skip,SkipLast,Take,TakeLast - [Combining 组合型]
And/Then/When,CombineLatest,Join,Merge,StartWith,Switch,Zip - [Error Handling 容错型]
Catch,Retry - [Utility 辅助操作 工具型]
Delay,Do,Materialize/Dematerialize,ObserveOn,Serialize,Subscribe,SubscribeOn,TimeInterval,Timeout,Timestamp,Using - [Conditional and Boolean 条件与布尔操作 条件型]
All/Amb/Contains,DefaultIfEmpty,SequenceEqual,SkipUntil/SkipWhile,TakeUntil/TakeWhile - [Mathematical and Aggregate 算术与聚合操作 聚合型]
Average/Concat/Reduce,Max/Min/Count/Sum - [Async 异步操作]
Start,ToAsync,StartFuture,FromAction,FromCallable,RunAsync - [Connect 连接操作]
Connect,Publish,RefCount,Replay - [Convert 转换型]
ToFuture,ToList,ToMap,ToIterable,toMultiMap - [Blocking 阻塞操作]
ForEach,First,Last,MostRecent,Next,Single,Latest - [String 字符串操作]
ByLine,Decode,Encode,From,Join,Split,StringConcat
Transformer
转换器
Observable/Flowable/Single/Completable/Maybe 对象转换成另一个Observable/Flowable/Single/Completable/Maybe 对象
- RxJava1.x 版本就有了Observable.Transformer、Single.Transformer和Completable.Transformer
- RxJava2.x版本中变成了ObservableTransformer、SingleTransformer、CompletableTransformer、FlowableTransformer和MaybeTransformer。其中,FlowableTransformer和MaybeTransformer是新增的。
compose
compose()是唯一一个能够从数据流中得到原始Observable<T>的操作符,所以,那些需要对整个数据流产生作用的操作(比如,subscribeOn()和observeOn())需要使用compose()来实现。
flatMap()中使用subscribeOn()或者observeOn(),那么它仅仅对在flatMap()中创建的Observable起作用,而不会对剩下的流产生影响
当创建Observable流的时候,compose()会立即执行,犹如已经提前写好了一个操作符一样,
而flatMap()则是在onNext()被调用后执行,onNext()的每一次调用都会触发flatMap(),
也就是说,flatMap()转换每一个事件,而compose()转换的是整个数据流。
因为每一次调用onNext()后,都不得不新建一个Observable,所以flatMap()的效率较低。事实上,compose()操作符只在主干数据流上执行操作。