在 Dart
中的异步函数返回 Future
或 Stream
对象, await
和 async
关键字用于异步编程, 使得编写异步代码就像同步代码一样
使用 async
关键字标记一个函数为异步函数, 如:
Future<String> fetchVersion() async {
return "1.0.0";
}
获取异步函数 Future 的值
通过 await
关键获取异步函数 Future
的结果:
main() async {
var v = await fetchVersion();
print(v);
}
除了通过 await
关键字获取 Future
的值, 还可以通过 then
函数来获取:
fetchVersion().then((version){
print(version);
});
捕获异步函数异常
使用 try/catch/finally
来捕获异步函数 Future
的异常信息:
try {
var v = await fetchVersion();
print(v);
} catch(e) {
print(e);
}
如果是通过 then
函数来获取的 Future
的结果, 可以通过 catchError
函数来捕获异常:
fetchVersion().then((version) {
print(version);
}).catchError((e) {
print(e);
});
异步函数链
通过 then
函数来实现异步函数的调用链:
Future result = costlyQuery(url);
result
.then((value) => expensiveWork(value))
.then((_) => lengthyComputation())
.then((_) => print('Done!'))
.catchError((exception) {
//Handle exception...
});
也可以通过 await
关键字来实现上面的功能:
try {
final value = await costlyQuery(url);
await expensiveWork(value);
await lengthyComputation();
print('Done!');
} catch (e) {
//Handle exception...
}
等待多个异步Future
可以使用 Future.wait
函数来等待多个异步函数返回, 该函数返回一个集合, 集合元素就是异步函数的返回结果, 集合元素的顺序就是异步函数的顺序:
main() async {
Future deleteLotsOfFiles() async {
return Future.delayed(Duration(seconds: 5), () {
return "deleteLotsOfFiles";
});
}
Future copyLotsOfFiles() async {
return ("copyLotsOfFiles");
};
Future checksumLotsOfOtherFiles() async {
return ("checksumLotsOfOtherFiles");
};
var result = await Future.wait([
deleteLotsOfFiles(),
copyLotsOfFiles(),
checksumLotsOfOtherFiles(),
]);
print(result);
print('Done with all the long steps!');
}
// 输出结果
[deleteLotsOfFiles, copyLotsOfFiles, checksumLotsOfOtherFiles]
Done with all the long steps!
await for
异步函数返回 Future
, 也可以返回 Stream
, Stream
代表的是数据序列
可以通过 await for
来获取 stream
里的值, 如:
await for (varOrType identifier in stream) {
// Executes each time the stream emits a value.
}
同理, 使用了 await for
的函数也必须是 async
:
Future main() async {
await for (var request in requestServer) {
handleRequest(request);
}
}
在 await for
中可以使用 return
或 break
来中断流
除了使用 await for
还可以使用 listen
函数来监听 stream
里的值:
main() {
new HttpClient().getUrl(Uri.parse('http://www.baidu.com'))
.then((HttpClientRequest request) => request.close())
.then((HttpClientResponse response) => response.transform(new Utf8Decoder()).listen(print));
}
stream transform
有的时候我们还需要对流数据进行转换, 例如下面的代码读取文件的内容案例, 对内容进行 utf8 的反编码, 加上行分割, 这样就能原样输出内容:
main() async {
var config = File('hello.dart');
var inputStream = config.openRead();
var lines = inputStream.transform(utf8.decoder).transform(LineSplitter());
await for (var line in lines) {
print(line);
}
}
监听流的异常和关闭
如何要捕获上面读取文件的例子的异常, 如果是使用 await for
, 可以使用 try catch
main() async {
var config = File('config.txt');
Stream<List<int>> inputStream = config.openRead();
var lines = inputStream.transform(utf8.decoder).transform(LineSplitter());
try {
await for (var line in lines) {
print(line);
}
print('file is now closed');
} catch (e) {
print(e);
}
}
如果使用的是 then
函数来实现的, 可以使用 onDone
和 onError
来监听:
main() async {
var config = File('config.txt');
Stream<List<int>> inputStream = config.openRead();
inputStream.transform(utf8.decoder).transform(LineSplitter()).listen(
(String line) {
print(line);
}, onDone: () {
print('file is now closed');
}, onError: (e) {
print(e);
});
}
create stream
上面的案例都是使用 SDK
为我们提供好的 Stream
, 那么我们如何自己创建 Stream
呢?
新建一个返回 Stream
的异步函数需要使用 async*
来标记, 使用 yield
或 yield*
来发射数据:
Stream<int> timedCounter(Duration interval, [int maxCount]) async* {
int i = 0;
while (true) {
await Future.delayed(interval);
yield i++;
if (i == maxCount) break;
}
}
main() async {
timedCounter(Duration(seconds: 2), 5).listen(print);
}
上面的代码大概的意思就是每隔 interval = 2
秒发射一次数据, 数据从 0
开始累加, 直到数据等于 maxCount=5
时停止发射. 需要注意的是只有调用了 listen
异步函数体才会被执行, 该函数返回的是一个 Subscription
可以通过 pause
函数来暂停一个 Subscription
发射数据:
main() async {
var r = timedCounter(Duration(seconds: 2), 5).listen(print);
var resumeSignal = Future.delayed(Duration(seconds: 2));
r.pause(resumeSignal);
}
执行了 pause
函数会导致 Subscription
暂停发射数据, resumeSignal
执行完毕后 Subscription
将继续发射数据
pause
函数参数是可以选的, resumeSignal
参数可以不传递, 如:
main() async {
var r = timedCounter(Duration(seconds: 2), 5).listen(print);
r.pause();
}
这个时候程序 2
秒后就会退出, 因为 执行完 listen
函数后,异步函数体就会被执行, 当执行完
await Future.delayed(interval);
程序便退出了, 因为 main 函数已经执行完毕. 我们可以通过 resume
函数来恢复 Subscription
数据的发射:
main() async {
var r = timedCounter(Duration(seconds: 2), 5).listen(print);
r.pause();
await Future.delayed(Duration(seconds: 3));
r.resume();
}
还可以通过 cancel
函数来取消一个 Subscription
:
main() async {
var r = timedCounter(Duration(seconds: 2), 5).listen(print);
//上面的代码意思4秒之后取消stream
Future.delayed(Duration(seconds: 4),(){
r.cancel();
});
}
StreamController
有的时候 Stream
来程序的不同地方, 不仅仅是来自异步函数
我们可以通过 StreamController
来创建流, 并且往这个流里发射数据. 也就是说 StreamController
会为我们创建一个新的 Stream
, 可以在任何时候任何地方为 Stream
添加 Event
例如我们使用 StreamController
来改造下 timedCounter
函数 :
Stream<int> timedCounter(Duration interval, [int maxCount]) {
var controller = StreamController<int>();
int counter = 0;
void tick(Timer timer) {
counter++;
// Ask stream to send counter values as event.
controller.add(counter);
if (maxCount != null && counter >= maxCount) {
timer.cancel();
// Ask stream to shut down and tell listeners.
controller.close();
}
}
// BAD: Starts before it has subscribers.
Timer.periodic(interval, tick);
return controller.stream;
}
但是这个改造程序有两个问题:
- 尽管没有调用
listen
函数,timedCounter
函数体会被执行, 也就是定时器Timer.periodic()
会被执行, 因为timedCounter
并没有被async*
标记, 如果被标记程序会自动帮我们处理.
- 尽管没有调用
- 没有处理
pause
情况, 哪怕执行了pause
函数, 程序也会不断的产生事件
- 没有处理
所以当我们使用 StreamController
的时候一定要避免出现上面的两个问题, 否则会导致内存泄漏问题
对于第一个问题, 读者可能会问:我们没有执行 listen
函数, 程序也没有输出数据啊. 这是因为这些事件被 StreamController
缓冲起来了:
void listenAfterDelay() async {
var counterStream = timedCounter(const Duration(seconds: 1), 15);
await Future.delayed(const Duration(seconds: 5));
// After 5 seconds, add a listener.
await for (int n in counterStream) {
print(n); // Print an integer every second, 15 times.
}
}
调用完 timedCounter
函数后, 我们没有调用 listen
函数, 而是 delay
了 5
秒
也就是说 StreamController
在这 5
秒里缓冲了 5
个 Event
然后通过 await for
来获取值, 会立刻输出 1 ~ 5
的数字, 因为这些数据已经缓冲好了, 等待被获取.
最后每隔一秒输出一次. 所以这个程序证明了哪怕没有调用 listen
函数 发送的 Event
会被 StreamController
缓冲起来
我们再来看下第二个问题: 没有处理 pause
事件, 哪怕调用管理 pause
函数, 程序也会发送 Event , 这些 Event
会被 StreamController
buffer 起来. 我们通过一个程序来验证下:
void listenWithPause() {
var counterStream = timedCounter(const Duration(seconds: 1), 15);
StreamSubscription<int> subscription;
subscription = counterStream.listen((int counter) {
print(counter); // Print an integer every second.
if (counter == 5) {
// After 5 ticks, pause for five seconds, then resume.
subscription.pause(Future.delayed(const Duration(seconds: 5)));
}
});
}
这个程序会输出 1 ~ 5
的数, 然后会触发 pause
函数, 暂停 5
秒. 其实在暂停的期间 Event
依然会发送, 并没有真正的暂停. 5
秒后 Subscription
恢复, 会立刻输出 6 ~ 10
, 说明在这 5
秒产生了 5
个 Event
, 这些事件被 StreamController
buffer 起来了.
那么在使用 StreamController
的时候如何避免上述问题呢?
Stream<int> timedCounter(Duration interval, [int maxCount]) {
StreamController<int> controller;
Timer timer;
int counter = 0;
void tick(_) {
counter++;
controller.add(counter); // Ask stream to send counter values as event.
if (counter == maxCount) {
timer.cancel();
controller.close(); // Ask stream to shut down and tell listeners.
}
}
void startTimer() {
timer = Timer.periodic(interval, tick);
}
void stopTimer() {
if (timer != null) {
timer.cancel();
timer = null;
}
}
controller = StreamController<int>(
onListen: startTimer,
onPause: stopTimer,
onResume: startTimer,
onCancel: stopTimer);
return controller.stream;
}
也就是说需要把 pause/resume
等回调交给 StreamController
, 它会自动帮我们处理好生命周期
联系我
所有关于 Retrofit 的使用案例都在我的 AndroidAll GitHub 仓库中。该仓库除了 Retrofit,还有其他 Android 其他常用的开源库源码分析,如「RxJava」「Glide」「LeakCanary」「Dagger2」「Retrofit」「OkHttp」「ButterKnife」「Router」等。除此之外,还有完整的 Android 程序员所需要的技术栈思维导图,欢迎享用。
下面是我的公众号,干货文章不错过,有需要的可以关注下,有任何问题可以联系我:
Reference
https://dart.dev/guides/libraries/library-tour#dartasync---asynchronous-programming
https://dart.dev/tutorials/language/futures
https://dart.dev/guides/language/language-tour#asynchrony-support
http://wiki.jikexueyuan.com/project/dart-language-tour/asynchrony-support.html
https://dart.dev/articles/libraries/creating-streams