async管道的实现

angular的管道很多时候很好用,省了很多事。举例子来说keyvalue可以把键值对变成键值数组,很棒。但是存在一个问题,只能处理同步值,这虽然也不是什么特别的限制。可以自己在ts里把异步值变同步,然后用管道。但是有时图省事,少写代码,还是希望在模板上搞这个。这时就可以用async管道了。

神奇的async

async可以做到处理异步值。例如{{promiseValue | async}}。这是一个很棒的特性,但是从实现上来说,真的想破脑袋也想不出来。那就去看看源码吧。https://github.com/angular/angular/blob/master/packages/common/src/pipes/async_pipe.ts

/**
 * @license
 * Copyright Google Inc. All Rights Reserved.
 *
 * Use of this source code is governed by an MIT-style license that can be
 * found in the LICENSE file at https://angular.io/license
 */

import {ChangeDetectorRef, EventEmitter, OnDestroy, Pipe, PipeTransform, WrappedValue, ɵisObservable, ɵisPromise, ɵlooseIdentical} from '@angular/core';
import {Observable, SubscriptionLike} from 'rxjs';
import {invalidPipeArgumentError} from './invalid_pipe_argument_error';

interface SubscriptionStrategy {
  createSubscription(async: Observable<any>|Promise<any>, updateLatestValue: any): SubscriptionLike
      |Promise<any>;
  dispose(subscription: SubscriptionLike|Promise<any>): void;
  onDestroy(subscription: SubscriptionLike|Promise<any>): void;
}

class ObservableStrategy implements SubscriptionStrategy {
  createSubscription(async: Observable<any>, updateLatestValue: any): SubscriptionLike {
    return async.subscribe({next: updateLatestValue, error: (e: any) => { throw e; }});
  }

  dispose(subscription: SubscriptionLike): void { subscription.unsubscribe(); }

  onDestroy(subscription: SubscriptionLike): void { subscription.unsubscribe(); }
}

class PromiseStrategy implements SubscriptionStrategy {
  createSubscription(async: Promise<any>, updateLatestValue: (v: any) => any): Promise<any> {
    return async.then(updateLatestValue, e => { throw e; });
  }

  dispose(subscription: Promise<any>): void {}

  onDestroy(subscription: Promise<any>): void {}
}

const _promiseStrategy = new PromiseStrategy();
const _observableStrategy = new ObservableStrategy();

/**
 * @ngModule CommonModule
 * @description
 *
 * Unwraps a value from an asynchronous primitive.
 *
 * The `async` pipe subscribes to an `Observable` or `Promise` and returns the latest value it has
 * emitted. When a new value is emitted, the `async` pipe marks the component to be checked for
 * changes. When the component gets destroyed, the `async` pipe unsubscribes automatically to avoid
 * potential memory leaks.
 *
 * @usageNotes
 *
 * ### Examples
 *
 * This example binds a `Promise` to the view. Clicking the `Resolve` button resolves the
 * promise.
 *
 * {@example common/pipes/ts/async_pipe.ts region='AsyncPipePromise'}
 *
 * It's also possible to use `async` with Observables. The example below binds the `time` Observable
 * to the view. The Observable continuously updates the view with the current time.
 *
 * {@example common/pipes/ts/async_pipe.ts region='AsyncPipeObservable'}
 *
 * @publicApi
 */
@Pipe({name: 'async', pure: false})
export class AsyncPipe implements OnDestroy, PipeTransform {
  private _latestValue: any = null;
  private _latestReturnedValue: any = null;

  private _subscription: SubscriptionLike|Promise<any>|null = null;
  private _obj: Observable<any>|Promise<any>|EventEmitter<any>|null = null;
  private _strategy: SubscriptionStrategy = null !;

  constructor(private _ref: ChangeDetectorRef) {}

  ngOnDestroy(): void {
    if (this._subscription) {
      this._dispose();
    }
  }

  transform<T>(obj: null): null;
  transform<T>(obj: undefined): undefined;
  transform<T>(obj: Observable<T>|null|undefined): T|null;
  transform<T>(obj: Promise<T>|null|undefined): T|null;
  transform(obj: Observable<any>|Promise<any>|null|undefined): any {
    if (!this._obj) {
      if (obj) {
        this._subscribe(obj);
      }
      this._latestReturnedValue = this._latestValue;
      return this._latestValue;
    }

    if (obj !== this._obj) {
      this._dispose();
      return this.transform(obj as any);
    }

    if (ɵlooseIdentical(this._latestValue, this._latestReturnedValue)) {
      return this._latestReturnedValue;
    }

    this._latestReturnedValue = this._latestValue;
    return WrappedValue.wrap(this._latestValue);
  }

  private _subscribe(obj: Observable<any>|Promise<any>|EventEmitter<any>): void {
    this._obj = obj;
    this._strategy = this._selectStrategy(obj);
    this._subscription = this._strategy.createSubscription(
        obj, (value: Object) => this._updateLatestValue(obj, value));
  }

  private _selectStrategy(obj: Observable<any>|Promise<any>|EventEmitter<any>): any {
    if (ɵisPromise(obj)) {
      return _promiseStrategy;
    }

    if (ɵisObservable(obj)) {
      return _observableStrategy;
    }

    throw invalidPipeArgumentError(AsyncPipe, obj);
  }

  private _dispose(): void {
    this._strategy.dispose(this._subscription !);
    this._latestValue = null;
    this._latestReturnedValue = null;
    this._subscription = null;
    this._obj = null;
  }

  private _updateLatestValue(async: any, value: Object): void {
    if (async === this._obj) {
      this._latestValue = value;
      this._ref.markForCheck();
    }
  }
}

代码不长就100多行,还是可以看看的。
transform就是处理使用async时,会使用的函数了。

transform(obj: Observable<any>|Promise<any>|null|undefined): any {
// this._obj是null或者undefined,就订阅传来的obj
    if (!this._obj) {     //
      if (obj) {
        this._subscribe(obj); // 订阅入参obj
      }
      this._latestReturnedValue = this._latestValue;
      return this._latestValue;
    }
    //obj换了新值,释放以前订阅,订阅新obj
    if (obj !== this._obj) { 
      this._dispose();
      return this.transform(obj as any);
    }

    // obj没换新值,如果内部存储的新旧值都一样,返回最新值
    if (ɵlooseIdentical(this._latestValue, this._latestReturnedValue)) {
      return this._latestReturnedValue;
    }
    // obj没换新值,如果内部存储的新旧值不一样,返回最新值
    this._latestReturnedValue = this._latestValue;
    return WrappedValue.wrap(this._latestValue);
  }
// 判断值是否完全相同
export function looseIdentical(a: any, b: any): boolean {
  return a === b || typeof a === 'number' && typeof b === 'number' && isNaN(a) && isNaN(b);
}

基本实现就像上面注释一样。但是最后一行的wrapedValue让人很迷惑。为什么要这么写。官方文档上说

表示 Pipe 转换的值已经变化了 —— 即使其引用并没有变。
包装过的值会在变更检测期间自动解包,并保存解包过的值。

所以总的流程就是,来一个异步值,订阅这个异步值,当异步值更新时,触发更新检测,更新新值。其他的操作算是节省性能,避免内存泄漏之类。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容