Dart Stream

Stream

Dart中的流概念和Java8中的流概念非常相似。

Java:A sequence of elements supporting sequential and parallel aggregate operations。
      一个支持串行和并行聚合操作的 元素序列。
Dart:A stream is a sequence of asynchronous data。
      一个异步的数据序列

在同步的世界里:一个数据可以用一个对象表示,众多数据 就需要使用Iterable承载。在异步的世界里:一个数据对应一个Future,众多的异步数据 就是Stream。

--- 单个 多个
Syn* int Iterable<int>
Asy* Future<int> Stream<int>

我们知道异步的世界里,最为不确定的因素就是时间,我们不确定数据在什么时候准备好。List集合里面的数据是我们已经知道的,因此我们可以方便的对数据做一些操作。但是Stream数据是不确定的,因此Dart提供了一种监听机制,类似于观察者模式,当数据来临时,去通知注册监听的观察者。在观察者模式中对数据的监听者,既可以称为 观察者、也可以称为监听者,因此下文的两者语义相同。

除此之外,程序中除了对数据的操作,其他的都是数据,事件可以是数据、消息可以是数据、数据更可以是数据,并且Dart中一切都是object,因此,下文中的数据、事件等是一回事。

我们可以将Stream想象成水管,水管里面的水就是数据。如下图所示:

image

水管的一边是数据源、另一边是数据的监听。
数据分为三种类型:值数据、标志事件完成的标识位数据、标志事件异常的标识位数据。当所有的数据发送完成之后,数据源会给监听者发送 数据完成的标识位,结束本次流过程。 当发生某种错误时,数据源会发出 错误事件数据,虽然发出了错误事件,但是可以根据开发者对错误的事件的处理 去决定是否还发送后续事件。


那么问题来了:如何产生呢、如何建立监听关系呢?

数据源

往水管里添加水的方法大概有三种:Stream的构造方法、StreamController和语法糖。它们都会往sink中添加数据,当监听建立后,sink的数据就会流动起来。

方式 特点
Stream构造方法 使用方便、数据较为固定
StreamController 数据灵活动态、使用较为不方便
语法糖 使用灵活

Stream构造方法

Stream为我们提供了8中工厂构造方法去创建所需要的Stream。

方法名 作用
Stream.empty() 创建一个空的 广播Stream,只发送完成事件
Stream.value(T value) 只发送value和完成事件的流
Stream.error(Object error, [StackTrace stackTrace]) 只发送error事件和完成事件的流
Stream.fromFuture(Future<T> future) 当future完成后发送一个数据事件或错误事件和一个完成事件的 单订阅者流
Stream.fromFutures(Iterable<Future<T>> futures) 根据一组future生成一个Stream,事件是无序的。
Stream.fromIterable(Iterable<T> elements) 根据集合的数据生成一个但订阅者的流
Stream.eventTransformed(Stream source, EventSink mapSink(EventSink<T> sink)) 根据soure的数据和map函数,生成一个流,是转换流。
Stream.periodic(Duration period,[T computation(int computationCount)]) 生成一个周期性发射数据的流。
Stream.empty()

事件的订阅 是 使用listen方法,下面会详谈,这里暂且使用。三个参数分别是:
onData(T event) 数据事件的监听

Function onError 错误事件的监听

void onDone() 完成事件的监听

void main() {
  Stream.empty().listen(
    (value) {
    print('${value}');
  }, onError: (obj,strace) {
    print('error');
  }, onDone: () {
    print('done');
  });
}

//打印 done,只发射了完成事件

Stream.value(T value)
void main() {
  Stream.value("ss").listen((value) {
    print('${value}');
  }, onError: (obj, str) {
    print('error');
  }, onDone: () {
    print('done');
  });
}

//打印 ss 和 done,发射了数据和完成事件
Stream.error(Object error, [StackTrace stackTrace])
Stream.error("error").listen((value) {
    print('${value}');
  }, onError: (obj, str) {
    print('error');
  }, onDone: () {
    print('done');
  });
  
  //打印 error 和 done,发射了错误和完成事件
  
Stream.fromFuture(Future<T> future)
Stream.fromFuture(Future.delayed(Duration(seconds: 1), () {
    return 1;
  })).listen((value) {
    print('${value}');
  }, onError: (obj, str) {
    print('error');
  }, onDone: () {
    print('done');
  });
}
// 一秒后打印1 和done ,将future的结果和完成事件发射出去
注意这里是 将等待future完成之后 才发射数据
Stream.fromFutures(Iterable<Future<T>> futures)
var list = List.generate(10, (value) {
    return Future.delayed(Duration(seconds: 10 - value), () {
      return value; 
    });
  });
  Stream.fromFutures(list).listen((value) {
    print('${value}');
  }, onError: (obj, str) {
    print('error');
  }, onDone: () {
    print('done');
  });
  
//一次打印了 9、8、7、6、5、4、3、2、1、0、done
注意:虽然我们添加的顺序的 是正序,但是打印的顺序是 谁先计算好 Stream中就有谁 ,并把它发射出去。
当所有的都计算 好之后 就会发射出done事件

factory Stream.fromIterable(Iterable<T> elements)
void main() {
  var list = List.generate(10, (value) {
    return Future.delayed(Duration(seconds: 10 - value), () {
      return value;
    });
  });
  Stream.fromIterable(list).listen((value) {
    print('${value}');
  }, onError: (obj, str) {
    print('error');
  }, onDone: () {
    print('done');
  });
}

//立即打印了 十次 Instance of 'Future<int>' 和 done
注意:这里是将数组的元素 当成 数据 发送出去,即使参数是Future没有 等待 参数的计算。

Stream.periodic(Duration period,[T computation(int computationCount)])
void main() {
  Stream.periodic(Duration(seconds: 5), (count) {
    return count;
  }).listen((value) {
    print('${value}');
  }, onError: (obj, str) {
    print('error');
  }, onDone: () {
    print('done');
  });
}
//每间隔 五秒钟 都会打印处 count,count是从0开始 ++;

Stream.eventTransformed(Stream source, EventSink mapSink(EventSink<T> sink))
void main() {
  var list = List.generate(10, (value) {
    return value;
  });

  Stream.eventTransformed(Stream.fromIterable(list), (eventSink) {
    return MapSink(eventSink);
  }).listen((value) {
    print('${value}');
  }, onError: (obj, str) {
    print('error');
  }, onDone: () {
    print('done');
  });
}

class MapSink implements EventSink<int> {
  EventSink _outputSink;

  MapSink(this._outputSink);

  @override
  void add(int event) {
    _outputSink.add('$event');
  }

  void addError(e, [st]) {
    _outputSink.addError(e, st);
  }

  void close() {
    _outputSink.close();
  }
}

//一次 打印 0-9 和 done事件。

以上就是通过构造方法去 为Stream添加数据。我们可以发现Stream的一个特点就是:数据什么时候来 我什么时候发射出去,不管你什么时候发射,监听者都可以收到数据。

StreamController

通过构造方法 去构造数据源,更像是我买来的 水管本身就是有水的,相比较起来StreamController就更纯粹一点,它是水管的管理者,可以去为水管添加 水(数据),也可以调整 监听者的行为,也可以看看水管当前的状态。

StreamController里有一些重要的字段和方法,这些字段和方法就是 对 水管及其两端的监听。
如下所示:

  Stream<T> get stream; //水管

  //构造方法:参数分别是建立监听、暂停发数据、
  //恢复发数据、取消发送的回调以及是否是异步
  //这里的异步 和 Stream的数据的异步没有关系
  factory StreamController(
      {void onListen(),
      void onPause(),
      void onResume(),
      onCancel(),
      bool sync: false}) {
    return sync
        ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
        : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
  }

  //构造方法:用于广播式 流订阅者
  factory StreamController.broadcast(
      {void onListen(), void onCancel(), bool sync: false}) {
    return sync
        ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
        : new _AsyncBroadcastStreamController<T>(onListen, onCancel);
  }

  //监听 建立的回调---》就是我们调用了Stream的Listen的回调
  ControllerCallback get onListen;
  void set onListen(void onListenHandler());

  //数据暂定发送的回调
  ControllerCallback get onPause;
  void set onPause(void onPauseHandler());

  //数据恢复发送的回调
  ControllerCallback get onResume;
  void set onResume(void onResumeHandler());

  //流 取消的回调
  ControllerCancelCallback get onCancel;
  void set onCancel(onCancelHandler());

  //这个属性 就是 这个Stream的 sink,
  //就是水管的 水源一端
  StreamSink<T> get sink;

 
  bool get isClosed;

  bool get isPaused;
  
  //判断水管 是否有监听者
  bool get hasListener;

   //发送一个数据event
  //紧接着监听者会收到 事件
  void add(T event);

  void addError(Object error, [StackTrace stackTrace]);

  //关闭流通道,也就是立即发送一个done事件
  Future close();

  Future addStream(Stream<T> source, {bool cancelOnError});

上面就是StreamController的属性和方法,所谓的控制 就是 通过这些API实现的。我们发现控制 侧重于水源测,比如add方法,而对于监听测大多数是回调,比如暂停、恢复的回调等。

void main() {
  // 创建一个控制器 
  var streamController = StreamController(onListen: () {
    print('listen');
  }, onPause: () {
    print('pause');
  }, onResume: () {
    print('resume');
  }, onCancel: () {
    print('cancel');
  });

  var count = 0;

  Timer.periodic(Duration(seconds: 1), (value) {
    每隔一秒水槽 添加一个数据
    streamController.sink.add(count++);
  });
  
  print('${streamController.hasListener}');

  // 建立监听关系
  var subscription = streamController.stream.listen((value) {
    print('${value}');
  }, onError: (obj, str) {
    print('error');
  }, onDone: () {
    print('done');
  },cancelOnError: true);
  print('${streamController.hasListener}');

  Future.delayed(Duration(seconds: 5),(){
    //暂定
    subscription.pause();
  });

  Future.delayed(Duration(seconds: 10),(){
    //恢复
    subscription.resume();
  });

  Future.delayed(Duration(seconds: 15),(){
    //取消
    subscription.cancel();
  });
}

语法糖

一个有趣的语法糖就是async*关键字和yiel 关键字

Stream<int> countStream(int max) async* {
    for (int i = 0; i < max; i++) {
        yield i;
    }
}

//水管中有了0-max的数据

上面说完了数据源侧,下面说一下Stream本身。

Stream数据通道

类似于Java的Stream,Dart的Stream也提供了一些针对Stream的操作。

再说操作之前我们先说一个点就是Stream的类型。上面提到过 单订阅式和广播式订阅。下面我们介绍一下这两种类型。

Stream类型

A single-subscription stream

单订阅 流,在整个流的生命存活期内,只允许存在一个监听者。它的特点就是:在没有监听者之前,Stream是不会发送数据的。 当监听者 解除订阅时,Stream就会停止数据的发送。虽然Stream会停止数据的发送,但是并不应影响数据的产生。

如果对一个单一订阅的流,建两个或者更多的订阅者
会怎么样呢?-----》会报错。

即使我们把第一个订阅取消了之后,再去订阅也还是会报错。

void main() {
  var stream = Stream.value(2);
  stream.first;
  stream.last;
}
//报错
Unhandled exception:
Bad state: Stream has already been listened to.
//因为first和last 都调用了listen

A broadcast stream

广播式流,特点:第一:允许任意数量的订阅者。第二:无论是否订阅,它会立即发送数据。因此晚于数据发送的时机的订阅者 并不会收到 前面发送过的数据。

void main() {
  var count = 0;
  var controller = StreamController();
  Timer.periodic(Duration(seconds: 1), (value) {
    controller.sink.add(count++);
  });

  var asBroadcastStream = controller.stream.asBroadcastStream();
  asBroadcastStream.listen((value) {
    print(value);
  });
  Future.delayed(Duration(seconds: 3), () {
    asBroadcastStream.listen((value) {
      print('$value + second');
    });
  });
}

//打印
0
1
2
3
3 + second
4
4 + second
...

//首先 订阅了两次也没有报错
//其次 我们发现第二次没有收到 订阅之前的数据,但是订阅之后,同一个数据会被两个订阅者 收到。

以上就是关于流的类型

流的操作

流的操作和Java非常相似:聚合、转换、过滤、判断等。

Stream<T> where(bool test(T event)): 创建一个新流,流中元素是 旧流中符合test条件的元素。

Stream<M> map <M>(M convert(T event)) : 创建一个新流,流中元素是 旧流元素 执行 convert的结果。

Stream<T> handleError(Function onError, {bool test(error)}): 创建一个新流,用于拦截满足test的error事件。

Stream<M> expand<M>(Iterable<M> convert(T element)):创建一个新流,新流的元素是 旧流元素执行convert的结果,注意新流的每一个元素 是一个数据序列。

Future pipe(StreamConsumer<T> streamConsumer):返回一个Future,当流被消费
且被关闭时,future会自动计算。

Stream<M> transform<M>(StreamTransformer<T, M> streamTransformer): 在旧流的基础上做streamTransformer转换,注意这里返回的还是旧流,只不过旧流的元素 已经 做了转换。

Future<T> reduce(T combine(T previous, T element)):聚合流中元素,对流中元素迭代调用combine函数。

Future<M> fold<M>(M initialValue, M combine(M previous, T element)):和reduce相似,只是多了一个初始值。

Future<String> join([String separator = ""]):拼接流中元素的字符串形式,separator为分隔符。

Future<bool> contains(Object needle):是否包含needle。

Future forEach(void action(T element)) :流中元素迭代执行action。

Future<bool> every(bool test(T element)) : 检查 流种元素是否 全都满足test。

Future<bool> any(bool test(T element)) : 检查流中元素 是否 存在满足test 的元素。

Future<List<T>> toList() :将流中元素 收集到 集合中。

Future<Set<T>> toSet() : 将流中元素 收集到 Set中。

Stream<T> take(int count) : 返回一个新的流,流中元素的 旧流元素的前count个。

Stream<T> skip(int count) :返回以新流,新流的第一个元素为 旧流元素的第count个。

Stream<T> distinct([bool equals(T previous, T next)]):流中元素去重。

....

以上是 一些常用的操作符,由于Stream是异步的数据序列,所以返回值要么是 异步的Future,要么是异步的数据序列。

数据通道监听

前面我们说了 数据源sink、数据通道stream,现在我们说一下数据的监听,之前我们提到了 对于单订阅的Stream的来说,只有订阅关系建立之后,stream才会发送数据,stream不关心你sink中是否有数据,如果没有那就 等你有了 我在发送,如果有数据,那我就顺序的取数据。这个订阅关系的建立就是listen方法的调用。

listen

StreamSubscription<T> listen(void onData(T event),
{Function onError, void onDone(), bool cancelOnError})

该方法接受一个必填参数 和 三个可选的命名参数。返回一个StreamSubscription。

该方法的作用就是:为流添加一个订阅者,这个订阅者就是其返回值。因为发送的数据有 数据事件、完成事件和错误事件,所以对应的订阅者就有处理三种事件的能力:分别是参数中的onData、onError、onDone。 如果发送了错误事件是否还继续还订阅后续事件呢?这个是否就是参数cancelOnError的值。

void main() {
  var count = 0;
  var controller = StreamController();
  Timer.periodic(Duration(seconds: 1), (value) {
    controller.sink.add(count++);
    if (count == 3) {
      controller.sink.addError(Object());
    }
    print('add');
  });

  controller.stream.listen((value){
    print('$value');
  },onError: (error){
    print('$error');
  });
}

//一次打印
add
0
add
1
add
2
Instance of 'Object'
add
3
···

我们可以看到:

1、listen的回调方法都执行了,并分别对应这事件类型

2、即使发送了并收到了错误事件,并没有解除订阅关系,因为cancelOnError默认是false。

下面我们将cancelOnError设置为true。

void main() {
  var count = 0;
  var controller = StreamController();
  Timer.periodic(Duration(seconds: 1), (value) {
    controller.sink.add(count++);
    if (count == 3) {
      controller.sink.addError(Object());
    }
    print('add');
  });

  controller.stream.listen((value){
    print('$value');
  },onError: (error){
    print('$error');
  },cancelOnError: true);
}

//依次打印
add
0
add
1
add
2
Instance of 'Object'
add
add
···

我们发现:

1、当收到error事件后,解除了订阅关系(并没有打印value值)。

2、虽然解除了订阅关系,但是并不影响 往sink中添加数据。

StreamSubscription

上面我们提到了订阅者就是StreamSubscription,listen方法中的参数只是返回的StreamSubscription的默认实现。

初次之外,StreamSubscription提供了更为强大的功能:控制是否继续接受数据。

我们可以这么理解:水管一只流水,我可以选择是否要你流出来的水,什么我可以暂定一会儿。

StreamSubscription对订阅的控制是通过他对暴露的方法接口实现的。

Future cancel():取消订阅,取消之后订阅者就不会在收到任何消息。

void main() {
  var count = 0;
  var controller = StreamController();
  Timer.periodic(Duration(seconds: 1), (value) {
    controller.sink.add(count++);
    print('add');
  });

  var subscription = controller.stream.listen((value) {
    print('$value');
  }, onError: (error) {
    print('$error');
  },onDone: (){
    print('done');
  });

  Future.delayed(Duration(seconds: 3),(){
    subscription.cancel();
  });

}

//一次打印:
add
0
add
1
add
2
add
···
可以发现:
取消之后,就不会在收到事件。
但是并没有影响 sink的添加

void onData(void handleData(T data)):替换listen的onData参数,所有的数据事件 都会流到 这个方法中。

void main() {
  var count = 0;
  var controller = StreamController();
  Timer.periodic(Duration(seconds: 1), (value) {
    controller.sink.add(count++);
  });

  var subscription = controller.stream.listen((value) {
    print('$value');
  }, onError: (error) {
    print('$error');
  }, onDone: () {
    print('done');
  });

  subscription.onData((value) {
    print('value');
  });
}
//一次打印:
value
value
value
···
可以看到onData的替换,并且如果指定了subscription.onData,那么listen的onData参数可以为null。

同理: void onError(Function handleError)、void onDone(void handleDone())分别是对错误和完成事件的替换。<br

void pause([Future resumeSignal]):暂定一下订阅

void resume():将订阅从暂定恢复到活动状态

我们在StreamContoller提到过对Stream的暂定和恢复的监听回调,那个回调就是 这里的方法的调用。

void main() {
  // 创建一个控制器
  var streamController = StreamController(onPause: () {
    print('pause');
  }, onResume: () {
    print('resume');
  }, onCancel: () {
    print('cancel');
  });

  var count = 0;
  Timer.periodic(Duration(seconds: 1), (value) {
    streamController.sink.add(count++);
  });

  // 建立监听关系
  var subscription = streamController.stream.listen((value) {
    print('${value}');
  }, onError: (obj, str) {
    print('error');
  }, onDone: () {
    print('done');
  });

  Future.delayed(Duration(seconds: 3), () {
    //暂定
    subscription.pause();
  });

  Future.delayed(Duration(seconds: 7), () {
    //恢复
    subscription.resume();
  });
}

依次打印:
0
1
2
pause
3
4
5
6
resume
7
···

可以发现:
调用了暂定和恢复之后,回调执行了。
调用了暂定之后,有数据onData也不会调用
调用了恢复之后,数据瞬间发送了好几条。

因为:暂停之后,订阅者会缓存 恢复时间点之前 Stream发送出来的数据。所以出现了瞬间发送了好几条。

总结

以上就是流的三个方面:数据源、Stream通道和数据订阅。总体来说流就是异步的数据序列,只要订阅了数据源,流就会将数据送到订阅处。从这一方面来看,流与集合相比,更符合响应式,因为它是一个推数据的过程。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容