RxJava 创建操作符

ReactiveX 系列文章目录


just

内部触发对 Observer 的 onNext 方法的调用,just 中传递的参数将直接在 onNext 方法中接收到,参数的类型要和 Observer 的泛型保持一致。

共有 10 个重载方法,其中 2 个以上参数方法的内部直接调用了 fromArray。

private val observerStr = object : Observer<String> {
    override fun onNext(t: String) {
        textView.text = "${textView.text}\nonNext $t"
    }

    override fun onError(e: Throwable) {
        textView.text = "${textView.text}\nonError "
    }

    override fun onComplete() {
        textView.text = "${textView.text}\nonComplete "
        disposableStr?.dispose() // 解除订阅
    }

    override fun onSubscribe(d: Disposable) {
        disposableStr = d
    }
}

// 调用 observer 的 onNext("just")
Observable.just("just").subscribe(observerStr)

fromArray/fromIterable

1.x 的 from 方法没有了。遍历集合,每个元素调用一次观察者的 onNext,最后调用 onComplete。

var list = listOf("a", "b", "c")
// 依次调用
// onNext("a")
// onNext("b")
// onNext("c")
// onComplete
Observable.fromIterable(list).subscribe(observerStr)
Observable.fromArray("a","b","c").subscribe(observerStr)

fromCallable

public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)

当有观察者订阅时,从 Callable 的回调方法里获取要发射的数据。

val callable = Callable {
   Thread.sleep(1000)
   "callable"
}
Executors.newSingleThreadExecutor().submit(callable)
Log.e("RX", "start subscribe")
Observable.fromCallable(callable).subscribe({
   Log.e("RX", "accept: $it")
})

日志输出:

05-07 14:16:15.531 1131-1131/pot.ner347.androiddemo E/RX: start subscribe
05-07 14:16:16.544 1131-1131/pot.ner347.androiddemo E/RX: accept: callable

订阅时是 14:16:15.531,Callable 返回结果前先休眠了 1000ms,所以发射出再收到的时间在 1000ms 后。所以 14:16:16.544 时观察者收到了数据。

fromFuture

public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)

// 相当于 fromFuture(future, 0L, null)
public static <T> Observable<T> fromFuture(Future<? extends T> future)

// 相当于 fromFuture(future, timeout, unit).subscribeOn(scheduler)
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)

// 相当于 fromFuture(future, 0L, null, scheduler)
public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler)

共 4 个重载方法,本质差别不大,所以只看第一个即可。timeout 最终是传到了 Future 的 get 方法里。

val callable = Callable {
    Thread.sleep(1000)
    "callable"
}
val future = Executors.newSingleThreadExecutor().submit(callable)
Log.e("RX", "start subscribe")
Observable.fromFuture(future, 500L, TimeUnit.MILLISECONDS).subscribe(observerStr)

超时时间只有 500ms,所以进了 onError,当把超时改成 5000 时进了 onNext,收到了 "callable",然后 onComplete。

fromPublisher

从响应式数据流获取要发射的数据。如果可以,尽可能使用 ObservableOnSubscribe。不支持背压。

public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

虽然 Publisher 看起来像个接口,但不建议通过无状态的 Lambda 实现它。注释不太看得明白。应该和其它配合,从其它地方的数据流里返回,单独用没什么意义。

看 Flowable 实现了 Publisher,写了个例子,现在不太清楚。

val flowable = Flowable.create(FlowableOnSubscribe<String> { emitter ->
    emitter.onNext("abc")
    emitter.onNext("def")
}, BackpressureStrategy.ERROR)
Observable.fromPublisher(flowable).subscribe(object : Observer<String> {
    override fun onSubscribe(d: Disposable) {
        Log.e("RX", "onSubscribe")
    }

    override fun onNext(str: String) {
        Log.e("RX", "onNext $str")
    }

    override fun onError(e: Throwable) {
        Log.e("RX", "onError")
    }

    override fun onComplete() {
        Log.e("RX", "onComplete")
    }
})

defer

没有立刻创建被观察者,只有当观察者订阅时才创建,并且针对每个观察者创建都是一个新的 Observable。在回调里决定如何创建这个 Observable。不订阅就不创建。

Observable.defer {
    // 订阅后才创建这个 Observable,使用了 just,就又调了 Observer 的 onNext
    Observable.just("hello")
}.subscribe(observerStr)

empty

不需要发射数据,但又需要告知观察者事件结束,即需要调 onComplete。

private val observerAny = object: Observer<Any> {
  override fun onSubscribe(d: Disposable) { }

  override fun onNext(t: Any) {
      Log.e("RX", "onNext")
  }

  override fun onError(e: Throwable) {
      Log.e("RX", "onError:" + e.message)
  }

  override fun onComplete() {
      Log.e("RX", "onComplete")
  }
}
      
// 输出 onComplete
Observable.empty<Any>().subscribe(observerAny)

never

相比 empty,不仅不发 onNext,也不会发 onComplete 或 onError,什么都不发射而且也不终止。

Observable.never<String>().subscribe(object : Observer<String> {
   override fun onComplete() { Log.e("RX", "onComplete") }
   override fun onSubscribe(d: Disposable) { Log.e("RX", "onSubscribe") }
   override fun onNext(t: String) { Log.e("RX", "onNext") }
   override fun onError(e: Throwable) {  Log.e("RX", "onError") }
})

看日志只回调了 onSubscribe,主要用于测试。

error

直接发射 onError。两个重载方法

// 通过 errorSupplier 提供一个 Throwable 的子类
public static <T> Observable<T> error(Callable<? extends Throwable> errorSupplier)

// 参数直接就是一个 Throwable
public static <T> Observable<T> error(final Throwable exception)
// 输出 onError:abc
Observable.error<Any> { Throwable("abc") }.subscribe(observerAny)

// 输出 onError:edf
Observable.error<Any>(Throwable("edf")).subscribe(observerAny)

generate

public static <T, S> Observable<T> generate(
          final Callable<S> initialState,
          final BiConsumer<S, Emitter<T>> generator,
          Consumer<? super S> disposeState)
// 相当于上一个方法的第三个参数的 accept 方法是空实现
public static <T, S> Observable<T> generate(Callable<S> initialState, final BiConsumer<S, Emitter<T>> generator)

// 第二个参数是 BiFunction,最后一个泛型是 apply 方法返回值的类型,
// 表示新的 state,下面这个两个方法的 state 是可变的,而上面两个方法的 state 在第一个参数 initialState 指定后就不变了
public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator)
public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator,
          Consumer<? super S> disposeState)

// 无状态的,相当于 initialState 的回调提供了一个 null
public static <T> Observable<T> generate(final Consumer<Emitter<T>> generator)

create 每次可以发射多个事件,而 generate 是每次只能发送一个事件,连续调用多次 onNext 会抛出 IllegalStateException 异常。


无状态:

是一个无限循环,只要条件满足就一直发射事件。下面的代码将不停的收到数据。

Observable.generate(Consumer<Emitter<String>>{
   it.onNext("abc")
}).subscribe({ Log.e("RX", "收到: $it") })

一次连续发射 onNext 和 onComplete/onError 是可以的。

Observable.generate(Consumer<Emitter<String>>{
   it.onNext("abc")
   it.onComplete()
}).subscribe({ Log.e("RX", "收到: $it") })

如果连续调用多次 onNext 就崩溃了。

Observable.generate(Consumer<Emitter<String>>{
   // 崩溃
   it.onNext("abc")
   it.onNext("ab")
}).subscribe({ Log.e("RX", "收到: $it") })

不可变状态:

Observable.generate(Callable<Int> { 0 }, BiConsumer<Int, Emitter<String>> { t1, t2 ->
   Log.e("RX", "当前状态 $t1")
   t2.onNext("$t1")
}) .subscribe({
   Log.e("RX", "收到: $it")
})

状态不变,一直是 0,不停的发射不停的接收。


可变状态:

Observable.generate(Callable<Int> { 0 }, BiFunction<Int, Emitter<String>, Int> { currentState, t2 ->
    if (currentState < 5) {
        Log.e("RX", "onNext $currentState")
        t2.onNext("emitter $currentState")
    } else {
        Log.e("RX", "onComplete $currentState")
        t2.onComplete()
    }
    currentState + 1 // 返回一个新的状态
}, Consumer<Int> {
    Log.e("RX", "dispose 时的 state $it")
})

日志:

05-10 15:41:06.922 20685-20685/pot.ner347.androiddemo E/RX: onNext 0
05-10 15:41:06.923 20685-20685/pot.ner347.androiddemo E/RX: 收到: 0
    onNext 1
    收到: 1
    onNext 2
    收到: 2
    onNext 3
    收到: 3
    onNext 4
05-10 15:41:06.924 20685-20685/pot.ner347.androiddemo E/RX: 收到: 4
    onComplete 5
    dispose 时的 state 6

关于在 Flowable 里的应用看这篇文章 RxJava: Generating Backpressure-Aware Streams,和背压有关,没有彻底弄明白。

Flowable.generate(Callable<Int> { 0 }, BiFunction<Int, Emitter<String>, Int> { t1, t2 ->
    if (t1 < 130) {
        Log.e("RX", "发送 $t1")
        t2.onNext("$t1")
    } else {
        t2.onComplete()
    }
    ++state
}).subscribeOn(Schedulers.io()).observeOn(Schedulers.computation())
        .doAfterNext({
            Thread.sleep(10)
            Log.e("RX", "观察者 $it 休眠 10ms")
        })
       .subscribe(object : Subscriber<String> {
           override fun onSubscribe(s: Subscription?) {
               s?.request(130)
           }

           override fun onNext(t: String?) {}

           override fun onError(t: Throwable?) {}

           override fun onComplete() {}
       })

测试发现,假设要求 130 个数据,被观察者总共发 140 个事件,在发完 128 个后,观察者消费事件,当观察者接收了一些数据(有一次是已经收到 95 了)后,被观察者继续发送第 129-140 个事件,然后观察者接收到第 130 个。

interval

每隔固定时长调用 onNext 发送一个 long 值,从 0 开始。默认在计算线程发射。

public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
// 相当于 interval(initialDelay, period, unit, Schedulers.computation())
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)

// 相当于 interval(period, period, unit, scheduler)
public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler)
// 相当于 interval(period, unit, Schedulers.computation())
public static Observable<Long> interval(long period, TimeUnit unit)
private val observerLong = object : Observer<Long> {
    override fun onNext(t: Long) {
        Log.e("RX", "onNext thread: ${Thread.currentThread().name}, t:$t")
    }

    override fun onError(e: Throwable) {
        textView.text = "${textView.text}\nonError "
    }

    override fun onComplete() {
        Log.e("RX", "complete thread: ${Thread.currentThread().name}")
        disposableLong?.dispose() // 解除订阅
    }

    override fun onSubscribe(d: Disposable) {
        disposableLong = d
    }
}

Observable.interval(2, 2, TimeUnit.SECONDS).subscribe(observerLong)

日志结果:

05-03 17:59:52.649 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:0
05-03 17:59:54.647 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:1
05-03 17:59:56.648 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:2
05-03 17:59:58.647 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:3
05-03 18:00:00.645 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:4
05-03 18:00:02.647 25191-26740/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:5
...

intervalRange

public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

// 在 Schedulers.computation() 线程
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)

看日志,从 1 开始,发了 5 个数据。从 click 开始 500ms 开始发射,然后大约 1000ms 发一次。

05-10 16:21:23.231 26568-26568/pot.ner347.androiddemo E/RX: click
05-10 16:21:23.760 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:1
05-10 16:21:24.759 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:2
05-10 16:21:25.760 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:3
05-10 16:21:26.760 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:4
05-10 16:21:27.760 26568-26867/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:5
05-10 16:21:27.761 26568-26867/pot.ner347.androiddemo E/RX: complete thread: RxComputationThreadPool-1

range/rangeLong

发射一个整数序列。区别一个是 int,一个是 long。

private val observerInt = object : Observer<Int> {
    override fun onNext(t: Int) {
        textView.text = "${textView.text}\nonNext $t"
    }

    override fun onError(e: Throwable) {
        textView.text = "${textView.text}\nonError "
    }

    override fun onComplete() {
        textView.text = "${textView.text}\nonComplete "
        disposableInt?.dispose() // 解除订阅
    }

    override fun onSubscribe(d: Disposable) {
        disposableInt = d
    }
}

// 第一个参数是起始值,第二个参数是发射的个数,将依次调用
// onNext(10)
// onNext(11)
// onNext(12)
// onNext(13)
// onNext(14)
// onComplete
Observable.range(10, 5).subscribe(observerInt)

timer

在一个给定的延迟后发射一个特殊的值。默认在 Schedulers.computation() 线程。

Observable.timer(2, TimeUnit.SECONDS).subscribe(observerLong)

onNext 中收到的是 0:

05-03 17:51:01.143 22986-24694/pot.ner347.androiddemo E/RX: onNext thread: RxComputationThreadPool-1, t:0
05-03 17:51:01.145 22986-24694/pot.ner347.androiddemo E/RX: complete thread: RxComputationThreadPool-1

unsafeCreate

create 的参数是 ObservableOnSubscribe,发射器是 ObservableEmitter。

unsafeCreate 的参数是 ObservableSource,参数是 Observer。相当于 1.x 时代的 create,没什么用。

public static <T> Observable<T> unsafeCreate(ObservableSource<T> onSubscribe) {
    ObjectHelper.requireNonNull(onSubscribe, "source is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
    if (onSubscribe instanceof Observable) {
        throw new IllegalArgumentException("unsafeCreate(Observable) should be upgraded");
    }
    return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(onSubscribe));
}

wrap

public static <T> Observable<T> wrap(ObservableSource<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    if (source instanceof Observable) {
        return RxJavaPlugins.onAssembly((Observable<T>)source);
    }
    return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source));
}

和 unsafe 的区别,主要在 source instanceof Observable,一个抛出异常,一个返回,返回时做了个类型转换,不知道怎么用。

compose

对一个 Observable 做一系列的链式调用。

val transformer = ObservableTransformer<Int, String> {
    upstream -> upstream.map { it.toString() }.take(2)
}
     Observable.just(1,2,3,4).compose(transformer).subscribe(observerStr)

创建了一个 ObservableTransformer 对象,它里面封装了一些调用,以后可以统一复用。

repeat

重复调用。

// 调用
// onNext("repeat just")
// onNext("repeat just")
// onNext("repeat just")
// onComplete
Observable.just("repeat just").repeat(3).subscribe(observerStr)
// 无限重复
Observable.just("repeat just").repeat().subscribe{Log.e("RX", "$it")}

repeatUntil

无限循环,直到参数返回的布尔值为 true,表明满足停止条件。

// i 为 11 时停止
var i = 0
Observable.just("repeat just").repeatUntil {
   Log.e("RX", "i:$i")
   i++ > 10
}.subscribe{Log.e("RX", "$it")}

repeatWhen

repeatWhen 的参数接收原始 Observable 的 complete 和 error 通知,且决定是否要重新订阅和发射原来的 Observable。

Observable.just("repeat").repeatWhen {
    Observable.interval(1000, TimeUnit.MILLISECONDS).take(5)
}.subscribe{Log.e("RX", "$it")}

重复了 6 次,1 秒重复一次。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,548评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,069评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,985评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,305评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,324评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,030评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,639评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,552评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,081评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,194评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,327评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,004评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,688评论 3 332
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,188评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,307评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,667评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,337评论 2 358

推荐阅读更多精彩内容

  • 本篇文章介主要绍RxJava中操作符是以函数作为基本单位,与响应式编程作为结合使用的,对什么是操作、操作符都有哪些...
    嘎啦果安卓兽阅读 2,864评论 0 10
  • 创建操作 用于创建Observable的操作符Create通过调用观察者的方法从头创建一个ObservableEm...
    rkua阅读 1,831评论 0 1
  • 作者: maplejaw本篇只解析标准包中的操作符。对于扩展包,由于使用率较低,如有需求,请读者自行查阅文档。 创...
    maplejaw_阅读 45,688评论 8 93
  • 前言:上篇文章我们讲解了RxJava最基本的基础知识原理,这篇呢我打算讲解下怎么来创建一个observable被观...
    六_六阅读 239评论 0 0
  • 计划经济和市场经济从来都是分配资源的两种模式,在信息完备的情况下计划经济的重要性的日益显现。随着大数据云计算人工智...
    揣在右边口袋的温柔阅读 333评论 0 1