Flutter throttle 和 debounce的实现



import 'dart:async';
import 'dart:collection';

/// The strategy that is used to determine how and when a new window is created.
enum WindowStrategy {
  /// cancels the open window (if any) and immediately opens a fresh one.

  /// waits until the current open window completes, then when the
  /// source [Stream] emits a next event, it opens a new window.

  /// opens a recurring window right after the very first event on
  /// the source [Stream] is emitted.

  /// does not open any windows, rather all events are buffered and emitted
  /// whenever the handler triggers, after this trigger, the buffer is cleared.

class _BackpressureStreamSink<S, T> implements ForwardingSink<S, T> {
  final WindowStrategy _strategy;
  final Stream<dynamic> Function(S event) _windowStreamFactory;
  final T Function(S event) _onWindowStart;
  final T Function(List<S> queue) _onWindowEnd;
  final int _startBufferEvery;
  final bool Function(List<S> queue) _closeWindowWhen;
  final bool _ignoreEmptyWindows;
  final bool _dispatchOnClose;
  final Queue<S> queue = DoubleLinkedQueue<S>();
  final int maxLengthQueue;
  var skip = 0;
  var _hasData = false;
  var _mainClosed = false;
  StreamSubscription<dynamic> _windowSubscription;


  void add(EventSink<T> sink, S data) {
    _hasData = true;
    maybeCreateWindow(data, sink);

    if (skip == 0) {

      if (maxLengthQueue != null && queue.length > maxLengthQueue) {
        queue.removeFirstElements(queue.length - maxLengthQueue);

    if (skip > 0) {


  void addError(EventSink<T> sink, Object e, StackTrace st) =>
      sink.addError(e, st);

  void close(EventSink<T> sink) {
    _mainClosed = true;

    if (_strategy == WindowStrategy.eventAfterLastWindow) {

    // treat the final event as a Window that opens
    // and immediately closes again
    if (_dispatchOnClose && queue.isNotEmpty) {
      resolveWindowStart(queue.last, sink);

    resolveWindowEnd(sink, true);



  FutureOr onCancel(EventSink<T> sink) => _windowSubscription?.cancel();

  void onListen(EventSink<T> sink) {}

  void onPause(EventSink<T> sink) => _windowSubscription?.pause();

  void onResume(EventSink<T> sink) => _windowSubscription?.resume();

  void maybeCreateWindow(S event, EventSink<T> sink) {
    switch (_strategy) {
    // for example throttle
      case WindowStrategy.eventAfterLastWindow:
        if (_windowSubscription != null) return;

        _windowSubscription = singleWindow(event, sink);

        resolveWindowStart(event, sink);

    // for example scan
      case WindowStrategy.firstEventOnly:
        if (_windowSubscription != null) return;

        _windowSubscription = multiWindow(event, sink);

        resolveWindowStart(event, sink);

    // for example debounce
      case WindowStrategy.everyEvent:

        _windowSubscription = singleWindow(event, sink);

        resolveWindowStart(event, sink);

      case WindowStrategy.onHandler:

  void maybeCloseWindow(EventSink<T> sink) {
    if (_closeWindowWhen != null && _closeWindowWhen(unmodifiableQueue)) {

  StreamSubscription<dynamic> singleWindow(S event, EventSink<T> sink) =>
      buildStream(event, sink).take(1).listen(
        onError: sink.addError,
        onDone: () => resolveWindowEnd(sink, _mainClosed),

  // opens a new Window which is kept open until the main Stream
  // closes.
  StreamSubscription<dynamic> multiWindow(S event, EventSink<T> sink) =>
      buildStream(event, sink).listen(
            (dynamic _) => resolveWindowEnd(sink),
        onError: sink.addError,
        onDone: () => resolveWindowEnd(sink),

  Stream<dynamic> buildStream(S event, EventSink<T> sink) {
    Stream stream;


    stream = _windowStreamFactory(event);

    return stream;

  void resolveWindowStart(S event, EventSink<T> sink) {
    if (_onWindowStart != null) {

  void resolveWindowEnd(EventSink<T> sink, [bool isControllerClosing = false]) {
    if (isControllerClosing &&
        _strategy == WindowStrategy.eventAfterLastWindow) {
      if (_dispatchOnClose &&
          _hasData &&
          queue.length > 1 &&
          _onWindowEnd != null) {

      _windowSubscription = null;


    if (isControllerClosing ||
        _strategy == WindowStrategy.eventAfterLastWindow ||
        _strategy == WindowStrategy.everyEvent) {
      _windowSubscription = null;

    if (isControllerClosing && !_dispatchOnClose) {

    if (_hasData && (queue.isNotEmpty || !_ignoreEmptyWindows)) {
      if (_onWindowEnd != null) {

      // prepare the buffer for the next window.
      // by default, this is just a cleared buffer
      if (!isControllerClosing && _startBufferEvery > 0) {
        skip = _startBufferEvery > queue.length
            ? _startBufferEvery - queue.length
            : 0;

        // ...unless startBufferEvery is provided.
        // here we backtrack to the first event of the last buffer
        // and count forward using startBufferEvery until we reach
        // the next event.
        // if the next event is found inside the current buffer,
        // then this event and any later events in the buffer
        // become the starting values of the next buffer.
        // if the next event is not yet available, then a skip
        // count is calculated.
        // this count will skip the next Future n-events.
        // when skip is reset to 0, then we start adding events
        // again into the new buffer.
        // example:
        // startBufferEvery = 2
        // last buffer: [0, 1, 2, 3, 4]
        // 0 is the first event,
        // 2 is the n-th event
        // new buffer starts with [2, 3, 4]
        // example:
        // startBufferEvery = 3
        // last buffer: [0, 1]
        // 0 is the first event,
        // the n-the event is not yet dispatched at this point
        // skip becomes 1
        // event 2 is skipped, skip becomes 0
        // event 3 is now added to the buffer
        if (_startBufferEvery < queue.length) {
        } else {
      } else {

  List<S> get unmodifiableQueue => List<S>.unmodifiable(queue);

/// A highly customizable [StreamTransformer] which can be configured
/// to serve any of the common rx backpressure operators.
/// The [StreamTransformer] works by creating windows, during which it
/// buffers events to a [Queue].
/// The [StreamTransformer] works by creating windows, during which it
/// buffers events to a [Queue]. It uses a  [WindowStrategy] to determine
/// how and when a new window is created.
/// onWindowStart and onWindowEnd are handlers that fire when a window
/// opens and closes, right before emitting the transformed event.
/// startBufferEvery allows to skip events coming from the source [Stream].
/// ignoreEmptyWindows can be set to true, to allow events to be emitted
/// at the end of a window, even if the current buffer is empty.
/// If the buffer is empty, then an empty [List] will be emitted.
/// If false, then nothing is emitted on an empty buffer.
/// dispatchOnClose will cause the remaining values in the buffer to be
/// emitted when the source [Stream] closes.
/// When false, the remaining buffer is discarded on close.
class BackpressureStreamTransformer<S, T> extends StreamTransformerBase<S, T> {
  /// Determines how the window is created
  final WindowStrategy strategy;

  /// Factory method used to create the [Stream] which will be buffered
  final Stream<dynamic> Function(S event) windowStreamFactory;

  /// Handler which fires when the window opens
  final T Function(S event) onWindowStart;

  /// Handler which fires when the window closes
  final T Function(List<S> queue) onWindowEnd;

  /// Maximum length of the buffer.
  /// Specify this value to avoid running out of memory when adding too many events to the buffer.
  /// If it's `null`, maximum length of the buffer is unlimited.
  final int maxLengthQueue;

  /// Used to skip an amount of events
  final int startBufferEvery;

  /// Predicate which determines when the current window should close
  final bool Function(List<S> queue) closeWindowWhen;

  /// Toggle to prevent, or allow windows that contain
  /// no events to be dispatched
  final bool ignoreEmptyWindows;

  /// Toggle to prevent, or allow the final set of events to be dispatched
  /// when the source [Stream] closes
  final bool dispatchOnClose;

  /// Constructs a [StreamTransformer] which buffers events emitted by the
  /// [Stream] that is created by [windowStreamFactory].
  /// Use the various optional parameters to precisely determine how and when
  /// this buffer should be created.
  /// For more info on the parameters, see [BackpressureStreamTransformer],
  /// or see the various back pressure [StreamTransformer]s for examples.
      this.windowStreamFactory, {
        this.startBufferEvery = 0,
        this.ignoreEmptyWindows = true,
        this.dispatchOnClose = true,

  Stream<T> bind(Stream<S> stream) {
    final sink = _BackpressureStreamSink(
    return forwardStream(stream, sink);

extension _RemoveFirstNQueueExtension<T> on Queue<T> {
  /// Removes the first [count] elements of this queue.
  void removeFirstElements(int count) {
    for (var i = 0; i < count; i++) {

class ThrottleStreamTransformer<T> extends BackpressureStreamTransformer<T, T> {
  /// Construct a [StreamTransformer] that emits a value from the source [Stream],
  /// then ignores subsequent source values while the window [Stream] is open,
  /// then repeats this process.
  /// If [leading] is true, then the first item in each window is emitted.
  /// If [trailing] is true, then the last item in each window is emitted.
      Stream Function(T event) window, {
        bool trailing = false,
        bool leading = true,
      }) : super(
    onWindowStart: leading ? (event) => event : null,
    onWindowEnd: trailing ? (queue) => queue.last : null,
    dispatchOnClose: trailing,
    maxLengthQueue: trailing ? 2 : 0,

class TimerStream<T> extends Stream<T> {
  final StreamController<T> _controller;

  /// Constructs a [Stream] which emits [value] after the specified [Duration].
  TimerStream(T value, Duration duration)
      : _controller = _buildController(value, duration);

  StreamSubscription<T> listen(void Function(T event) onData,
      {Function onError, void Function() onDone, bool cancelOnError}) {
    return _controller.stream.listen(
      onError: onError,
      onDone: onDone,
      cancelOnError: cancelOnError,

  static StreamController<T> _buildController<T>(T value, Duration duration) {
    final watch = Stopwatch();
    Timer timer;
     StreamController<T> controller;
    Duration totalElapsed = Duration.zero;

    void onResume() {
      // Already cancelled or is not paused.
      if (totalElapsed == null || timer != null) return;

      totalElapsed = totalElapsed + watch.elapsed;

      timer = Timer(duration - totalElapsed, () {

    controller = StreamController(
      sync: true,
      onListen: () {
        timer = Timer(duration, () {
      onPause: () {
        timer = null;
      onResume: onResume,
      onCancel: () {
        timer = null;
        totalElapsed = null;
    return controller;

abstract class ForwardingSink<T, R> {
  /// Handle data event
  void add(EventSink<R> sink, T data);

  /// Handle error event
  void addError(EventSink<R> sink, Object error, StackTrace st);

  /// Handle close event
  void close(EventSink<R> sink);

  /// Fires when a listener subscribes on the underlying [Stream].
  void onListen(EventSink<R> sink);

  /// Fires when a subscriber pauses.
  void onPause(EventSink<R> sink);

  /// Fires when a subscriber resumes after a pause.
  void onResume(EventSink<R> sink);

  /// Fires when a subscriber cancels.
  FutureOr onCancel(EventSink<R> sink);

/// @private
/// Helper method which forwards the events from an incoming [Stream]
/// to a new [StreamController].
/// It captures events such as onListen, onPause, onResume and onCancel,
/// which can be used in pair with a [ForwardingSink]
Stream<R> forwardStream<T, R>(
    Stream<T> stream, ForwardingSink<T, R> connectedSink) {
  ArgumentError.checkNotNull(stream, 'stream');
  ArgumentError.checkNotNull(connectedSink, 'connectedSink');

  StreamController<R> controller;
  StreamSubscription<T> subscription;

  void runCatching(void Function() block) {
    try {
    } catch (e, s) {
      connectedSink.addError(controller, e, s);

  final onListen = () {
    runCatching(() => connectedSink.onListen(controller));

    subscription = stream.listen(
          (data) => runCatching(() => connectedSink.add(controller, data)),
      onError: (Object e, StackTrace st) =>
          runCatching(() => connectedSink.addError(controller, e, st)),
      onDone: () => runCatching(() => connectedSink.close(controller)),

  final onCancel = () {
    final onCancelSelfFuture = subscription.cancel();
    final onCancelConnectedFuture = connectedSink.onCancel(controller);
    final futures = <Future>[
      if (onCancelSelfFuture is Future) onCancelSelfFuture,
      if (onCancelConnectedFuture is Future) onCancelConnectedFuture,
    return Future.wait<dynamic>(futures);

  final onPause = () {
    runCatching(() => connectedSink.onPause(controller));

  final onResume = () {
    runCatching(() => connectedSink.onResume(controller));

  // Create a new Controller, which will serve as a trampoline for
  // forwarded events.
  if (stream.isBroadcast) {
    controller = StreamController<R>.broadcast(
      onListen: onListen,
      onCancel: onCancel,
      sync: true,
  } else {
    controller = StreamController<R>(
      onListen: onListen,
      onPause: onPause,
      onResume: onResume,
      onCancel: onCancel,
      sync: true,

  return controller.stream;

class DebounceStreamTransformer<T> extends BackpressureStreamTransformer<T, T> {
  /// Constructs a [StreamTransformer] which buffers events into a [List] and
  /// emits this [List] whenever the current [window] fires.
  /// The [window] is reset whenever the [Stream] that is being transformed
  /// emits an event.
  DebounceStreamTransformer(Stream Function(T event) window)
      : super(
    onWindowEnd: (Iterable<T> queue) => queue.last,
    maxLengthQueue: 1,

extension Throttle<T> on Stream<T> {
  Stream<T> throttle(Duration duration,{bool trailing = false, bool leading = true}) {
    return transform(ThrottleStreamTransformer<T>(
          (_) => TimerStream<bool>(true, duration),
      trailing: trailing,
      leading: leading,

  Stream<T> debounce(Duration duration) => transform(
      DebounceStreamTransformer<T>((_) => TimerStream<void>(null, duration)));

