rxjs

rxjs使用观察者模式、迭代器模式以及函数式编程实现一种理想的、管理序列事件的方式
rxjs的基础概念

  1. Observable:是一个包含多个值的集合,这些值都是懒推送(lazy push)进集合中的
  2. Observer
  3. Subscription
  4. Operators
  5. Subject
  6. Schedulers

pull push system

先搞清楚两个角色:

  1. 生产者(producer): 数据产生的地方
  2. 消费者(consumer): 数据使用的地方

pull系统:consumer决定什么时候接受producer生产的数据,比如函数,就是一个pull system,它只生产数据,并不知道什么时候这些数据会被使用。
push系统:producer决定什么时候把生产的数据传递给消费者,如promise,promise决定什么时候把生产的值“push”给callback函数

Observable

Observable的核心概念:

  1. creating Observables:可以通过Rx.Observable.create创建,或者通过所谓的创建操作如:of、from、interval等创建
  2. Subscribing Observables:Observables的注册就像调用一个函数,这个函数提供一个回调函数,数据最终会在这个回调函数中使用
  3. Executing the Observable:Observable.create(function subscribe(observer) {...})中的一段代码,在Excution中,如果error或者complete执行了,那么后续的observer就不会执行
  4. Disposing Observables:处理Observables。Executing Observables可能是循环的,需要一个unsubscribe()去终止这个无限循环

概念很不好理解,下面是一个js写的简易版Observale,仅帮助理解,注释表明了1,2,3的含义


var observerOrigin = function(nextSelf) {
  this.nextSelf = nextSelf ? nextSelf : null
}
observerOrigin.closed = false
observerOrigin.prototype.next = function(val) {
  if (observerOrigin.closed) {
    return
  }
  this.nextSelf ? this.nextSelf.call(this, val) : console.log(val)
}
observerOrigin.prototype.error = function(error) {
  if (observerOrigin.closed) {
    return
  }
  observerOrigin.closed = true
  console.error(error)
}
observerOrigin.prototype.complete = function(val) {
  if (observerOrigin.closed) {
    return
  }
  observerOrigin.closed = true
  console.log('complete')
}

var observable = function(subscribeFun) {
  this.subscribeFun = subscribeFun
}
observable.prototype.subscribe = function(observer) {
  let observerInner = new observerOrigin(observer) // a observer
  this.subscribeFun.call(this, observerInner)
  return this
}
/* subscribeFunEx: Executing Observables
   Executing Observables执行的过程中,如果执行了observer的error或者complete,后续的其他操作就不会执行
*/
var subscribeFunEx = function(observer) {
  observer.next(1)
  observer.next(2)
  // observer.error('throw a error')
  // observer.complete()
  observer.next(3)
}
// Creating Observables
var observableEx = new observable(subscribeFunEx)
observableEx
  .subscribe()   // Subscribing to Observables
  .subscribe((val) => console.log('next: ', val))   // Subscribing to Observables

对于第四点,官方示例如下

function subscribe(observer) {
  var intervalID = setInterval(() => {
    observer.next('hi');
  }, 1000);

  return function unsubscribe() {
    clearInterval(intervalID);
  };
}

var unsubscribe = subscribe({next: (x) => console.log(x)});

// Later:
unsubscribe(); // dispose the resources

Observer

Observer是一个对象,这个对象有三个回调函数(next,error,complete),任何一个回调函数都可能调用

Subscription

var subscription = observable.subscribe(x => console.log(x)),subscription有一个unsubscribe()方法释放所有的资源并且取消Observable的执行,也可以通过add()将多个subscription放在一起(个人感觉类似数组的unshift),这个时候调用一个subscription的unsubscribe()方法可能会将多个Subscriptionunsubscribe()

var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);

var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);

setTimeout(() => {
  // Unsubscribes BOTH subscription and childSubscription
  subscription.unsubscribe();
}, 1000);

Subscription还有一个remove(otherSubscription)方法

Subject

一个Subject就是一个Observable,和Observable的区别是,Subject可以多播多个observers,它就是一个注册器,订阅者将自己想要订阅的事件注册到注册中心。Subject的subscribe并不会立即执行传递过来的值,它只是将订阅的事件放到一个observers的list中,类似addListener的作用
一个Subject也是一个Observer,通过next(value)可以将值多播至注册在Subject中的订阅事件

var subject = new Rx.Subject();  // 一个Observables

subject.subscribe({  // subscribe类似于别的语言中的addListener
  next: (v) => console.log('observerA: ' + v)  // 订阅的事件
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v) // 订阅的事件
});

subject.next(1); // 将value值1多播至上面的订阅事件中
subject.next(2);// 将value值2多播至上面的订阅事件中

打印结果:

observerA: 1
observerB: 1
observerA: 2
observerB: 2

subject也是一个observer,所以也可以observable.subscribe(subject)

Muticasted Observables

multicast返回的的Observable的subscribe方法和Subject的subscribe方法作用相同(即类似其他语言的addListener),connect方法调用的是observable的subscribe

var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();

BehaviorSubject

BehaviorSubject表示“随着时间变化的值”,例如人的生日是不变的时间流,使用Subject,那么人的年龄就是随着时间变化的事件流,用BehaviorSubject表示

var subject = new Rx.BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);

输出

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

如果将上述的BehaviorSubject换成Subject,输出将变为

observerA: 1
observerA: 2
observerA: 3
observerB: 3

ReplaySubject

ReplaySubject记录多个来自Observable excution的值,并将它们分配给新的subscribes

var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

输出:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

设置一个windowTime来决定,分配多少个并且最近windowTime时间内来自Observable excution的值

// Rx.ReplaySubject(10, 1000),observerB记录1开始至结束的值
// Rx.ReplaySubject(10, 500),observerB记录3开始至结束(500ms~1000ms之前的buffer)的值
// Rx.ReplaySubject(3, 1000),observerB记录3开始至结束(只subscribe3个buffer)的值
var subject = new Rx.ReplaySubject(10, 1000 /* windowTime */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

var i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 1000);

setTimeout(() => {
  subject.unsubscribe();
}, 5000);

AsyncSubject

AsyncSubject中,只有最后一个值会传递给observers

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();

输出

observerA: 5
observerB: 5

Operator

Operator是函数,这个函数会根据原来的Observable创建一个新的Observable,并且不会改变原来的Observable。.map(...), .filter(...), .merge(...)都是Operator

function multiplyByTen(input) {
  var output = Rx.Observable.create(function subscribe(observer) {
    input.subscribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
  return output;
}

var input = Rx.Observable.from([1, 2, 3, 4]);
var output = multiplyByTen(input);
output.subscribe(x => console.log(x));

输出

10
20
30
40

转化成js(简易版,很多漏洞,望指教)


var observerOrigin = function(nextSelf) {
  this.nextSelf = nextSelf ? nextSelf : null
}
observerOrigin.closed = false
observerOrigin.prototype.next = function(val) {
  if (observerOrigin.closed) {
    return
  }
  this.nextSelf ? this.nextSelf.call(this, val) : console.log(val)
}
observerOrigin.prototype.error = function(error) {
  if (observerOrigin.closed) {
    return
  }
  observerOrigin.closed = true
  console.error(error)
}
observerOrigin.prototype.complete = function() {
  if (observerOrigin.closed) {
    return
  }
  observerOrigin.closed = true
  console.log('complete')
}

var observable = function(subscribeFun) {
  this.subscribeFun = subscribeFun
}
observable.prototype.subscribe = function(observer) {
  let observerInner = new observerOrigin(observer) // a observer
  this.subscribeFun.call(this, observerInner)
  return this
}
/* subscribeFunEx: Executing Observables
   Executing Observables执行的过程中,如果执行了observer的error或者complete,后续的其他操作就不会执行
*/
var subscribeFunEx = function(observer) {
  observer.next(1)
  observer.next(2)
  observer.next(3)
  observer.next(4)
}

function multiplyByTen(input) {
  var output = new observable(function subscribe(observer) {
    input.subscribe(v => observer.next(10 * v))
  });
  return output;
}
// Creating Observables
var input = new observable(subscribeFunEx)
var output = multiplyByTen(input);
output.subscribe(v => console.log(v))

由上可以看出:output的subscribe会导致input的subscribe,这叫做“operator subscription chain”

Instance operators versus static operators(实例运算符与静态运算符)

在Instance operators中,this关键字是输入的Observable,通过input
observable创建一个observable。static operators是通过Observable对象从头开始创建一个Observable

Scheduler

一个scheduler可以定义在什么样的执行环境中,observable会把通知传递给observer

var observable = Rx.Observable.create(function (proxyObserver) {
  proxyObserver.next(1);
  proxyObserver.next(2);
  proxyObserver.next(3);
  proxyObserver.complete();
})
.observeOn(Rx.Scheduler.async);

var finalObserver = {
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
};
console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');

proxyObserver是在observeOn(Rx.Scheduler.async)中创建的,scheduler在Observable.create和最终的Observer之间创建了一个proxyObserver,proxyObserver实质上通过setTimeout或者setInterval操作来实现一个延迟执行(delay)

Scheduler种类

Scheduler 目的
null 消息同步的递归的传递,
Rx.Scheduler.queue 在当前时间框架的队列中按时间表传递,用于迭代操作
Rx.Scheduler.asap 在微型任务队列中按时间表传递,例如NodeJs的nextTick()、Web Worker的MessageChannel、setTimeout()或者其他的,用于转换成异步操作
Rx.Scheduler.async Scheduler通过setInterval工作,用于基于事件的操作

使用 Schedulers

Static creation operators通常有一个Scheduler作为最后一个函数参数,如 from(array, scheduler)
Scheduler.subscribeOn决定subscribe()在什么环境中执行
Scheduler.observeOn决定在什么环境中传递通知
Instance operators有一个Scheduler作为函数参数

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,607评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,239评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,960评论 0 355
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,750评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,764评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,604评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,347评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,253评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,702评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,893评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,015评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,734评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,352评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,934评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,052评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,216评论 3 371
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,969评论 2 355

推荐阅读更多精彩内容

  • 介绍 RxJS是一个异步编程的库,同时它通过observable序列来实现基于事件的编程。它提供了一个核心的类型:...
    泓荥阅读 16,602评论 0 12
  • 一.背景介绍 Rx(Reactive Extension -- 响应式扩展 http://reactivex.io...
    爱上Shu的小刺猬阅读 2,045评论 1 3
  • 本文结构: 什么是RxJS RxJS有什么特点 RxJS核心概念 什么是RxJS 在javaScript中,我们可...
    stevemoon阅读 11,263评论 0 8
  • 晨起,收拾居所。看電影一部。未讀書。下午,踢球至深夜,飲酒,歸。與吾父談天。余之不能奮發,何也。
    寒窗寄傲生阅读 179评论 0 0
  • 亲爱的孟唯: 你好。 一年级结束了,暑假也即将结束。妈妈看着这一年中你的种种表现,你的进步,你的不足,还有爸...
    阳光满屋_8fc9阅读 467评论 3 4