RxJS Observable

前言

RxJS 的 Observable 有点难理解,其实 RxJS 相关的概念都有点难理解。毕竟 RxJS 引入了响应式编程这种新的模式,会不习惯是正常的。不过总得去理解嘛,而认识新的事物时,如果能够参照一个合适的已知事物比对着,会比较容易理解吧。对于 Observable,类比 JS 中的函数,还是比较好的。

开始

封装

先来看一个普通函数调用的例子:

function foo() {
  console.log('process...')
}

foo()
// 输出:
// process...

很简单,函数 foo() 封装了一段逻辑(这里只是向控制台输出),然后通过调用函数,函数执行内部的逻辑。

再来看 RxJS Observable 的一个例子:

var foo = Rx.Observable.create(() => {
  console.log('process...')
})

foo.subscribe()
// 输出:
// process...

上例中,通过 Rx.Observable.create() 来创建 Observable 对象,将同样将一段代码逻辑封装到 Observable 对象 foo 中,然后通过 foo.subscribe() 来执行封装的代码逻辑。

对于普通函数和 Observable 对象,封装的代码逻辑在每次调用时都会重新执行一次。从这一点来看,Observable 能够和普通函数一样实现封装代码进行复用。

返回值

函数调用后可以有返回值:

function foo() {
  console.log('process...')
  return 42
}

console.log(foo())
// 输出:
// process...
// 42

Observable 执行后也会产生值,不过和函数直接返回的方式不同,要通过回调函数方式获取:

var foo = Rx.Observable.create((observer) => {
  console.log('process...')
  observer.next(42)
})

foo.subscribe(value => console.log(value))
// 输出:
// process...
// 42

Observable 对象内部是通过 observer.next(42) 这种方式返回值,而调用方则通过回调函数来接收返回的数据。形式上比普通函数直接返回值啰嗦一些。

从调用方的角度来看,两个过程分别是:

  • 普通函数:调用 > 执行逻辑 > 返回数据
  • Observable:订阅(subscribe) > 执行逻辑 > 返回数据

从获取返回值方式来看,调用函数是一种直接获取数据的模式,从函数那里“拿”(pull)数据;而 Observable 订阅后,是要由 Observable 通过间接调用回调函数的方式,将数据“推”(push)给调用方。

这里 pull 和 push 的重要区别在于,push 模式下,Observable 可以决定什么时候返回值,以及返回几个值(即调用回调函数的次数)。

var foo = Rx.Observable.create((observer) => {
  console.log('process...')
  observer.next(1)
  setTimeout(() => observer.next(2), 1000)
})

console.log('before')
foo.subscribe(value => console.log(value))
console.log('after')
// 输出:
// before
// process...
// 1
// after
// 2

上面例子中,Observable 返回了两个值,第1个值同步返回,第2个值则是过了1秒后异步返回。

也就是说,从返回值来说,Observable 相比普通函数区别在于:

  • 可以返回多个值
  • 可以异步返回值

异常处理

函数执行可能出现异常情况,例如:

function foo() {
  console.log('process...')
  throw new Error('BUG!')
}

我们可以捕获到异常状态进行处理:

try {
   foo()
} catch(e) {
  console.log('error: ' + e)
}

对于 Observable,也有错误处理的机制:

var foo = Rx.Observable.create((observer) => {
  console.log('process...')
  observer.error(new Error('BUG!'))
})

foo.subscribe(
  value => console.log(value),
  e => console.log('error: ' + e)
)

Observable 的 subscribe() 方法支持传入额外的回调函数,用于处理异常情况。和函数执行类似,出现错误之后,Observable 就不再继续返回数据了。

subscribe() 方法还支持另一种形式传入回调函数:

foo.subscribe({
  next(value) { console.log(value) },
  error(e) { console.log('error: ' + e) }
})

而这种形式下,传入的对象和 Observable 内部执行函数中的 observer 参数在形式上就比较一致了。

中止执行

Observable 内部的逻辑可以异步多个返回值,甚至返回无数个值:

var foo = Rx.Observable.create((observer) => {
  let i = 0
  setInterval(() => observer.next(i++), 1000)
})

foo.subscribe(i => console.log(i))
// 输出:
// 0
// 1
// 2
// ...

上面例子中,Observable 对象每隔 1 秒会返回一个值给调用方。即使调用方不再需要数据,仍旧会继续通过回调函数向调用推送数据。

RxJS 提供了中止 Observable 执行的机制:

var foo = Rx.Observable.create((observer) => {
  console.log('start')
  let i = 0
  let timer = setInterval(() => observer.next(i++), 1000)
  return () => {
    clearInterval(timer)
    console.log('end')
  }
})

var subscription = foo.subscribe(i => console.log(i))
setTimeout(() => subscription.unsubscribe(), 2500)
// 输出:
// start
// 0
// 1
// 2
// end

subscribe() 方法返回一个订阅对象(subscription),该对象上的 unsubscribe() 方法用于取消订阅,也就是中止 Observable 内部逻辑的执行,停止返回新的数据。

对于具体的 Observable 对象是如何中止执行,则要由 Observable 在执行后返回一个用于中止执行的函数,像上面例子中的这种方式。

Observable 执行结束后,会触发观察者的 complete 回调,所以可以这样:

foo.subscribe({
  next(value) { console.log(value) },
  complete() { console.log('completed') }
})

Observable 的观察者共有上面三种回调:

  • next:获得数据
  • error:处理异常
  • complete:执行结束

其中 next 可以被多次调用,error 和 complete 最多只有一个被调用一次(任意一个被调用后不再触发其他回调)。

数据转换

对于函数返回值,有时候我们要转换后再使用,例如:

function foo() {
  return 1
}

console.log(f00() * 2)
// 输出:
// 2

对于 Observable 返回的值,也会有类似的情况,不过通常采用下面的方式:

var foo = Rx.Observable.create((observer) => {
  let i = 0
  setInterval(() => observer.next(i++), 1000)
})

foo.map(i => i * 2).subscribe(i => console.log(i))
// 输出:
// 0
// 2
// 4
// ...

其实 foo.map() 返回了新的 Observable 对象,上面代码等价于:

var foo2 = foo.map(i => i * 2)
foo2.subscribe(i => console.log(i))

Observable 对象 foo2 被订阅时执行的内部逻辑可以简单视为:

function subscribe(observer) {
  let mapFn = v => v * 2
  foo.subscribe(v => {
    observer.next(mapFn(v))
  })
}

将这种对数据的处理和数组进行比较看看:

var array = [0, 1, 2, 3, 4, 5]
array.map(i => i * 2).forEach(i => console.log(i))

是不是有点像?

除了 map() 方法,Observable 还提供了多种转换方法,如 filter() 用于过滤数据,find() 值返回第一个满足条件的数据,reduce() 对数据进行累积处理,在执行结束后返回最终的数据。这些方法和数组方法功能是类似的,只不过是对异步返回的数据进行处理。还有一些转换方法更加强大,例如可以 debounceTime() 可以在时间维度上对数据进行拦截等等。

Observable 的转换方法,本质不过是创建了一个新的 Observable,新的 Observable 基于一定的逻辑对原 Observable 的返回值进行转换处理,然后再推送给观察者。

总结

Observable 就是一个奇怪的函数,它有和函数类似的东西,例如封装了一段逻辑,每次调用时都会重新执行逻辑,执行有返回数据等;也有更特殊的特性,例如数据是推送(push)的方式返回给调用方法,返回值可以是异步,可以返回多个值等。

不过将 Observable 视作特殊函数,至少对于理解 Observable 上是比较有帮助的。

Observable 也被视为 data stream(数据流),这是从 Observable 可以返回多个值的角度来看的,而数据转换则是基于当前数据流创建新的数据流,例如:

[图片上传失败...(image-bdfa20-1528252834964)]

不过上图看到的只是数据,而将 Observable 视为特殊函数时,不应该忘了其内部逻辑,不然数据是怎么产生的呢。

作者:luobo_tang
链接:https://www.jianshu.com/p/b05aa5d152c8
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容