https://blog.csdn.net/u013318615/article/details/82390992
前言
既然RxJava是基于观察者模式,那么就需要我们操作观察者(Observer)和被观察者(Observable),那么怎么创建Observable便是第一步。
创建Observable操作符
just():创建一个直接发射数据的Observable
from():从一个数组或列表中转换成Observable
create():创建一个Observable
defer():当订阅者订阅时才创建Observable,为每一个订阅创建一个新的Observable
range():创建一个指定范围的序列Observable
interval():创建一个按照规定时间间隔发射数据的Observable
timer():延时发射数据的Observable
empty():直接完成的Observable
error():直接发射错误的Observable
never():不发射数据的Observable
just()
// 发射数据
Observable.just("abc")
.subscribe({
JLog.d("onNext:it")
}, {
JLog.d("onComplete")
}, {
JLog.d("onSubscribe")
})
1
2
3
4
5
6
7
8
9
10
11
打印:
onSubscribe
onNext:abc
onComplete
1
2
3
当然,你可以使用Just发射一组数据:
Observable.just("1", "2", "3", "4", "5")
.subscribe({
JLog.d("onNext:it")
}, {
JLog.d("onComplete")
}, {
JLog.d("onSubscribe")
})
1
2
3
4
5
6
7
8
9
10
打印:
onSubscribe
onNext:1
onNext:2
onNext:3
onNext:4
onNext:5
onComplete
1
2
3
4
5
6
7
From()
当你使用Just发射一组数据时,你会发现它和From很相似:
Observable.fromArray("a", "b", "c")
.subscribe({
JLog.d(it)
})
val items = arrayListOf("1", "2", "3")
Observable.fromIterable(items)
.subscribe({
JLog.d(it)
})
1
2
3
4
5
6
7
8
9
打印:
a
b
c
1
2
3
1
2
3
4
5
6
create()
Observable.create(ObservableOnSubscribe<Int> {
try
{
if (!it.isDisposed)
{
for (i in 0..5)
{
it.onNext(i)
}
it.onComplete()
}
} catch (e: Exception)
{
it.onError(e)
}
}).subscribe({
JLog.d("onNext->it")
}, {
JLog.d("onComplete")
}, {
JLog.d("onSubscribe")
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
打印:
onSubscribe
onNext->0
onNext->1
onNext->2
onNext->3
onNext->4
onNext->5
onComplete
1
2
3
4
5
6
7
8
Defer()
defer操作符,直到有订阅者订阅它的时候,它才会创建一个Observable,以确保获取的数据是最新的
var a = 1
val defer = Observable.defer {
Observable.just(a)
}
val just = Observable.just(a)
a++
just.subscribe({ JLog.d("just:" + it.toString()) })
defer.subscribe({
JLog.d("defer:" + it.toString())
})
1
2
3
4
5
6
7
8
9
10
我在a=1的时候用defer和just分别创建了一个Observable,在a++之后才消费他们,首先看一下结果:
just:1
defer:2
1
2
可以看到,just在创建的时候发射数据已经确定,而defer在有订阅的时候才会创建Observable,保证了a是最新值
Timer()
延时操作符,默认在computation调度器上执行,会在延时一段时间后发射数据
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(object : Observer<Long>
{
override fun onComplete()
{
}
override fun onSubscribe(d: Disposable)
{
JLog.d("onSubscribe")
}
override fun onNext(t: Long)
{
JLog.d("onNext")
JLog.d("onNext发射结果:$t")
}
override fun onError(e: Throwable)
{
JLog.e(e.toString())
}
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
打印结果:
10-15 17:40:22.734 19180-19180/com.jzd.jutils.app D/JLog: onSubscribe
10-15 17:40:24.735 19180-19627/com.jzd.jutils.app D/JLog: onNext
onNext发射结果:0
1
2
3
timer在延时2s后反射了一个0
Interval()
间隔操作符,默认在computation调度器上执行,会在固定时间间隔后发射数据
Observable.interval(1, TimeUnit.SECONDS)
.subscribe({ JLog.d(it.toString()) })
1
2
打印结果
10-15 17:44:15.022 21811-21902/com.jzd.jutils.app D/JLog: 0
10-15 17:44:16.021 21811-21902/com.jzd.jutils.app D/JLog: 1
10-15 17:44:17.021 21811-21902/com.jzd.jutils.app D/JLog: 2
10-15 17:44:18.021 21811-21902/com.jzd.jutils.app D/JLog: 3
10-15 17:44:19.021 21811-21902/com.jzd.jutils.app D/JLog: 4
10-15 17:44:20.021 21811-21902/com.jzd.jutils.app D/JLog: 5
1
2
3
4
5
6
interval也有3个参数的创建方法interval(long initialDelay, long period, TimeUnit unit)先指定延时时间,然后再轮询发送。。。不过,如果你第一次用interval,你会发现这样写,interval创建的发射器根本停不下来啊,有木有~坑爹啊。所以正确的操作方式应该是这样的:
Observable.intervalRange(10, 5, 1, 1, TimeUnit.SECONDS)
.subscribe({ JLog.d(it.toString()) })
1
2
对应的方法就是intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
Repeat
Repeat操作符可以配合创建操作符使用,在创建时配置重复策略
使用repeat重复发射一组数据
Observable.just("1", "2", "3")
.repeat(2)
.subscribe({ JLog.d(it) })
1
2
3
打印
1
2
3
1
2
3
1
2
3
4
5
6
使用repeatWhen
Observable.just("1")
.repeatWhen { Observable.timer(3, TimeUnit.SECONDS) }.subscribe({ JLog.d(it) })
JLog.d("---------------------------")
1
2
3
打印
10-15 19:49:01.646 5814-5814/com.jzd.jutils.app D/JLog: 1
---------------------------
10-15 19:49:04.646 5814-6133/com.jzd.jutils.app D/JLog: 1
1
2
3
可以看到3秒后重复发送了一次数据
使用repeatUntil
var count = 0
Observable.just("测试")
.repeatUntil { count >= 5 }
.subscribe({
JLog.d("第it")
})
1
2
3
4
5
6
打印结果:
第0次:测试
第1次:测试
第2次:测试
第3次:测试
第4次:测试
1
2
3
4
5
可以看到,repeatUntil(BooleanSupplier stop)中,当getAsBoolean返回false时会重复发射数据,当返回true时,会终止发射数据。
常用的创建操作符基本介绍完了,使用这些操作符,我们就可以愉快的创建各式各样的Observable了。
————————————————
版权声明:本文为CSDN博主「Jzd_dev」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/u013318615/article/details/82390992