Dart - Stream

Core Stream types

There are two kinds of streams: "Single-subscription" streams and "broadcast" streams.

Single-subscription

  1. a single-subscription stream allows only a single listener during the whole lifetime of the stream.
  2. it does'n start generating events until it has a listener,and it stops sending events when the listener is unsubscribed, even if the source of events could still provider more.
  3. Listening twice on a single-subscripiton stream is not allowed, even after the first subscription has been canceled.
  4. single -subscription stream are generally used for streaming chunks of larger contiguous data like file I/o.

broadcast stream allows any number of listeners,and it fires its events when they are ready, whether there are listeners or not.

  1. broadcast streams are used for independent events/observers.
  2. if several listeners want to listen to a single subscription stream, use [asBroadStream] to create a boradcart stream on top of the non-broadcast stream.

creates a new single-subscription stream from the future.

when the future completes, the stream will fire one event, either data or error, and then close with a done-event.

factory Stream.fromFuture(Future<T> future){}

create a single-subscription stream from a group of futures.

  1. the stream reports the results for the futures on the stream in the order in which the futures complete.
    2.each future provides either a data event or an error event,depending on how the future completes.
  2. if some futures have already completed when "Stream.fromfutures" is called,their results will be emitted in some unspecified order.
  3. when all futures have completed, the stream is closed.
  4. if [futures] is empty ,the stream closes as soon as possible.
factory Stream.fromFutures(Iterate<Future<T>> futures) {}

creates a single-subscription stream that gets its data from [elements].

  1. the utterable is iterated when the stream receives a listener ,and stops iterating if the listener cancels the subscription ,or if the [Iterator.moveNext] methods returns "false" or throws.Iterations is suspended while the stream subscription is paused.
  2. if calling [iterator.moveNext] on "elements.iterator" throws, the stream emits that error and then it closes.
  3. if reading [Iterator.current] on "elements.iterator" throws, the stream emits that error, but keeps iterating.
factory Stream.fromIterable(Iterable<T> elements){}

creates a stream that repeatedly emits events at [perid] intervals.

the event values are computed by invoking [computation] .the argument to this callback is an integer in an integer that starts with 0 and Is incremented for event event.

  1. the [period] must a non-negative [Duraton]
  2. if [computation] is omitted the event values will all the "null"
  3. the [computation] must not be omitted if the event type [T] does not allow "null" as a value.

creates a stream where all events of an existing stream are piped through a sink-transformation.

factory Stream.eventTransformed(Stream<dynamic> source,EventSink<dynamic> mapSink(EventSink<T> sink) {
 return new _BoundSinkStream(source,mapSink);
}

adapts [source] to be a "stream<T>".

this allows [source] to be used at the new type, but at run-time is must satisfy the requirements of the both the new type and its original type.
data events created bu the source stream must also be instances of [T]

static Stream<T> castFrom<S,T>(Stream<S> source) => new CastStream<S,T>(source);

returns a multi-subscription stream that produces the same events as this .

the returned stream will subscribe to this stream when it's first subscriber is added, and will subscribed until this stream ends, or a callback cancels the subscription.

adds a subscription to this stream.

SrreamSubscription<T> listen(void onData(T event)?,{Function? onError,void onDone()?,bool cancelOnError});

creates a new stream from this stream that discards some elements.

  1. the new stream sends the same error and done events as this stream, but it only sends the data events that satisfy the [test].
  2. if the [test] function throws, the data event is dropped and the error is emitted on the returned stream instead.
  3. the returned stream is a broadcast stream if this stream is, .
  4. if a broadcast stream is listened to more than once, each subscription will individually perform the "test"
Stream<T> where(bool test(T event)) {
return new _whereStream<T>(this, test);

Transforms each element of this stream into a new stream event.

  1. creates a new stream that converts each element of this stream to a new value using the [convert] function ,and emits the result.
  2. if [convert] throws, the returned stream reports it as an error event instead.
  3. error and down events are passed through unchanged to the returned stream.
  4. the returned stream is a broadcast stream if this stream is.
Stream<S> map<S>(S convert(T event)) {
 return new _MapStream<T,S>(this,convert);
}

Creates a new stream with each data event of this stream asynchronously mapped to a new event.

Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) {
}

Transforms each element into a sequence of asynchronous events.

returns a new stream and for each event of this streamed the following:

  1. If the event is an error event or a done event, it is emitted directly by the returned stream.
    2.otherwise it is an element, then the [convert] function is called with the element as argument to produce a convert-stream for the element.
  2. if that call throws, the error is emitted on the returned stream.
  3. if the call returns "null",no further action is taken for the elements, otherwise,this stream is paused and convert-stream is listended to .
  4. every data and error event of the convert-stream is emitted on the returned stream in the order it is produced.
  5. when the convert - stream ends, this stream is resumed.
  6. the returned stream is a broadcast stream if the stream is
Stream<E> asyncExpand<E>(Stream<E>? convert(T event)){
}

Creates a wrapper Stream that intercepts some errors from this stream.

  1. the returned stream is a broadcast stream if this stream is .
  2. if a broadcast stream is listened to more than once, each subscription will individually perform the test and handle the error
Stream <T> handleError(Function onError,{bool test(error)?}) {}

Transforms each element of this stream into a sequence of elements.

  1. returns a new stream where each element of this stream is replaced by zero or more data events.
  2. error events and the done event of this stream are forwarded directly to the returned stream.
  3. the returned stream is a broadcast stream if this stream is.
  4. if a boradcast stream is listended to more than once, each subscription will individually call convert and expand the events.
Stream<S> expand<S>(Iterable<S> convert(T element)) {
 return new _ExpandStream<T,S>(this,convert);
}

Pipes the events of this stream into [streamConsumer].

  1. all events of this stream are added to "streamConsumer" using [StreamConsumer.addStream].
  2. the "streamConsumer" is closed when this stream has been successfully added to it - when the future returned by "addStream" completes without an error.
  3. returns a future which completes when this stream has been consumed and thee consumer has been closed.
  4. the returned future completes with the same result as the future returned by [StreamConsume.close].
  5. if the call to [streamConsumer.addStream] fails in some way, this method fails in the same way.
Future pipe(StreamConsumer<T> streamConsumer) {
return streamConsumer.addStream(this).then((_)=>streamConsumer.close());
}

Applies [streamTransformer] to this stream.

This method should always be used for transformations which treat
the entire stream as representing a single value
which has perhaps been split into several parts for transport,
like a file being read from disk or being fetched over a network.
The transformation will then produce a new stream which
transforms the stream's value incrementally (perhaps using
[Converter.startChunkedConversion]). The resulting stream
may again be chunks of the result, but does not have to
correspond to specific events from the source string.
分包流加载

Stream<S> transform<S>(StreamTransformer<T,S> streamTransformer) {
return streamTransformer.bind(this);
}

Combines a sequence of values by repeatedly applying [combine].

Similar to [Iterable.reduce], this function maintains a value,

  1. the value is updated to the result of calling [combine] with the previous value and the element.
  2. when this stream is done, the returned future is completed with the value at that time.
  3. if this stream is empty ,the returned future is completed with an error.
  4. if this emits an error, or the call to [combine] throws, the returned future is completed with that error, and processing is stopped.
Future<T> reduce( T combine(T previous,T element)) {
}

Combines a sequence of values by repeatedly applying [combine].

Similar to [Iterable.fold], this function maintains a value,starting with [initialValue] and updated for each element of this stream.

  1. for each element, the value is updated to the result of calling [combine] with the previous value and the element
  2. when this stream is done, the returned future is completed with the value at that time.
  3. for any empty stream,the future is completed with [inititalValue].
  4. if this stream emits an error, or the call to [combine] throws, the returned future is completed with that error. and processing is stopped.
Future<S> fold<S>(S initialValue,S combine(S previous,T element)) {
}

combines the string representation of elements into a single string.

1.each element is converted to a string using its [Object.toString] method.

  1. if [separator] is provided, it is inserted between element string representations.
  2. the returned future is completed with the combined string when this stream is done.
  3. if this stream emits an error, or the call to [Object.toString] throws, the returned future is completed with that error. and processing stops.
Future<String> join([String separator = ""]) {}

Returns whether [needle] occurs in the elements provided by this stream,

  1. compares each element of this stream to [needle] using [Object.==]. if an equal element is found., the returned future is completed with "true" . if this stream ends without finding a match ,the future is completed with "false".
  2. if this stream emits an error, or the call to [Object.==] throws,the returned future is completed with that error and processing stops.
Future<bool> contains(Object? needle){}

Executes [action] on each element of this stream.

  1. completes the returned [Future] when all elements of this stream have been processed.
  2. if this stream emits an error, or if the call to [action] throws, the returned future completes with that error.and processing stops.
Future forEach(void action(T element)){}

checks whether [test] accepts all elements provided by this stream.

  1. calls [test] on each element of this stream.
  2. if the call returns "false",the returned future is completed with "false" and processing stops.
  3. if this stream ends without finding an element that [test] rejects, the returned future is completed with "true".
  4. if this stream emits an errors , or if the call to [test] throws, the returned future is completed with that error, and processing stops.
Future<bool> every(bool test(T element)){}

checks whether [test] accepts any element provided by this stream.

  1. calls [test] on each element of this stream, if the call returns "true",the returned future is completed with "true" and processing stops.
  2. if this stream ends without finding an element that [test] accepts, the returned future is completed with "false".
  3. if this stream emits an error, or if the call to [test] throws, the returned future is completed with that error, and processing stops.
Future<bool> any( bool test(T element)) {}

the number of elements in this stream

Future<int> get length {}

whether this stream contains any elements.

Future<bool> get isEmpty{}

adapt this stream to be a "Stream<R>"

this stream is wrapped as a "Stream<R> " which checks at run-time that each data event emitted by this stream is also an instance of [R].

Stream<R> cast<R>() => Stream.castFrom<T,R>(this);

collects all elements of this stream in a [List].

  1. creates a "List<T>" and adds all elements of this stream to the list in the order they arrive.
  2. when this stream ends, the returned future is completed with that list.
  3. if this stream emits an error, the returned future is completed with that error. and processing stops.
Future<List<T>> toList() {}

collects the. data of this stream in a [Set].

  1. create a "Set<T>" and adds all. elements of this stream to the set. in the order they arrive.
  2. when this stream ends, the returned future is completed with that set.
  3. when this stream ends,the returned future is completed with that set.
    4.the returned set is the same type as created by "<T>{}",if another type of set is needed, either user [forEach] to add each element to the set, or use "toList().then((list) => new SomeOtherSet.from(list))" to create the set.
  4. if this stream emits an error, the returned future is completed with that error. and processing stops.
Future<Set<T>> toSet(){}

discards(丢弃)all data on this stream, but signals when it is done or an error occurred.

  1. when subscribing using [drain],cancelOnError will be true.this means that the future will complete with the first error on this stream and then cancel the subscription.
  2. if this stream emits an error, the returned future is completed with that error, and processing is stoped.
  3. in case of a "done" event the future completes with the given [futureValue].
  4. the [FutureValue] must not be omitted if "null" is not assignable to [E].
Future<E> drain<E>(([E? futureValue]){}

provides at most the first [count] data events of this stream.

  1. if this is a single-subscription (non-broaccast)streams it cannot be resumed after the returned stream has been listened to.
  2. if this is a broadcast stream,the returned stream is a boradcast stream.
  3. in that case, the events are only counted from the time the returned stream is listended to .
Stream<T> take(int count) {
return new _TakeStream<T>(this, count);
}

forwards data events while [test] is successful.

  1. returns a stream that provides the same events as this stream until [test] fails for a data event.
  2. the returned stream is done when either the stream is done, or when this stream first emits a data event that fails [test].
  3. the "test" call is considered failing if It returns a non-"true" value or if it throws. if the "test" call throws, the error is emitted as the last event on the returned streams.
  4. stops listening to this stream after the accepted elements.
  5. internally the method cancels its subscription after these elements.this means that single-subscription(non-broadcast)streams are closed and cannot be reused after a call to this method.
  6. the returned stream is a broadcast stream if this stream is .
  7. for a broadcast stream,the events are only tested from the time the returned stream is listened to .
Stream<T> takeWhile(bool test(T element)) {
return new _TakeWhileStream<T>(this, test);
}

Skips the first [count] data events from this stream

Stream<T> skip(int count) {
return new _SkipStream<T>(this, count);
}

skip data events from this stream while they are matched by [test].

  1. if it returns a non-"true" value or if the call to "test" throws
  2. if the call throws, the error is emitted as an error event on the returned stream instead of the data event, otherwise the event that made "test" return non-true is emitted as the first data event.
  3. error and done events are provided by the returned stream unmodified.
  4. the returned stream is a broadcast stream if this stream is
  5. for a broadcast stream ,the events are only tested from the time the returned stream is listened to.
Stream<T>skipWhile(bool test(T element)){
 return new _SKipWhileStream<T>(this, test);
}

skips data events if they are equal to the previous data event.(跳过连续相同的值)

  1. the returned stream provides the same events as this stream,except that it never provides two consecutive(连续)data events that are equal, that is ,error are passed through to the returned stream,and data events are passed through if they are distinct from the more recently emittable data event.
  2. equality is determined by the provided [equals] method. if that is omitted, the "==" operator on the last provided data element is used.
  3. if [equals] throws, the data event is repeated by an error event containing the thrown error, the behavior is equivalent to the original stream emitting the error event, and it doesn't change the what the most recently emitted data event is.
  4. the returned stream is a broadcast stream if this stream is .
  5. if a broadcast stream is listened to more than once, each subscription will individually perform the equals test.
Stream<T> distinct([bool equals(T previous,T next)?]) {
 return new _DistinctStream<T>(this, equals);
}

the first element of this stream

Future<T> get first {}

the last element of this stream

if this stream is empty (the done event is the first event),the returned future completes with an error.

Future<T> get last{}

the single element of this stream

  1. if this stream emits an error event, the returned future is completed with that error and processing stops.
  2. if this is empty or has more than one element,the returned future completes with an error.
Future<T> get single{}

Finds the first element of this stream matching [test].

  1. return a future that is completed with the first element of this stream that [test] returns "true".
  2. if no such element is found before this stream is done ,and a [orElse] function is provided. if [orElse] throws,the returned future is completed with that error.
Future<T> firstWhere(bool test(T element),{T orElse()?} ) {}

Finds the last element in this stream matching [test].

Future<T> last where(bool test(T element),{T orElse()?}) {}

Finds the single element in this stream matching [test]

like [lastWhere],except that it is an error if more than one matching element occurs in this stream.

Future<T> singleWhere(bool test(T element),{T orElse()?}) {
}

returns the value of the [index] the data event of this stream.

Future<T> elementAt(int index) {
}

creates a new stream with the same events as this stream

  1. whenever more than [timeLimit] passes between two events from this stream,the [onTimeOut] function is called, which can emit further events on the returned stream.
    2.the countdown doesn't start until the returned stream is listened to . the countdown is reset every time an event is forwarded from this streams,or when this stream is paused and resumed.
  2. if [onTimeout] is omitted, a timeout will just put a [TimeOutException] into the error channel of the returned stream.
  3. if the call to [onTimeOut] throws, the error is emitted on the returned stream.
  4. the returned stream is a broadcast stream if this stream is ,
  5. if a broadcast stream is listened to more than once, each subscription will have its individually time that starts counting on listen, and the subscriptions's timers cane paused individually.
Stream<T> timeout(Duration timeLimit,{void onTimeOut(EventSink<T> sink)?}) {}

A subscription on events from a [Stream].

when you listen to a [Stream] using [Stream.listen] a [StreamSubscription] object is returned.

abstract class StreamSubScription<T> {}

A [Sink] that supports adding errors.

  1. this makes it suitable for capturing the results of asynchronous computations, which can complete with a value or an error.
  2. the [EventSink] has been designed to handle asynchronous events from [Stream]s,See ,for example,[Stream,eventTransformed] which uses "EventSink" to transform events.

[Stream] wrapper that only exposes the [Stream] interface.

class StreamView<T> extends Stream<T> {}

Abstract interface for a "Sink" accepting multiple entire streams.

  1. a consumer can accept a number of consecutive streams using [addStrem] ,and when no further data need to be added, the [close] method tells the consumer to complete its work and shut down .
  2. the [Stream.pipe] accepts a "StreamConsumer" and will pass the stream to the consumers [addStream] method. when that completes ,it will call [close] and then complete its own returned future.
abstract class StreamConsumer<S> {}

A object that accpets stream events both synchronously and asynchronously.

  1. A [StreamSink] combines the methods from [StreamConsumer] and [EventSink].
  2. The [EventSink] methods can't be used while the [addStream] is called. as soon as the [addStream]'s [Future] completes with a value, the [EventSink] methods can be used again.
  3. if [addStream] is called after any of the [EventSink] methods it'l be delayed until the underlying system has consumed the data added by the [EventSink] methods.
  4. when [EventSink] methods are used,the [done]、[Future] can be used to catch any error.
  5. when [close] is called, it will return the [done] 、[Future]
abstract class StreamSink<S> implements EventSink<S>,StreamConsumer<S>{}

Transforms a Stream

  1. when a stream's [Stream.transform] method is invoked with a [StreamTransformer] ,the stream calls the [bind] method on the provided transformer, the resulting stream is then returned from the [Stream.transform] method.
    2.conceptually,a transformer is simply a function from [Stream] to [Stream] that is encapsulated into a class.
  2. it is good practice to write transformers that can be used multiple times.
  3. all other transforming methods on [Stream], such as [Stream.map]、[Stream.where]、[Stream.expand] can be implements using [Stream.transform]
  4. A [StreamTransformer] is thus very powerful but often also a bit more complicated(复杂) to use.
abstract class StreamTransformer<S,T> {}

A [Iterator]- like interface for the values of a [Stream].

  1. this wraps a [Stream] and a subscription on the stream. it listens on the stream,and completed the future returned by [moveNext] when the next value becomes available。
  2. the stream may be paused between calls to [moveNext]
  3. the [current] value must only be used after a future returned by [moveNext] has complete with "true",and only until [moveNext] is called again.
abstract class StreamIterator<T>{}

An enhanced stream constroller provided by [Stream.multi]

  1. Acts like a normal asynchronous controller, but also allows adding events synchronously.
  2. as with any synchronous event delivery, the sender should be very careful to not deliver events at times when a new listener might not be ready to receive them .
  3. that generally means only delivering events synchronously in response to other asynchronous events,because that is a time when an asynchronous event could happen.
abstract class MultiStreamController<T> implements StreamController<T>{}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容