感觉文章还行,请留下你的赞。
一、Stream 是什么?
Stream 是异步数据事件的源。Stream 提供了一种接收事件序列的方法,可以通过 listen 进行数据监听;通过 error 接收失败状态;通过 done 接收事件的结束状态。
注意: Stream 只有当被监听后,才能使其产生事件。同时会产生一个 StreamSubscription 的对象,该对象是提供事件的活动对象,可用于停止再次监听或者临时暂停订阅事件。
二、 Stream 的订阅对象 StreamSubscription
-
StreamSubscription 的种类
Stream 的订阅流有两种,分别是:单订阅流(Single-subscription)和多订阅流(broadcast)。
-
单订阅流(Single-subscription)
- 单订阅流只允许有一个监听器(listen);只有监听后才会产生事件;取消监听器时也停止事件发送,即是 Stream 还有更多事件。
- 单订阅流即是取消了第一次订阅,也不允许再次订阅。
- 单订阅流常用于流较大的连续数据事件,如文件 I/O,
-
多订阅流(broadcast, 广播)
广播流可以有多个监听器,广播事件就绪时就触发其事件,无论有无监听器。
广播流常用于独立的事件或者观察者。
-
广播流可以取消监听,然后再次监听。
注意: 如果多个监听器想订阅单个订阅流,请使用 asBroadcastStream 在单订阅流顶部创建广播流。
三、 Stream 的构建
-
empty
/// 空的广播流 void emptyStream() { var stream = Stream.empty(); stream.listen((event) { print("empty -- listen"); }, onDone: () { print("empty -- onDone"); // flutter: empty -- onDone }, onError: (e) { print("empty -- onError"); }, cancelOnError: true); }
这是一个流,它在监听后,只发出一个 onDone 事件,其他什么都不做。
-
error
// 创建一个错误的流 void createErrorStream() { var stream = Stream.error("错误流"); stream.listen((event) { print("error -- listen"); }, onError: (e) { print("error -- onError"); // flutter: error -- onError }, onDone: () { print("error -- onDone"); }, cancelOnError: true); }
这是创建在事件完成前发出的单个错误事件流。
-
fromFuture
// 从Future创建一个单事件流 void createStreamFormFuture() { var stream = Stream.fromFuture( Future.delayed(Duration(seconds: 1), () { print("Future 延迟事件"); }), ); stream.listen((event) { print("fromFuture -- listen-- ${event.toString()}"); // flutter: fromFuture -- listen-- null }, onError: (e) { print("fromFuture -- onError"); }, onDone: () { print("fromFuture -- onDone"); // flutter: fromFuture -- onDone }, cancelOnError: true); }
这是有 Future 创建的一个单订阅流。
注意:
1.这是单订阅流在未监听时,它的事件将有 _SyncStreamController 的缓存区保留,当被监听后,再次发出事件。
2.对于一个单一的值来说,在 Future 做 then之前等待一个侦听器是不值得的。 -
fromFutures
// 从一组 Future 中创建单订阅流 void createStreamFromMoreFuture() { var stream = Stream.fromFutures([ Future(() { print("Future - 1"); }), Future(() { print("Future - 2"); }), ]); stream.listen((event) { print("fromFutures -- listen-- ${event.toString()}"); }, onError: (e) { print("fromFutures -- onError"); }, onDone: () { print("fromFutures -- onDone"); }, cancelOnError: true); }
日志输出:
flutter: Future - 1 flutter: fromFutures -- listen-- null flutter: Future - 2 flutter: fromFutures -- listen-- null flutter: fromFutures -- onDone
从一组 Future 中创建一个单订阅流。该流有几个 Future 则 listen 方法将会被调用几次,同时 onDone 方法最后调用。
-
fromIterable
// 创建一个可以从改流中获取数据的单订阅流 void createStreamFromIterable() { var stream = Stream.fromIterable([1, 2, 3, 4]); stream.listen((event) { print("fromIterable -- listen-- ${event.toString()}"); }, onError: (e) { print("fromIterable -- onError"); }, onDone: () { print("fromIterable -- onDone"); }, cancelOnError: true); }
日志输出:
flutter: fromIterable -- listen-- 1 flutter: fromIterable -- listen-- 2 flutter: fromIterable -- listen-- 3 flutter: fromIterable -- listen-- 4 flutter: fromIterable -- onDone
创建一个可以从流中获取值的单订阅流。该流有几个元素,则 listen 方法将会被调用几次,同时 onDone 方法最后调用。
-
multi
/// 创建一个多订阅流 void createMutilStream() { var stream = Stream.multi((control) { control.addSync("multi--1"); control.addSync("multi--2"); control.close(); }); stream.listen((event) { print("multi -- listen-- ${event.toString()}"); }, onError: (e) { print("multi -- onError"); }, onDone: () { print("multi -- onDone"); }, cancelOnError: false); Future.delayed(Duration(seconds: 2), () { stream.listen((event) { print("multi -- listen-- 2-- ${event.toString()}"); }, onError: (e) { print("multi -- onError -- 2"); }, onDone: () { print("multi -- onDone-- 2"); }, cancelOnError: false); }); }
日志输出:
flutter: multi -- listen-- multi--1 flutter: multi -- listen-- multi--2 flutter: multi -- onDone flutter: multi -- listen-- 2-- multi--1 flutter: multi -- listen-- 2-- multi--2 flutter: multi -- onDone-- 2
创建一个广播流,通过 control 发出响应的事件。注意在 control 不调用 close 方法,则监听中的 onDone 方法也不会调用。
-
periodic
// 创建一定间隔事件发出事件的流 void createPeriodicStream() { var stream = Stream.periodic(Duration(seconds: 2), (value) { return value; }); stream.listen((event) { print("periodic -- listen-- 2-- ${event.toString()}"); }, onError: (e) { print("periodic -- onError -- 2"); }, onDone: () { print("periodic -- onDone-- 2"); }, cancelOnError: false); }
日志输出:
flutter: periodic -- listen-- 2-- 0 flutter: periodic -- listen-- 2-- 1 flutter: periodic -- listen-- 2-- 2 flutter: periodic -- listen-- 2-- 3 flutter: periodic -- listen-- 2-- 4 flutter: periodic -- listen-- 2-- 5 ...
这是创建一个指定间隔时间内发出事件的单订阅流。
-
value
// 创建一个在完成前发出事件的单订阅流 void createSingleStream() { var stream = Stream.value("单值流"); stream.listen((event) { print("value -- listen-- ${event.toString()}"); }, onError: (e) { print("value -- onError"); }, onDone: () { print("value -- onDone"); }, cancelOnError: false); }
日志输出:
flutter: value -- listen-- 单值流 flutter: value -- onDone
这是创建一个在完成前发出事件的单订阅流。
-
eventTransformed
/// 创建一个将现有流的所有事件通过接收器转换通过管道发出 void createevEntTransformed() { var stream = Stream.eventTransformed(Stream.fromIterable([1, 2]), (sink) { sink.add(2); return sink; }); stream.listen((event) { print("eventTransformed -- listen-- ${event.toString()}"); }, onError: (e) { print("eventTransformed -- onError"); }, onDone: () { print("eventTransformed -- onDone"); }, cancelOnError: false); }
日志输出:
flutter: eventTransformed -- listen-- 2 flutter: eventTransformed -- listen-- 1 flutter: eventTransformed -- listen-- 2 flutter: eventTransformed -- onDone
这是创建一个将现有事件通过接收器转换通过通道发出。
四、Stream 属性
-
first
first 是事件流的第一个元素,是 Futurt<T> 的对象。
实现原理核心代码是:Future<T> get first { _Future<T> future = new _Future<T>(); StreamSubscription<T> subscription = this.listen(null, onError: future._completeError, onDone: () { try { throw IterableElementError.noElement(); } catch (e, s) { _completeWithErrorCallback(future, e, s); } }, cancelOnError: true); subscription.onData((T value) { _cancelAndValue(subscription, future, value); }); return future; } void _cancelAndValue(StreamSubscription subscription, _Future future, value) { var cancelFuture = subscription.cancel(); if (cancelFuture != null && !identical(cancelFuture, Future._nullFuture)) { cancelFuture.whenComplete(() => future._complete(value)); } else { future._complete(value); } }
上面解释: 实现原理就是事件流发出第一个事件时
subscription.onData((T value) { _cancelAndValue(subscription, future, value); })
,然后取消订阅流subscription.cancel()
。实例:
void first() { var stream1 = Stream.value("first 测试"); stream1.first.then((value) => print(value)); var stream2 = Stream.fromIterable([1, 2, 3]); stream2.first.then((value) => print(value)); }
日志输出:
flutter: first 测试 flutter: 1
注意: 当我们的流是单订阅流时,调用 frist 之后,就不能被再次监听。因为
stream.first
包含一次监听(源码)。 -
isEmpty
isEmpty 是判断订阅流是否为空。核心代码如下:
Future<bool> get isEmpty { _Future<bool> future = new _Future<bool>(); StreamSubscription<T> subscription = this.listen(null, onError: future._completeError, onDone: () { future._complete(true); }, cancelOnError: true); subscription.onData((_) { _cancelAndValue(subscription, future, false); }); return future; }
上面代码可知如果订阅流能回调 onDone 方法,则判定其不为空,否则为空。
实例如下:void isEmpty() { var stream = Stream.empty(); stream.isEmpty.then((value) => print(value)); var stream1 = Stream.fromIterable(["发生错误"]); stream1.isEmpty.then((value) => print(value)); var stream2 = Stream.multi((control) { control.add(1); }); stream2.isEmpty.then((value) => print(value)); }
日志输出:
flutter: true flutter: false flutter: false
-
last
last 是获取订阅流发出事件的最后一个,是 Future<T> 的类型。核心代码:
Future<T> get last { _Future<T> future = new _Future<T>(); late T result; bool foundResult = false; listen( (T value) { foundResult = true; result = value; }, onError: future._completeError, onDone: () { if (foundResult) { future._complete(result); return; } try { throw IterableElementError.noElement(); } catch (e, s) { _completeWithErrorCallback(future, e, s); } }, cancelOnError: true); return future; }
从上面代码的 onDone 方法看出,这是把订阅流的最后一个事件,通过 Future 返回。
注意:
- 如果订阅流事件发生错误,这 Future 完成错误并停止处理。
- 如果订阅流是空,则 onDone 函数调用,并抛出无事件的异常。
实例:
void last() { var stream = Stream.fromIterable([1, 2, 3]); stream.last.then((value) => print(value)); // 3 }
-
length
length 是获取订阅流发出事件的个数。核心代码:
Future<int> get length { _Future<int> future = new _Future<int>(); int count = 0; this.listen( (_) { count++; }, onError: future._completeError, onDone: () { future._complete(count); }, cancelOnError: true); return future; }
从上面代码,可以看到是通过声明 count 来记录流发出的事件。
实例:
void length() { var stream = Stream.fromIterable([1, 2, 3]); stream.length.then((value) => print(value)); // 3 }
-
single
single 是获取只能发出一次事件的订阅流的事件。核心代码:
Future<T> get single { _Future<T> future = new _Future<T>(); late T result; bool foundResult = false; StreamSubscription<T> subscription = this.listen(null, onError: future._completeError, onDone: () { if (foundResult) { future._complete(result); return; } try { throw IterableElementError.noElement(); } catch (e, s) { _completeWithErrorCallback(future, e, s); } }, cancelOnError: true); subscription.onData((T value) { if (foundResult) { // This is the second element we get. try { throw IterableElementError.tooMany(); } catch (e, s) { _cancelAndErrorWithReplacement(subscription, future, e, s); } return; } foundResult = true; result = value; }); return future; }
从上面代码知道: 首先使用 foundResult 在 onData 方法中为 false 跳过检查,然后 foundResult 变为 true 和 result 获取发送事件。在 onDone 方法中 foundResult 为 true 完成 Future 的
_complete
方法结束。注意: 使用single 的订阅流必须是只能包含一个事件的订阅流,否则会抛出 tooMany 的异常。
实例:
void single() { var stream = Stream.fromIterable([1]); stream.single.then((value) => print(value));// 1 }
五、Stream 的方法
-
any
any 是检查订阅流中是否有符合test 条件的事件。核心代码:
Future<bool> any(bool test(T element)) { _Future<bool> future = new _Future<bool>(); StreamSubscription<T> subscription = this.listen(null, onError: future._completeError, onDone: () { future._complete(false); }, cancelOnError: true); subscription.onData((T element) { _runUserCode(() => test(element), (bool isMatch) { if (isMatch) { _cancelAndValue(subscription, future, true); } }, _cancelAndErrorClosure(subscription, future)); }); return future; } void _cancelAndValue(StreamSubscription subscription, _Future future, value) { var cancelFuture = subscription.cancel(); if (cancelFuture != null && !identical(cancelFuture, Future._nullFuture)) { cancelFuture.whenComplete(() => future._complete(value)); } else { future._complete(value); } }
从上面代码可以看到: 通过 onData 函数获取 element ,然后经过
_runUserCode
方法检查并返回结果 isMatch ,如果 isMatch 为 true 则调用_cancelAndValue
方法取消订阅subscription.cancel()
,然后完成 Future。注意: any 是检查订阅流中事件是否符合条件的事件,如果有则返回 true ,否则返回 false。any 只要检查到有一个符合则检查结束,后面的就不在检查。
实例:
void any() { var stream = Stream.fromIterable([1, 2, 3]); stream.any((element) => element > 2).then((value) => print(value)); // true }
-
asBroadcastStream
asBroadcastStream 是将一个单订阅流转换为一个广播流。核心代码:
Stream<T> asBroadcastStream( {void onListen(StreamSubscription<T> subscription)?, void onCancel(StreamSubscription<T> subscription)?}) { return new _AsBroadcastStream<T>(this, onListen, onCancel); }
从上面代码可知,原有的订阅流经过 asBroadcastStream 后,生成新的 _AsBroadcastStream 的订阅流。
实例代码:
void asBroadcastStream() { var stream = Stream.fromFuture( Future(() { return 110; }), ); var stream1 = stream.asBroadcastStream(); stream1 ..listen((event) { print("event -- 1 -- $event"); }); stream1.listen((event) { print("event -- 2 -- $event"); }); }
日志输出:
flutter: event -- 1 -- 110 flutter: event -- 2 -- 110
注意:上面的实例不能写成下面形式:
void asBroadcastStream() { var stream = Stream.fromFuture( Future(() { return 110; }), ); stream.asBroadcastStream().listen((event) { print("event -- 1 -- $event"); }); stream.asBroadcastStream().listen((event) { print("event -- 2 -- $event"); }); }
因为 asBroadcastStream 本身对 stream 就是订阅,而stream 两次 asBroadcastStream 就造成单订阅多次被订阅的错误。
-
asyncExpand
asyncExpand 是将一个订阅流的事件全部转化为一系列的异步事件的广播流。核心代码:
Stream<E> asyncExpand<E>(Stream<E>? convert(T event)) { _StreamControllerBase<E> controller; if (isBroadcast) { controller = _SyncBroadcastStreamController<E>(null, null); } else { controller = _SyncStreamController<E>(null, null, null, null); } controller.onListen = () { StreamSubscription<T> subscription = this.listen(null, onError: controller._addError, // Avoid Zone error replacement. onDone: controller.close); subscription.onData((T event) { Stream<E>? newStream; try { newStream = convert(event); } catch (e, s) { controller.addError(e, s); return; } if (newStream != null) { subscription.pause(); controller.addStream(newStream).whenComplete(subscription.resume); } }); controller.onCancel = subscription.cancel; if (!isBroadcast) { controller ..onPause = subscription.pause ..onResume = subscription.resume; } }; return controller.stream; }
从上面代码可知,是将订阅流事件经过
asyncExpand
生成 _StreamControllerBase 对象,将onData
获取的数据生成新的Stream ,然后在由 _StreamControllerBase 通过addStream
对外提供新的 Stream。实例代码:
void asyncExpand() { var stream = Stream.fromIterable([1, 2, 3]); stream .asyncExpand((event) => Stream.fromFuture( Future(() { return event * 2; }), )) .listen((event) { print("asyncExpand-- $event"); }); var stream1 = Stream.fromIterable([1, 2, 3]).asBroadcastStream(); var stream2 = stream1.asyncExpand((event) => Stream.fromFuture( Future(() { return event; }), )); stream2.listen((event) { print("asyncExpand--1- $event"); }); stream2.listen((event) { print("asyncExpand--2- $event"); }); }
日志输出:
flutter: asyncExpand-- 2 flutter: asyncExpand--1- 1 flutter: asyncExpand--2- 1 flutter: asyncExpand-- 4 flutter: asyncExpand--1- 2 flutter: asyncExpand--2- 2 flutter: asyncExpand-- 6 flutter: asyncExpand--1- 3 flutter: asyncExpand--2- 3
注意: 什么类型的订阅流经过
asyncExpand
生成对应类型的订阅流。 -
asyncMap
asyncMap 是创建一个新流,并将该流的每个事件都转化为一个新的异步事件。核心代码:
Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) { _StreamControllerBase<E> controller; if (isBroadcast) { controller = _SyncBroadcastStreamController<E>(null, null); } else { controller = _SyncStreamController<E>(null, null, null, null); } controller.onListen = () { StreamSubscription<T> subscription = this.listen(null, onError: controller._addError, // Avoid Zone error replacement. onDone: controller.close); FutureOr<Null> add(E value) { controller.add(value); } final addError = controller._addError; final resume = subscription.resume; subscription.onData((T event) { FutureOr<E> newValue; try { newValue = convert(event); } catch (e, s) { controller.addError(e, s); return; } if (newValue is Future<E>) { subscription.pause(); newValue.then(add, onError: addError).whenComplete(resume); } else { // TODO(40014): Remove cast when type promotion works. controller.add(newValue as dynamic); } }); controller.onCancel = subscription.cancel; if (!isBroadcast) { controller ..onPause = subscription.pause ..onResume = resume; } }; return controller.stream; }
从上面代码知道,订阅流经过
ansyMap
后生成新的_StreamControllerBase
并返回 _StreamControllerBase 的 Stream, 然后通过 订阅流的onData
方法获取事件,然后经过convert
获得newValue
对象。然后在判断 newValue 的类型,分别执行不同的方法,最后通过 _StreamControllerBase 的add
方法进行发出事件。实例代码:
void asyncMap() { var stream = Stream.fromIterable([1, 2]); stream.asyncMap((event) => event > 1).listen((event) { print("asyncMap -1- $event"); }); var stream1 = Stream.fromIterable([1, 2]); stream1 .asyncMap((event) => Future(() { return event > 1; })) .listen((event) { print("asyncMap -2- $event"); }); }
日志输出:
flutter: asyncMap -1- false flutter: asyncMap -1- true flutter: asyncMap -2- false flutter: asyncMap -2- true
-
cast
cast 是将订阅流转化 Stream<R> 的订阅流,作用是检查订阅流发出是事件是否是 R 类型。核心代码:
Stream<R> cast<R>() => Stream.castFrom<T, R>(this); static Stream<T> castFrom<S, T>(Stream<S> source) => new CastStream<S, T>(source); class CastStream<S, T> extends Stream<T> { final Stream<S> _source; CastStream(this._source); bool get isBroadcast => _source.isBroadcast; StreamSubscription<T> listen(void Function(T data)? onData, {Function? onError, void Function()? onDone, bool? cancelOnError}) { return new CastStreamSubscription<S, T>( _source.listen(null, onDone: onDone, cancelOnError: cancelOnError)) ..onData(onData) ..onError(onError); } Stream<R> cast<R>() => new CastStream<S, R>(_source); }
实例代码:
void cast() { var stream = Stream.fromIterable([11, 22]); stream.cast<int>().listen((event) { print("cast--$event"); }); }
日志输出:
flutter: cast--11 flutter: cast--22
注意: Stream 的
cast
方法能够检查 Stream 发出事件的类型是否是指定类型。 -
contains
contains 是判断订阅流中事件是否有指定的事件, 返回一个 Future<bool> 对象。 核心代码:
Future<bool> contains(Object? needle) { _Future<bool> future = new _Future<bool>(); StreamSubscription<T> subscription = this.listen(null, onError: future._completeError, onDone: () { future._complete(false); }, cancelOnError: true); subscription.onData((T element) { _runUserCode(() => (element == needle), (bool isMatch) { if (isMatch) { _cancelAndValue(subscription, future, true); } }, _cancelAndErrorClosure(subscription, future)); }); return future; }
从上面代码知,订阅流经过
onData
方法获取 element 事件,然后又_runUserCode
判断 element 事件是否和指定的事件相等,返回 isMatch, 如果 isMatch 为 true 则有_cancelAndValue
方法以 Future 返回。实例代码:
void contains() { var stream = Stream.fromIterable([2, 3, 4]); stream.contains(4).then((value) => print(value)); // true }
-
distinct
distinct 是去除订阅流中相同的事件只发出一次。核心代码:
Stream<T> distinct([bool equals(T previous, T next)?]) { return new _DistinctStream<T>(this, equals); } class _DistinctStream<T> extends _ForwardingStream<T, T> { static final _SENTINEL = new Object(); final bool Function(T, T)? _equals; _DistinctStream(Stream<T> source, bool equals(T a, T b)?) : _equals = equals, super(source); StreamSubscription<T> _createSubscription(void onData(T data)?, Function? onError, void onDone()?, bool cancelOnError) { return new _StateStreamSubscription<Object?, T>( this, onData, onError, onDone, cancelOnError, _SENTINEL); } void _handleData(T inputEvent, _EventSink<T> sink) { var subscription = sink as _StateStreamSubscription<Object?, T>; var previous = subscription._subState; if (identical(previous, _SENTINEL)) { // First event. Cannot use [_equals]. subscription._subState = inputEvent; sink._add(inputEvent); } else { T previousEvent = previous as T; var equals = _equals; bool isEqual; try { if (equals == null) { isEqual = (previousEvent == inputEvent); } else { isEqual = equals(previousEvent, inputEvent); } } catch (e, s) { _addErrorWithReplacement(sink, e, s); return; } if (!isEqual) { sink._add(inputEvent); subscription._subState = inputEvent; } } } }
从上面代码知,订阅流调用
distinct
方法生 _DistinctStream 订阅流,然后 _DistinctStream 中创建 _StateStreamSubscription 的订阅者,然后再有_handleData
方法中首先使用identical
方法规避第一次调用的判断,并将事件赋值给 _StateStreamSubscription的_subState 属性。下次_handleData
调用经过equals
判断得到 isEqual 对象,然后 isEqual 是否为 true 决定是否发出事件。实例代码:
void distinct() { var stream = Stream.fromIterable([1, 3, 3, 6]); stream.distinct().listen((event) { print("distinct -- $event"); }); }
日志输出:
flutter: distinct -- 1 flutter: distinct -- 3 flutter: distinct -- 6
-
drain
drain 是清除订阅流所有的事件,自定义自己的数据事件以 Future 返回。核心代码:
Future<E> drain<E>([E? futureValue]) { if (futureValue == null) { futureValue = futureValue as E; } return listen(null, cancelOnError: true).asFuture<E>(futureValue); }
从上面代码知道订阅流经过
drain
方法,将 StreamSubscription 通过asFuture
转化为 Future 对象并带 futureValue 初始值。实例代码:
void drain() { var stream = Stream.fromIterable([11, 33, 44]); stream.drain(2).then((value) => print(value)); // 2 }
-
elementAt
elementAt 是获取订阅流中第 index 事件, 返回一个 Future 对象。核心代码:
Future<T> elementAt(int index) { RangeError.checkNotNegative(index, "index"); _Future<T> result = new _Future<T>(); int elementIndex = 0; StreamSubscription<T> subscription; subscription = this.listen(null, onError: result._completeError, onDone: () { result._completeError( new RangeError.index(index, this, "index", null, elementIndex), StackTrace.empty); }, cancelOnError: true); subscription.onData((T value) { if (index == elementIndex) { _cancelAndValue(subscription, result, value); return; } elementIndex += 1; }); return result; }
从上面代码知:订阅流经过
elementAt(int index)
方法,首先调用checkNotNegative
检查 index 的值是否小于零;然后生成 subscription 订阅对象和elementIndex 记录广播次数 ,然后在onData
方法中判断 index 是否和 elementIndex 相等。如果相等,再有_cancelAndValue
取消订阅对象,然后调用onDone
方法,返回 Future 对象。实例代码:
void elementAt() { var stream = Stream.fromIterable([11, 33, 44]); stream.elementAt(2).then((value) => print(value)); // 44 }
-
every
every 是判断订阅流事件是否符合指定的条件,结果以 Future<bool> 返回。核心代码:
Future<bool> every(bool test(T element)) { _Future<bool> future = new _Future<bool>(); StreamSubscription<T> subscription = this.listen(null, onError: future._completeError, onDone: () { future._complete(true); }, cancelOnError: true); subscription.onData((T element) { _runUserCode(() => test(element), (bool isMatch) { if (!isMatch) { _cancelAndValue(subscription, future, false); } }, _cancelAndErrorClosure(subscription, future)); }); return future; }
从上面代码知:订阅流调用
every
函数,生成 subscription 和 future 对象。然后 subscription的onData
方法中调用_runUserCode
方法检查是否符合条件,并返回isMatch,然后在根据 isMatch 为 false 调用_cancelAndValue
方法取消 subscription 取消监听,以 Future<bool> 返回结果。实例代码:
void every() { var stream = Stream.fromIterable([110, 220, 330]); stream.every((element) => element > 100).then((value) => print(value)); // true }
-
expand
expand 是将订阅流事件扩展为事件序列。核心代码:
Stream<S> expand<S>(Iterable<S> convert(T element)) { return new _ExpandStream<T, S>(this, convert); } class _ExpandStream<S, T> extends _ForwardingStream<S, T> { final _Transformation<S, Iterable<T>> _expand; _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) : this._expand = expand, super(source); void _handleData(S inputEvent, _EventSink<T> sink) { try { for (T value in _expand(inputEvent)) { sink._add(value); } } catch (e, s) { // If either _expand or iterating the generated iterator throws, // we abort the iteration. _addErrorWithReplacement(sink, e, s); } } }
从上面代码知道,订阅流调用
expand
方法生成 _ExpandStream 订阅流,然后_ExpandStream 的_handleData
方法中通过for...in...
通过 sink 将事件发出。实例代码:
void expand() { var stream = Stream.fromIterable([110, 220, 330]); stream.expand((element) => [element, element * 2]).listen((event) { print(event); }); }
日志输出:
flutter: 110 flutter: 220 flutter: 220 flutter: 440 flutter: 330 flutter: 660
-
map
map 是将订阅流转化为另一种订阅流。核心代码:
class _MapStream<S, T> extends _ForwardingStream<S, T> { final _Transformation<S, T> _transform; _MapStream(Stream<S> source, T transform(S event)) : this._transform = transform, super(source); void _handleData(S inputEvent, _EventSink<T> sink) { T outputEvent; try { outputEvent = _transform(inputEvent); } catch (e, s) { _addErrorWithReplacement(sink, e, s); return; } sink._add(outputEvent); } }
从上面代码知: 订阅流通过
map
生成 _MapStream 对象,然后有 _MapStream 的_handleData
方法中经过_Transformation
将结果给 outputEvent,最后通过 sink 的_add
方法发出 outputEvent 事件。实例代码:
void map() { var stream = Stream.fromIterable([110, 220, 330]); stream.map((event) => event > 200).listen((event) { print(event); }); }
日志输出:
flutter: false flutter: true flutter: true
-
skip(int count)
skip 是跳过 count 次订阅流中数据事件。核心代码:
class _SkipStream<T> extends _ForwardingStream<T, T> { final int _count; _SkipStream(Stream<T> source, int count) : this._count = count, super(source) { // This test is done early to avoid handling an async error // in the _handleData method. RangeError.checkNotNegative(count, "count"); } StreamSubscription<T> _createSubscription(void onData(T data)?, Function? onError, void onDone()?, bool cancelOnError) { return new _StateStreamSubscription<int, T>( this, onData, onError, onDone, cancelOnError, _count); } void _handleData(T inputEvent, _EventSink<T> sink) { var subscription = sink as _StateStreamSubscription<int, T>; int count = subscription._subState; if (count > 0) { subscription._subState = count - 1; return; } sink._add(inputEvent); } }
从上面代码知,事件源经过
skip
函数生成 _SkipStream事件源; _SkipStream 中生成 StreamSubscription 订阅对象;再有_handleData
方法中通过 subscription._subState 获取指定跳过的次数判断是否大于零,直至subscription._subState 小于零,然后通过sink._add(inputEvent) 发出后续数据事件。实例代码:
void skip() { var stream = Stream.fromIterable([11, 22, 33]); stream.skip(2).listen((event) { print(event); // 33 }); }
注意: 跳过次数 count ,只能大于等于零; count 可以大于事件源中发送事件的个数。
-
toList
toList 是将事件源发出的所有数据事件存放到 List 中,并以 Future 形式返回。核心代码:
Future<List<T>> toList() { List<T> result = <T>[]; _Future<List<T>> future = new _Future<List<T>>(); this.listen( (T data) { result.add(data); }, onError: future._completeError, onDone: () { future._complete(result); }, cancelOnError: true); return future; }
从上面知道,事件源由
toList
创建List<T> result
数组,然后事件源订阅将发出事件存放到 List<T> result 中,然后在onDone
方法中以 Future 返回。实例代码:
void toList() { var stream = Stream.fromIterable([11, 22, 33]); stream.toList().then((value) => print(value)); // [11,22,33] }
-
toSet
toSet 是将事件源发出的事件排出重复的添加到 Set 中。核心代码:
Future<Set<T>> toSet() { Set<T> result = new Set<T>(); _Future<Set<T>> future = new _Future<Set<T>>(); this.listen( (T data) { result.add(data); }, onError: future._completeError, onDone: () { future._complete(result); }, cancelOnError: true); return future; }
从上面代码知:发出事件的排重是有Set 完成。
实例代码:
void toSet() { var stream = Stream.fromIterable([11, 22, 33, 33]); stream.toSet().then((value) => print(value)); // {11,22,33} }
-
take(int count)
take 是获取事件源中 count 次的事件并生成新的事件源。核心代码:
class _TakeStream<T> extends _ForwardingStream<T, T> { final int _count; _TakeStream(Stream<T> source, int count) : this._count = count, super(source); StreamSubscription<T> _createSubscription(void onData(T data)?, Function? onError, void onDone()?, bool cancelOnError) { if (_count == 0) { _source.listen(null).cancel(); return new _DoneStreamSubscription<T>(onDone); } return new _StateStreamSubscription<int, T>( this, onData, onError, onDone, cancelOnError, _count); } void _handleData(T inputEvent, _EventSink<T> sink) { var subscription = sink as _StateStreamSubscription<int, T>; int count = subscription._subState; if (count > 0) { sink._add(inputEvent); count -= 1; subscription._subState = count; if (count == 0) { // Closing also unsubscribes all subscribers, which unsubscribes // this from source. sink._close(); } } } }
从上面代码可知,事件源经过
take
生成_TakeStream
事件源,然后由_handleData
方法获取指定次数事件,经sink._add(inputEvent)
发出。实例代码:
void take() { var stream = Stream.fromIterable([22, 22, 33, 33, 22, 11, 33, 11]); stream.take(2).listen((event) { print(event); //22 22 }); }
-
reduce
reduce 是减少事件源发出事件,将事件组合将结果以Future 输出。核心代码:
Future<T> reduce(T combine(T previous, T element)) { _Future<T> result = new _Future<T>(); bool seenFirst = false; late T value; StreamSubscription<T> subscription = this.listen(null, onError: result._completeError, onDone: () { if (!seenFirst) { try { // Throw and recatch, instead of just doing // _completeWithErrorCallback, e, theError, StackTrace.current), // to ensure that the stackTrace is set on the error. throw IterableElementError.noElement(); } catch (e, s) { _completeWithErrorCallback(result, e, s); } } else { result._complete(value); } }, cancelOnError: true); subscription.onData((T element) { if (seenFirst) { _runUserCode(() => combine(value, element), (T newValue) { value = newValue; }, _cancelAndErrorClosure(subscription, result)); } else { value = element; seenFirst = true; } }); return result; }
从上面代码知事件源有
reduce
生成 StreamSubscription<T> subscription 对象,在onData
中有combine
将结果合并为 newValue 并赋值给 value 以 Future 返回。实例代码:
void reduce() { var stream = Stream.fromIterable([11, 22, 33, 33]); stream .reduce((previous, element) => previous + element) .then((value) => print(value)); // 99 }
-
join
join 将事件源发出的事件以字符串的形式按指定字符窜拼接以 Futuren 输出。核心代码:
Future<String> join([String separator = ""]) { _Future<String> result = new _Future<String>(); StringBuffer buffer = new StringBuffer(); bool first = true; StreamSubscription<T> subscription = this.listen(null, onError: result._completeError, onDone: () { result._complete(buffer.toString()); }, cancelOnError: true); subscription.onData(separator.isEmpty ? (T element) { try { buffer.write(element); } catch (e, s) { _cancelAndErrorWithReplacement(subscription, result, e, s); } } : (T element) { if (!first) { buffer.write(separator); } first = false; try { buffer.write(element); } catch (e, s) { _cancelAndErrorWithReplacement(subscription, result, e, s); } }); return result; }
从上面代码知,事件源经过
join
函数生成 result、buffer、subscription 。由onData
方法将事件写入到 buffer 中,然后在onDone
中将 buffer.toString() 的结果以Future 返回。实例代码:
void join() { var stream = Stream.fromIterable([11, 22, 33, 33]); stream.join("-").then((value) => print(value)); // 11-22-33-33 }