Rxjs源码解读之Observable

Observable

Observable是一个类,是多个值的推送集合。

Observable类通过静态方法create创建Observable实例

用法


const observable = Observable.create(function (observer) {

  observer.next(1);

  observer.next(2);

  observer.next(3);

  setTimeout(() => {

    observer.next(4);

    observer.complete();

  }, 1000);

});

源码


  /**

  * 创建一个Observable实例

  * @static true

  * @owner Observable

  * @method create

  * @param {Function} subscribe函数,函数参数为observer对象

  * @return {Observable} 返回一个Observable实例

  **/

  static create: Function = <T>(subscribe?: (subscriber: Subscriber<T>) => TeardownLogic) => {

    return new Observable<T>(subscribe);

  }

Observable实例通过subscribe方法订阅事件,返回Subscriber实例

用法


observable.subscribe({

  next: x => console.log('got value ' + x),

  error: err => console.error('something wrong occurred: ' + err),

  complete: () => console.log('done'),

});

observable.subscribe(function (x) {

  console.log(x);

});

源码


  /** 

  * 参数为observer对象或者next函数,返回Subscriber实例subscription(执行subscription.unsubscribe()订阅取消)

  **/

subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),

            error?: (error: any) => void,

            complete?: () => void): Subscription {

    const { operator } = this;//operator操作对象,包含map,scan等方法

    const sink = toSubscriber(observerOrNext, error, complete);//生成Subscriber实例

    if (operator) {//如果有operator,执行operator,source指Observable实例

      operator.call(sink, this.source);

    } else {//如果没有,订阅Subscriber实例

      sink.add(

        this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?

        this._subscribe(sink) :

        this._trySubscribe(sink)

      );

    }

    if (config.useDeprecatedSynchronousErrorHandling) {

      if (sink.syncErrorThrowable) {

        sink.syncErrorThrowable = false;

        if (sink.syncErrorThrown) {

          throw sink.syncErrorValue;

        }

      }

    }

    return sink;

}

_subscribe(subscriber: Subscriber<any>): TeardownLogic {

    const { source } = this;

    return source && source.subscribe(subscriber);//订阅

}

Observable实例通过forEach方法也可订阅事件,返回一个Promise实例

还有另外一个函式可以达到跟subscribe一样的结果,forEach只接受一个函式,这个函式只负责处理next阶段的行为,且返回的是一个Promise实例,而不是 subscription。返回Promise实例的好处是方便使用await处理异步,例如


//输出1,2,3,finish,不加await会先输出finish

async function execute() {

  await Observable.from([1, 2, 3]).delay(1000).forEach(v => console.log(v));

  console.log('finish');

}

源码


  /**

  * @method forEach

  * @param {Function} next a handler for each value emitted by the observable

  * @param {PromiseConstructor} [promiseCtor] a constructor function used to instantiate the Promise

  * @return {Promise} a promise that either resolves on observable completion or

  *  rejects with the handled error

  */

  forEach(next: (value: T) => void, promiseCtor?: PromiseConstructorLike): Promise<void> {

    promiseCtor = getPromiseCtor(promiseCtor);

    //返回一个Promise实例

    return new promiseCtor<void>((resolve, reject) => {

      // Must be declared in a separate statement to avoid a RefernceError when

      // accessing subscription below in the closure due to Temporal Dead Zone.

      let subscription: Subscription;

      subscription = this.subscribe((value) => {

        try {

          next(value);//执行next方法

        } catch (err) {

          reject(err);

          if (subscription) {

            subscription.unsubscribe();

          }

        }

      }, reject, resolve);

    }) as Promise<void>;

  }

Observable实例通过toPromise将实例转为promise

用法


Observable.of('foo').toPromise().then(res => console.log(res));

源码


  toPromise(promiseCtor?: PromiseConstructorLike): Promise<T> {

    promiseCtor = getPromiseCtor(promiseCtor);

    //执行订阅,并返回一个promise

    return new promiseCtor((resolve, reject) => {

      let value: any;

      this.subscribe((x: T) => value = x, (err: any) => reject(err), () => resolve(value));

    }) as Promise<T>;

  }

Observable实例通过pipe来组合操作符,返回observable实例

用法


range(0, 10).pipe(

  filter(x => x % 2 === 0),

  map(x => x + x),

  scan((acc, x) => acc + x, 0)

)

.subscribe(x => console.log(x))

源码


function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {

  if (!fns) {

    return noop as UnaryFunction<any, any>;

  }

  if (fns.length === 1) {

    return fns[0];

  }

  return function piped(input: T): R {// 通过reduce来组合操作符

    return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);

  };

}


  pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {

    if (operations.length === 0) {

      return this as any;

    }

    //返回observable实例

    return pipeFromArray(operations)(this);

  }

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容