在 Dart 中创建流
作者:Lasse Nielsen
2013 年 4 月(2021 年 5 月更新)
dart:async 库包含两种对于许多 Dart API 而言很重要的类型:Stream 和 Future. Future 表示单个计算的结果,而流是结果的序列。您监听流以获取结果(数据和错误)以及流关闭的通知。您还可以在监听时暂停或在流完成之前停止监听流。
但本文不是关于使用流的。而是关于创建您自己的流。您可以通过以下几种方式创建流
- 转换现有流。
- 使用
async*
函数从头开始创建流。 - 使用
StreamController
创建流。
本文展示了每种方法的代码,并提供了帮助您正确实现流的提示。
有关使用流的帮助,请参阅 异步编程:流。
转换现有流
#创建流的常见情况是您已经有一个流,并且您想基于原始流的事件创建一个新流。例如,您可能有一个字节流,您希望通过 UTF-8 解码输入将其转换为字符串流。最通用的方法是创建一个新流,等待原始流上的事件,然后输出新事件。示例
/// Splits a stream of consecutive strings into lines.
///
/// The input string is provided in smaller chunks through
/// the `source` stream.
Stream<String> lines(Stream<String> source) async* {
// Stores any partial line from the previous chunk.
var partial = '';
// Wait until a new chunk is available, then process it.
await for (final chunk in source) {
var lines = chunk.split('\n');
lines[0] = partial + lines[0]; // Prepend partial line.
partial = lines.removeLast(); // Remove new partial line.
for (final line in lines) {
yield line; // Add lines to output stream.
}
}
// Add final partial line to output stream, if any.
if (partial.isNotEmpty) yield partial;
}
对于许多常见转换,您可以使用 Stream
提供的转换方法,例如 map()
、where()
、expand()
和 take()
。
例如,假设您有一个流,counterStream
,它每秒发出一个递增的计数器。以下是它的实现方式
var counterStream =
Stream<int>.periodic(const Duration(seconds: 1), (x) => x).take(15);
要快速查看事件,可以使用如下代码
counterStream.forEach(print); // Print an integer every second, 15 times.
要转换流事件,可以在侦听流之前调用流上的转换方法,例如 map()
。该方法返回一个新的流。
// Double the integer in each event.
var doubleCounterStream = counterStream.map((int x) => x * 2);
doubleCounterStream.forEach(print);
除了 map()
,您还可以使用任何其他转换方法,例如以下方法
.where((int x) => x.isEven) // Retain only even integer events.
.expand((var x) => [x, x]) // Duplicate each event.
.take(5) // Stop after the first five events.
通常,转换方法是您所需要的全部。但是,如果您需要对转换有更多控制,可以使用 Stream
的 transform()
方法指定一个 StreamTransformer。平台库为许多常见任务提供了流转换器。例如,以下代码使用 dart:convert 库提供的 utf8.decoder
和 LineSplitter
转换器。
Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines = await content
.transform(utf8.decoder)
.transform(const LineSplitter())
.toList();
从头开始创建流
#创建新流的一种方法是使用异步生成器 (async*
) 函数。当调用该函数时创建流,当侦听流时函数的主体开始运行。当函数返回时,流关闭。在函数返回之前,它可以通过使用 yield
或 yield*
语句在流上发出事件。
以下是一个以规则间隔发出数字的原始示例
Stream<int> timedCounter(Duration interval, [int? maxCount]) async* {
int i = 0;
while (true) {
await Future.delayed(interval);
yield i++;
if (i == maxCount) break;
}
}
此函数返回一个 Stream
。当侦听该流时,主体开始运行。它重复延迟请求的间隔,然后生成下一个数字。如果省略 maxCount
参数,则循环没有停止条件,因此流会永远输出越来越大的数字 - 或直到侦听器取消其订阅。
当侦听器取消(通过调用 listen()
方法返回的 StreamSubscription
对象上的 cancel()
)时,下次主体到达 yield
语句时,yield
反而充当 return
语句。任何封闭的 finally
块都会被执行,并且函数退出。如果函数在退出之前尝试生成一个值,则会失败并充当返回。
当函数最终退出时,cancel()
方法返回的 future 完成。如果函数以错误退出,则 future 以该错误完成;否则,它以 null
完成。
另一个更有用的示例是将一系列 future 转换为流的函数
Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
for (final future in futures) {
var result = await future;
yield result;
}
}
此函数要求 futures
可迭代对象提供一个新的 future,等待该 future,发出结果值,然后循环。如果 future 以错误完成,则流将以该错误完成。
很少有 async*
函数从无中构建流。它需要从某个地方获取其数据,而大多数情况下,该地方是另一个流。在某些情况下(如上面的 future 序列),数据来自其他异步事件源。然而,在许多情况下,async*
函数过于简单,无法轻松处理多个数据源。这就是 StreamController
类发挥作用的地方。
使用 StreamController
#如果流的事件来自程序的不同部分,而不仅仅来自 async
函数可以遍历的流或 future,那么请使用 StreamController 创建和填充流。
StreamController
为您提供一个新流,以及一种在任何时间和任何地方向流中添加事件的方法。该流具有处理侦听器和暂停所需的所有逻辑。您返回流并保留控制器以供自己使用。
以下示例(来自 stream_controller_bad.dart)展示了 StreamController
的一个基本但有缺陷的用法,以实现前面示例中的 timedCounter()
函数。此代码创建了一个要返回的流,然后根据计时器事件(既不是 future 也不是流事件)向其中馈送数据。
// NOTE: This implementation is FLAWED!
// It starts before it has subscribers, and it doesn't implement pause.
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
var controller = StreamController<int>();
int counter = 0;
void tick(Timer timer) {
counter++;
controller.add(counter); // Ask stream to send counter values as event.
if (maxCount != null && counter >= maxCount) {
timer.cancel();
controller.close(); // Ask stream to shut down and tell listeners.
}
}
Timer.periodic(interval, tick); // BAD: Starts before it has subscribers.
return controller.stream;
}
与之前一样,您可以像这样使用 timedCounter()
返回的流
var counterStream = timedCounter(const Duration(seconds: 1), 15);
counterStream.listen(print); // Print an integer every second, 15 times.
timedCounter()
的此实现有一些问题
- 它在拥有订阅者之前就开始生成事件。
- 即使订阅者请求暂停,它也会继续生成事件。
正如下一节所示,您可以在创建 StreamController
时指定回调(如 onListen
和 onPause
)来解决这两个问题。
等待订阅
#根据规则,流应在开始工作之前等待订阅者。async*
函数会自动执行此操作,但当使用 StreamController
时,您可以完全控制,甚至可以在不应添加事件时添加事件。当流没有订阅者时,其 StreamController
会缓冲事件,如果流永远没有订阅者,这可能会导致内存泄漏。
尝试将使用该流的代码更改为以下内容
void listenAfterDelay() async {
var counterStream = timedCounter(const Duration(seconds: 1), 15);
await Future.delayed(const Duration(seconds: 5));
// After 5 seconds, add a listener.
await for (final n in counterStream) {
print(n); // Print an integer every second, 15 times.
}
}
当此代码运行时,在前 5 秒内不会打印任何内容,尽管流正在执行工作。然后添加侦听器,并且前 5 个左右的事件会一次性打印出来,因为它们已由 StreamController
缓冲。
要接收订阅通知,请在创建 StreamController
时指定 onListen
参数。当流获得第一个订阅者时,将调用 onListen
回调。如果你指定了 onCancel
回调,则当控制器失去最后一个订阅者时,将调用该回调。在前面的示例中,Timer.periodic()
应移至 onListen
处理程序,如下一节所示。
尊重暂停状态
#当侦听器请求暂停时,避免产生事件。async*
函数在流订阅暂停时自动在 yield
语句处暂停。另一方面,StreamController
会在暂停期间缓冲事件。如果提供事件的代码不遵守暂停,则缓冲区的大小可能会无限增长。此外,如果侦听器在暂停后不久停止侦听,那么创建缓冲区所花费的工作就白费了。
要了解在没有暂停支持的情况下会发生什么,请尝试将使用流的代码更改为以下内容
void listenWithPause() {
var counterStream = timedCounter(const Duration(seconds: 1), 15);
late StreamSubscription<int> subscription;
subscription = counterStream.listen((int counter) {
print(counter); // Print an integer every second.
if (counter == 5) {
// After 5 ticks, pause for five seconds, then resume.
subscription.pause(Future.delayed(const Duration(seconds: 5)));
}
});
}
当五秒的暂停结束时,在此期间触发的事件将全部同时接收。发生这种情况是因为流的源不遵守暂停并继续向流中添加事件。因此,流会缓冲事件,然后在流取消暂停时清空其缓冲区。
以下版本的 timedCounter()
(来自 stream_controller.dart)通过使用 StreamController
上的 onListen
、onPause
、onResume
和 onCancel
回调来实现暂停。
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
late StreamController<int> controller;
Timer? timer;
int counter = 0;
void tick(_) {
counter++;
controller.add(counter); // Ask stream to send counter values as event.
if (counter == maxCount) {
timer?.cancel();
controller.close(); // Ask stream to shut down and tell listeners.
}
}
void startTimer() {
timer = Timer.periodic(interval, tick);
}
void stopTimer() {
timer?.cancel();
timer = null;
}
controller = StreamController<int>(
onListen: startTimer,
onPause: stopTimer,
onResume: startTimer,
onCancel: stopTimer);
return controller.stream;
}
使用上面的 listenWithPause()
函数运行此代码。你会看到它在暂停时停止计数,然后在暂停后很好地恢复。
你必须使用所有侦听器(onListen
、onCancel
、onPause
和 onResume
)才能收到暂停状态更改的通知。原因是如果订阅和暂停状态同时更改,则只调用 onListen
或 onCancel
回调。
最终提示
#在不使用 async* 函数创建流时,请记住以下提示
使用同步控制器时要小心,例如使用
StreamController(sync: true)
创建的控制器。当您在未暂停的同步控制器上发送事件(例如,使用 EventSink 定义的add()
、addError()
或close()
方法)时,该事件会立即发送给流上的所有侦听器。在添加侦听器的代码完全返回之前,绝不能调用Stream
侦听器,在错误的时间使用同步控制器可能会破坏此承诺并导致正常代码失败。避免使用同步控制器。如果您使用
StreamController
,则在listen
调用返回StreamSubscription
之前,将调用onListen
回调。不要让onListen
回调依赖于已存在的订阅。例如,在以下代码中,在subscription
变量具有有效值之前,将触发onListen
事件(并调用handler
)。dartsubscription = stream.listen(handler);
当流的侦听器状态发生变化时,由流调用
StreamController
定义的onListen
、onPause
、onResume
和onCancel
回调,但绝不会在触发事件期间或在调用其他状态更改处理程序期间调用。在这些情况下,状态更改回调会延迟,直到先前的回调完成。不要尝试自己实现
Stream
接口。很容易错误地理解事件、回调以及添加和移除侦听器之间的交互。始终使用现有流(可能来自StreamController
)来实现新流的listen
调用。虽然可以通过扩展
Stream
类并实现listen
方法和顶部的额外功能来创建扩展Stream
的更多功能的类,但通常不建议这样做,因为它会引入用户必须考虑的新类型。您可以经常创建一个具有Stream
(以及更多)的类,而不是一个是Stream
(以及更多)的类。