跳到主要内容

在 Dart 中创建 Stream

作者:Lasse Nielsen
2013 年 4 月(2021 年 5 月更新)

dart:async 库包含两种对许多 Dart API 很重要的类型:StreamFuture。 Future 表示单个计算的结果,而 stream 是一系列结果。您监听 stream 以获取结果(数据和错误)以及 stream 关闭的通知。您还可以在监听时暂停或在 stream 完成之前停止监听 stream。

但是本文不是关于使用 stream 的。它是关于创建您自己的 stream。您可以通过几种方式创建 stream:

  • 转换现有的 stream。
  • 通过使用 async* 函数从头开始创建 stream。
  • 通过使用 StreamController 创建 stream。

本文展示了每种方法的代码,并提供提示以帮助您正确实现 stream。

有关使用 stream 的帮助,请参阅异步编程:Stream

转换现有的 stream

#

创建 stream 的常见情况是您已经有一个 stream,并且想要基于原始 stream 的事件创建一个新的 stream。例如,您可能有一个字节 stream,您想通过 UTF-8 解码输入将其转换为字符串 stream。最通用的方法是创建一个新的 stream,该 stream 等待原始 stream 上的事件,然后输出新的事件。示例:

dart
/// Splits a stream of consecutive strings into lines.
///
/// The input string is provided in smaller chunks through
/// the `source` stream.
Stream<String> lines(Stream<String> source) async* {
  // Stores any partial line from the previous chunk.
  var partial = '';
  // Wait until a new chunk is available, then process it.
  await for (final chunk in source) {
    var lines = chunk.split('\n');
    lines[0] = partial + lines[0]; // Prepend partial line.
    partial = lines.removeLast(); // Remove new partial line.
    for (final line in lines) {
      yield line; // Add lines to output stream.
    }
  }
  // Add final partial line to output stream, if any.
  if (partial.isNotEmpty) yield partial;
}

对于许多常见的转换,您可以使用 Stream 提供的转换方法,例如 map()where()expand()take()

例如,假设您有一个 stream,counterStream,它每秒发出一个递增的计数器。以下是如何实现它的方法:

dart
var counterStream = Stream<int>.periodic(
  const Duration(seconds: 1),
  (x) => x,
).take(15);

要快速查看事件,您可以使用如下代码:

dart
counterStream.forEach(print); // Print an integer every second, 15 times.

要转换 stream 事件,您可以在监听 stream 之前调用 stream 上的转换方法,例如 map()。该方法返回一个新的 stream。

dart
// Double the integer in each event.
var doubleCounterStream = counterStream.map((int x) => x * 2);
doubleCounterStream.forEach(print);

除了 map() 之外,您还可以使用任何其他转换方法,例如以下方法:

dart
.where((int x) => x.isEven) // Retain only even integer events.
.expand((var x) => [x, x]) // Duplicate each event.
.take(5) // Stop after the first five events.

通常,转换方法是您所需要的全部。但是,如果您需要对转换进行更多控制,则可以使用 StreamTransformer 以及 Streamtransform() 方法。平台库为许多常见任务提供了 stream 转换器。例如,以下代码使用 dart:convert 库提供的 utf8.decoderLineSplitter 转换器。

dart
Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines =
    await content
        .transform(utf8.decoder)
        .transform(const LineSplitter())
        .toList();

从头开始创建 stream

#

创建新 stream 的一种方法是使用异步生成器 (async*) 函数。stream 在函数被调用时创建,函数的 body 在 stream 被监听时开始运行。当函数返回时,stream 关闭。在函数返回之前,它可以通过使用 yieldyield* 语句在 stream 上发出事件。

这是一个原始示例,它以固定的时间间隔发出数字:

dart
Stream<int> timedCounter(Duration interval, [int? maxCount]) async* {
  int i = 0;
  while (true) {
    await Future.delayed(interval);
    yield i++;
    if (i == maxCount) break;
  }
}

此函数返回一个 Stream。当该 stream 被监听时,body 开始运行。它重复延迟请求的时间间隔,然后 yield 下一个数字。如果省略了 maxCount 参数,则循环没有停止条件,因此 stream 将永远输出越来越大的数字 - 或者直到监听器取消其订阅为止。

当监听器取消(通过在 listen() 方法返回的 StreamSubscription 对象上调用 cancel())时,则 body 下次到达 yield 语句时,yield 将充当 return 语句。任何封闭的 finally 代码块都会被执行,并且函数退出。如果函数在退出之前尝试 yield 一个值,则会失败并充当 return。

当函数最终退出时,cancel() 方法返回的 future 完成。如果函数以错误退出,则 future 以该错误完成;否则,它以 null 完成。

另一个更有用的示例是一个将 future 序列转换为 stream 的函数:

dart
Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
  for (final future in futures) {
    var result = await future;
    yield result;
  }
}

此函数向 futures 可迭代对象请求一个新的 future,等待该 future,发出结果值,然后循环。如果 future 以错误完成,则 stream 以该错误完成。

很少有 async* 函数从无到有构建 stream。它需要从某处获取其数据,而最常见的情况是某处是另一个 stream。在某些情况下,例如上面的 future 序列,数据来自其他异步事件源。然而,在许多情况下,async* 函数过于简单,无法轻松处理多个数据源。这就是 StreamController 类发挥作用的地方。

使用 StreamController

#

如果您的 stream 的事件来自程序的各个部分,而不仅仅来自可以通过 async 函数遍历的 stream 或 future,则使用 StreamController 来创建和填充 stream。

StreamController 为您提供了一个新的 stream 以及一种在任何时候、从任何地方向 stream 添加事件的方法。stream 具有处理监听器和暂停所需的所有逻辑。您返回 stream 并自己保留控制器。

以下示例(来自 stream_controller_bad.dart)展示了 StreamController 的基本但有缺陷的用法,以实现先前示例中的 timedCounter() 函数。此代码创建一个 stream 来返回,然后根据定时器事件(既不是 future 也不是 stream 事件)向其馈送数据。

baddart
// NOTE: This implementation is FLAWED!
// It starts before it has subscribers, and it doesn't implement pause.
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  var controller = StreamController<int>();
  int counter = 0;
  void tick(Timer timer) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  Timer.periodic(interval, tick); // BAD: Starts before it has subscribers.
  return controller.stream;
}

与之前一样,您可以使用 timedCounter() 返回的 stream,如下所示:

dart
var counterStream = timedCounter(const Duration(seconds: 1), 15);
counterStream.listen(print); // Print an integer every second, 15 times.

timedCounter() 的这种实现存在几个问题:

  • 它在有订阅者之前就开始生成事件。
  • 即使订阅者请求暂停,它也会继续生成事件。

如下一节所示,您可以通过在创建 StreamController 时指定回调(例如 onListenonPause)来解决这两个问题。

等待订阅

#

作为规则,stream 应该在开始工作之前等待订阅者。async* 函数会自动执行此操作,但是当使用 StreamController 时,您完全可以控制,甚至可以在不应该添加事件时添加事件。当 stream 没有订阅者时,其 StreamController 会缓冲事件,如果 stream 永远没有订阅者,则可能导致内存泄漏。

尝试将使用 stream 的代码更改为以下内容:

dart
void listenAfterDelay() async {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  await Future.delayed(const Duration(seconds: 5));

  // After 5 seconds, add a listener.
  await for (final n in counterStream) {
    print(n); // Print an integer every second, 15 times.
  }
}

当此代码运行时,前 5 秒钟没有任何内容打印出来,尽管 stream 正在工作。然后添加了监听器,并且前 5 个左右的事件一次性打印出来,因为它们被 StreamController 缓冲了。

要获得订阅通知,请在创建 StreamController 时指定 onListen 参数。当 stream 获得其第一个订阅者时,将调用 onListen 回调。如果您指定 onCancel 回调,则当控制器失去其最后一个订阅者时,将调用该回调。在前面的示例中,Timer.periodic() 应该移动到 onListen 处理程序,如下一节所示。

遵守暂停状态

#

避免在监听器请求暂停时生成事件。当 stream 订阅暂停时,async* 函数会在 yield 语句处自动暂停。另一方面,StreamController 在暂停期间缓冲事件。如果提供事件的代码不遵守暂停,则缓冲区的大小可能会无限增长。此外,如果监听器在暂停后不久停止监听,则花费在创建缓冲区上的工作将被浪费。

要查看在没有暂停支持的情况下会发生什么,请尝试将使用 stream 的代码更改为以下内容:

dart
void listenWithPause() {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  late StreamSubscription<int> subscription;

  subscription = counterStream.listen((int counter) {
    print(counter); // Print an integer every second.
    if (counter == 5) {
      // After 5 ticks, pause for five seconds, then resume.
      subscription.pause(Future.delayed(const Duration(seconds: 5)));
    }
  });
}

当五秒暂停结束后,在此期间触发的事件将一次性全部接收。发生这种情况是因为 stream 的源不遵守暂停,并且继续向 stream 添加事件。因此,stream 缓冲了事件,然后在 stream 取消暂停时清空其缓冲区。

以下版本的 timedCounter()(来自 stream_controller.dart)通过使用 StreamController 上的 onListenonPauseonResumeonCancel 回调来实现暂停。

dart
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  late StreamController<int> controller;
  Timer? timer;
  int counter = 0;

  void tick(_) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (counter == maxCount) {
      timer?.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  void startTimer() {
    timer = Timer.periodic(interval, tick);
  }

  void stopTimer() {
    timer?.cancel();
    timer = null;
  }

  controller = StreamController<int>(
    onListen: startTimer,
    onPause: stopTimer,
    onResume: startTimer,
    onCancel: stopTimer,
  );

  return controller.stream;
}

使用上面的 listenWithPause() 函数运行此代码。您会看到它在暂停时停止计数,并在之后很好地恢复。

您必须使用所有监听器——onListenonCancelonPauseonResume——才能收到暂停状态更改的通知。原因是,如果订阅和暂停状态同时更改,则只会调用 onListenonCancel 回调。

最后的提示

#

在不使用 async* 函数创建 stream 时,请记住以下提示:

  • 使用同步控制器时要小心——例如,使用 StreamController(sync: true) 创建的控制器。当您在未暂停的同步控制器上发送事件时(例如,使用 EventSink 定义的 add()addError()close() 方法),事件会立即发送到 stream 上的所有监听器。在添加监听器的代码完全返回之前,绝不能调用 Stream 监听器,而在错误的时间使用同步控制器可能会破坏此承诺并导致良好的代码失败。避免使用同步控制器。

  • 如果您使用 StreamController,则在 listen 调用返回 StreamSubscription 之前,会调用 onListen 回调。不要让 onListen 回调依赖于订阅已经存在。例如,在以下代码中,在 subscription 变量具有有效值之前,onListen 事件会触发(并调用 handler)。

    dart
    subscription = stream.listen(handler);
  • StreamController 定义的 onListenonPauseonResumeonCancel 回调由 stream 在 stream 的监听器状态更改时调用,但绝不会在事件触发期间或在调用另一个状态更改处理程序期间调用。在这些情况下,状态更改回调会延迟到先前的回调完成为止。

  • 不要尝试自己实现 Stream 接口。事件、回调以及添加和删除监听器之间的交互很容易出现细微的错误。始终使用现有的 stream,可能来自 StreamController,来实现新 stream 的 listen 调用。

  • 虽然可以创建通过扩展 Stream 类并实现 listen 方法以及顶部的额外功能来扩展 Stream 功能的类,但通常不建议这样做,因为它引入了用户必须考虑的新类型。与其创建一个 Stream(以及更多)的类,不如创建一个具有 Stream(以及更多)的类。