本篇,我们来实现ReactiveX中的操作符,即数据传递者Deliver。这些操作符一般包括,过滤、组合、数学运算、转换等几个大类。
Deliver既是Observable又是Observer,它接受一个或者多个Observable作为上一级的数据源,又可被订阅一次或者多次。
实现要点
- 传递数据、complete事件、error事件
- 订阅和退订上级数据源
- 可被下一级观察者订阅和退订
订阅上级数据源
现在假设我们有一个Observable就是前一篇文章中的FromArray
array:=int[]{1,2,3}
observable:=FromArray(array)
这个observable会依次发出1,2,3三个整数,然后complete。作为Deliver,我们可能需要去订阅这个observable,那么如何订阅呢?
根据上篇所述,订阅行为就是传入一个Next和一个Stop。
next:=make(Next)
stop:=make(Stop)
observable(next,stop)
我们完成了订阅,但我们还需要对订阅后采集来自observable的数据。
next:=make(Next)
stop:=make(Stop)
go observable(next,stop)
for d:= range next{
//处理数据
}
由于observable里面的逻辑会阻塞当前‘线程’,所以我们加了关键字go。
退订上级数据源
close(stop)
随时都可以调用这个方法进行退订。
传递数据
真实的Deliver是这样定义的
Deliver func(source Observable) Observable
它是一个函数,接受一个Observable作为参数,返回一个Observable。
展开出来就是这样的
func deliver(source Observable) Observable {
return func(next Next, stop Stop) {
//deliver被订阅的时候就会执行这里面的逻辑
}
}
我们可以在被订阅的时候,去订阅source,然后获取数据后传递给next管道
func deliver(source Observable) Observable {
return func(next Next, stop Stop) {
dnext:= make(Next)
go source(dnext,stop)
for d:= range next{
next<-d
}
close(next)
}
}
这样我就做好了一个什么也没用的数据传递者了。下面我们来实现一个有一点作用的filter
Filter的实现
func Filter(f func(interface{}) bool) Deliver {
return func(source Observable) Observable {
return func(next Next, stop Stop) {
sNext := make(Next)
go source(sNext, stop)
for {
select {
case d, ok := <-sNext:
if !ok {
close(next)
return
}
if _, ok = d.(error); ok {
next <- d
close(next)
return
} else if f(d) {
next <- d
}
case <-stop:
return
}
}
}
}
}
这里,我们用for select代替了for range,这样方便我们的接收到stop被close的时候发来的信息。我们判断了source是否complete,如果complete我们就close(next)——向下级发送complete信号。然后我们判断了数据是否是error类型,然后执行了filter函数来过滤数据。
其他的Deliver都是沿用Filter这套模板来实现的。这是个死循环结构,所以订阅deliver也需要用go关键字,这个和Observable是一脉相承的。
最后我们再看一个startwith操作符,也是一个十分常用的功能,用于在source前面加塞数据。如果有更好的表达方式,欢迎留言。
func StartWith(xs ...interface{}) Deliver {
return func(source Observable) Observable {
return func(next Next, s Stop) {
stopped := false
go func() {
<-s
stopped = true
}()
for d := range xs {
if stopped {
return
}
next <- d
}
if !stopped {
source(next, s)
}
}
}
}