[译] 关于 RxJS 中的 Subject

原文链接: https://medium.com/@benlesh/on-the-subject-of-subjects-in-rxjs-2b08b7198b93
本文为 RxJS 中文社区 翻译文章,如需转载,请注明出处,谢谢合作!
如果你也想和我们一起,翻译更多优质的 RxJS 文章以奉献给大家,请点击【这里】

rxjs-subject-header-post.png

RxJS 中的 Subjects 经常被误解。因为它们允许你命令式地向 Observable 流中推送值,当人们不太清楚如何将某个东西变成 Observable 时,他们倾向于滥用 Subjects 。此模式看起来有点像这样...

// 当人们发现不太清楚如何做时,通常会首先使用 Subjects
// (不要这样做)
const subject = new Subject();
button.addEventListener('click', () => subject.next('click');
subject.subscribe(x => console.log(x));

虽然这对于 RxJS 新手 (对于这个阶段来说太正常不过了) 很有帮助,但这不是以 “Rx 的方式”在处理问题。理想的是在 Observable 中包装事件注册,既可以监听事件,又可以取消事件监听。看起来像这样:

// 这样好些了,但请使用 Observable.fromEvent(button, 'click') 来替代
const clicks = new Observable(observer => {
  const handler = (e) => observer.next(e);
  button.addEventListener('click', handler);
  return () => button.removeEventListener('click', handler);
});

为什么展示这个跟 Subjects 没半点关系的示例?好吧,一点是它展示了为什么不总是需要使用 Subject,另外一点这有个隐藏的 subject... (某种程度上可以说是 subject )。这里要注意的一点是,Observable 是通过 addEventListener 来包装按钮的处理函数的注册,而 addEventListener 本身就是一个 subject 。…至少根据 “Gang Of Four” 的观察者模式来说是这样的。

观察者模式

你可能知道,RxJS 主要是关于 Observables 和 Observers 的,但它也是 Subjects 相关的。然而在 Gof 的 设计模式中是找不到 observables 的,Subjects 和 Observers 是 观察者模式中的根本。

observer-pattern.png

模式本身很简单。Observers 是具有通知方法的类,Subject 也是类,它具有向内部观察者列表添加或删除观察者的方法和通知观察者列表的方法。

RxJS 中的 Subjects 并没有太差区别。当使用 observer 对 Rx Subject 调用 subscribe 时,Subject 会将该 observer 添加到内部的观察者列表中。同样的,如果使用一到三个函数来调用 subscribe,Subject 会将它们包装成一个 observer,然后添加到观察者列表中。当调用 Subject 的 next(value) 时,它会遍历观察者列表并将 value 传递给 next 方法。对于 errorcomplete 也是同样的。要想从 subject 的观察者列表中移除 observer,只需简单调用 subscription 的 unsubscribe 方法即可,subscription 是将 observer 添加到观察者列表中时返回的。

const subject = new Subject();
// 将 observer1 添加到观察者列表
const sub1 = subject.subscribe(observer1);
// 将 observer2 添加到观察者列表
const sub2 = subject.subscribe(observer2);
// 使用 "hi there" 来通知列表中的所有观察者
subject.next('hi there');
// 将 observer1 从观察者列表中移除
sub1.unsubscribe();

Subject 相比较于 Observable

实际上,RxJS 中的 Subjects 不同于 GoF 观察者模式中的 Subjects,但它们的 API 是 Observable 的鸭子类型。实际上,在 RxJS 中 Subjects 更甚,它们继承自 Observable 。优点是所有 Subjects 都具有与 Observable 相同的操作符和方法。

大概 Subject 和 Observable 之间一个很重要的区别就是 Subject 是有状态的,它维护观察者列表。另一方面,Observable 真的只是一个函数,它建立了观察本身。

虽然 Subjects 是 Observables,但 Subjects 还实现了 Observer 接口。也就是说,它们拥有 nexterrorcomplete 方法。这些方法用来通知 subject 内部观察者列表中的 observers 。这意味着 subject 可以用作订阅任何 observable 的 observer 。

// 为了使两个观察者 observer1 和 observer2 “共享” tick$,
// 我们可以通过 Subject 来传输所有通知,像这样
const tick$ = Observable.interval(1000);
const subject = new Subject();
subject.subscribe(observer1);
subject.subscribe(observer2);
tick$.subscribe(subject);

上面的示例是将 observable tick$ “多播” 给两个观察者: observer1observer2 。这其实就是 RxJS 中大多数多播操作符内部所做的事情。例如 publishpublishReplaymulticastshare,等等。真的,这才是 RxJS 中 Subjects 的主要用法。

Subjects 是不可重用的

在 RxJS 中,Subjects 不能重用。也就是说,当一个 Subject 完成或报错时,便不可再使用了。如果你尝试在已关闭的 Subject (调用过 completeerror 方法)上调用 next,它会默认忽略通知。如果想 Subject 在完成后调用 next 时进行显示地报错,你可以在 subject 实例上直接调用 unsubscribe

// Subject 之死
const subject = new Subject();
subject.subscribe(x => console.log(x));
subject.next(1);    // 1
subject.next(2);    // 2
subject.complete();
subject.next(3);    // 悄悄地忽略
subject.unsubscribe();
subject.next(4);    // Unhandled ObjectUnsubscribedError

RxJS 中的陷阱

但是在 RxJS 的当前版本中,会带来了一些令人困惑的痛点。因为 Rx observables 不会“捕获”错误,我们会遭遇一些奇怪的行为。我曾经嘲笑过 Promises 实现了错误“捕获”,但在多播场景中它或许是正确的。我的意思是当我说 Rx observable 不“捕获”错误时,是表示当错误渗透到观察者链末端并的末端时,如果错误未被处理,它会被重新抛出。

// 演示缺少错误处理时会进行重新抛出
const badObservable = Observable.throw(new Error('haha'));
try {
  badObservable.subscribe({
    next: x => console.log(x),
    error: null,
    complete: () => console.log('done')
  });
} catch (err) {
  console.error(err); // 输出自定义错误: "haha"
}

现在我们来想想,当你循环观察者列表并通知它们时会发生什么(正如 subject 所做的)。

for (let observer of observers) {
  observer.next('notify'); // 如果在这里调用 throw 会发生什么?
}
// 提示: 会报错会打破循环
// 注意: 好吧,这不仅仅是个提示

假设一些操作符是同步处理的(mapfilterscan 等等),如果其中一个或任何其它同步操作符报错了,你会在多播(使用 Subject 来循环观察者列表并通知它们)的下游得到一些诡异的行为:

// 会发生奇怪的行为
const source$ = Observable.interval(1000).share();
const mapped$ = source$.map(x => {
  if (x === 1) {
    throw new Error('oops');
  }
  return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(x => console.log('B', x));
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// Uncaught Error: "oops"

在上面的示例中,大多数用户会期望 A 和 C 能继续通知。输出 B 的 observable 死了是可以理解的,它报错了,但其他流和源流也死了令人想当困惑。任意的第三方都可以杀掉共享的 observable 流以及未知数量的兄弟流,不应该是这样的。这是一个脆弱的抽象,我们需要在 RxJS 接下来的版本中修复它。

临时解决上述场景中的问题很简单,感谢调度器( schedulers )。你可以在多播后使用 observeOn,这样就可以解决此问题,因为错误不再是同步抛出的。

const source$ = Observable.interval(1000)
  .share()
  .observeOn(Rx.Scheduler.asap); // 点睛之笔
const mapped$ = source$.map(x => {
  if (x === 1) {
    throw new Error('oops');
  }
  return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(x => console.log('B', x));
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// Uncaught Error: "oops"
// "C" 1
// "A" 2
// "C" 2 
// "A" 3
// "C" 3
// ... 等等

还有另一种临时解决方案,如果你可以管理它的话,它的性能会更好一些,方法是只需为所有的 subscriptions 添加错误处理方法。

const source$ = Observable.interval(1000)
  .share()
  .observeOn(Rx.Scheduler.asap); // 点睛之笔
const mapped$ = source$.map(x => {
  if (x === 1) {
    throw new Error('oops');
  }
  return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(
  x => console.log('B', x),
  err => console.log('Error handled: ' + err.message)
);
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// "Error handled: oops"
// "C" 1
// "A" 2
// "C" 2 
// "A" 3
// "C" 3
// ... 等等

Observable 的未来

TC39 Observable 提议 新的化身,不包括 CancelToken 业务,提议本身就是一整篇文章,它可能会在没有错误处理方法的情况下通过“捕获”错误来解决这个问题。也就是说,它不再会到达观察者链末端并重新抛出错误。在 RxJS 未来的版本中,我想我们也会做同样的事情,因为这才是正确的。当然,这个问题是公开讨论的,但我个人认为这不会有太多阻力。

总结

  1. Subjects 既是 observer,又是 observable
  2. Subjects 对内部的观察者列表进行“多播”
  3. Observables 只是建立观察的函数
  4. Observables 当前不会捕获错误,但它们应该捕获
  5. 向 Subject 的下游同步抛出错误会杀掉此 Subject
  6. 你可以使用错误处理方法或 observeOn 来解决 #4 中的问题
  7. 关于 Promise 的错误捕获,我错了。这是个好主意,因为 promises 是多播的 *
  8. 未来版本的 RxJS 很可能会捕获错误
  • 虽然可能不是完全必要的,因为 promises 永远是异步的。 (耸肩脸)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,258评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,335评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,225评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,126评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,140评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,098评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,018评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,857评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,298评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,518评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,400评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,993评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,638评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,661评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容