Flutter中的Stream初探

Stream: 超级抽象的一个XXX

Stream 的分类

(1) "Single-subscription" -- 单订阅流
(2) "broadcast" -- 广播式的流(可多订阅)

  • 单订阅流只能被订阅一次,重复订阅会报错, 直到设置listen 后才会发送。单订阅流通常用于流式数据块较大的连续数据,如文件I/O。
  • 广播式的 可以订阅多次,在listen之前的数据会丢失。
    - 如果一个流是单订阅模式 却想多次订阅,可以通过asBroadcastStream ()方法来修改。

Stream 的创建

(1) 从集合中创建一个新的单订阅流, Stream.fromIterable
Stream stream1 = Stream.fromIterable([11, 22, 33]);
(2) 从Future中创建一个新的单订阅流, Stream.fromFuture
Stream stream = Stream.fromFuture(Future(()=> 1));
(3) 通过Stream.fromFutures创建
Stream stream3 = Stream.fromFutures([
Future(() => 111),
Future(() => 111),
]);
(4) 创建一个每隔自定义时间发送一个数据的流
Stream stream2 = Stream.periodic(Duration(seconds: 2), (a) {
print("a------>$a");
if (a < 3)
return a;
else
return a *10;
});
它有2 个参数, 第一个是间隔的时间, 第二个是每次发送数据前的回调方法,可以通过这个方法的返回值来修改流的值。 (返回值即向流中添加的内容)
上面创建的流是一个固定间隔时间无限发送的流,这就有问题了。 正常情况下谁会去搞一个无限发送数据流的功能呢? 怎样控制它的结束?-----------
用stream的take 方法。 stream2 = stream2.take(5); 这样就只会发送5次了。
升级版的takeWhile () : 可以用来做筛选

stream2 = stream2.takeWhile((data) {
 return data < 10; // 设置一个上限为10 
});
image.png

为了方操作 Stream ,官方提供了StreamController;如上图所示,他提供StreamSink来添加流 (入口),同时又提供 stream 属性用于对外的监听和变换。 stream.listen的返回时一个StreamSubscription,可以通过它的pause(),resume(),cancel()等方法来操作流的订阅。

StreamController:
StreamController controller = StreamController<String>(); // 创建一个单订阅流
StreamController controller = StreamController.broadcast(); // 创建一个广播式的订阅流
参数sync用来指定是同步还是异步。

listen : 用来设置监听, 它的返回值是 StreamSubscribe。
StreamSubscribe:
pause() : 暂停监听(是立即暂停),暂停后的事件流不会丢失,会在resume后一起回调
resume(): 唤醒pause的流
cancel(): 取消
举个栗子🌰

 // 1. StreamControl
StreamController controller = StreamController<String>();
// 2. StreamSink
StreamSink sink = controller.sink;
// 3. Stream
Stream stream = controller.stream;
stream.transform(StreamTransformer<String, String>.fromHandlers(
    handleData: (String data, EventSink<String> sink) {
  // 在这里设置transform 是没有用的,不会走这里; 除非在stream.transform返回的stream上加listen监听。
  if (!data.contains("数据2")) {
    sink.add(data);
  }
}));
sink.add("3秒后才设置监听。");
// 4. subscribe
Timer(Duration(seconds: 3), () {
  StreamSubscription subscription = controller.stream.transform(
      StreamTransformer<String, String>.fromHandlers(
          handleData: (String data, EventSink<String> sink) {
    print("transform");
    if (!data.contains("数据3")) {
      sink.add(data);
    }
  })).listen((event) {
    print("接收到新的消息: " + event);
  });
  sink.add("我是一条新的数据"); 
  Timer(Duration(milliseconds: 100), () {
    sink.add("pause...");
    subscription.pause(); // 暂停
    sink.add("我是一条新的数据pause"); 
  });

  Timer(Duration(seconds: 5), () {
    subscription.resume();

    sink.add("我是一条新的数据2"); 
  });
});

输出结果: 绿色的先输出,过5秒后黄色的才输出


image1.png

那么问题来了, 为什么pause 之前的add的那一个流没有输出呢? 跟进去看源码就明白了,pause 期间是不会分发事件的。


image2.png

schedule 的实现
image3.png

这样也就明白了, Stream最终是 想microtask queue 中添加了一个microtask 来实现异步的功能。

说点题外的: flutter是单线程,他的异步实现是通过Event Looper 来实现的。Event looper 中包含2个队列: (1)MicorTask Queue (2) Event Queue , MicroTask 的优先级是大于Event Queue的,只有所有的MicroTask Queue中的任务都完成以后才会去执行Event Queue中的内容。

当然 StreamController 可以是同步的,只要在创建的时候将参数sync设置为true即可,sync: true

如何通过Stream来实现响应式的组件 ?

通过StreamBuilder

看个例子:

class StreamModel {
  StreamController _controller;

  StreamSink<List<BookResponseData>> _sink;

  Stream<List<BookResponseData>> stream;

  StreamModel() {
    // 构造方法中初始化流相关的对象
    _controller = StreamController<List<BookResponseData>>.broadcast();
    _sink = _controller.sink;
    stream = _controller.stream;
  }
  /// 获取书本列表
  getBookList() async {
    var httpClient = new HttpClient();
    var uri = new Uri.https('www.apiopen.top', '/novelApi');
    var request = await httpClient.getUrl(uri);
    var response = await request.close();
    var responseBody = await response.transform(utf8.decoder).join();
    // 将获取到的字符串转换成定义好的Book实体类
    BookResponseEntity entity =
        BookResponseEntity.fromJson(json.decode(responseBody));
    // 接口中拿到数据之后,通过sink.add 添加一条流即可, 这样在StreamBuild中就会有回调。
    _sink.add(entity.data);
  }
  /// 资源
  dispose() {
    _sink.close();
    _controller.close();
  }
}

组件类:

class BookList extends StatefulWidget {
  @override
  State<StatefulWidget> createState() {
    return _BookListState();
  }
}

//https://api.apiopen.top/getSingleJoke?sid=28654780
//https://www.apiopen.top/novelApi

class _BookListState extends State<BookList> {
  StreamModel streamModel;

  @override
  void initState() {
    super.initState();
    streamModel = StreamModel();
  }

  @override
  Widget build(BuildContext context) {
//    print("build");
    return Scaffold(
      appBar: AppBar(
        title: Text("stream demo"),
      ),
      body: Container(
        child: StreamBuilder<List<BookResponseData>>(
          stream: streamModel.stream,  // 要监听的流 
          initialData: [], // 初始值,可以不设
          builder: (context, a) { // sink.add 后,就会回调这个方法。
            List<Widget> views = [];
            if (a.data != null && a.data.length > 0) {
              a.data.forEach((BookResponseData data) {
                views.add(Container(
                  padding: EdgeInsets.all(10.0),
                  child: Column(
                    children: <Widget>[
                      Text(
                        data.bookname,
                        style: TextStyle(fontWeight: FontWeight.w600),
                      ),
                      Text(data.bookInfo),
                    ],
                  ),
                ));
              });
            }
            return ListView(
              children: views,
            );
          },
        ),
      ),
      floatingActionButton: FloatingActionButton(
          child: Text("获取数据"),
          onPressed: () {
            streamModel.getBookList();
          }),
    );
  }
  @override
  void dispose() {
    streamModel.dispose(); 
    super.dispose();
  }
}

以上便可以实现基于Stream的响应式组件。

又有问题了。。。。 没办法问题就是这么多 (来打我... )

为什么StreamBuilder能够监听到Stream的变化来刷新UI?
跟进去看一下源码

void _subscribe() {
  if (widget.stream != null) {
    _subscription = widget.stream.listen((T data) {
      setState(() {
        _summary = widget.afterData(_summary, data);
      });
    }, onError: (Object error) {
      setState(() {
        _summary = widget.afterError(_summary, error);
      });
    }, onDone: () {
      setState(() {
        _summary = widget.afterDone(_summary);
      });
    });
    _summary = widget.afterConnected(_summary);
  }
}

你会发现 StreamBuilder是一个StatefulWidget, 本质还是在stream.listen中通过setState 来实现响应数据刷新View。

这样,对Stream及基于Stream的响应式组件就有个大致的了解了... ...

over。。。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容