跳到主要内容

异步编程: 流

Dart 中的异步编程以 FutureStream 类为特征。

Future 表示不会立即完成的计算。普通函数返回结果,而异步函数返回一个 Future,它最终会包含结果。Future 会告诉您结果何时准备就绪。

流是一系列异步事件。它类似于异步的 Iterable——您不是在请求时获得下一个事件,而是流在事件准备就绪时通知您。

接收流事件

#

流可以通过多种方式创建,这是另一篇文章的主题,但它们都可以通过相同的方式使用:异步 for 循环(通常简称为 await for)迭代流中的事件,就像 for 循环迭代 Iterable 一样。例如

dart
Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (final value in stream) {
    sum += value;
  }
  return sum;
}

此代码简单地接收整数事件流中的每个事件,将它们相加,并返回总和(一个 future)。当循环体结束时,函数会暂停,直到下一个事件到来或流完成。

该函数使用 async 关键字标记,在使用 await for 循环时这是必需的。

以下示例通过使用 async* 函数生成一个简单的整数流来测试前面的代码

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (final value in stream) {
    sum += value;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

void main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // 55
}

错误事件

#

当流中没有更多事件时,流就完成了,接收事件的代码会收到完成通知,就像收到新事件到来通知一样。使用 await for 循环读取事件时,当流完成时循环会停止。

在某些情况下,错误会在流完成之前发生;例如从远程服务器获取文件时网络失败,或者创建事件的代码有 bug,需要有人知道。

流也可以像传递数据事件一样传递错误事件。大多数流在第一个错误后会停止,但也可能存在传递多个错误的流,以及在错误事件后继续传递更多数据的流。本文档只讨论最多传递一个错误的流。

使用 await for 读取流时,错误由循环语句抛出。这也会结束循环。您可以使用 try-catch 捕获错误。以下示例在循环迭代器等于 4 时抛出错误

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  try {
    await for (final value in stream) {
      sum += value;
    }
  } catch (e) {
    return -1;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    if (i == 4) {
      throw Exception('Intentional exception');
    } else {
      yield i;
    }
  }
}

void main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // -1
}

使用流

#

Stream 类包含许多辅助方法,可以帮助您对流执行常见操作,类似于 Iterable 上的方法。例如,您可以使用 Stream API 中的 lastWhere() 方法查找流中最后一个正整数。

dart
Future<int> lastPositive(Stream<int> stream) =>
    stream.lastWhere((x) => x >= 0);

两种流

#

流有两种类型。

单订阅流

#

最常见的流包含一系列事件,这些事件是更大整体的一部分。事件需要按正确顺序传递,并且不能丢失任何一个。当您读取文件或接收 Web 请求时,就会得到这种流。

这种流只能监听一次。稍后再次监听可能意味着错过了初始事件,然后流的其余部分就没有意义了。当您开始监听时,数据将被获取并以块的形式提供。

广播流

#

另一种流适用于可以一次处理一个的独立消息。例如,这种流可用于浏览器中的鼠标事件。

您可以随时开始监听这种流,并且在监听期间会收到触发的事件。多个监听器可以同时监听,并且在取消之前的订阅后,您可以稍后再次监听。

处理流的方法

#

Stream<T> 上的以下方法处理流并返回结果

dart
Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object? needle);
Future<E> drain<E>([E? futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function()? orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = '']);
Future<T> lastWhere(bool Function(T element) test, {T Function()? orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function()? orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();

所有这些函数,除了 drain()pipe(),都对应于 Iterable 上的类似函数。每个函数都可以通过使用带有 await for 循环的 async 函数(或仅使用其他方法之一)轻松编写。例如,一些实现可以是

dart
Future<bool> contains(Object? needle) async {
  await for (final event in this) {
    if (event == needle) return true;
  }
  return false;
}

Future forEach(void Function(T element) action) async {
  await for (final event in this) {
    action(event);
  }
}

Future<List<T>> toList() async {
  final result = <T>[];
  await forEach(result.add);
  return result;
}

Future<String> join([String separator = '']) async =>
    (await toList()).join(separator);

(实际实现略微复杂,但主要是出于历史原因。)

修改流的方法

#

Stream 上的以下方法根据原始流返回一个新的流。每个方法都会等待新的流被监听后,才开始监听原始流。

dart
Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);

前面的方法对应于 Iterable 上的类似方法,它们将一个可迭代对象转换为另一个可迭代对象。所有这些都可以使用带有 await for 循环的 async 函数轻松编写。

dart
Stream<E> asyncExpand<E>(Stream<E>? Function(T event) convert);
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
Stream<T> distinct([bool Function(T previous, T next)? equals]);

asyncExpand()asyncMap() 函数类似于 expand()map(),但允许其函数参数是异步函数。distinct() 函数在 Iterable 上不存在,但可以有。

dart
Stream<T> handleError(Function onError, {bool Function(dynamic error)? test});
Stream<T> timeout(
  Duration timeLimit, {
  void Function(EventSink<T> sink)? onTimeout,
});
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);

最后三个函数更具特殊性。它们涉及 await for 循环无法直接处理的错误处理;遇到的第一个错误将终止循环及其流订阅,并且没有内置的恢复机制。

以下代码演示了如何使用 handleError()await for 循环消费流之前从中过滤掉错误。

dart
Stream<S> mapLogErrors<S, T>(
  Stream<T> stream,
  S Function(T event) convert,
) async* {
  var streamWithoutErrors = stream.handleError((e) => log(e));

  await for (final event in streamWithoutErrors) {
    yield convert(event);
  }
}

在前面的示例中,如果流没有发出任何事件,则永远不会返回到 await for 循环。为了避免这种情况,请使用 timeout() 函数创建一个新流。timeout() 使您能够设置时间限制并在返回的流上继续发出事件。

以下代码修改了前面的示例。它添加了一个两秒的超时,并且如果在两秒或更长时间内没有事件发生,则会产生相关的错误。

dart
Stream<S> mapLogErrors<S, T>(
  Stream<T> stream,
  S Function(T event) convert,
) async* {
  var streamWithoutErrors = stream.handleError((e) => log(e));
  var streamWithTimeout = streamWithoutErrors.timeout(
    const Duration(seconds: 2),
    onTimeout: (eventSink) {
      eventSink.addError('Timed out after 2 seconds');
      eventSink.close();
    },
  );

  await for (final event in streamWithTimeout) {
    yield convert(event);
  }
}

transform() 函数

#

transform() 函数不仅用于错误处理;它是一种更通用的流的“映射”。普通的 map 对于每个输入的事件只需要一个值。然而,特别是对于 I/O 流,可能需要多个输入的事件才能产生一个输出事件。StreamTransformer 可以处理这种情况。例如,像 Utf8Decoder 这样的解码器就是转换器。转换器只需要一个函数 bind(),可以使用 async 函数轻松实现。

读取和解码文件

#

以下代码读取文件,并在流上运行两次转换。它首先将数据从 UTF8 转换,然后通过 LineSplitter 进行处理。所有行都被打印出来,除了以井号 # 开头的行。

dart
import 'dart:convert';
import 'dart:io';

void main(List<String> args) async {
  var file = File(args[0]);
  var lines = utf8.decoder
      .bind(file.openRead())
      .transform(const LineSplitter());
  await for (final line in lines) {
    if (!line.startsWith('#')) print(line);
  }
}

listen() 方法

#

Stream 上的最后一个方法是 listen()。这是一个“低级”方法——所有其他流函数都基于 listen() 定义。

dart
StreamSubscription<T> listen(
  void Function(T event)? onData, {
  Function? onError,
  void Function()? onDone,
  bool? cancelOnError,
});

要创建一个新的 Stream 类型,您只需扩展 Stream 类并实现 listen() 方法——Stream 上的所有其他方法都会调用 listen() 才能工作。

listen() 方法允许您开始监听流。在此之前,流只是一个描述您希望看到哪些事件的惰性对象。当您监听时,会返回一个 StreamSubscription 对象,它代表正在生成事件的活动流。这类似于 Iterable 只是对象的集合,而迭代器才是实际执行迭代的对象。

流订阅允许您暂停订阅、在暂停后恢复订阅以及完全取消订阅。您可以设置回调函数,以便在每个数据事件或错误事件发生时以及流关闭时调用。

其他资源

#

阅读以下文档,了解更多关于在 Dart 中使用流和异步编程的详细信息。