跳到主要内容

隔离区

本页讨论了一些使用 Isolate API 实现隔离区的示例。

每当您的应用程序处理的计算量大到足以暂时阻塞其他计算时,您都应该使用隔离区。最常见的例子是在 Flutter 应用程序中,当您需要执行大量计算时,否则可能会导致 UI 无响应。

没有关于您*必须*何时使用隔离区的规定,但以下是一些隔离区可能有所帮助的更多情况:

  • 解析和解码特大的 JSON 数据块。
  • 处理和压缩照片、音频和视频。
  • 转换音频和视频文件。
  • 在大型列表或文件系统中执行复杂的搜索和过滤。
  • 执行 I/O 操作,例如与数据库通信。
  • 处理大量的网络请求。

实现一个简单的辅助隔离区

#

这些示例实现了一个主隔离区,它会生成一个简单的辅助隔离区。Isolate.run() 简化了设置和管理辅助隔离区的步骤:

  1. 生成(启动并创建)一个隔离区。
  2. 在生成的隔离区上运行一个函数。
  3. 捕获结果。
  4. 将结果返回给主隔离区。
  5. 工作完成后终止该隔离区。
  6. 检查、捕获并将异常和错误抛回给主隔离区。

在新隔离区中运行现有方法

#
  1. 直接在主隔离区中调用 run() 来生成一个新的隔离区(一个后台工作者),同时 main() 等待结果:
dart
const String filename = 'with_keys.json';

void main() async {
  // Read some data.
  final jsonData = await Isolate.run(_readAndParseJson);

  // Use that data.
  print('Number of JSON keys: ${jsonData.length}');
}
  1. 将您希望辅助隔离区执行的函数作为其第一个参数传递。在本例中,它是现有函数 _readAndParseJson()
dart
Future<Map<String, dynamic>> _readAndParseJson() async {
  final fileData = await File(filename).readAsString();
  final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
  return jsonData;
}
  1. Isolate.run() 获取 _readAndParseJson() 返回的结果,并将该值发送回主隔离区,然后关闭辅助隔离区。

  2. 辅助隔离区将保存结果的内存*转移*到主隔离区。它*不会复制*数据。辅助隔离区会执行一次验证,以确保对象可以被转移。

_readAndParseJson() 是一个现有的异步函数,它同样可以很方便地直接在主隔离区中运行。但是,使用 Isolate.run() 来运行它会启用并发。辅助隔离区完全抽象了 _readAndParseJson() 的计算。它可以在不阻塞主隔离区的情况下完成。

Isolate.run() 的结果始终是一个 Future,因为主隔离区中的代码会继续运行。无论辅助隔离区执行的计算是同步还是异步的,都不会影响主隔离区,因为它无论如何都在并发运行。

有关完整的程序,请查看 send_and_receive.dart 示例。

通过隔离区发送闭包

#

您也可以直接在主隔离区中使用函数字面量或闭包,通过 run() 创建一个简单的辅助隔离区。

dart
const String filename = 'with_keys.json';

void main() async {
  // Read some data.
  final jsonData = await Isolate.run(() async {
    final fileData = await File(filename).readAsString();
    final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
    return jsonData;
  });

  // Use that data.
  print('Number of JSON keys: ${jsonData.length}');
}

此示例与上一个示例实现相同的功能。一个新的隔离区生成、计算并返回结果。

然而,现在隔离区发送的是一个闭包。闭包在功能和代码编写方式上都比典型的命名函数限制更少。在此示例中,Isolate.run() 并发执行看起来像本地代码的部分。从这个意义上讲,您可以将 run() 想象成一个用于“并行运行”的控制流运算符。

使用端口在隔离区之间发送多条消息

#

短生命周期的隔离区使用起来很方便,但生成新的隔离区并将对象从一个隔离区复制到另一个隔离区需要性能开销。如果您的代码依赖于使用 Isolate.run 重复运行相同的计算,您可以通过创建不立即退出的长生命周期隔离区来提高性能。

为此,您可以使用 Isolate.run 抽象的一些低级隔离区 API:

本节将介绍在新生成的隔离区和主隔离区之间建立双向通信所需的步骤。第一个示例基本端口在高层次上介绍了该过程。第二个示例健壮端口在第一个示例的基础上逐步添加了更多实用的、真实世界的功能。

ReceivePortSendPort

#

在隔离区之间建立长生命周期通信需要两个类(除了 Isolate):ReceivePortSendPort。这些端口是隔离区之间相互通信的唯一方式。

ReceivePort 是一个处理从其他隔离区发送的消息的对象。这些消息通过 SendPort 发送。

端口的行为类似于 Stream 对象(事实上,接收端口实现了 Stream!)。您可以将 SendPortReceivePort 分别看作 Stream 的 StreamController 和监听器。SendPort 就像一个 StreamController,因为您使用 SendPort.send() 方法向它们“添加”消息,而这些消息由监听器(在本例中是 ReceivePort)处理。然后,ReceivePort 通过将其作为参数传递给您提供的回调函数来处理接收到的消息。

设置端口

#

新生成的隔离区只拥有通过 Isolate.spawn 调用接收到的信息。如果您需要主隔离区在初始创建之后继续与生成的隔离区通信,则必须建立一个通信通道,以便生成的隔离区可以向主隔离区发送消息。隔离区只能通过消息传递进行通信。它们无法“看到”彼此的内存内部,这就是“isolate”这个名称的由来。

要设置这种双向通信,首先在主隔离区中创建一个 ReceivePort,然后在使用 Isolate.spawn 生成新隔离区时,将其 SendPort 作为参数传递给新隔离区。新隔离区随后创建自己的 ReceivePort,并通过主隔离区传递给它的 SendPort 将*其自身*的 SendPort 发送回去。主隔离区收到此 SendPort 后,双方现在都拥有了一个开放的通道来发送和接收消息。

A figure showing events being fed, one by one, into the event loop

  1. 在主隔离区中创建一个 ReceivePortSendPort 作为 ReceivePort 上的一个属性自动创建。
  2. 使用 Isolate.spawn() 生成辅助隔离区
  3. ReceivePort.sendPort 的引用作为第一条消息传递给辅助隔离区。
  4. 在辅助隔离区中创建另一个新的 ReceivePort
  5. 将辅助隔离区的 ReceivePort.sendPort 的引用作为第一条消息*返回*给主隔离区。

除了创建端口和设置通信之外,您还需要告知端口在接收到消息时该做什么。这可以通过在每个各自的 ReceivePort 上使用 listen 方法来完成。

A figure showing events being fed, one by one, into the event loop

  1. 通过主隔离区对辅助隔离区的 SendPort 的引用发送消息。
  2. 通过辅助隔离区的 ReceivePort 上的监听器接收和处理消息。这是您希望从主隔离区移出的计算执行的地方。
  3. 通过辅助隔离区对主隔离区的 SendPort 的引用发送返回消息。
  4. 通过主隔离区的 ReceivePort 上的监听器接收消息。

基本端口示例

#

本示例演示了如何设置一个长生命周期的辅助隔离区,并实现其与主隔离区之间的双向通信。代码使用将 JSON 文本发送到新隔离区,并在那里解析和解码 JSON,然后将其发送回主隔离区的示例。

步骤 1:定义工作器类

#

首先,为您的后台辅助隔离区创建一个类。这个类包含您需要的所有功能,用于:

  • 生成一个隔离区。
  • 向该隔离区发送消息。
  • 让隔离区解码一些 JSON。
  • 将解码后的 JSON 发送回主隔离区。

该类暴露了两个公共方法:一个用于生成辅助隔离区,另一个用于处理向该辅助隔离区发送消息。

本示例的其余部分将逐一向您展示如何填充类方法。

dart
class Worker {
  Future<void> spawn() async {
    // TODO: Add functionality to spawn a worker isolate.
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _startRemoteIsolate(SendPort port) {
    // TODO: Define code that should be executed on the worker isolate.
  }

  Future<void> parseJson(String message) async {
    // TODO: Define a public method that can
    // be used to send messages to the worker isolate.
  }
}

步骤 2:生成一个辅助隔离区

#

Worker.spawn 方法是您将用于创建辅助隔离区并确保其能够接收和发送消息的代码分组的地方。

  • 首先,创建一个 ReceivePort。这允许主隔离区接收从新生成的辅助隔离区发送的消息。
  • 接下来,向接收端口添加一个监听器,以处理辅助隔离区将发送回的消息。传递给监听器的回调函数 _handleResponsesFromIsolate 将在步骤 4 中介绍。
  • 最后,使用 Isolate.spawn 生成辅助隔离区。它需要两个参数:一个将在辅助隔离区上执行的函数(在步骤 3 中介绍),以及接收端口的 sendPort 属性。
dart
Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

receivePort.sendPort 参数在辅助隔离区上调用回调函数(_startRemoteIsolate)时,它将作为参数传递给该回调函数。这是确保辅助隔离区能够将消息发送回主隔离区的第一步。

步骤 3:在辅助隔离区上执行代码

#

在此步骤中,您将定义 _startRemoteIsolate 方法,该方法被发送到辅助隔离区,在它生成时执行。此方法类似于辅助隔离区的“main”方法。

  • 首先,创建另一个新的 ReceivePort。此端口将接收来自主隔离区的未来消息。
  • 接下来,将该端口的 SendPort 发送回主隔离区。
  • 最后,向新的 ReceivePort 添加一个监听器。此监听器处理主隔离区发送给辅助隔离区的消息。
dart
static void _startRemoteIsolate(SendPort port) {
  final receivePort = ReceivePort();
  port.send(receivePort.sendPort);

  receivePort.listen((dynamic message) async {
    if (message is String) {
      final transformed = jsonDecode(message);
      port.send(transformed);
    }
  });
}

辅助隔离区 ReceivePort 上的监听器解码从主隔离区传递过来的 JSON,然后将解码后的 JSON 发送回主隔离区。

此监听器是主隔离区发送到辅助隔离区消息的入口点。**这是您唯一一次有机会告诉辅助隔离区将来要执行的代码。**

步骤 4:在主隔离区上处理消息

#

最后,您需要告诉主隔离区如何处理从辅助隔离区发送回主隔离区的消息。为此,您需要填充 _handleResponsesFromIsolate 方法。回想一下,如步骤 2 所述,此方法已传递给 receivePort.listen 方法。

dart
Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

还请记住,您在步骤 3 中将一个 SendPort 发送回了主隔离区。此方法处理该 SendPort 的接收,以及处理未来的消息(这些消息将是解码后的 JSON)。

  • 首先,检查消息是否为 SendPort。如果是,则将该端口赋值给类的 _sendPort 属性,以便将来可以用来发送消息。
  • 接下来,检查消息是否为 Map<String, dynamic> 类型,这是解码 JSON 的预期类型。如果是,则使用您的应用程序特定逻辑处理该消息。在此示例中,消息被打印出来。
dart
void _handleResponsesFromIsolate(dynamic message) {
  if (message is SendPort) {
    _sendPort = message;
    _isolateReady.complete();
  } else if (message is Map<String, dynamic>) {
    print(message);
  }
}

步骤 5:添加 Completer 以确保您的隔离区已设置完成

#

为了完善该类,定义一个名为 parseJson 的公共方法,它负责向辅助隔离区发送消息。它还需要确保在隔离区完全设置好之前,消息不会被发送。为了处理这种情况,请使用 Completer

  • 首先,添加一个名为 Completer 的类级别属性,并将其命名为 _isolateReady
  • 接下来,如果消息是 SendPort,则在 _handleResponsesFromIsolate 方法(在步骤 4 中创建)中添加对 completer 的 complete() 调用。
  • 最后,在 parseJson 方法中,在添加 _sendPort.send 之前添加 await _isolateReady.future。这确保了在辅助隔离区生成*并且*已将其 SendPort 发送回主隔离区之前,不会向其发送任何消息。
dart
Future<void> parseJson(String message) async {
  await _isolateReady.future;
  _sendPort.send(message);
}

完整示例

#
展开以查看完整示例
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = Worker();
  await worker.spawn();
  await worker.parseJson('{"key":"value"}');
}

class Worker {
  late SendPort _sendPort;
  final Completer<void> _isolateReady = Completer.sync();

  Future<void> spawn() async {
    final receivePort = ReceivePort();
    receivePort.listen(_handleResponsesFromIsolate);
    await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    if (message is SendPort) {
      _sendPort = message;
      _isolateReady.complete();
    } else if (message is Map<String, dynamic>) {
      print(message);
    }
  }

  static void _startRemoteIsolate(SendPort port) {
    final receivePort = ReceivePort();
    port.send(receivePort.sendPort);

    receivePort.listen((dynamic message) async {
      if (message is String) {
        final transformed = jsonDecode(message);
        port.send(transformed);
      }
    });
  }

  Future<void> parseJson(String message) async {
    await _isolateReady.future;
    _sendPort.send(message);
  }

}

健壮端口示例

#

上一个示例解释了设置具有双向通信的长期隔离区所需的基本构建块。如前所述,该示例缺少一些重要功能,例如错误处理、在不再使用时关闭端口的能力,以及在某些情况下消息排序的不一致性。

本示例在第一个示例的信息基础上进行了扩展,通过创建一个具有这些附加功能和更多功能的长期辅助隔离区,并遵循更好的设计模式。尽管此代码与第一个示例有相似之处,但它不是该示例的扩展。

步骤 1:定义工作器类

#

首先,为您的后台辅助隔离区创建一个类。这个类包含您需要的所有功能,用于:

  • 生成一个隔离区。
  • 向该隔离区发送消息。
  • 让隔离区解码一些 JSON。
  • 将解码后的 JSON 发送回主隔离区。

该类暴露了三个公共方法:一个用于创建辅助隔离区,一个用于处理向该辅助隔离区发送消息,另一个用于在端口不再使用时关闭它们。

dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  Future<Object?> parseJson(String message) async {
    // TODO: Ensure the port is still open.
    _commands.send(message);
  }

  static Future<Worker> spawn() async {
    // TODO: Add functionality to create a new Worker object with a
    //  connection to a spawned isolate.
    throw UnimplementedError();
  }

  Worker._(this._responses, this._commands) {
    // TODO: Initialize main isolate receive port listener.
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _handleCommandsToIsolate(ReceivePort rp, SendPort sp) async {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _startRemoteIsolate(SendPort sp) {
    // TODO: Initialize worker isolate's ports.
  }
}

步骤 2:在 Worker.spawn 方法中创建 RawReceivePort

#

在生成隔离区之前,您需要创建一个 RawReceivePort,它是 ReceivePort 的一个低级版本。使用 RawReceivePort 是一种首选模式,因为它允许您将隔离区启动逻辑与处理隔离区上的消息传递逻辑分开。

Worker.spawn 方法中:

  • 首先,创建 RawReceivePort。这个 ReceivePort 只负责接收来自辅助隔离区的初始消息,该消息将是一个 SendPort
  • 接下来,创建一个 Completer,它将指示隔离区何时准备好接收消息。当它完成时,它将返回一个包含 ReceivePortSendPort 的记录。
  • 接下来,定义 RawReceivePort.handler 属性。此属性是一个 Function?,其行为类似于 ReceivePort.listener。当此端口接收到消息时,将调用该函数。
  • 在处理函数中,调用 connection.complete()。此方法需要一个包含 ReceivePortSendPort记录作为参数。SendPort 是从辅助隔离区发送的初始消息,它将在下一步中赋值给类级别的 SendPort,名为 _commands
  • 然后,使用 ReceivePort.fromRawReceivePort 构造函数创建一个新的 ReceivePort,并传入 initPort
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler.
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
  }
}

通过首先创建 RawReceivePort,然后再创建 ReceivePort,您稍后就可以向 ReceivePort.listen 添加新的回调。相反,如果您直接创建 ReceivePort,则只能添加一个 listener,因为 ReceivePort 实现了 Stream,而不是 BroadcastStream

实际上,这允许您将隔离区启动逻辑与在通信设置完成后处理接收消息的逻辑分开。随着其他方法中逻辑的增长,这种好处将变得更加明显。

步骤 3:使用 Isolate.spawn 生成辅助隔离区

#

此步骤继续填充 Worker.spawn 方法。您将添加生成隔离区所需的代码,并从该类返回一个 Worker 实例。在此示例中,对 Isolate.spawn 的调用被包装在 try/catch中,这确保了如果隔离区启动失败,initPort 将被关闭,并且不会创建 Worker 对象。

  • 首先,尝试在 try/catch 块中生成一个辅助隔离区。如果生成辅助隔离区失败,则关闭上一步中创建的接收端口。传递给 Isolate.spawn 的方法将在稍后的步骤中介绍。
  • 接下来,await connection.future,并从它返回的记录中解构发送端口和接收端口。
  • 最后,通过调用其私有构造函数,并传入该 completer 中的端口,返回一个 Worker 实例。
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
    // Spawn the isolate.
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(receivePort, sendPort);
  }
}

请注意,在此示例中(与上一个示例相比),Worker.spawn 充当此类的异步静态构造函数,并且是创建 Worker 实例的唯一方式。这简化了 API,使创建 Worker 实例的代码更简洁。

步骤 4:完成隔离区设置过程

#

在此步骤中,您将完成基本的隔离区设置过程。这与上一个示例几乎完全相关,并且没有新的概念。有一个细微的变化是代码被拆分成了更多的方法,这是一种设计实践,可以为您在示例的其余部分添加更多功能做好准备。有关设置隔离区基本过程的深入讲解,请参见基本端口示例

首先,创建从 Worker.spawn 方法返回的私有构造函数。在构造函数体中,向主隔离区使用的接收端口添加一个监听器,并向该监听器传递一个尚未定义的方法,名为 _handleResponsesFromIsolate

dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }
}

接下来,向 _startRemoteIsolate 添加负责初始化辅助隔离区上端口的代码。回想一下,此方法已在 Worker.spawn 方法中传递给 Isolate.spawn,它将把主隔离区的 SendPort 作为参数传递给它。

  • 创建一个新的 ReceivePort
  • 将该端口的 SendPort 发送回主隔离区。
  • 调用一个名为 _handleCommandsToIsolate 的新方法,并将新的 ReceivePort 和来自主隔离区的 SendPort 都作为参数传递。
dart
static void _startRemoteIsolate(SendPort sendPort) {
  final receivePort = ReceivePort();
  sendPort.send(receivePort.sendPort);
  _handleCommandsToIsolate(receivePort, sendPort);
}

接下来,添加 _handleCommandsToIsolate 方法,它负责接收来自主隔离区的消息,在辅助隔离区上解码 JSON,并将解码后的 JSON 作为响应发送回去。

  • 首先,在辅助隔离区的 ReceivePort 上声明一个监听器。
  • 在添加到监听器的回调函数中,尝试在 try/catch中解码从主隔离区传递的 JSON。如果解码成功,则将解码后的 JSON 发送回主隔离区。
  • 如果发生错误,则返回一个 RemoteError
dart
static void _handleCommandsToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
) {
  receivePort.listen((message) {
    try {
      final jsonData = jsonDecode(message as String);
      sendPort.send(jsonData);
    } catch (e) {
      sendPort.send(RemoteError(e.toString(), ''));
    }
  });
}

接下来,添加 _handleResponsesFromIsolate 方法的代码。

  • 首先,检查消息是否为 RemoteError,如果是,则应 throw 该错误。
  • 否则,打印消息。在未来的步骤中,您将更新此代码以返回消息而不是打印它们。
dart
void _handleResponsesFromIsolate(dynamic message) {
  if (message is RemoteError) {
    throw message;
  } else {
    print(message);
  }
}

最后,添加 parseJson 方法,这是一个公共方法,允许外部代码将 JSON 发送到辅助隔离区进行解码。

dart
Future<Object?> parseJson(String message) async {
  _commands.send(message);
}

您将在下一步中更新此方法。

步骤 5:同时处理多条消息

#

目前,如果您快速向辅助隔离区发送消息,隔离区将以*它们完成的顺序*发送解码后的 JSON 响应,而不是它们被发送的顺序。您无法确定哪个响应对应哪条消息。

在此步骤中,您将通过为每条消息分配一个 ID,并使用 Completer 对象来解决此问题,以确保当外部代码调用 parseJson 时,返回给该调用者的响应是正确的响应。

首先,向 Worker 添加两个类级别属性:

  • Map<int, Completer<Object?>> _activeRequests
  • int _idCounter
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;
  // ···
}

_activeRequests 映射将发送到辅助隔离区的消息与一个 Completer 相关联。_activeRequests 中使用的键取自 _idCounter,它将随着更多消息的发送而增加。

接下来,更新 parseJson 方法,使其在向辅助隔离区发送消息之前创建 completer。

  • 首先创建一个 Completer
  • 接下来,递增 _idCounter,以便每个 Completer 都与一个唯一的数字相关联。
  • _activeRequests 映射中添加一个条目,其中键是当前的 _idCounter 值,值是 completer。
  • 将消息连同 ID 一起发送给辅助隔离区。因为您只能通过 SendPort 发送一个值,所以将 ID 和消息包装在记录中。
  • 最后,返回 completer 的 future,它最终将包含来自辅助隔离区的响应。
dart
Future<Object?> parseJson(String message) async {
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

您还需要更新 _handleResponsesFromIsolate_handleCommandsToIsolate 以处理此系统。

_handleCommandsToIsolate 中,您需要考虑 message 是一个包含两个值的记录,而不仅仅是 JSON 文本。通过从 message 解构值来实现这一点。

然后,在解码 JSON 后,更新对 sendPort.send 的调用,以再次使用记录将 ID 和解码后的 JSON 都传回主隔离区。

dart
static void _handleCommandsToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
) {
  receivePort.listen((message) {
    final (int id, String jsonText) = message as (int, String); // New
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData)); // Updated
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}

最后,更新 _handleResponsesFromIsolate

  • 首先,再次从消息参数中解构出 ID 和响应。
  • 然后,从 _activeRequests 映射中移除与此请求对应的 completer。
  • 最后,不要抛出错误或打印解码后的 JSON,而是完成 completer,并传入响应。当它完成时,响应将返回给在主隔离区上调用 parseJson 的代码。
dart
void _handleResponsesFromIsolate(dynamic message) {
  final (int id, Object? response) = message as (int, Object?); // New
  final completer = _activeRequests.remove(id)!; // New

  if (response is RemoteError) {
    completer.completeError(response); // Updated
  } else {
    completer.complete(response); // Updated
  }
}

步骤 6:添加关闭端口的功能

#

当您的代码不再使用隔离区时,您应该关闭主隔离区和辅助隔离区上的端口。

  • 首先,添加一个类级别的布尔值,用于跟踪端口是否已关闭。
  • 然后,添加 Worker.close 方法。在该方法中:
    • _closed 更新为 true。
    • 向辅助隔离区发送最后一条消息。此消息是一个读取“shutdown”的 String,但它可以是您喜欢的任何对象。您将在下一个代码片段中使用它。
  • 最后,检查 _activeRequests 是否为空。如果为空,则关闭主隔离区中名为 _responsesReceivePort
dart
class Worker {
  bool _closed = false;
  // ···
  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
}
  • 接下来,您需要在辅助隔离区中处理“shutdown”消息。将以下代码添加到 _handleCommandsToIsolate 方法中。此代码将检查消息是否为读取“shutdown”的 String。如果是,它将关闭辅助隔离区的 ReceivePort 并返回。
dart
static void _handleCommandsToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
) {
  receivePort.listen((message) {
    // New if-block.
    if (message == 'shutdown') {
      receivePort.close();
      return;
    }
    final (int id, String jsonText) = message as (int, String);
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData));
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}
  • 最后,您应该添加代码,在尝试发送消息之前检查端口是否已关闭。在 Worker.parseJson 方法中添加一行。
dart
Future<Object?> parseJson(String message) async {
  if (_closed) throw StateError('Closed'); // New
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

完整示例

#
点击此处展开查看完整示例
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = await Worker.spawn();
  print(await worker.parseJson('{"key":"value"}'));
  print(await worker.parseJson('"banana"'));
  print(await worker.parseJson('[true, false, null, 1, "string"]'));
  print(
    await Future.wait([worker.parseJson('"yes"'), worker.parseJson('"no"')]),
  );
  worker.close();
}

class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;
  bool _closed = false;

  Future<Object?> parseJson(String message) async {
    if (_closed) throw StateError('Closed');
    final completer = Completer<Object?>.sync();
    final id = _idCounter++;
    _activeRequests[id] = completer;
    _commands.send((id, message));
    return await completer.future;
  }

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };

    // Spawn the isolate.
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(receivePort, sendPort);
  }

  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    final (int id, Object? response) = message as (int, Object?);
    final completer = _activeRequests.remove(id)!;

    if (response is RemoteError) {
      completer.completeError(response);
    } else {
      completer.complete(response);
    }

    if (_closed && _activeRequests.isEmpty) _responses.close();
  }

  static void _handleCommandsToIsolate(
    ReceivePort receivePort,
    SendPort sendPort,
  ) {
    receivePort.listen((message) {
      if (message == 'shutdown') {
        receivePort.close();
        return;
      }
      final (int id, String jsonText) = message as (int, String);
      try {
        final jsonData = jsonDecode(jsonText);
        sendPort.send((id, jsonData));
      } catch (e) {
        sendPort.send((id, RemoteError(e.toString(), '')));
      }
    });
  }

  static void _startRemoteIsolate(SendPort sendPort) {
    final receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);
    _handleCommandsToIsolate(receivePort, sendPort);
  }

  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
}