在 Dart 中创建流
作者:Lasse Nielsen
2013 年 4 月 (2021 年 5 月更新)
dart:async 库包含两种对许多 Dart API 很重要的类型:Stream 和 Future。 Future 表示单个计算的结果,而 stream 则是结果的序列。你可以监听流来获取结果(数据和错误)以及流关闭的通知。在流完成之前,你也可以在监听时暂停或停止监听。
但本文不是关于使用流的,而是关于创建自己的流。你可以通过几种方式创建流:
- 转换现有流。
- 使用
async*
函数从头创建流。 - 使用
StreamController
创建流。
本文展示了每种方法的代码,并提供了提示,帮助你正确实现流。
关于使用流的帮助,请参见异步编程:流。
转换现有流
#创建流的常见情况是你已经有一个流,并希望基于原始流的事件创建一个新流。例如,你可能有一个字节流,希望通过 UTF-8 解码输入将其转换为字符串流。最通用的方法是创建一个新流,该新流等待原始流上的事件,然后输出新事件。示例:
/// Splits a stream of consecutive strings into lines.
///
/// The input string is provided in smaller chunks through
/// the `source` stream.
Stream<String> lines(Stream<String> source) async* {
// Stores any partial line from the previous chunk.
var partial = '';
// Wait until a new chunk is available, then process it.
await for (final chunk in source) {
var lines = chunk.split('\n');
lines[0] = partial + lines[0]; // Prepend partial line.
partial = lines.removeLast(); // Remove new partial line.
for (final line in lines) {
yield line; // Add lines to output stream.
}
}
// Add final partial line to output stream, if any.
if (partial.isNotEmpty) yield partial;
}
对于许多常见的转换,你可以使用 Stream
提供的转换方法,例如 map()
、where()
、expand()
和 take()
。
例如,假设你有一个流 counterStream
,它每秒发出一个递增的计数器。它的实现可能如下:
var counterStream = Stream<int>.periodic(
const Duration(seconds: 1),
(x) => x,
).take(15);
要快速查看事件,可以使用如下代码:
counterStream.forEach(print); // Print an integer every second, 15 times.
要转换流事件,可以在监听流之前在其上调用转换方法,例如 map()
。该方法返回一个新流。
// Double the integer in each event.
var doubleCounterStream = counterStream.map((int x) => x * 2);
doubleCounterStream.forEach(print);
除了 map()
,你还可以使用任何其他转换方法,例如以下方法:
.where((int x) => x.isEven) // Retain only even integer events.
.expand((var x) => [x, x]) // Duplicate each event.
.take(5) // Stop after the first five events.
通常,一个转换方法就足够了。但是,如果你需要对转换进行更多控制,可以使用 Stream
的 transform()
方法指定一个 StreamTransformer。平台库为许多常见任务提供了流转换器。例如,以下代码使用 dart:convert 库提供的 utf8.decoder
和 LineSplitter
转换器。
Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines = await content
.transform(utf8.decoder)
.transform(const LineSplitter())
.toList();
从头创建流
#创建新流的一种方法是使用异步生成器(async*
)函数。流在函数调用时创建,函数体在流被监听时开始运行。函数返回时,流关闭。在函数返回之前,它可以使用 yield
或 yield*
语句在流上发出事件。
这是一个以固定间隔发出数字的简单示例:
Stream<int> timedCounter(Duration interval, [int? maxCount]) async* {
int i = 0;
while (true) {
await Future.delayed(interval);
yield i++;
if (i == maxCount) break;
}
}
此函数返回一个 Stream
。当该流被监听时,函数体开始运行。它会重复延迟指定的间隔,然后 yield 下一个数字。如果省略 maxCount
参数,循环就没有停止条件,流将永远输出递增的数字 - 或者直到监听器取消其订阅。
当监听器取消(通过在 listen()
方法返回的 StreamSubscription
对象上调用 cancel()
)时,下一次函数体到达 yield
语句时,yield
将充当 return
语句。任何包含的 finally
块都将执行,函数退出。如果函数在退出前尝试 yield 一个值,那将失败并充当 return。
当函数最终退出时,cancel()
方法返回的 Future 完成。如果函数因错误退出,则 Future 以该错误完成;否则,它以 null
完成。
另一个更有用的示例是将 Future 序列转换为流的函数:
Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
for (final future in futures) {
var result = await future;
yield result;
}
}
此函数向 futures
可迭代对象请求一个新的 Future,等待该 Future 完成,发出结果值,然后循环。如果某个 Future 完成时出现错误,则流也会以该错误完成。
async*
函数从零开始构建流的情况很少见。它需要从某个地方获取数据,而这个地方通常是另一个流。在某些情况下,例如上面 Future 的序列,数据来自其他异步事件源。然而,在许多情况下,async*
函数过于简单,难以轻松处理多个数据源。这就是 StreamController
类发挥作用的地方。
使用 StreamController
#如果你的流事件来自程序的各个不同部分,而不仅仅是来自可以通过 async
函数遍历的流或 Future,那么可以使用 StreamController 来创建和填充流。
StreamController
为你提供了一个新的流,以及一种在任何时间、从任何地方向流添加事件的方法。该流具备处理监听器和暂停所需的所有逻辑。你返回流,并将控制器保留给自己。
以下示例(来自 stream_controller_bad.dart)展示了 StreamController
的基本(但有缺陷)用法,用于实现先前示例中的 timedCounter()
函数。此代码创建要返回的流,然后根据计时器事件向其提供数据,这些事件既不是 Future 也不是流事件。
// NOTE: This implementation is FLAWED!
// It starts before it has subscribers, and it doesn't implement pause.
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
var controller = StreamController<int>();
int counter = 0;
void tick(Timer timer) {
counter++;
controller.add(counter); // Ask stream to send counter values as event.
if (maxCount != null && counter >= maxCount) {
timer.cancel();
controller.close(); // Ask stream to shut down and tell listeners.
}
}
Timer.periodic(interval, tick); // BAD: Starts before it has subscribers.
return controller.stream;
}
和之前一样,你可以像这样使用 timedCounter()
返回的流:
var counterStream = timedCounter(const Duration(seconds: 1), 15);
counterStream.listen(print); // Print an integer every second, 15 times.
timedCounter()
的这个实现有两个问题:
- 在有订阅者之前就开始产生事件。
- 即使订阅者请求暂停,它也继续产生事件。
正如接下来的部分所示,你可以在创建 StreamController
时指定 onListen
和 onPause
等回调来解决这两个问题。
等待订阅
#通常,流在开始工作之前应等待订阅者。async*
函数会自动执行此操作,但使用 StreamController
时,你可以完全控制,即使不应该添加事件时也可以添加。当流没有订阅者时,其 StreamController
会缓冲事件,如果流从未获得订阅者,这可能导致内存泄漏。
尝试将使用流的代码更改为以下内容:
void listenAfterDelay() async {
var counterStream = timedCounter(const Duration(seconds: 1), 15);
await Future.delayed(const Duration(seconds: 5));
// After 5 seconds, add a listener.
await for (final n in counterStream) {
print(n); // Print an integer every second, 15 times.
}
}
当这段代码运行时,前 5 秒没有输出任何内容,尽管流正在工作。然后添加了监听器,由于事件被 StreamController
缓冲,前 5 个左右的事件会一次性打印出来。
要在订阅时收到通知,请在创建 StreamController
时指定 onListen
参数。onListen
回调在流获得第一个订阅者时调用。如果指定 onCancel
回调,则在控制器失去最后一个订阅者时调用。在前面的示例中,Timer.periodic()
应该移至 onListen
处理程序中,如下一节所示。
遵守暂停状态
#当监听器请求暂停时,避免产生事件。当流订阅暂停时,async*
函数会在 yield
语句处自动暂停。另一方面,StreamController
在暂停期间会缓冲事件。如果提供事件的代码不遵守暂停,缓冲区的大小可能会无限增长。此外,如果监听器在暂停后不久停止监听,那么用于创建缓冲区的工作就被浪费了。
要查看没有暂停支持时会发生什么,请尝试将使用流的代码更改为以下内容:
void listenWithPause() {
var counterStream = timedCounter(const Duration(seconds: 1), 15);
late StreamSubscription<int> subscription;
subscription = counterStream.listen((int counter) {
print(counter); // Print an integer every second.
if (counter == 5) {
// After 5 ticks, pause for five seconds, then resume.
subscription.pause(Future.delayed(const Duration(seconds: 5)));
}
});
}
当五秒暂停结束后,在此期间触发的事件会一次性全部接收到。这是因为流的源不遵守暂停,并持续向流添加事件。因此流会缓冲事件,并在流恢复非暂停状态时清空其缓冲区。
以下版本的 timedCounter()
(来自 stream_controller.dart)通过使用 StreamController
上的 onListen
、onPause
、onResume
和 onCancel
回调来实现暂停。
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
late StreamController<int> controller;
Timer? timer;
int counter = 0;
void tick(_) {
counter++;
controller.add(counter); // Ask stream to send counter values as event.
if (counter == maxCount) {
timer?.cancel();
controller.close(); // Ask stream to shut down and tell listeners.
}
}
void startTimer() {
timer = Timer.periodic(interval, tick);
}
void stopTimer() {
timer?.cancel();
timer = null;
}
controller = StreamController<int>(
onListen: startTimer,
onPause: stopTimer,
onResume: startTimer,
onCancel: stopTimer,
);
return controller.stream;
}
使用上面提供的 listenWithPause()
函数运行这段代码。你会看到它在暂停时停止计数,并在之后顺利恢复。
你必须使用所有监听器——onListen
、onCancel
、onPause
和 onResume
——才能收到暂停状态变化的通知。原因是如果订阅和暂停状态同时改变,只会调用 onListen
或 onCancel
回调。
最后提示
#在不使用 async* 函数创建流时,请记住以下提示:
使用同步控制器时要小心——例如,使用
StreamController(sync: true)
创建的控制器。当你在未暂停的同步控制器上发送事件时(例如,使用 EventSink 定义的add()
、addError()
或close()
方法),事件会立即发送给流上的所有监听器。Stream
监听器绝不应在添加监听器的代码完全返回之前被调用,在错误的时间使用同步控制器可能会违反此约定并导致正常代码失败。避免使用同步控制器。如果你使用
StreamController
,onListen
回调会在listen
调用返回StreamSubscription
之前被调用。不要让onListen
回调依赖于订阅已经存在。例如,在以下代码中,onListen
事件会在subscription
变量具有有效值之前触发(并调用handler
)。dartsubscription = stream.listen(handler);
StreamController
定义的onListen
、onPause
、onResume
和onCancel
回调在流的监听器状态改变时由流调用,但绝不会在事件触发期间或调用另一个状态改变处理程序期间调用。在这些情况下,状态改变回调会延迟到先前的回调完成之后。不要尝试自己实现
Stream
接口。很容易在事件、回调以及添加和移除监听器之间的交互上出现细微的错误。始终使用现有流(可能是来自StreamController
的流)来实现新流的listen
调用。尽管可以通过扩展
Stream
类并实现listen
方法及额外的功能来创建具有更多功能的类,但通常不推荐这样做,因为它引入了一个用户必须考虑的新类型。与其创建一个是Stream
(及更多)的类,你通常可以创建一个拥有Stream
(及更多)的类。