Flutter 的 Stream 探究

感觉文章还行,请留下你的赞。

一、Stream 是什么?

Stream 是异步数据事件的源。Stream 提供了一种接收事件序列的方法,可以通过 listen 进行数据监听;通过 error 接收失败状态;通过 done 接收事件的结束状态。

注意: Stream 只有当被监听后,才能使其产生事件。同时会产生一个 StreamSubscription 的对象,该对象是提供事件的活动对象,可用于停止再次监听或者临时暂停订阅事件。

二、 Stream 的订阅对象 StreamSubscription

  1. StreamSubscription 的种类

    Stream 的订阅流有两种,分别是:单订阅流(Single-subscription)和多订阅流(broadcast)。

  2. 单订阅流(Single-subscription)
    • 单订阅流只允许有一个监听器(listen);只有监听后才会产生事件;取消监听器时也停止事件发送,即是 Stream 还有更多事件。
    • 单订阅流即是取消了第一次订阅,也不允许再次订阅。
    • 单订阅流常用于流较大的连续数据事件,如文件 I/O,
  3. 多订阅流(broadcast, 广播)
    • 广播流可以有多个监听器,广播事件就绪时就触发其事件,无论有无监听器。

    • 广播流常用于独立的事件或者观察者。

    • 广播流可以取消监听,然后再次监听。

      注意: 如果多个监听器想订阅单个订阅流,请使用 asBroadcastStream 在单订阅流顶部创建广播流。

三、 Stream 的构建

  • empty
    /// 空的广播流
    void emptyStream() {
      var stream = Stream.empty();
      stream.listen((event) {
          print("empty -- listen");
      }, onDone: () {
          print("empty -- onDone"); // flutter: empty -- onDone
      }, onError: (e) {
          print("empty -- onError");
      }, cancelOnError: true);
    }
    

    这是一个流,它在监听后,只发出一个 onDone 事件,其他什么都不做。

  • error
    // 创建一个错误的流
    void createErrorStream() {
        var stream = Stream.error("错误流");
        stream.listen((event) {
        print("error -- listen");
        }, onError: (e) {
        print("error -- onError"); // flutter: error -- onError
        }, onDone: () {
        print("error -- onDone");
        }, cancelOnError: true);
    }
    

    这是创建在事件完成前发出的单个错误事件流。

  • fromFuture
    // 从Future创建一个单事件流
    void createStreamFormFuture() {
        var stream = Stream.fromFuture(
        Future.delayed(Duration(seconds: 1), () {
            print("Future 延迟事件");
        }),
        );
        stream.listen((event) {
        print("fromFuture -- listen-- ${event.toString()}"); // flutter: fromFuture -- listen-- null
        }, onError: (e) {
        print("fromFuture -- onError");
        }, onDone: () {
        print("fromFuture -- onDone"); // flutter: fromFuture -- onDone
        }, cancelOnError: true);
    }
    

    这是有 Future 创建的一个单订阅流。

    注意:

    1.这是单订阅流在未监听时,它的事件将有 _SyncStreamController 的缓存区保留,当被监听后,再次发出事件。
    2.对于一个单一的值来说,在 Future 做 then之前等待一个侦听器是不值得的。

  • fromFutures
    // 从一组 Future 中创建单订阅流
    void createStreamFromMoreFuture() {
        var stream = Stream.fromFutures([
        Future(() {
            print("Future - 1");
        }),
        Future(() {
            print("Future - 2");
        }),
        ]);
        stream.listen((event) {
        print("fromFutures -- listen-- ${event.toString()}");
        }, onError: (e) {
        print("fromFutures -- onError");
        }, onDone: () {
        print("fromFutures -- onDone");
        }, cancelOnError: true);
    }
    

    日志输出:

    flutter: Future - 1
    flutter: fromFutures -- listen-- null
    flutter: Future - 2
    flutter: fromFutures -- listen-- null
    flutter: fromFutures -- onDone
    

    从一组 Future 中创建一个单订阅流。该流有几个 Future 则 listen 方法将会被调用几次,同时 onDone 方法最后调用。

  • fromIterable
    // 创建一个可以从改流中获取数据的单订阅流
    void createStreamFromIterable() {
        var stream = Stream.fromIterable([1, 2, 3, 4]);
        stream.listen((event) {
        print("fromIterable -- listen-- ${event.toString()}");
        }, onError: (e) {
        print("fromIterable -- onError");
        }, onDone: () {
        print("fromIterable -- onDone");
        }, cancelOnError: true);
    }
    

    日志输出:

    flutter: fromIterable -- listen-- 1
    flutter: fromIterable -- listen-- 2
    flutter: fromIterable -- listen-- 3
    flutter: fromIterable -- listen-- 4
    flutter: fromIterable -- onDone
    

    创建一个可以从流中获取值的单订阅流。该流有几个元素,则 listen 方法将会被调用几次,同时 onDone 方法最后调用。

  • multi
    /// 创建一个多订阅流
    void createMutilStream() {
        var stream = Stream.multi((control) {
        control.addSync("multi--1");
        control.addSync("multi--2");
        control.close();
        });
        stream.listen((event) {
        print("multi -- listen-- ${event.toString()}");
        }, onError: (e) {
        print("multi -- onError");
        }, onDone: () {
        print("multi -- onDone");
        }, cancelOnError: false);
    
        Future.delayed(Duration(seconds: 2), () {
        stream.listen((event) {
            print("multi -- listen-- 2-- ${event.toString()}");
        }, onError: (e) {
            print("multi -- onError -- 2");
        }, onDone: () {
            print("multi -- onDone-- 2");
        }, cancelOnError: false);
        });
    }
    

    日志输出:

    flutter: multi -- listen-- multi--1
    flutter: multi -- listen-- multi--2
    flutter: multi -- onDone
    flutter: multi -- listen-- 2-- multi--1
    flutter: multi -- listen-- 2-- multi--2
    flutter: multi -- onDone-- 2
    

    创建一个广播流,通过 control 发出响应的事件。注意在 control 不调用 close 方法,则监听中的 onDone 方法也不会调用。

  • periodic
    // 创建一定间隔事件发出事件的流
    void createPeriodicStream() {
        var stream = Stream.periodic(Duration(seconds: 2), (value) {
        return value;
        });
        stream.listen((event) {
        print("periodic -- listen-- 2-- ${event.toString()}");
        }, onError: (e) {
        print("periodic -- onError -- 2");
        }, onDone: () {
        print("periodic -- onDone-- 2");
        }, cancelOnError: false);
    }
    

    日志输出:

    flutter: periodic -- listen-- 2-- 0
    flutter: periodic -- listen-- 2-- 1
    flutter: periodic -- listen-- 2-- 2
    flutter: periodic -- listen-- 2-- 3
    flutter: periodic -- listen-- 2-- 4
    flutter: periodic -- listen-- 2-- 5
    ...
    

    这是创建一个指定间隔时间内发出事件的单订阅流。

  • value
    // 创建一个在完成前发出事件的单订阅流
    void createSingleStream() {
        var stream = Stream.value("单值流");
        stream.listen((event) {
        print("value -- listen-- ${event.toString()}");
        }, onError: (e) {
        print("value -- onError");
        }, onDone: () {
        print("value -- onDone");
        }, cancelOnError: false);
    }
    

    日志输出:

    flutter: value -- listen-- 单值流
    flutter: value -- onDone
    

    这是创建一个在完成前发出事件的单订阅流。

  • eventTransformed
    /// 创建一个将现有流的所有事件通过接收器转换通过管道发出
    void createevEntTransformed() {
        var stream = Stream.eventTransformed(Stream.fromIterable([1, 2]), (sink) {
        sink.add(2);
        return sink;
        });
    
        stream.listen((event) {
        print("eventTransformed -- listen-- ${event.toString()}");
        }, onError: (e) {
        print("eventTransformed -- onError");
        }, onDone: () {
        print("eventTransformed -- onDone");
        }, cancelOnError: false);
    }
    

    日志输出:

    flutter: eventTransformed -- listen-- 2
    flutter: eventTransformed -- listen-- 1
    flutter: eventTransformed -- listen-- 2
    flutter: eventTransformed -- onDone
    

    这是创建一个将现有事件通过接收器转换通过通道发出。

四、Stream 属性

  • first

    first 是事件流的第一个元素,是 Futurt<T> 的对象。
    实现原理核心代码是:

    Future<T> get first {
      _Future<T> future = new _Future<T>();
      StreamSubscription<T> subscription =
          this.listen(null, onError: future._completeError, onDone: () {
        try {
          throw IterableElementError.noElement();
        } catch (e, s) {
          _completeWithErrorCallback(future, e, s);
        }
      }, cancelOnError: true);
      subscription.onData((T value) {
        _cancelAndValue(subscription, future, value);
      });
      return future;
    }
    
    void _cancelAndValue(StreamSubscription subscription, _Future future, value) {
      var cancelFuture = subscription.cancel();
      if (cancelFuture != null && !identical(cancelFuture, Future._nullFuture)) {
          cancelFuture.whenComplete(() => future._complete(value));
      } else {
          future._complete(value);
      }
    }
    

    上面解释: 实现原理就是事件流发出第一个事件时 subscription.onData((T value) { _cancelAndValue(subscription, future, value); }),然后取消订阅流subscription.cancel()

    实例:

    void first() {
        var stream1 = Stream.value("first 测试");
        stream1.first.then((value) => print(value));
        var stream2 = Stream.fromIterable([1, 2, 3]);
        stream2.first.then((value) => print(value));
    }
    

    日志输出:

    flutter: first 测试
    flutter: 1
    

    注意: 当我们的流是单订阅流时,调用 frist 之后,就不能被再次监听。因为 stream.first 包含一次监听(源码)。

  • isEmpty

    isEmpty 是判断订阅流是否为空。核心代码如下:

    Future<bool> get isEmpty {
        _Future<bool> future = new _Future<bool>();
        StreamSubscription<T> subscription =
            this.listen(null, onError: future._completeError, onDone: () {
            future._complete(true);
        }, cancelOnError: true);
        subscription.onData((_) {
            _cancelAndValue(subscription, future, false);
        });
        return future;
    }
    

    上面代码可知如果订阅流能回调 onDone 方法,则判定其不为空,否则为空。
    实例如下:

    void isEmpty() {
        var stream = Stream.empty();
        stream.isEmpty.then((value) => print(value));
    
        var stream1 = Stream.fromIterable(["发生错误"]);
        stream1.isEmpty.then((value) => print(value));
    
        var stream2 = Stream.multi((control) {
        control.add(1);
        });
        stream2.isEmpty.then((value) => print(value));
    }
    

    日志输出:

    flutter: true
    flutter: false
    flutter: false
    
  • last

    last 是获取订阅流发出事件的最后一个,是 Future<T> 的类型。核心代码:

    Future<T> get last {
      _Future<T> future = new _Future<T>();
      late T result;
      bool foundResult = false;
      listen(
          (T value) {
            foundResult = true;
            result = value;
          },
          onError: future._completeError,
          onDone: () {
            if (foundResult) {
              future._complete(result);
              return;
            }
            try {
              throw IterableElementError.noElement();
            } catch (e, s) {
              _completeWithErrorCallback(future, e, s);
            }
          },
          cancelOnError: true);
      return future;
    }
    

    从上面代码的 onDone 方法看出,这是把订阅流的最后一个事件,通过 Future 返回。

    注意:

    1. 如果订阅流事件发生错误,这 Future 完成错误并停止处理。
    2. 如果订阅流是空,则 onDone 函数调用,并抛出无事件的异常。

    实例:

    void last() {
      var stream = Stream.fromIterable([1, 2, 3]);
      stream.last.then((value) => print(value)); // 3
    }
    
  • length

    length 是获取订阅流发出事件的个数。核心代码:

    Future<int> get length {
        _Future<int> future = new _Future<int>();
        int count = 0;
        this.listen(
            (_) {
              count++;
            },
            onError: future._completeError,
            onDone: () {
              future._complete(count);
            },
            cancelOnError: true);
        return future;
    }
    

    从上面代码,可以看到是通过声明 count 来记录流发出的事件。

    实例:

    void length() {
      var stream = Stream.fromIterable([1, 2, 3]);
      stream.length.then((value) => print(value)); // 3
    }
    
  • single

    single 是获取只能发出一次事件的订阅流的事件。核心代码:

    Future<T> get single {
      _Future<T> future = new _Future<T>();
      late T result;
      bool foundResult = false;
      StreamSubscription<T> subscription =
          this.listen(null, onError: future._completeError, onDone: () {
        if (foundResult) {
          future._complete(result);
          return;
        }
        try {
          throw IterableElementError.noElement();
        } catch (e, s) {
          _completeWithErrorCallback(future, e, s);
        }
      }, cancelOnError: true);
      subscription.onData((T value) {
        if (foundResult) {
          // This is the second element we get.
          try {
            throw IterableElementError.tooMany();
          } catch (e, s) {
            _cancelAndErrorWithReplacement(subscription, future, e, s);
          }
          return;
        }
        foundResult = true;
        result = value;
      });
      return future;
    }
    

    从上面代码知道: 首先使用 foundResultonData 方法中为 false 跳过检查,然后 foundResult 变为 trueresult 获取发送事件。在 onDone 方法中 foundResulttrue 完成 Future_complete方法结束。

    注意: 使用single 的订阅流必须是只能包含一个事件的订阅流,否则会抛出 tooMany 的异常。

    实例:

    void single() {
      var stream = Stream.fromIterable([1]);
      stream.single.then((value) => print(value));// 1
    }
    

五、Stream 的方法

  • any

    any 是检查订阅流中是否有符合test 条件的事件。核心代码:

    Future<bool> any(bool test(T element)) {
      _Future<bool> future = new _Future<bool>();
      StreamSubscription<T> subscription =
          this.listen(null, onError: future._completeError, onDone: () {
        future._complete(false);
      }, cancelOnError: true);
      subscription.onData((T element) {
        _runUserCode(() => test(element), (bool isMatch) {
          if (isMatch) {
            _cancelAndValue(subscription, future, true);
          }
        }, _cancelAndErrorClosure(subscription, future));
      });
      return future;
    }
    
    void _cancelAndValue(StreamSubscription subscription, _Future future, value) {
      var cancelFuture = subscription.cancel();
      if (cancelFuture != null && !identical(cancelFuture, Future._nullFuture)) {
        cancelFuture.whenComplete(() => future._complete(value));
      } else {
        future._complete(value);
      }
    }
    

    从上面代码可以看到: 通过 onData 函数获取 element ,然后经过 _runUserCode 方法检查并返回结果 isMatch ,如果 isMatchtrue 则调用 _cancelAndValue 方法取消订阅 subscription.cancel() ,然后完成 Future。

    注意: any 是检查订阅流中事件是否符合条件的事件,如果有则返回 true ,否则返回 false。any 只要检查到有一个符合则检查结束,后面的就不在检查。

    实例:

    void any() {
      var stream = Stream.fromIterable([1, 2, 3]);
      stream.any((element) => element > 2).then((value) => print(value)); // true
    }
    
  • asBroadcastStream

    asBroadcastStream 是将一个单订阅流转换为一个广播流。核心代码:

    Stream<T> asBroadcastStream(
        {void onListen(StreamSubscription<T> subscription)?,
        void onCancel(StreamSubscription<T> subscription)?}) {
      return new _AsBroadcastStream<T>(this, onListen, onCancel);
    }
    

    从上面代码可知,原有的订阅流经过 asBroadcastStream 后,生成新的 _AsBroadcastStream 的订阅流。

    实例代码:

    void asBroadcastStream() {
      var stream = Stream.fromFuture(
        Future(() {
          return 110;
        }),
      );
    
      var stream1 = stream.asBroadcastStream();
      stream1
        ..listen((event) {
          print("event -- 1 -- $event");
        });
      stream1.listen((event) {
        print("event -- 2 -- $event");
      });
    }
    

    日志输出:

    flutter: event -- 1 -- 110
    flutter: event -- 2 -- 110
    

    注意:上面的实例不能写成下面形式:

    void asBroadcastStream() {
      var stream = Stream.fromFuture(
        Future(() {
          return 110;
        }),
      );
      stream.asBroadcastStream().listen((event) {
          print("event -- 1 -- $event");
        });
      stream.asBroadcastStream().listen((event) {
        print("event -- 2 -- $event");
      });
    }
    

    因为 asBroadcastStream 本身对 stream 就是订阅,而stream 两次 asBroadcastStream 就造成单订阅多次被订阅的错误。

  • asyncExpand

    asyncExpand 是将一个订阅流的事件全部转化为一系列的异步事件的广播流。核心代码:

    Stream<E> asyncExpand<E>(Stream<E>? convert(T event)) {
      _StreamControllerBase<E> controller;
      if (isBroadcast) {
        controller = _SyncBroadcastStreamController<E>(null, null);
      } else {
        controller = _SyncStreamController<E>(null, null, null, null);
      }
    
      controller.onListen = () {
        StreamSubscription<T> subscription = this.listen(null,
            onError: controller._addError, // Avoid Zone error replacement.
            onDone: controller.close);
        subscription.onData((T event) {
          Stream<E>? newStream;
          try {
            newStream = convert(event);
          } catch (e, s) {
            controller.addError(e, s);
            return;
          }
          if (newStream != null) {
            subscription.pause();
            controller.addStream(newStream).whenComplete(subscription.resume);
          }
        });
        controller.onCancel = subscription.cancel;
        if (!isBroadcast) {
          controller
            ..onPause = subscription.pause
            ..onResume = subscription.resume;
        }
      };
      return controller.stream;
    }
    

    从上面代码可知,是将订阅流事件经过 asyncExpand 生成 _StreamControllerBase 对象,将 onData 获取的数据生成新的Stream ,然后在由 _StreamControllerBase 通过 addStream 对外提供新的 Stream

    实例代码:

    void asyncExpand() {
      var stream = Stream.fromIterable([1, 2, 3]);
      stream
          .asyncExpand((event) => Stream.fromFuture(
                Future(() {
                  return event * 2;
                }),
              ))
          .listen((event) {
        print("asyncExpand-- $event");
      });
    
      var stream1 = Stream.fromIterable([1, 2, 3]).asBroadcastStream();
      var stream2 = stream1.asyncExpand((event) => Stream.fromFuture(
            Future(() {
              return event;
            }),
          ));
      stream2.listen((event) {
        print("asyncExpand--1- $event");
      });
      stream2.listen((event) {
        print("asyncExpand--2- $event");
      });
    }
    

    日志输出:

    flutter: asyncExpand-- 2
    flutter: asyncExpand--1- 1
    flutter: asyncExpand--2- 1
    flutter: asyncExpand-- 4
    flutter: asyncExpand--1- 2
    flutter: asyncExpand--2- 2
    flutter: asyncExpand-- 6
    flutter: asyncExpand--1- 3
    flutter: asyncExpand--2- 3
    

    注意: 什么类型的订阅流经过 asyncExpand 生成对应类型的订阅流。

  • asyncMap

    asyncMap 是创建一个新流,并将该流的每个事件都转化为一个新的异步事件。核心代码:

    Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) {
      _StreamControllerBase<E> controller;
      if (isBroadcast) {
        controller = _SyncBroadcastStreamController<E>(null, null);
      } else {
        controller = _SyncStreamController<E>(null, null, null, null);
      }
    
      controller.onListen = () {
        StreamSubscription<T> subscription = this.listen(null,
            onError: controller._addError, // Avoid Zone error replacement.
            onDone: controller.close);
        FutureOr<Null> add(E value) {
          controller.add(value);
        }
    
        final addError = controller._addError;
        final resume = subscription.resume;
        subscription.onData((T event) {
          FutureOr<E> newValue;
          try {
            newValue = convert(event);
          } catch (e, s) {
            controller.addError(e, s);
            return;
          }
          if (newValue is Future<E>) {
            subscription.pause();
            newValue.then(add, onError: addError).whenComplete(resume);
          } else {
            // TODO(40014): Remove cast when type promotion works.
            controller.add(newValue as dynamic);
          }
        });
        controller.onCancel = subscription.cancel;
        if (!isBroadcast) {
          controller
            ..onPause = subscription.pause
            ..onResume = resume;
        }
      };
      return controller.stream;
    }
    

    从上面代码知道,订阅流经过 ansyMap 后生成新的 _StreamControllerBase 并返回 _StreamControllerBaseStream, 然后通过 订阅流的 onData 方法获取事件,然后经过 convert 获得 newValue 对象。然后在判断 newValue 的类型,分别执行不同的方法,最后通过 _StreamControllerBaseadd 方法进行发出事件。

    实例代码:

    void asyncMap() {
      var stream = Stream.fromIterable([1, 2]);
      stream.asyncMap((event) => event > 1).listen((event) {
        print("asyncMap -1- $event");
      });
    
      var stream1 = Stream.fromIterable([1, 2]);
      stream1
          .asyncMap((event) => Future(() {
                return event > 1;
              }))
          .listen((event) {
        print("asyncMap -2- $event");
      });
    }
    

    日志输出:

    flutter: asyncMap -1- false
    flutter: asyncMap -1- true
    flutter: asyncMap -2- false
    flutter: asyncMap -2- true
    
  • cast

    cast 是将订阅流转化 Stream<R> 的订阅流,作用是检查订阅流发出是事件是否是 R 类型。核心代码:

    Stream<R> cast<R>() => Stream.castFrom<T, R>(this);
    
    static Stream<T> castFrom<S, T>(Stream<S> source) =>
        new CastStream<S, T>(source);
    
    class CastStream<S, T> extends Stream<T> {
      final Stream<S> _source;
      CastStream(this._source);
      bool get isBroadcast => _source.isBroadcast;
    
      StreamSubscription<T> listen(void Function(T data)? onData,
          {Function? onError, void Function()? onDone, bool? cancelOnError}) {
        return new CastStreamSubscription<S, T>(
            _source.listen(null, onDone: onDone, cancelOnError: cancelOnError))
          ..onData(onData)
          ..onError(onError);
      }
    
      Stream<R> cast<R>() => new CastStream<S, R>(_source);
    }
    

    实例代码:

    void cast() {
      var stream = Stream.fromIterable([11, 22]);
      stream.cast<int>().listen((event) {
        print("cast--$event");
      });
    }
    

    日志输出:

    flutter: cast--11
    flutter: cast--22
    

    注意: Streamcast 方法能够检查 Stream 发出事件的类型是否是指定类型。

  • contains

    contains 是判断订阅流中事件是否有指定的事件, 返回一个 Future<bool> 对象。 核心代码:

    Future<bool> contains(Object? needle) {
      _Future<bool> future = new _Future<bool>();
      StreamSubscription<T> subscription =
          this.listen(null, onError: future._completeError, onDone: () {
        future._complete(false);
      }, cancelOnError: true);
      subscription.onData((T element) {
        _runUserCode(() => (element == needle), (bool isMatch) {
          if (isMatch) {
            _cancelAndValue(subscription, future, true);
          }
        }, _cancelAndErrorClosure(subscription, future));
      });
      return future;
    }
    

    从上面代码知,订阅流经过 onData 方法获取 element 事件,然后又 _runUserCode 判断 element 事件是否和指定的事件相等,返回 isMatch, 如果 isMatchtrue 则有 _cancelAndValue 方法以 Future 返回。

    实例代码:

    void contains() {
      var stream = Stream.fromIterable([2, 3, 4]);
      stream.contains(4).then((value) => print(value)); // true
    }
    
  • distinct

    distinct 是去除订阅流中相同的事件只发出一次。核心代码:

    Stream<T> distinct([bool equals(T previous, T next)?]) {
      return new _DistinctStream<T>(this, equals);
    }
    
    class _DistinctStream<T> extends _ForwardingStream<T, T> {
      static final _SENTINEL = new Object();
    
      final bool Function(T, T)? _equals;
    
      _DistinctStream(Stream<T> source, bool equals(T a, T b)?)
          : _equals = equals,
            super(source);
    
      StreamSubscription<T> _createSubscription(void onData(T data)?,
          Function? onError, void onDone()?, bool cancelOnError) {
        return new _StateStreamSubscription<Object?, T>(
            this, onData, onError, onDone, cancelOnError, _SENTINEL);
      }
    
      void _handleData(T inputEvent, _EventSink<T> sink) {
        var subscription = sink as _StateStreamSubscription<Object?, T>;
        var previous = subscription._subState;
        if (identical(previous, _SENTINEL)) {
          // First event. Cannot use [_equals].
          subscription._subState = inputEvent;
          sink._add(inputEvent);
        } else {
          T previousEvent = previous as T;
          var equals = _equals;
          bool isEqual;
          try {
            if (equals == null) {
              isEqual = (previousEvent == inputEvent);
            } else {
              isEqual = equals(previousEvent, inputEvent);
            }
          } catch (e, s) {
            _addErrorWithReplacement(sink, e, s);
            return;
          }
          if (!isEqual) {
            sink._add(inputEvent);
            subscription._subState = inputEvent;
          }
        }
      }
    }
    

    从上面代码知,订阅流调用 distinct 方法生 _DistinctStream 订阅流,然后 _DistinctStream 中创建 _StateStreamSubscription 的订阅者,然后再有 _handleData 方法中首先使用 identical 方法规避第一次调用的判断,并将事件赋值给 _StateStreamSubscription_subState 属性。下次 _handleData 调用经过 equals 判断得到 isEqual 对象,然后 isEqual 是否为 true 决定是否发出事件。

    实例代码:

    void distinct() {
      var stream = Stream.fromIterable([1, 3, 3, 6]);
      stream.distinct().listen((event) {
        print("distinct -- $event");
      });
    }
    

    日志输出:

    flutter: distinct -- 1
    flutter: distinct -- 3
    flutter: distinct -- 6
    
  • drain

    drain 是清除订阅流所有的事件,自定义自己的数据事件以 Future 返回。核心代码:

    Future<E> drain<E>([E? futureValue]) {
      if (futureValue == null) {
        futureValue = futureValue as E;
      }
      return listen(null, cancelOnError: true).asFuture<E>(futureValue);
    }
    

    从上面代码知道订阅流经过 drain 方法,将 StreamSubscription 通过 asFuture 转化为 Future 对象并带 futureValue 初始值。

    实例代码:

    void drain() {
      var stream = Stream.fromIterable([11, 33, 44]);
      stream.drain(2).then((value) => print(value)); // 2
    }
    
  • elementAt

    elementAt 是获取订阅流中第 index 事件, 返回一个 Future 对象。核心代码:

    Future<T> elementAt(int index) {
      RangeError.checkNotNegative(index, "index");
      _Future<T> result = new _Future<T>();
      int elementIndex = 0;
      StreamSubscription<T> subscription;
      subscription =
          this.listen(null, onError: result._completeError, onDone: () {
        result._completeError(
            new RangeError.index(index, this, "index", null, elementIndex),
            StackTrace.empty);
      }, cancelOnError: true);
      subscription.onData((T value) {
        if (index == elementIndex) {
          _cancelAndValue(subscription, result, value);
          return;
        }
        elementIndex += 1;
      });
    
      return result;
    }
    

    从上面代码知:订阅流经过 elementAt(int index) 方法,首先调用 checkNotNegative 检查 index 的值是否小于零;然后生成 subscription 订阅对象和elementIndex 记录广播次数 ,然后在 onData 方法中判断 index 是否和 elementIndex 相等。如果相等,再有 _cancelAndValue 取消订阅对象,然后调用 onDone 方法,返回 Future 对象。

    实例代码:

    void elementAt() {
      var stream = Stream.fromIterable([11, 33, 44]);
      stream.elementAt(2).then((value) => print(value)); // 44
    }
    
  • every

    every 是判断订阅流事件是否符合指定的条件,结果以 Future<bool> 返回。核心代码:

    Future<bool> every(bool test(T element)) {
      _Future<bool> future = new _Future<bool>();
      StreamSubscription<T> subscription =
          this.listen(null, onError: future._completeError, onDone: () {
        future._complete(true);
      }, cancelOnError: true);
      subscription.onData((T element) {
        _runUserCode(() => test(element), (bool isMatch) {
          if (!isMatch) {
            _cancelAndValue(subscription, future, false);
          }
        }, _cancelAndErrorClosure(subscription, future));
      });
      return future;
    }
    

    从上面代码知:订阅流调用every 函数,生成 subscriptionfuture 对象。然后 subscriptiononData 方法中调用 _runUserCode 方法检查是否符合条件,并返回isMatch,然后在根据 isMatchfalse 调用 _cancelAndValue 方法取消 subscription 取消监听,以 Future<bool> 返回结果。

    实例代码:

    void every() {
      var stream = Stream.fromIterable([110, 220, 330]);
      stream.every((element) => element > 100).then((value) => print(value)); // true
    }
    
  • expand

    expand 是将订阅流事件扩展为事件序列。核心代码:

    Stream<S> expand<S>(Iterable<S> convert(T element)) {
      return new _ExpandStream<T, S>(this, convert);
    }
    
    class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
      final _Transformation<S, Iterable<T>> _expand;
    
      _ExpandStream(Stream<S> source, Iterable<T> expand(S event))
          : this._expand = expand,
            super(source);
    
      void _handleData(S inputEvent, _EventSink<T> sink) {
        try {
          for (T value in _expand(inputEvent)) {
            sink._add(value);
          }
        } catch (e, s) {
          // If either _expand or iterating the generated iterator throws,
          // we abort the iteration.
          _addErrorWithReplacement(sink, e, s);
        }
      }
    }
    

    从上面代码知道,订阅流调用 expand 方法生成 _ExpandStream 订阅流,然后_ExpandStream_handleData 方法中通过 for...in... 通过 sink 将事件发出。

    实例代码:

    void expand() {
      var stream = Stream.fromIterable([110, 220, 330]);
      stream.expand((element) => [element, element * 2]).listen((event) {
        print(event);
      });
    }
    

    日志输出:

    flutter: 110
    flutter: 220
    flutter: 220
    flutter: 440
    flutter: 330
    flutter: 660
    
  • map

    map 是将订阅流转化为另一种订阅流。核心代码:

    class _MapStream<S, T> extends _ForwardingStream<S, T> {
      final _Transformation<S, T> _transform;
    
      _MapStream(Stream<S> source, T transform(S event))
          : this._transform = transform,
            super(source);
    
      void _handleData(S inputEvent, _EventSink<T> sink) {
        T outputEvent;
        try {
          outputEvent = _transform(inputEvent);
        } catch (e, s) {
          _addErrorWithReplacement(sink, e, s);
          return;
        }
        sink._add(outputEvent);
      }
    }
    

    从上面代码知: 订阅流通过 map 生成 _MapStream 对象,然后有 _MapStream_handleData 方法中经过 _Transformation 将结果给 outputEvent,最后通过 sink_add 方法发出 outputEvent 事件。

    实例代码:

    void map() {
      var stream = Stream.fromIterable([110, 220, 330]);
      stream.map((event) => event > 200).listen((event) {
        print(event);
      });
    }
    

    日志输出:

    flutter: false
    flutter: true
    flutter: true
    
  • skip(int count)

    skip 是跳过 count 次订阅流中数据事件。核心代码:

    class _SkipStream<T> extends _ForwardingStream<T, T> {
      final int _count;
    
      _SkipStream(Stream<T> source, int count)
          : this._count = count,
            super(source) {
        // This test is done early to avoid handling an async error
        // in the _handleData method.
        RangeError.checkNotNegative(count, "count");
      }
    
      StreamSubscription<T> _createSubscription(void onData(T data)?,
          Function? onError, void onDone()?, bool cancelOnError) {
        return new _StateStreamSubscription<int, T>(
            this, onData, onError, onDone, cancelOnError, _count);
      }
    
      void _handleData(T inputEvent, _EventSink<T> sink) {
        var subscription = sink as _StateStreamSubscription<int, T>;
        int count = subscription._subState;
        if (count > 0) {
          subscription._subState = count - 1;
          return;
        }
        sink._add(inputEvent);
      }
    }
    

    从上面代码知,事件源经过 skip 函数生成 _SkipStream事件源; _SkipStream 中生成 StreamSubscription 订阅对象;再有 _handleData 方法中通过 subscription._subState 获取指定跳过的次数判断是否大于零,直至subscription._subState 小于零,然后通过sink._add(inputEvent) 发出后续数据事件。

    实例代码:

    void skip() {
      var stream = Stream.fromIterable([11, 22, 33]);
      stream.skip(2).listen((event) {
        print(event); // 33
      });
    }
    

    注意: 跳过次数 count ,只能大于等于零; count 可以大于事件源中发送事件的个数。

  • toList

    toList 是将事件源发出的所有数据事件存放到 List 中,并以 Future 形式返回。核心代码:

    Future<List<T>> toList() {
      List<T> result = <T>[];
      _Future<List<T>> future = new _Future<List<T>>();
      this.listen(
          (T data) {
            result.add(data);
          },
          onError: future._completeError,
          onDone: () {
            future._complete(result);
          },
          cancelOnError: true);
      return future;
    }
    

    从上面知道,事件源由 toList 创建 List<T> result 数组,然后事件源订阅将发出事件存放到 List<T> result 中,然后在 onDone 方法中以 Future 返回。

    实例代码:

    void toList() {
      var stream = Stream.fromIterable([11, 22, 33]);
      stream.toList().then((value) => print(value)); // [11,22,33]
    }
    
  • toSet

    toSet 是将事件源发出的事件排出重复的添加到 Set 中。核心代码:

    Future<Set<T>> toSet() {
      Set<T> result = new Set<T>();
      _Future<Set<T>> future = new _Future<Set<T>>();
      this.listen(
          (T data) {
            result.add(data);
          },
          onError: future._completeError,
          onDone: () {
            future._complete(result);
          },
          cancelOnError: true);
      return future;
    }
    

    从上面代码知:发出事件的排重是有Set 完成。

    实例代码:

    void toSet() {
      var stream = Stream.fromIterable([11, 22, 33, 33]);
      stream.toSet().then((value) => print(value)); // {11,22,33}
    }
    
  • take(int count)

    take 是获取事件源中 count 次的事件并生成新的事件源。核心代码:

    class _TakeStream<T> extends _ForwardingStream<T, T> {
      final int _count;
    
      _TakeStream(Stream<T> source, int count)
          : this._count = count,
            super(source);
    
      StreamSubscription<T> _createSubscription(void onData(T data)?,
          Function? onError, void onDone()?, bool cancelOnError) {
        if (_count == 0) {
          _source.listen(null).cancel();
          return new _DoneStreamSubscription<T>(onDone);
        }
        return new _StateStreamSubscription<int, T>(
            this, onData, onError, onDone, cancelOnError, _count);
      }
    
      void _handleData(T inputEvent, _EventSink<T> sink) {
        var subscription = sink as _StateStreamSubscription<int, T>;
        int count = subscription._subState;
        if (count > 0) {
          sink._add(inputEvent);
          count -= 1;
          subscription._subState = count;
          if (count == 0) {
            // Closing also unsubscribes all subscribers, which unsubscribes
            // this from source.
            sink._close();
          }
        }
      }
    }
    

    从上面代码可知,事件源经过take生成 _TakeStream事件源,然后由 _handleData 方法获取指定次数事件,经 sink._add(inputEvent) 发出。

    实例代码:

    void take() {
      var stream = Stream.fromIterable([22, 22, 33, 33, 22, 11, 33, 11]);
      stream.take(2).listen((event) {
        print(event); //22 22
      });
    }
    
  • reduce

    reduce 是减少事件源发出事件,将事件组合将结果以Future 输出。核心代码:

    Future<T> reduce(T combine(T previous, T element)) {
      _Future<T> result = new _Future<T>();
      bool seenFirst = false;
      late T value;
      StreamSubscription<T> subscription =
          this.listen(null, onError: result._completeError, onDone: () {
        if (!seenFirst) {
          try {
            // Throw and recatch, instead of just doing
            //  _completeWithErrorCallback, e, theError, StackTrace.current),
            // to ensure that the stackTrace is set on the error.
            throw IterableElementError.noElement();
          } catch (e, s) {
            _completeWithErrorCallback(result, e, s);
          }
        } else {
          result._complete(value);
        }
      }, cancelOnError: true);
      subscription.onData((T element) {
        if (seenFirst) {
          _runUserCode(() => combine(value, element), (T newValue) {
            value = newValue;
          }, _cancelAndErrorClosure(subscription, result));
        } else {
          value = element;
          seenFirst = true;
        }
      });
      return result;
    }
    

    从上面代码知事件源有reduce 生成 StreamSubscription<T> subscription 对象,在 onData 中有combine 将结果合并为 newValue 并赋值给 valueFuture 返回。

    实例代码:

    void reduce() {
      var stream = Stream.fromIterable([11, 22, 33, 33]);
      stream
          .reduce((previous, element) => previous + element)
          .then((value) => print(value)); // 99
    }
    
  • join

    join 将事件源发出的事件以字符串的形式按指定字符窜拼接以 Futuren 输出。核心代码:

    Future<String> join([String separator = ""]) {
      _Future<String> result = new _Future<String>();
      StringBuffer buffer = new StringBuffer();
      bool first = true;
      StreamSubscription<T> subscription =
          this.listen(null, onError: result._completeError, onDone: () {
        result._complete(buffer.toString());
      }, cancelOnError: true);
      subscription.onData(separator.isEmpty
          ? (T element) {
              try {
                buffer.write(element);
              } catch (e, s) {
                _cancelAndErrorWithReplacement(subscription, result, e, s);
              }
            }
          : (T element) {
              if (!first) {
                buffer.write(separator);
              }
              first = false;
              try {
                buffer.write(element);
              } catch (e, s) {
                _cancelAndErrorWithReplacement(subscription, result, e, s);
              }
            });
      return result;
    }
    

    从上面代码知,事件源经过 join 函数生成 resultbuffersubscription 。由 onData 方法将事件写入到 buffer 中,然后在 onDone 中将 buffer.toString() 的结果以Future 返回。

    实例代码:

    void join() {
      var stream = Stream.fromIterable([11, 22, 33, 33]);
      stream.join("-").then((value) => print(value)); // 11-22-33-33
    }
    
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352