#1 Subject 的基本概念

通常我们碰到的Observable 和 observer 都是一一对应的,多个observers 即使订阅相同的observable,它们之间也是独立的,不会共享数据或者事件,比如下例

示例1·

var observable = Rx.Observable.interval(1000).take(3);

# 观察者A
var observerA = {
  next: function(x) { console.log('观察者A ' + x);},
  error: function(err) { console.log(err); },
  complete: function() { console.log('A完成'); }
}

observable.subscribe(observerA); # 观察者A订阅observable 相当于执行函数

# 观察者B
var observerB = {
  next: function(x) { console.log('观察者B ' + x);},
  error: function(err) { console.log(err); },
  complete: function() { console.log('B完成'); }
}

setTimeout(() => {
  observable.subscribe(observerB); # 观察者B订阅相同的observable 相当于执行函数
}, 2000)

打印结果

"观察者A 0"
"观察者A 1"
"观察者B 0"
"观察者A 2"
"A完成"
"观察者B 1"
"观察者B 2"
"B完成"

可以看出观察者B虽然2秒后订阅相同的observable,但是完全不受观察者A的影响,仍然从0开始然后结束。

但是有时候我们希望流(即Observable)的数据或者事件在多个观察者之间是共享的,这个时候 Subject 就该登场了

Subject 的特点,它既是一个Observable(拥有所有Observable该拥有的操作符和方法,以及能够被别的观察者订阅的能力),又是Observer(拥有next, error, complete等方法),就是这么强悍的能力,可以把它看成一个数据的桥梁

Subject 的基本实现

上面的示例中我们看到因为订阅相当于函数执行,上面订阅了2次,所以函数执行了2次,如果想要函数只执行1次,我们可以通过一个桥梁,将所有订阅者收集到一个集合里面,然后再订阅1次,即函数只执行一次

示例2

var observable = Rx.Observable.interval(1000).take(3);

var bridgeObserver = {
  next: function(x) {
    this.observers.forEach(o => o.next(x));
  },
  error: function(err) {
    this.observers.forEach(o => o.error(err));
  },
  complete: function() {
    this.observers.forEach(o => o.complete());
  },
  observers: [],  // 观察者集合
  addObserver: function(observer) { // 将观察者添加到观察者集合中
    this.observers.push(observer)
  }
}

var observerA = {
  next: function(x) { console.log('观察者A ' + x);},
  error: function(err) { console.log(err); },
  complete: function() { console.log('A完成'); }
}

observable.subscribe(bridgeObserver); // 只订阅一次 所以相当于函数只执行一次

bridgeObserver.addObserver(observerA); // observerA添加到观察者集合中

var observerB = {
  next: function(x) { console.log('观察者B ' + x);},
  error: function(err) { console.log(err); },
  complete: function() { console.log('B完成'); }
}

setTimeout(() => {
  bridgeObserver.addObserver(observerB); // observerB添加到观察者集合中
}, 2000)

打印结果


Console Run  Clear
"观察者A 0"
"观察者A 1"
"观察者B 1"
"观察者A 2"
"观察者B 2"
"A完成"
"B完成"

可以看出观察者B共享了A中的事件,并没有从头开始执行

将上面的桥梁bridgeObserve换成 subject, addObserver 换成 subscribe 即为我们通常所见的Subject的基本骨架了

var observable = Rx.Observable.interval(1000).take(3);

var subject = {  // **`bridgeObserve`**换成 **`subject`**, 
  next: function(x) {
    this.observers.forEach(o => o.next(x));
  },
  error: function(err) {
    this.observers.forEach(o => o.error(err));
  },
  complete: function() {
    this.observers.forEach(o => o.complete());
  },
  observers: [],
  subscribe: function(observer) { // **`addObserver`** 换成 **`subscribe`** 
    this.observers.push(observer)
  }
}

var observerA = {
  next: function(x) { console.log('观察者A ' + x);},
  error: function(err) { console.log(err); },
  complete: function() { console.log('A完成'); }
}

observable.subscribe(subject); // subject作为observer的功能 能够订阅observable

subject.subscribe(observerA); // subject作为observable的功能 能够被其它observer订阅

var observerB = {
  next: function(x) { console.log('观察者B ' + x);},
  error: function(err) { console.log(err); },
  complete: function() { console.log('B完成'); }
}

setTimeout(() => {
  subject.subscribe(observerB)  // subject作为observable的功能 能够被其它observer订阅
}, 2000)

最后Rx为我们提供了这个桥梁称之为 Subject, 所以上面的代码等同于

var observable = Rx.Observable.interval(1000).take(3);

// var subject = {
//   next: function(x) {
//     this.observers.forEach(o => o.next(x));
//   },
//   error: function(err) {
//     this.observers.forEach(o => o.error(err));
//   },
//   complete: function() {
//     this.observers.forEach(o => o.complete());
//   },
//   observers: [],
//   subscribe: function(o) {
//     this.observers.push(o)
//   }
// }

// RxJS给我们提供了这样一个接口
var subject = new Rx.Subject(); 

var observerA = {
  next: function(x) { console.log('观察者A ' + x);},
  error: function(err) { console.log(err); },
  complete: function() { console.log('A完成'); }
}

observable.subscribe(subject);

subject.subscribe(observerA)

var observerB = {
  next: function(x) { console.log('观察者B ' + x);},
  error: function(err) { console.log(err); },
  complete: function() { console.log('B完成'); }
}

setTimeout(() => {
  subject.subscribe(observerB)
}, 2000)

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 介绍 RxJS是一个异步编程的库,同时它通过observable序列来实现基于事件的编程。它提供了一个核心的类型:...
    泓荥阅读 16,725评论 0 12
  • 推荐我的Rxjs教程:Rxjs系列教程目录 前言 随着开发中项目的越来越大,代码的要求越来越高,于是开始四处搜找各...
    侬姝沁儿阅读 13,479评论 1 6
  • 发现 关注 消息 RxSwift入坑解读-你所需要知道的各种概念 沸沸腾关注 2016.11.27 19:11*字...
    枫叶1234阅读 7,858评论 0 2
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 136,095评论 19 139
  • 作者:夏汐蕊☞想看其他作品请点击这里简书连载风云录前情回顾《我的爱只属于你(21)》 【第二十二章】走进唐风斋,一...
    夏汐蕊阅读 2,662评论 0 8

友情链接更多精彩内容