Core Stream types
There are two kinds of streams: "Single-subscription" streams and "broadcast" streams.
Single-subscription
- a single-subscription stream allows only a single listener during the whole lifetime of the stream.
- it does'n start generating events until it has a listener,and it stops sending events when the listener is unsubscribed, even if the source of events could still provider more.
- Listening twice on a single-subscripiton stream is not allowed, even after the first subscription has been canceled.
- single -subscription stream are generally used for streaming chunks of larger contiguous data like file I/o.
broadcast stream allows any number of listeners,and it fires its events when they are ready, whether there are listeners or not.
- broadcast streams are used for independent events/observers.
- if several listeners want to listen to a single subscription stream, use [asBroadStream] to create a boradcart stream on top of the non-broadcast stream.
creates a new single-subscription stream from the future.
when the future completes, the stream will fire one event, either data or error, and then close with a done-event.
factory Stream.fromFuture(Future<T> future){}
create a single-subscription stream from a group of futures.
- the stream reports the results for the futures on the stream in the order in which the futures complete.
2.each future provides either a data event or an error event,depending on how the future completes.- if some futures have already completed when "Stream.fromfutures" is called,their results will be emitted in some unspecified order.
- when all futures have completed, the stream is closed.
- if [futures] is empty ,the stream closes as soon as possible.
factory Stream.fromFutures(Iterate<Future<T>> futures) {}
creates a single-subscription stream that gets its data from [elements].
- the utterable is iterated when the stream receives a listener ,and stops iterating if the listener cancels the subscription ,or if the [Iterator.moveNext] methods returns "false" or throws.Iterations is suspended while the stream subscription is paused.
- if calling [iterator.moveNext] on "elements.iterator" throws, the stream emits that error and then it closes.
- if reading [Iterator.current] on "elements.iterator" throws, the stream emits that error, but keeps iterating.
factory Stream.fromIterable(Iterable<T> elements){}
creates a stream that repeatedly emits events at [perid] intervals.
the event values are computed by invoking [computation] .the argument to this callback is an integer in an integer that starts with 0 and Is incremented for event event.
- the [period] must a non-negative [Duraton]
- if [computation] is omitted the event values will all the "null"
- the [computation] must not be omitted if the event type [T] does not allow "null" as a value.
creates a stream where all events of an existing stream are piped through a sink-transformation.
factory Stream.eventTransformed(Stream<dynamic> source,EventSink<dynamic> mapSink(EventSink<T> sink) {
return new _BoundSinkStream(source,mapSink);
}
adapts [source] to be a "stream<T>".
this allows [source] to be used at the new type, but at run-time is must satisfy the requirements of the both the new type and its original type.
data events created bu the source stream must also be instances of [T]
static Stream<T> castFrom<S,T>(Stream<S> source) => new CastStream<S,T>(source);
returns a multi-subscription stream that produces the same events as this .
the returned stream will subscribe to this stream when it's first subscriber is added, and will subscribed until this stream ends, or a callback cancels the subscription.
adds a subscription to this stream.
SrreamSubscription<T> listen(void onData(T event)?,{Function? onError,void onDone()?,bool cancelOnError});
creates a new stream from this stream that discards some elements.
- the new stream sends the same error and done events as this stream, but it only sends the data events that satisfy the [test].
- if the [test] function throws, the data event is dropped and the error is emitted on the returned stream instead.
- the returned stream is a broadcast stream if this stream is, .
- if a broadcast stream is listened to more than once, each subscription will individually perform the "test"
Stream<T> where(bool test(T event)) {
return new _whereStream<T>(this, test);
Transforms each element of this stream into a new stream event.
- creates a new stream that converts each element of this stream to a new value using the [convert] function ,and emits the result.
- if [convert] throws, the returned stream reports it as an error event instead.
- error and down events are passed through unchanged to the returned stream.
- the returned stream is a broadcast stream if this stream is.
Stream<S> map<S>(S convert(T event)) {
return new _MapStream<T,S>(this,convert);
}
Creates a new stream with each data event of this stream asynchronously mapped to a new event.
Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) {
}
Transforms each element into a sequence of asynchronous events.
returns a new stream and for each event of this streamed the following:
- If the event is an error event or a done event, it is emitted directly by the returned stream.
2.otherwise it is an element, then the [convert] function is called with the element as argument to produce a convert-stream for the element. - if that call throws, the error is emitted on the returned stream.
- if the call returns "null",no further action is taken for the elements, otherwise,this stream is paused and convert-stream is listended to .
- every data and error event of the convert-stream is emitted on the returned stream in the order it is produced.
- when the convert - stream ends, this stream is resumed.
- the returned stream is a broadcast stream if the stream is
Stream<E> asyncExpand<E>(Stream<E>? convert(T event)){
}
Creates a wrapper Stream that intercepts some errors from this stream.
- the returned stream is a broadcast stream if this stream is .
- if a broadcast stream is listened to more than once, each subscription will individually perform the test and handle the error
Stream <T> handleError(Function onError,{bool test(error)?}) {}
Transforms each element of this stream into a sequence of elements.
- returns a new stream where each element of this stream is replaced by zero or more data events.
- error events and the done event of this stream are forwarded directly to the returned stream.
- the returned stream is a broadcast stream if this stream is.
- if a boradcast stream is listended to more than once, each subscription will individually call convert and expand the events.
Stream<S> expand<S>(Iterable<S> convert(T element)) {
return new _ExpandStream<T,S>(this,convert);
}
Pipes the events of this stream into [streamConsumer].
- all events of this stream are added to "streamConsumer" using [StreamConsumer.addStream].
- the "streamConsumer" is closed when this stream has been successfully added to it - when the future returned by "addStream" completes without an error.
- returns a future which completes when this stream has been consumed and thee consumer has been closed.
- the returned future completes with the same result as the future returned by [StreamConsume.close].
- if the call to [streamConsumer.addStream] fails in some way, this method fails in the same way.
Future pipe(StreamConsumer<T> streamConsumer) {
return streamConsumer.addStream(this).then((_)=>streamConsumer.close());
}
Applies [streamTransformer] to this stream.
This method should always be used for transformations which treat
the entire stream as representing a single value
which has perhaps been split into several parts for transport,
like a file being read from disk or being fetched over a network.
The transformation will then produce a new stream which
transforms the stream's value incrementally (perhaps using
[Converter.startChunkedConversion]). The resulting stream
may again be chunks of the result, but does not have to
correspond to specific events from the source string.
分包流加载
Stream<S> transform<S>(StreamTransformer<T,S> streamTransformer) {
return streamTransformer.bind(this);
}
Combines a sequence of values by repeatedly applying [combine].
Similar to [Iterable.reduce], this function maintains a value,
- the value is updated to the result of calling [combine] with the previous value and the element.
- when this stream is done, the returned future is completed with the value at that time.
- if this stream is empty ,the returned future is completed with an error.
- if this emits an error, or the call to [combine] throws, the returned future is completed with that error, and processing is stopped.
Future<T> reduce( T combine(T previous,T element)) {
}
Combines a sequence of values by repeatedly applying [combine].
Similar to [Iterable.fold], this function maintains a value,starting with [initialValue] and updated for each element of this stream.
- for each element, the value is updated to the result of calling [combine] with the previous value and the element
- when this stream is done, the returned future is completed with the value at that time.
- for any empty stream,the future is completed with [inititalValue].
- if this stream emits an error, or the call to [combine] throws, the returned future is completed with that error. and processing is stopped.
Future<S> fold<S>(S initialValue,S combine(S previous,T element)) {
}
combines the string representation of elements into a single string.
1.each element is converted to a string using its [Object.toString] method.
- if [separator] is provided, it is inserted between element string representations.
- the returned future is completed with the combined string when this stream is done.
- if this stream emits an error, or the call to [Object.toString] throws, the returned future is completed with that error. and processing stops.
Future<String> join([String separator = ""]) {}
Returns whether [needle] occurs in the elements provided by this stream,
- compares each element of this stream to [needle] using [Object.==]. if an equal element is found., the returned future is completed with "true" . if this stream ends without finding a match ,the future is completed with "false".
- if this stream emits an error, or the call to [Object.==] throws,the returned future is completed with that error and processing stops.
Future<bool> contains(Object? needle){}
Executes [action] on each element of this stream.
- completes the returned [Future] when all elements of this stream have been processed.
- if this stream emits an error, or if the call to [action] throws, the returned future completes with that error.and processing stops.
Future forEach(void action(T element)){}
checks whether [test] accepts all elements provided by this stream.
- calls [test] on each element of this stream.
- if the call returns "false",the returned future is completed with "false" and processing stops.
- if this stream ends without finding an element that [test] rejects, the returned future is completed with "true".
- if this stream emits an errors , or if the call to [test] throws, the returned future is completed with that error, and processing stops.
Future<bool> every(bool test(T element)){}
checks whether [test] accepts any element provided by this stream.
- calls [test] on each element of this stream, if the call returns "true",the returned future is completed with "true" and processing stops.
- if this stream ends without finding an element that [test] accepts, the returned future is completed with "false".
- if this stream emits an error, or if the call to [test] throws, the returned future is completed with that error, and processing stops.
Future<bool> any( bool test(T element)) {}
the number of elements in this stream
Future<int> get length {}
whether this stream contains any elements.
Future<bool> get isEmpty{}
adapt this stream to be a "Stream<R>"
this stream is wrapped as a "Stream<R> " which checks at run-time that each data event emitted by this stream is also an instance of [R].
Stream<R> cast<R>() => Stream.castFrom<T,R>(this);
collects all elements of this stream in a [List].
- creates a "List<T>" and adds all elements of this stream to the list in the order they arrive.
- when this stream ends, the returned future is completed with that list.
- if this stream emits an error, the returned future is completed with that error. and processing stops.
Future<List<T>> toList() {}
collects the. data of this stream in a [Set].
- create a "Set<T>" and adds all. elements of this stream to the set. in the order they arrive.
- when this stream ends, the returned future is completed with that set.
- when this stream ends,the returned future is completed with that set.
4.the returned set is the same type as created by "<T>{}",if another type of set is needed, either user [forEach] to add each element to the set, or use "toList().then((list) => new SomeOtherSet.from(list))" to create the set.- if this stream emits an error, the returned future is completed with that error. and processing stops.
Future<Set<T>> toSet(){}
discards(丢弃)all data on this stream, but signals when it is done or an error occurred.
- when subscribing using [drain],cancelOnError will be true.this means that the future will complete with the first error on this stream and then cancel the subscription.
- if this stream emits an error, the returned future is completed with that error, and processing is stoped.
- in case of a "done" event the future completes with the given [futureValue].
- the [FutureValue] must not be omitted if "null" is not assignable to [E].
Future<E> drain<E>(([E? futureValue]){}
provides at most the first [count] data events of this stream.
- if this is a single-subscription (non-broaccast)streams it cannot be resumed after the returned stream has been listened to.
- if this is a broadcast stream,the returned stream is a boradcast stream.
- in that case, the events are only counted from the time the returned stream is listended to .
Stream<T> take(int count) {
return new _TakeStream<T>(this, count);
}
forwards data events while [test] is successful.
- returns a stream that provides the same events as this stream until [test] fails for a data event.
- the returned stream is done when either the stream is done, or when this stream first emits a data event that fails [test].
- the "test" call is considered failing if It returns a non-"true" value or if it throws. if the "test" call throws, the error is emitted as the last event on the returned streams.
- stops listening to this stream after the accepted elements.
- internally the method cancels its subscription after these elements.this means that single-subscription(non-broadcast)streams are closed and cannot be reused after a call to this method.
- the returned stream is a broadcast stream if this stream is .
- for a broadcast stream,the events are only tested from the time the returned stream is listened to .
Stream<T> takeWhile(bool test(T element)) {
return new _TakeWhileStream<T>(this, test);
}
Skips the first [count] data events from this stream
Stream<T> skip(int count) {
return new _SkipStream<T>(this, count);
}
skip data events from this stream while they are matched by [test].
- if it returns a non-"true" value or if the call to "test" throws
- if the call throws, the error is emitted as an error event on the returned stream instead of the data event, otherwise the event that made "test" return non-true is emitted as the first data event.
- error and done events are provided by the returned stream unmodified.
- the returned stream is a broadcast stream if this stream is
- for a broadcast stream ,the events are only tested from the time the returned stream is listened to.
Stream<T>skipWhile(bool test(T element)){
return new _SKipWhileStream<T>(this, test);
}
skips data events if they are equal to the previous data event.(跳过连续相同的值)
- the returned stream provides the same events as this stream,except that it never provides two consecutive(连续)data events that are equal, that is ,error are passed through to the returned stream,and data events are passed through if they are distinct from the more recently emittable data event.
- equality is determined by the provided [equals] method. if that is omitted, the "==" operator on the last provided data element is used.
- if [equals] throws, the data event is repeated by an error event containing the thrown error, the behavior is equivalent to the original stream emitting the error event, and it doesn't change the what the most recently emitted data event is.
- the returned stream is a broadcast stream if this stream is .
- if a broadcast stream is listened to more than once, each subscription will individually perform the equals test.
Stream<T> distinct([bool equals(T previous,T next)?]) {
return new _DistinctStream<T>(this, equals);
}
the first element of this stream
Future<T> get first {}
the last element of this stream
if this stream is empty (the done event is the first event),the returned future completes with an error.
Future<T> get last{}
the single element of this stream
- if this stream emits an error event, the returned future is completed with that error and processing stops.
- if this is empty or has more than one element,the returned future completes with an error.
Future<T> get single{}
Finds the first element of this stream matching [test].
- return a future that is completed with the first element of this stream that [test] returns "true".
- if no such element is found before this stream is done ,and a [orElse] function is provided. if [orElse] throws,the returned future is completed with that error.
Future<T> firstWhere(bool test(T element),{T orElse()?} ) {}
Finds the last element in this stream matching [test].
Future<T> last where(bool test(T element),{T orElse()?}) {}
Finds the single element in this stream matching [test]
like [lastWhere],except that it is an error if more than one matching element occurs in this stream.
Future<T> singleWhere(bool test(T element),{T orElse()?}) {
}
returns the value of the [index] the data event of this stream.
Future<T> elementAt(int index) {
}
creates a new stream with the same events as this stream
- whenever more than [timeLimit] passes between two events from this stream,the [onTimeOut] function is called, which can emit further events on the returned stream.
2.the countdown doesn't start until the returned stream is listened to . the countdown is reset every time an event is forwarded from this streams,or when this stream is paused and resumed.- if [onTimeout] is omitted, a timeout will just put a [TimeOutException] into the error channel of the returned stream.
- if the call to [onTimeOut] throws, the error is emitted on the returned stream.
- the returned stream is a broadcast stream if this stream is ,
- if a broadcast stream is listened to more than once, each subscription will have its individually time that starts counting on listen, and the subscriptions's timers cane paused individually.
Stream<T> timeout(Duration timeLimit,{void onTimeOut(EventSink<T> sink)?}) {}
A subscription on events from a [Stream].
when you listen to a [Stream] using [Stream.listen] a [StreamSubscription] object is returned.
abstract class StreamSubScription<T> {}
A [Sink] that supports adding errors.
- this makes it suitable for capturing the results of asynchronous computations, which can complete with a value or an error.
- the [EventSink] has been designed to handle asynchronous events from [Stream]s,See ,for example,[Stream,eventTransformed] which uses "EventSink" to transform events.
[Stream] wrapper that only exposes the [Stream] interface.
class StreamView<T> extends Stream<T> {}
Abstract interface for a "Sink" accepting multiple entire streams.
- a consumer can accept a number of consecutive streams using [addStrem] ,and when no further data need to be added, the [close] method tells the consumer to complete its work and shut down .
- the [Stream.pipe] accepts a "StreamConsumer" and will pass the stream to the consumers [addStream] method. when that completes ,it will call [close] and then complete its own returned future.
abstract class StreamConsumer<S> {}
A object that accpets stream events both synchronously and asynchronously.
- A [StreamSink] combines the methods from [StreamConsumer] and [EventSink].
- The [EventSink] methods can't be used while the [addStream] is called. as soon as the [addStream]'s [Future] completes with a value, the [EventSink] methods can be used again.
- if [addStream] is called after any of the [EventSink] methods it'l be delayed until the underlying system has consumed the data added by the [EventSink] methods.
- when [EventSink] methods are used,the [done]、[Future] can be used to catch any error.
- when [close] is called, it will return the [done] 、[Future]
abstract class StreamSink<S> implements EventSink<S>,StreamConsumer<S>{}
Transforms a Stream
- when a stream's [Stream.transform] method is invoked with a [StreamTransformer] ,the stream calls the [bind] method on the provided transformer, the resulting stream is then returned from the [Stream.transform] method.
2.conceptually,a transformer is simply a function from [Stream] to [Stream] that is encapsulated into a class.- it is good practice to write transformers that can be used multiple times.
- all other transforming methods on [Stream], such as [Stream.map]、[Stream.where]、[Stream.expand] can be implements using [Stream.transform]
- A [StreamTransformer] is thus very powerful but often also a bit more complicated(复杂) to use.
abstract class StreamTransformer<S,T> {}
A [Iterator]- like interface for the values of a [Stream].
- this wraps a [Stream] and a subscription on the stream. it listens on the stream,and completed the future returned by [moveNext] when the next value becomes available。
- the stream may be paused between calls to [moveNext]
- the [current] value must only be used after a future returned by [moveNext] has complete with "true",and only until [moveNext] is called again.
abstract class StreamIterator<T>{}
An enhanced stream constroller provided by [Stream.multi]
- Acts like a normal asynchronous controller, but also allows adding events synchronously.
- as with any synchronous event delivery, the sender should be very careful to not deliver events at times when a new listener might not be ready to receive them .
- that generally means only delivering events synchronously in response to other asynchronous events,because that is a time when an asynchronous event could happen.
abstract class MultiStreamController<T> implements StreamController<T>{}