内容

隔离区

本页讨论一些使用 Isolate API 实现隔离区的示例。

当应用程序处理足够大的计算以暂时阻塞其他计算时,您应该使用隔离区。最常见的示例是在 Flutter 应用程序中,当您需要执行可能会导致 UI 变得无响应的大型计算时。

没有关于何时必须使用隔离区的规则,但以下是一些可以利用隔离区的其他情况。

  • 解析和解码异常大的 JSON 块。
  • 处理和压缩照片、音频和视频。
  • 转换音频和视频文件。
  • 对大型列表或文件系统执行复杂的搜索和过滤。
  • 执行 I/O,例如与数据库通信。
  • 处理大量网络请求。

实现简单的 worker 隔离区

#

这些示例实现了一个主隔离区,该隔离区会生成一个简单的 worker 隔离区。 Isolate.run() 简化了设置和管理 worker 隔离区背后的步骤。

  1. 生成(启动和创建)隔离区。
  2. 在生成的隔离区上运行函数。
  3. 捕获结果。
  4. 将结果返回到主隔离区。
  5. 在工作完成后终止隔离区。
  6. 检查、捕获并将异常和错误抛回主隔离区。

在新的隔离区中运行现有方法

#
  1. 调用 run() 以在 主隔离区 中直接生成一个新的隔离区(一个 后台 worker),同时 main() 等待结果。
dart
const String filename = 'with_keys.json';

void main() async {
  // Read some data.
  final jsonData = await Isolate.run(_readAndParseJson);

  // Use that data.
  print('Number of JSON keys: ${jsonData.length}');
}
  1. 将您希望 worker 隔离区执行的函数作为第一个参数传递给 worker 隔离区。在本例中,它是现有的函数 _readAndParseJson()
dart
Future<Map<String, dynamic>> _readAndParseJson() async {
  final fileData = await File(filename).readAsString();
  final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
  return jsonData;
}
  1. Isolate.run() 获取 _readAndParseJson() 返回的结果,并将该值发送回主隔离区,并关闭 worker 隔离区。

  2. worker 隔离区将保存结果的内存转移到主隔离区。它不会复制数据。worker 隔离区执行验证过程以确保允许转移对象。

_readAndParseJson() 是一个现有的异步函数,可以轻松地直接在主隔离区中运行。使用 Isolate.run() 来运行它,而是启用并发。worker 隔离区完全抽象了 _readAndParseJson() 的计算。它可以在不阻塞主隔离区的情况下完成。

Isolate.run() 的结果始终是 Future,因为主隔离区中的代码会继续运行。worker 隔离区执行的计算是同步还是异步,都不会影响主隔离区,因为它始终并行运行。

有关完整程序,请查看 send_and_receive.dart 示例。

使用隔离区发送闭包

#

您还可以使用 run() 在主隔离区中直接使用函数字面量或闭包创建一个简单的 worker 隔离区。

dart
const String filename = 'with_keys.json';

void main() async {
  // Read some data.
  final jsonData = await Isolate.run(() async {
    final fileData = await File(filename).readAsString();
    final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
    return jsonData;
  });

  // Use that data.
  print('Number of JSON keys: ${jsonData.length}');
}

此示例与前一个示例的作用相同。生成一个新的隔离区,计算一些内容,并发送回结果。

但是,现在隔离区发送一个 闭包。闭包在功能和写入代码的方式方面比典型的命名函数限制更少。在本例中,Isolate.run() 执行看起来像是本地代码的内容,并行运行。从这个意义上说,您可以想象 run() 像一个控制流运算符,用于“并行运行”。

使用端口在隔离区之间发送多个消息

#

短暂的隔离区使用起来很方便,但需要额外的性能开销来生成新的隔离区,并将对象从一个隔离区复制到另一个隔离区。如果您的代码依赖于使用 Isolate.run 重复运行相同的计算,您可能会通过创建不会立即退出的长生命周期隔离区来提高性能。

为此,您可以使用 Isolate.run 抽象的某些低级隔离区 API。

本节介绍在新建的隔离区和 主隔离区 之间建立双向通信所需的步骤。第一个示例 基本端口 介绍了该过程的高级概述。第二个示例 健壮端口 逐渐向第一个示例添加更多实用的现实世界功能。

ReceivePortSendPort

#

在隔离区之间建立长生命周期通信需要两个类(除了 Isolate):ReceivePortSendPort。这些端口是隔离区之间唯一可以相互通信的方式。

ReceivePort 是一个对象,它处理从其他隔离区发送的消息。这些消息是通过 SendPort 发送的。

端口的行为类似于 Stream 对象(实际上,接收端口实现了 Stream!)。您可以将 SendPortReceivePort 分别视为 Stream 的 StreamController 和侦听器。SendPort 就像 StreamController,因为您使用 SendPort.send() 方法 向其“添加”消息,而这些消息由侦听器处理,在本例中为 ReceivePort。然后,ReceivePort 通过将接收到的消息作为参数传递给您提供的回调来处理这些消息。

设置端口

#

新建的隔离区只有通过 Isolate.spawn 调用接收的信息。如果您需要主隔离区在生成的隔离区创建后继续与其通信,您必须设置一个通信通道,以便生成的隔离区可以向主隔离区发送消息。隔离区只能通过消息传递进行通信。它们不能“看到”彼此的内存内部,这就是“隔离区”名称的由来。

要设置这种双向通信,首先在主隔离区中创建一个 ReceivePort,然后在使用 Isolate.spawn 生成隔离区时,将其 SendPort 作为参数传递给新隔离区。然后,新隔离区会创建自己的 ReceivePort,并将其 SendPort 发送回它从主隔离区接收到的 SendPort。主隔离区会接收此 SendPort,现在双方都有一个开放的通道来发送和接收消息。

A figure showing events being fed, one by one, into the event loop

  1. 在主隔离区中创建一个 ReceivePortSendPort 会自动作为 ReceivePort 上的属性创建。
  2. 使用 Isolate.spawn() 生成 worker 隔离区
  3. 将对 ReceivePort.sendPort 的引用作为第一条消息传递给工作进程隔离。
  4. 在工作进程隔离中创建另一个新的 ReceivePort
  5. 将对工作进程隔离的 ReceivePort.sendPort 的引用作为第一条消息回传到主隔离。

除了创建端口并设置通信之外,你还需要告诉端口在收到消息时该怎么做。这是通过在每个相应的 ReceivePort 上使用 listen 方法来完成的。

A figure showing events being fed, one by one, into the event loop

  1. 通过主隔离对工作进程隔离的 SendPort 的引用发送消息。
  2. 通过工作进程隔离的 ReceivePort 上的监听器接收并处理消息。这就是你想从主隔离中移出的计算执行的地方。
  3. 通过工作进程隔离对主隔离的 SendPort 的引用发送返回消息。
  4. 通过主隔离的 ReceivePort 上的监听器接收消息。

基本端口示例

#

此示例演示了如何使用与主隔离之间的双向通信来设置一个长期存在的 worker 进程隔离。该代码使用将 JSON 文本发送到新隔离的示例,其中 JSON 将被解析和解码,然后发送回主隔离。

步骤 1:定义 worker 类

#

首先,为你的后台 worker 进程隔离创建一个类。此类包含你需要的全部功能

  • 生成一个隔离。
  • 向该隔离发送消息。
  • 让隔离解码一些 JSON。
  • 将解码后的 JSON 发送回主隔离。

该类公开两个公共方法:一个用于生成 worker 进程隔离的方法,一个用于处理向该 worker 进程隔离发送消息的方法。

本示例中的剩余部分将逐一向你展示如何填充类方法。

dart
class Worker {
  Future<void> spawn() async {
    // TODO: Add functionality to spawn a worker isolate.
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _startRemoteIsolate(SendPort port) {
    // TODO: Define code that should be executed on the worker isolate.
  }

  Future<void> parseJson(String message) async {
    // TODO: Define a public method that can
    // be used to send messages to the worker isolate.
  }
}

步骤 2:生成一个 worker 进程隔离

#

Worker.spawn 方法是将代码分组以创建 worker 进程隔离并确保它可以接收和发送消息的地方。

  • 首先,创建一个 ReceivePort。这允许主隔离接收从新生成的 worker 进程隔离发送的消息。
  • 接下来,向接收端口添加一个监听器以处理 worker 进程隔离将发送回来的消息。传递给监听器的回调 _handleResponsesFromIsolate 将在步骤 4中介绍。
  • 最后,使用 Isolate.spawn 生成 worker 进程隔离。它需要两个参数:在 worker 进程隔离上执行的函数(在步骤 3中介绍),以及接收端口的 sendPort 属性。
dart
Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

receivePort.sendPort 参数将在工作进程隔离上调用时作为参数传递给回调 (_startRemoteIsolate)。这是确保工作进程隔离能够将消息发送回主隔离的第一步。

步骤 3:在工作进程隔离上执行代码

#

在此步骤中,你将定义发送到工作进程隔离以在它生成时执行的方法 _startRemoteIsolate。此方法类似于工作进程隔离的“main”方法。

  • 首先,创建一个新的 ReceivePort。此端口接收来自主隔离的未来消息。
  • 接下来,将该端口的 SendPort 发送回主隔离。
  • 最后,向新的 ReceivePort 添加一个监听器。此监听器处理主隔离发送给工作进程隔离的消息。
dart
static void _startRemoteIsolate(SendPort port) {
  final receivePort = ReceivePort();
  port.send(receivePort.sendPort);

  receivePort.listen((dynamic message) async {
    if (message is String) {
      final transformed = jsonDecode(message);
      port.send(transformed);
    }
  });
}

工作进程的 ReceivePort 上的监听器解码从主隔离传递的 JSON,然后将解码后的 JSON 发送回主隔离。

此监听器是主隔离发送给工作进程隔离的消息的入口点。这是你告诉工作进程隔离将来执行什么代码的唯一机会。

步骤 4:处理主隔离上的消息

#

最后,你需要告诉主隔离如何处理从工作进程隔离发送回主隔离的消息。为此,你需要填写 _handleResponsesFromIsolate 方法。回想一下,此方法传递给 receivePort.listen 方法,如步骤 2中所述

dart
Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

还记得你在步骤 3中将 SendPort 发送回主隔离吗?此方法处理接收该 SendPort,以及处理未来的消息(将是解码后的 JSON)。

  • 首先,检查消息是否为 SendPort。如果是,将该端口分配给类的 _sendPort 属性,以便以后使用它来发送消息。
  • 接下来,检查消息是否为 Map<String, dynamic> 类型,这是解码后的 JSON 的预期类型。如果是,请使用你的应用程序特定逻辑处理该消息。在此示例中,将打印消息。
dart
void _handleResponsesFromIsolate(dynamic message) {
  if (message is SendPort) {
    _sendPort = message;
    _isolateReady.complete();
  } else if (message is Map<String, dynamic>) {
    print(message);
  }
}

步骤 5:添加一个完成器以确保你的隔离已设置

#

要完成该类,请定义一个名为 parseJson 的公共方法,该方法负责向工作进程隔离发送消息。它还需要确保在隔离完全设置之前可以发送消息。要处理此问题,请使用Completer

  • 首先,添加一个名为 _isolateReady 的类级属性,该属性称为 Completer
  • 接下来,在 _handleResponsesFromIsolate 方法(在步骤 4中创建)中,如果消息为 SendPort,则在完成器上添加对 complete() 的调用。
  • 最后,在 parseJson 方法中,在添加 _sendPort.send 之前添加 await _isolateReady.future。这确保了在隔离生成并且SendPort 发送回主隔离之前,无法向工作进程隔离发送任何消息。
dart
Future<void> parseJson(String message) async {
  await _isolateReady.future;
  _sendPort.send(message);
}

完整示例

#
展开以查看完整示例
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = Worker();
  await worker.spawn();
  await worker.parseJson('{"key":"value"}');
}

class Worker {
  late SendPort _sendPort;
  final Completer<void> _isolateReady = Completer.sync();

  Future<void> spawn() async {
    final receivePort = ReceivePort();
    receivePort.listen(_handleResponsesFromIsolate);
    await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    if (message is SendPort) {
      _sendPort = message;
      _isolateReady.complete();
    } else if (message is Map<String, dynamic>) {
      print(message);
    }
  }

  static void _startRemoteIsolate(SendPort port) {
    final receivePort = ReceivePort();
    port.send(receivePort.sendPort);

    receivePort.listen((dynamic message) async {
      if (message is String) {
        final transformed = jsonDecode(message);
        port.send(transformed);
      }
    });
  }

  Future<void> parseJson(String message) async {
    await _isolateReady.future;
    _sendPort.send(message);
  }
}

健壮端口示例

#

上一个示例解释了设置与主隔离之间的双向通信的长期存在隔离所需的构建块。如前所述,该示例缺少一些重要功能,例如错误处理、在不再使用时关闭端口的能力,以及某些情况下围绕消息排序的不一致性。

此示例通过创建一个具有这些附加功能(甚至更多)的长期存在 worker 进程隔离,并遵循更好的设计模式,扩展了第一个示例中的信息。虽然此代码与第一个示例有相似之处,但它不是该示例的扩展。

步骤 1:定义 worker 类

#

首先,为你的后台 worker 进程隔离创建一个类。此类包含你需要的全部功能

  • 生成一个隔离。
  • 向该隔离发送消息。
  • 让隔离解码一些 JSON。
  • 将解码后的 JSON 发送回主隔离。

该类公开三个公共方法:一个用于创建 worker 进程隔离的方法,一个用于处理向该 worker 进程隔离发送消息的方法,以及一个用于在不再使用时关闭端口的方法。

dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  Future<Object?> parseJson(String message) async {
    // TODO: Ensure the port is still open.
    _commands.send(message);
  }

  static Future<Worker> spawn() async {
    // TODO: Add functionality to create a new Worker object with a
    //  connection to a spawned isolate.
    throw UnimplementedError();
  }

  Worker._(this._responses, this._commands) {
    // TODO: Initialize main isolate receive port listener.
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _handleCommandsToIsolate(ReceivePort rp, SendPort sp) async {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _startRemoteIsolate(SendPort sp) {
    // TODO: Initialize worker isolate's ports.
  }
}

步骤 2:在 Worker.spawn 方法中创建一个 RawReceivePort

#

在生成隔离之前,你需要创建一个RawReceivePort,它是一个更低级的 ReceivePort。使用 RawReceivePort 是首选模式,因为它允许你将隔离启动逻辑与处理隔离上的消息传递的逻辑分开。

Worker.spawn 方法中

  • 首先,创建 RawReceivePort。此 ReceivePort 仅负责接收来自工作进程隔离的初始消息,该消息将是 SendPort
  • 接下来,创建一个 Completer,它将指示隔离何时准备好接收消息。完成后,它将返回一条记录,其中包含 ReceivePortSendPort
  • 接下来,定义 RawReceivePort.handler 属性。此属性是一个 Function?,其行为类似于 ReceivePort.listener。当此端口收到消息时,将调用该函数。
  • 在处理程序函数内,调用 connection.complete()。此方法需要一个包含 ReceivePortSendPort记录作为参数。SendPort 是来自工作进程隔离的初始消息,它将在下一步分配给名为 _commands 的类级 SendPort
  • 然后,使用 ReceivePort.fromRawReceivePort 构造函数创建一个新的 ReceivePort,并将 initPort 传递进去。
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler.
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
// ···
  }

通过首先创建一个 RawReceivePort,然后创建一个 ReceivePort,你将能够稍后在 ReceivePort.listen 上添加新的回调。相反,如果你直接创建一个 ReceivePort,你将只能添加一个 listener,因为 ReceivePort 实现的是Stream,而不是BroadcastStream

实际上,这使你能够将隔离启动逻辑与处理设置完通信后接收消息的逻辑分开。当其他方法中的逻辑增长时,这种好处将更加明显。

步骤 3:使用 Isolate.spawn 生成 worker 进程隔离

#

此步骤将继续填充 Worker.spawn 方法。你将添加生成隔离所需的代码,并从此类返回一个 Worker 实例。在此示例中,对 Isolate.spawn 的调用包装在try/catch中,它确保如果隔离无法启动,initPort 将被关闭,并且不会创建 Worker 对象。

  • 首先,在 try/catch 块中尝试生成 worker 进程隔离。如果生成 worker 进程隔离失败,请关闭在先前步骤中创建的接收端口。传递给 Isolate.spawn 的方法将在后面的步骤中介绍。
  • 接下来,等待 connection.future,并将发送端口和接收端口从它返回的记录中解构。
  • 最后,通过调用其私有构造函数并传入来自该完成器的端口,返回一个 Worker 实例。
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
    // Spawn the isolate.
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(receivePort, sendPort);
  }

请注意,在此示例中(与上一个示例相比),Worker.spawn 充当此类的异步静态构造函数,并且是创建 Worker 实例的唯一方法。这简化了 API,使创建 Worker 实例的代码更简洁。

步骤 4:完成隔离设置过程

#

在此步骤中,你将完成基本的隔离设置过程。这几乎完全与上一个示例相关,并且没有新的概念。有一个细微的改变,即代码被分解为更多的方法,这是一种设计实践,可以让你在示例的其余部分中添加更多功能。有关设置隔离的基本过程的深入介绍,请参阅基本端口示例

首先,创建从 Worker.spawn 方法返回的私有构造函数。在构造函数主体中,向主隔离使用的接收端口添加一个监听器,并将一个尚未定义的方法传递给该监听器,称为 _handleResponsesFromIsolate

dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
// ···
  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

接下来,添加 _startRemoteIsolate 中的代码,该代码负责初始化工作进程隔离上的端口。回想一下,此方法传递给 Worker.spawn 方法中的 Isolate.spawn,它将接收主隔离的 SendPort 作为参数。

  • 创建一个新的 ReceivePort
  • 将该端口的 SendPort 发送回主隔离。
  • 调用一个名为 _handleCommandsToIsolate 的新方法,并将主隔离区中的新 ReceivePortSendPort 作为参数传递。
dart
static void _startRemoteIsolate(SendPort sendPort) {
  final receivePort = ReceivePort();
  sendPort.send(receivePort.sendPort);
  _handleCommandsToIsolate(receivePort, sendPort);
}

接下来,添加 _handleCommandsToIsolate 方法,该方法负责接收来自主隔离区的消息,在工作隔离区上解码 JSON,并将解码后的 JSON 作为响应发送回。

  • 首先,在工作隔离区 ReceivePort 上声明一个监听器。
  • 在监听器中添加的回调函数内,尝试在 try/catch 中解码从主隔离区传递的 JSON。如果解码成功,则将解码后的 JSON 发送回主隔离区。
  • 如果出现错误,则发送一个 RemoteError
dart
static void _handleCommandsToIsolate(
    ReceivePort receivePort, SendPort sendPort) {
  receivePort.listen((message) {
    try {
      final jsonData = jsonDecode(message as String);
      sendPort.send(jsonData);
    } catch (e) {
      sendPort.send(RemoteError(e.toString(), ''));
    }
  });
}

接下来,添加 _handleResponsesFromIsolate 方法的代码。

  • 首先,检查消息是否为 RemoteError,如果是,则应 throw 该错误。
  • 否则,打印消息。在后续步骤中,您将更新此代码以返回消息而不是打印它们。
dart
void _handleResponsesFromIsolate(dynamic message) {
  if (message is RemoteError) {
    throw message;
  } else {
    print(message);
  }
}

最后,添加 parseJson 方法,该方法是一个公共方法,允许外部代码将 JSON 发送到工作隔离区以进行解码。

dart
Future<Object?> parseJson(String message) async {
  _commands.send(message);
}

您将在下一步中更新此方法。

步骤 5:同时处理多个消息

#

当前,如果您快速将消息发送到工作隔离区,隔离区将以完成的顺序发送解码后的 JSON 响应,而不是发送的顺序。您无法确定哪个响应对应于哪个消息。

在此步骤中,您将通过为每个消息分配一个 ID,并使用 Completer 对象来解决此问题,以确保当外部代码调用 parseJson 时,返回给该调用者的响应是正确的响应。

首先,在 Worker 中添加两个类级属性。

  • Map<int, Completer<Object?>> _activeRequests
  • int _idCounter
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;

_activeRequests 地图将发送到工作隔离区的消息与 Completer 关联。_activeRequests 中使用的键取自 _idCounter,当发送更多消息时,_idCounter 将会增加。

接下来,更新 parseJson 方法,使其在将消息发送到工作隔离区之前创建完成器。

  • 首先创建一个 Completer
  • 接下来,递增 _idCounter,以便每个 Completer 与一个唯一的数字相关联。
  • _activeRequests 地图中添加一个条目,其中键是 _idCounter 的当前数字,值是完成器。
  • 将消息发送到工作隔离区,以及 ID。因为您只能通过 SendPort 发送一个值,所以将 ID 和消息包装在一个 记录 中。
  • 最后,返回完成器的 future,该 future 最终将包含来自工作隔离区的响应。
dart
Future<Object?> parseJson(String message) async {
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

您还需要更新 _handleResponsesFromIsolate_handleCommandsToIsolate 以处理此系统。

_handleCommandsToIsolate 中,您需要考虑到 message 是具有两个值的记录,而不仅仅是 JSON 文本。通过从 message 中解构值来实现这一点。

然后,在解码 JSON 之后,更新对 sendPort.send 的调用,以将 ID 和解码后的 JSON 同时传递回主隔离区,再次使用记录。

dart
static void _handleCommandsToIsolate(
    ReceivePort receivePort, SendPort sendPort) {
  receivePort.listen((message) {
    final (int id, String jsonText) = message as (int, String); // New
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData)); // Updated
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}

最后,更新 _handleResponsesFromIsolate

  • 首先,再次从消息参数中解构 ID 和响应。
  • 然后,从 _activeRequests 地图中删除与该请求相对应的完成器。
  • 最后,不要抛出错误或打印解码后的 JSON,而是完成完成器,传入响应。完成此操作后,响应将返回到在主隔离区上调用 parseJson 的代码。
dart
void _handleResponsesFromIsolate(dynamic message) {
  final (int id, Object? response) = message as (int, Object?); // New
  final completer = _activeRequests.remove(id)!; // New

  if (response is RemoteError) {
    completer.completeError(response); // Updated
  } else {
    completer.complete(response); // Updated
  }
}

步骤 6:添加关闭端口的功能

#

当您的代码不再使用隔离区时,您应该关闭主隔离区和工作隔离区上的端口。

  • 首先,添加一个类级布尔值,用于跟踪端口是否已关闭。
  • 然后,添加 Worker.close 方法。在此方法内
    • _closed 更新为 true。
    • 向工作隔离区发送最后一条消息。此消息是一个 String,内容为“shutdown”,但它可以是您想要的任何对象。您将在下一段代码中使用它。
  • 最后,检查 _activeRequests 是否为空。如果为空,则关闭主隔离区的 ReceivePort,名为 _responses
dart
class Worker {
  bool _closed = false;
// ···
  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
  • 接下来,您需要在工作隔离区中处理“shutdown”消息。在 _handleCommandsToIsolate 方法中添加以下代码。此代码将检查消息是否是一个读取为“shutdown”的 String。如果是,它将关闭工作隔离区的 ReceivePort,然后返回。
dart
static void _handleCommandsToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
) {
  receivePort.listen((message) {
    // New if-block.
    if (message == 'shutdown') {
      receivePort.close();
      return;
    }
    final (int id, String jsonText) = message as (int, String);
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData));
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}
  • 最后,您应该添加代码以在尝试发送消息之前检查端口是否已关闭。在 Worker.parseJson 方法中添加一行代码。
dart
Future<Object?> parseJson(String message) async {
  if (_closed) throw StateError('Closed'); // New
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

完整示例

#
展开此处以查看完整示例
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = await Worker.spawn();
  print(await worker.parseJson('{"key":"value"}'));
  print(await worker.parseJson('"banana"'));
  print(await worker.parseJson('[true, false, null, 1, "string"]'));
  print(
      await Future.wait([worker.parseJson('"yes"'), worker.parseJson('"no"')]));
  worker.close();
}

class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;
  bool _closed = false;

  Future<Object?> parseJson(String message) async {
    if (_closed) throw StateError('Closed');
    final completer = Completer<Object?>.sync();
    final id = _idCounter++;
    _activeRequests[id] = completer;
    _commands.send((id, message));
    return await completer.future;
  }

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };

    // Spawn the isolate.
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(receivePort, sendPort);
  }

  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    final (int id, Object? response) = message as (int, Object?);
    final completer = _activeRequests.remove(id)!;

    if (response is RemoteError) {
      completer.completeError(response);
    } else {
      completer.complete(response);
    }

    if (_closed && _activeRequests.isEmpty) _responses.close();
  }

  static void _handleCommandsToIsolate(
    ReceivePort receivePort,
    SendPort sendPort,
  ) {
    receivePort.listen((message) {
      if (message == 'shutdown') {
        receivePort.close();
        return;
      }
      final (int id, String jsonText) = message as (int, String);
      try {
        final jsonData = jsonDecode(jsonText);
        sendPort.send((id, jsonData));
      } catch (e) {
        sendPort.send((id, RemoteError(e.toString(), '')));
      }
    });
  }

  static void _startRemoteIsolate(SendPort sendPort) {
    final receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);
    _handleCommandsToIsolate(receivePort, sendPort);
  }

  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
}