Stream
Dart中的流概念和Java8中的流概念非常相似。
Java:A sequence of elements supporting sequential and parallel aggregate operations。
一个支持串行和并行聚合操作的 元素序列。
Dart:A stream is a sequence of asynchronous data。
一个异步的数据序列
在同步的世界里:一个数据可以用一个对象表示,众多数据 就需要使用Iterable承载。在异步的世界里:一个数据对应一个Future,众多的异步数据 就是Stream。
--- | 单个 | 多个 |
---|---|---|
Syn* | int | Iterable<int> |
Asy* | Future<int> | Stream<int> |
我们知道异步的世界里,最为不确定的因素就是时间,我们不确定数据在什么时候准备好。List集合里面的数据是我们已经知道的,因此我们可以方便的对数据做一些操作。但是Stream数据是不确定的,因此Dart提供了一种监听机制,类似于观察者模式,当数据来临时,去通知注册监听的观察者。在观察者模式中对数据的监听者,既可以称为 观察者、也可以称为监听者,因此下文的两者语义相同。
除此之外,程序中除了对数据的操作,其他的都是数据,事件可以是数据、消息可以是数据、数据更可以是数据,并且Dart中一切都是object,因此,下文中的数据、事件等是一回事。
我们可以将Stream想象成水管,水管里面的水就是数据。如下图所示:
水管的一边是数据源、另一边是数据的监听。
数据分为三种类型:值数据、标志事件完成的标识位数据、标志事件异常的标识位数据。当所有的数据发送完成之后,数据源会给监听者发送 数据完成的标识位,结束本次流过程。 当发生某种错误时,数据源会发出 错误事件数据,虽然发出了错误事件,但是可以根据开发者对错误的事件的处理 去决定是否还发送后续事件。
那么问题来了:如何产生呢、如何建立监听关系呢?
数据源
往水管里添加水的方法大概有三种:Stream的构造方法、StreamController和语法糖。它们都会往sink中添加数据,当监听建立后,sink的数据就会流动起来。
方式 | 特点 |
---|---|
Stream构造方法 | 使用方便、数据较为固定 |
StreamController | 数据灵活动态、使用较为不方便 |
语法糖 | 使用灵活 |
Stream构造方法
Stream为我们提供了8中工厂构造方法去创建所需要的Stream。
方法名 | 作用 |
---|---|
Stream.empty() | 创建一个空的 广播Stream,只发送完成事件 |
Stream.value(T value) | 只发送value和完成事件的流 |
Stream.error(Object error, [StackTrace stackTrace]) | 只发送error事件和完成事件的流 |
Stream.fromFuture(Future<T> future) | 当future完成后发送一个数据事件或错误事件和一个完成事件的 单订阅者流 |
Stream.fromFutures(Iterable<Future<T>> futures) | 根据一组future生成一个Stream,事件是无序的。 |
Stream.fromIterable(Iterable<T> elements) | 根据集合的数据生成一个但订阅者的流 |
Stream.eventTransformed(Stream source, EventSink mapSink(EventSink<T> sink)) | 根据soure的数据和map函数,生成一个流,是转换流。 |
Stream.periodic(Duration period,[T computation(int computationCount)]) | 生成一个周期性发射数据的流。 |
Stream.empty()
事件的订阅 是 使用listen方法,下面会详谈,这里暂且使用。三个参数分别是:
onData(T event) 数据事件的监听
Function onError 错误事件的监听
void onDone() 完成事件的监听
void main() {
Stream.empty().listen(
(value) {
print('${value}');
}, onError: (obj,strace) {
print('error');
}, onDone: () {
print('done');
});
}
//打印 done,只发射了完成事件
Stream.value(T value)
void main() {
Stream.value("ss").listen((value) {
print('${value}');
}, onError: (obj, str) {
print('error');
}, onDone: () {
print('done');
});
}
//打印 ss 和 done,发射了数据和完成事件
Stream.error(Object error, [StackTrace stackTrace])
Stream.error("error").listen((value) {
print('${value}');
}, onError: (obj, str) {
print('error');
}, onDone: () {
print('done');
});
//打印 error 和 done,发射了错误和完成事件
Stream.fromFuture(Future<T> future)
Stream.fromFuture(Future.delayed(Duration(seconds: 1), () {
return 1;
})).listen((value) {
print('${value}');
}, onError: (obj, str) {
print('error');
}, onDone: () {
print('done');
});
}
// 一秒后打印1 和done ,将future的结果和完成事件发射出去
注意这里是 将等待future完成之后 才发射数据
Stream.fromFutures(Iterable<Future<T>> futures)
var list = List.generate(10, (value) {
return Future.delayed(Duration(seconds: 10 - value), () {
return value;
});
});
Stream.fromFutures(list).listen((value) {
print('${value}');
}, onError: (obj, str) {
print('error');
}, onDone: () {
print('done');
});
//一次打印了 9、8、7、6、5、4、3、2、1、0、done
注意:虽然我们添加的顺序的 是正序,但是打印的顺序是 谁先计算好 Stream中就有谁 ,并把它发射出去。
当所有的都计算 好之后 就会发射出done事件
factory Stream.fromIterable(Iterable<T> elements)
void main() {
var list = List.generate(10, (value) {
return Future.delayed(Duration(seconds: 10 - value), () {
return value;
});
});
Stream.fromIterable(list).listen((value) {
print('${value}');
}, onError: (obj, str) {
print('error');
}, onDone: () {
print('done');
});
}
//立即打印了 十次 Instance of 'Future<int>' 和 done
注意:这里是将数组的元素 当成 数据 发送出去,即使参数是Future没有 等待 参数的计算。
Stream.periodic(Duration period,[T computation(int computationCount)])
void main() {
Stream.periodic(Duration(seconds: 5), (count) {
return count;
}).listen((value) {
print('${value}');
}, onError: (obj, str) {
print('error');
}, onDone: () {
print('done');
});
}
//每间隔 五秒钟 都会打印处 count,count是从0开始 ++;
Stream.eventTransformed(Stream source, EventSink mapSink(EventSink<T> sink))
void main() {
var list = List.generate(10, (value) {
return value;
});
Stream.eventTransformed(Stream.fromIterable(list), (eventSink) {
return MapSink(eventSink);
}).listen((value) {
print('${value}');
}, onError: (obj, str) {
print('error');
}, onDone: () {
print('done');
});
}
class MapSink implements EventSink<int> {
EventSink _outputSink;
MapSink(this._outputSink);
@override
void add(int event) {
_outputSink.add('$event');
}
void addError(e, [st]) {
_outputSink.addError(e, st);
}
void close() {
_outputSink.close();
}
}
//一次 打印 0-9 和 done事件。
以上就是通过构造方法去 为Stream添加数据。我们可以发现Stream的一个特点就是:数据什么时候来 我什么时候发射出去,不管你什么时候发射,监听者都可以收到数据。
StreamController
通过构造方法 去构造数据源,更像是我买来的 水管本身就是有水的,相比较起来StreamController就更纯粹一点,它是水管的管理者,可以去为水管添加 水(数据),也可以调整 监听者的行为,也可以看看水管当前的状态。
StreamController里有一些重要的字段和方法,这些字段和方法就是 对 水管及其两端的监听。
如下所示:
Stream<T> get stream; //水管
//构造方法:参数分别是建立监听、暂停发数据、
//恢复发数据、取消发送的回调以及是否是异步
//这里的异步 和 Stream的数据的异步没有关系
factory StreamController(
{void onListen(),
void onPause(),
void onResume(),
onCancel(),
bool sync: false}) {
return sync
? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
: new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}
//构造方法:用于广播式 流订阅者
factory StreamController.broadcast(
{void onListen(), void onCancel(), bool sync: false}) {
return sync
? new _SyncBroadcastStreamController<T>(onListen, onCancel)
: new _AsyncBroadcastStreamController<T>(onListen, onCancel);
}
//监听 建立的回调---》就是我们调用了Stream的Listen的回调
ControllerCallback get onListen;
void set onListen(void onListenHandler());
//数据暂定发送的回调
ControllerCallback get onPause;
void set onPause(void onPauseHandler());
//数据恢复发送的回调
ControllerCallback get onResume;
void set onResume(void onResumeHandler());
//流 取消的回调
ControllerCancelCallback get onCancel;
void set onCancel(onCancelHandler());
//这个属性 就是 这个Stream的 sink,
//就是水管的 水源一端
StreamSink<T> get sink;
bool get isClosed;
bool get isPaused;
//判断水管 是否有监听者
bool get hasListener;
//发送一个数据event
//紧接着监听者会收到 事件
void add(T event);
void addError(Object error, [StackTrace stackTrace]);
//关闭流通道,也就是立即发送一个done事件
Future close();
Future addStream(Stream<T> source, {bool cancelOnError});
上面就是StreamController的属性和方法,所谓的控制 就是 通过这些API实现的。我们发现控制 侧重于水源测,比如add方法,而对于监听测大多数是回调,比如暂停、恢复的回调等。
void main() {
// 创建一个控制器
var streamController = StreamController(onListen: () {
print('listen');
}, onPause: () {
print('pause');
}, onResume: () {
print('resume');
}, onCancel: () {
print('cancel');
});
var count = 0;
Timer.periodic(Duration(seconds: 1), (value) {
每隔一秒水槽 添加一个数据
streamController.sink.add(count++);
});
print('${streamController.hasListener}');
// 建立监听关系
var subscription = streamController.stream.listen((value) {
print('${value}');
}, onError: (obj, str) {
print('error');
}, onDone: () {
print('done');
},cancelOnError: true);
print('${streamController.hasListener}');
Future.delayed(Duration(seconds: 5),(){
//暂定
subscription.pause();
});
Future.delayed(Duration(seconds: 10),(){
//恢复
subscription.resume();
});
Future.delayed(Duration(seconds: 15),(){
//取消
subscription.cancel();
});
}
语法糖
一个有趣的语法糖就是async*关键字和yiel 关键字
Stream<int> countStream(int max) async* {
for (int i = 0; i < max; i++) {
yield i;
}
}
//水管中有了0-max的数据
上面说完了数据源侧,下面说一下Stream本身。
Stream数据通道
类似于Java的Stream,Dart的Stream也提供了一些针对Stream的操作。
再说操作之前我们先说一个点就是Stream的类型。上面提到过 单订阅式和广播式订阅。下面我们介绍一下这两种类型。
Stream类型
A single-subscription stream
单订阅 流,在整个流的生命存活期内,只允许存在一个监听者。它的特点就是:在没有监听者之前,Stream是不会发送数据的。 当监听者 解除订阅时,Stream就会停止数据的发送。虽然Stream会停止数据的发送,但是并不应影响数据的产生。
如果对一个单一订阅的流,建两个或者更多的订阅者
会怎么样呢?-----》会报错。
即使我们把第一个订阅取消了之后,再去订阅也还是会报错。
void main() {
var stream = Stream.value(2);
stream.first;
stream.last;
}
//报错
Unhandled exception:
Bad state: Stream has already been listened to.
//因为first和last 都调用了listen
A broadcast stream
广播式流,特点:第一:允许任意数量的订阅者。第二:无论是否订阅,它会立即发送数据。因此晚于数据发送的时机的订阅者 并不会收到 前面发送过的数据。
void main() {
var count = 0;
var controller = StreamController();
Timer.periodic(Duration(seconds: 1), (value) {
controller.sink.add(count++);
});
var asBroadcastStream = controller.stream.asBroadcastStream();
asBroadcastStream.listen((value) {
print(value);
});
Future.delayed(Duration(seconds: 3), () {
asBroadcastStream.listen((value) {
print('$value + second');
});
});
}
//打印
0
1
2
3
3 + second
4
4 + second
...
//首先 订阅了两次也没有报错
//其次 我们发现第二次没有收到 订阅之前的数据,但是订阅之后,同一个数据会被两个订阅者 收到。
以上就是关于流的类型
流的操作
流的操作和Java非常相似:聚合、转换、过滤、判断等。
Stream<T> where(bool test(T event)): 创建一个新流,流中元素是 旧流中符合test条件的元素。
Stream<M> map <M>(M convert(T event)) : 创建一个新流,流中元素是 旧流元素 执行 convert的结果。
Stream<T> handleError(Function onError, {bool test(error)}): 创建一个新流,用于拦截满足test的error事件。
Stream<M> expand<M>(Iterable<M> convert(T element)):创建一个新流,新流的元素是 旧流元素执行convert的结果,注意新流的每一个元素 是一个数据序列。
Future pipe(StreamConsumer<T> streamConsumer):返回一个Future,当流被消费
且被关闭时,future会自动计算。
Stream<M> transform<M>(StreamTransformer<T, M> streamTransformer): 在旧流的基础上做streamTransformer转换,注意这里返回的还是旧流,只不过旧流的元素 已经 做了转换。
Future<T> reduce(T combine(T previous, T element)):聚合流中元素,对流中元素迭代调用combine函数。
Future<M> fold<M>(M initialValue, M combine(M previous, T element)):和reduce相似,只是多了一个初始值。
Future<String> join([String separator = ""]):拼接流中元素的字符串形式,separator为分隔符。
Future<bool> contains(Object needle):是否包含needle。
Future forEach(void action(T element)) :流中元素迭代执行action。
Future<bool> every(bool test(T element)) : 检查 流种元素是否 全都满足test。
Future<bool> any(bool test(T element)) : 检查流中元素 是否 存在满足test 的元素。
Future<List<T>> toList() :将流中元素 收集到 集合中。
Future<Set<T>> toSet() : 将流中元素 收集到 Set中。
Stream<T> take(int count) : 返回一个新的流,流中元素的 旧流元素的前count个。
Stream<T> skip(int count) :返回以新流,新流的第一个元素为 旧流元素的第count个。
Stream<T> distinct([bool equals(T previous, T next)]):流中元素去重。
....
以上是 一些常用的操作符,由于Stream是异步的数据序列,所以返回值要么是 异步的Future,要么是异步的数据序列。
数据通道监听
前面我们说了 数据源sink、数据通道stream,现在我们说一下数据的监听,之前我们提到了 对于单订阅的Stream的来说,只有订阅关系建立之后,stream才会发送数据,stream不关心你sink中是否有数据,如果没有那就 等你有了 我在发送,如果有数据,那我就顺序的取数据。这个订阅关系的建立就是listen方法的调用。
listen
StreamSubscription<T> listen(void onData(T event),
{Function onError, void onDone(), bool cancelOnError})
该方法接受一个必填参数 和 三个可选的命名参数。返回一个StreamSubscription。
该方法的作用就是:为流添加一个订阅者,这个订阅者就是其返回值。因为发送的数据有 数据事件、完成事件和错误事件,所以对应的订阅者就有处理三种事件的能力:分别是参数中的onData、onError、onDone。 如果发送了错误事件是否还继续还订阅后续事件呢?这个是否就是参数cancelOnError的值。
void main() {
var count = 0;
var controller = StreamController();
Timer.periodic(Duration(seconds: 1), (value) {
controller.sink.add(count++);
if (count == 3) {
controller.sink.addError(Object());
}
print('add');
});
controller.stream.listen((value){
print('$value');
},onError: (error){
print('$error');
});
}
//一次打印
add
0
add
1
add
2
Instance of 'Object'
add
3
···
我们可以看到:
1、listen的回调方法都执行了,并分别对应这事件类型
2、即使发送了并收到了错误事件,并没有解除订阅关系,因为cancelOnError默认是false。
下面我们将cancelOnError设置为true。
void main() {
var count = 0;
var controller = StreamController();
Timer.periodic(Duration(seconds: 1), (value) {
controller.sink.add(count++);
if (count == 3) {
controller.sink.addError(Object());
}
print('add');
});
controller.stream.listen((value){
print('$value');
},onError: (error){
print('$error');
},cancelOnError: true);
}
//依次打印
add
0
add
1
add
2
Instance of 'Object'
add
add
···
我们发现:
1、当收到error事件后,解除了订阅关系(并没有打印value值)。
2、虽然解除了订阅关系,但是并不影响 往sink中添加数据。
StreamSubscription
上面我们提到了订阅者就是StreamSubscription,listen方法中的参数只是返回的StreamSubscription的默认实现。
初次之外,StreamSubscription提供了更为强大的功能:控制是否继续接受数据。
我们可以这么理解:水管一只流水,我可以选择是否要你流出来的水,什么我可以暂定一会儿。
StreamSubscription对订阅的控制是通过他对暴露的方法接口实现的。
Future cancel():取消订阅,取消之后订阅者就不会在收到任何消息。
void main() {
var count = 0;
var controller = StreamController();
Timer.periodic(Duration(seconds: 1), (value) {
controller.sink.add(count++);
print('add');
});
var subscription = controller.stream.listen((value) {
print('$value');
}, onError: (error) {
print('$error');
},onDone: (){
print('done');
});
Future.delayed(Duration(seconds: 3),(){
subscription.cancel();
});
}
//一次打印:
add
0
add
1
add
2
add
···
可以发现:
取消之后,就不会在收到事件。
但是并没有影响 sink的添加
void onData(void handleData(T data)):替换listen的onData参数,所有的数据事件 都会流到 这个方法中。
void main() {
var count = 0;
var controller = StreamController();
Timer.periodic(Duration(seconds: 1), (value) {
controller.sink.add(count++);
});
var subscription = controller.stream.listen((value) {
print('$value');
}, onError: (error) {
print('$error');
}, onDone: () {
print('done');
});
subscription.onData((value) {
print('value');
});
}
//一次打印:
value
value
value
···
可以看到onData的替换,并且如果指定了subscription.onData,那么listen的onData参数可以为null。
同理: void onError(Function handleError)、void onDone(void handleDone())分别是对错误和完成事件的替换。<br
void pause([Future resumeSignal]):暂定一下订阅
void resume():将订阅从暂定恢复到活动状态
我们在StreamContoller提到过对Stream的暂定和恢复的监听回调,那个回调就是 这里的方法的调用。
void main() {
// 创建一个控制器
var streamController = StreamController(onPause: () {
print('pause');
}, onResume: () {
print('resume');
}, onCancel: () {
print('cancel');
});
var count = 0;
Timer.periodic(Duration(seconds: 1), (value) {
streamController.sink.add(count++);
});
// 建立监听关系
var subscription = streamController.stream.listen((value) {
print('${value}');
}, onError: (obj, str) {
print('error');
}, onDone: () {
print('done');
});
Future.delayed(Duration(seconds: 3), () {
//暂定
subscription.pause();
});
Future.delayed(Duration(seconds: 7), () {
//恢复
subscription.resume();
});
}
依次打印:
0
1
2
pause
3
4
5
6
resume
7
···
可以发现:
调用了暂定和恢复之后,回调执行了。
调用了暂定之后,有数据onData也不会调用
调用了恢复之后,数据瞬间发送了好几条。
因为:暂停之后,订阅者会缓存 恢复时间点之前 Stream发送出来的数据。所以出现了瞬间发送了好几条。
总结
以上就是流的三个方面:数据源、Stream通道和数据订阅。总体来说流就是异步的数据序列,只要订阅了数据源,流就会将数据送到订阅处。从这一方面来看,流与集合相比,更符合响应式,因为它是一个推数据的过程。