内容

本页面讨论了一些使用 Isolate API 来实现隔离的示例。

每当你的应用程序处理的计算量大到足以暂时阻塞其他计算时,你都应该使用隔离。Flutter 应用程序是最常见的示例,当你需要执行可能导致 UI 无响应的大量计算时。

没有关于何时必须使用隔离的规则,但以下是一些它们可能派上用场的情况

  • 解析和解码特别大的 JSON 数据块。
  • 处理和压缩照片、音频和视频。
  • 转换音频和视频文件。
  • 对大型列表或文件系统内执行复杂的搜索和筛选。
  • 执行 I/O,例如与数据库通信。
  • 处理大量网络请求。

实现一个简单的工作者隔离

#

这些示例实现了一个主隔离,它生成一个简单的辅助隔离。Isolate.run() 简化了设置和管理辅助隔离的步骤

  1. 生成(启动和创建)一个隔离。
  2. 在生成的隔离上运行一个函数。
  3. 捕获结果。
  4. 将结果返回给主隔离。
  5. 工作完成后终止隔离。
  6. 检查、捕获并向主隔离抛出异常和错误。

在新隔离中运行现有方法

#
  1. 主隔离 中直接调用 run() 来生成一个新隔离(一个 后台辅助进程),同时 main() 等待结果
dart
const String filename = 'with_keys.json';

void main() async {
  // Read some data.
  final jsonData = await Isolate.run(_readAndParseJson);

  // Use that data.
  print('Number of JSON keys: ${jsonData.length}');
}
  1. 将你希望辅助隔离执行的函数作为其第一个参数传递给辅助隔离。在此示例中,它是现有的函数 _readAndParseJson()
dart
Future<Map<String, dynamic>> _readAndParseJson() async {
  final fileData = await File(filename).readAsString();
  final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
  return jsonData;
}
  1. Isolate.run() 获取 _readAndParseJson() 返回的结果,并将其值发送回主隔离,关闭辅助隔离。

  2. 辅助隔离将保存结果的内存传输到主隔离。它不会复制数据。辅助隔离执行验证传递,以确保允许传输对象。

_readAndParseJson() 是一个现有的异步函数,它可以轻松地在主隔离中直接运行。使用 Isolate.run() 来运行它可以实现并发。辅助隔离完全抽象了 _readAndParseJson() 的计算。它可以在不阻塞主隔离的情况下完成。

Isolate.run() 的结果始终是 Future,因为主隔离中的代码会继续运行。辅助隔离执行的计算是同步还是异步不会影响主隔离,因为无论哪种方式它都会并发运行。

如需了解完整程序,请查看 send_and_receive.dart 示例。

使用隔离发送闭包

#

您还可以使用函数文字或闭包直接在主隔离区中使用 run() 创建一个简单的工作者隔离区。

dart
const String filename = 'with_keys.json';

void main() async {
  // Read some data.
  final jsonData = await Isolate.run(() async {
    final fileData = await File(filename).readAsString();
    final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
    return jsonData;
  });

  // Use that data.
  print('Number of JSON keys: ${jsonData.length}');
}

此示例与上一个示例实现相同的功能。一个新的隔离区会生成并计算一些内容,然后将结果发送回来。

但是,现在隔离区会发送一个 闭包。与典型的命名函数相比,闭包的限制更少,无论是其功能还是在代码中书写的方式。在此示例中,Isolate.run() 执行看起来像本地代码的内容,同时执行。从这个意义上说,您可以将 run() 想象成一个用于“并行运行”的控制流运算符。

使用端口在隔离之间发送多条消息

#

使用短生命周期隔离区很方便,但需要性能开销来生成新的隔离区并将对象从一个隔离区复制到另一个隔离区。如果您的代码依赖于使用 Isolate.run 重复运行相同的计算,则可以通过创建不会立即退出的长生命周期隔离区来提高性能。

为此,您可以使用 Isolate.run 抽象的一些底层隔离区 API

本部分介绍在新建的隔离区和 主隔离区 之间建立双向通信所需的步骤。第一个示例 基本端口 以高级别介绍了该过程。第二个示例 健壮端口 逐渐向第一个示例添加更多实际的现实世界功能。

ReceivePortSendPort

#

在隔离区之间建立长生命周期通信需要两个类(除了 Isolate):ReceivePortSendPort。这些端口是隔离区之间进行通信的唯一方式。

ReceivePort 是一个处理从其他隔离区发送的消息的对象。这些消息通过 SendPort 发送。

端口的行为类似于 Stream 对象(事实上,接收端口实现了 Stream!)您可以将 SendPortReceivePort 分别视为 Stream 的 StreamController 和侦听器。SendPort 类似于 StreamController,因为您可以使用 SendPort.send() 方法 向其中“添加”消息,并且这些消息由侦听器(在本例中为 ReceivePort)处理。然后,ReceivePort 通过将接收到的消息作为参数传递给您提供的回调函数来处理这些消息。

设置端口

#

新生成的隔离进程仅具有通过 Isolate.spawn 调用接收到的信息。如果您需要主隔离进程在最初创建后继续与生成的隔离进程通信,则必须设置一个通信通道,以便生成的隔离进程可以向主隔离进程发送消息。隔离进程只能通过消息传递进行通信。它们无法“看到”彼此的内存,这就是“隔离”一词的由来。

要设置这种双向通信,首先在主隔离进程中创建一个 ReceivePort,然后在使用 Isolate.spawn 生成新隔离进程时,将其 SendPort 作为参数传递给新隔离进程。然后,新隔离进程创建自己的 ReceivePort,并将 SendPort 通过主隔离进程传递的 SendPort 发送回去。主隔离进程接收此 SendPort,现在双方都有一个开放的通道来发送和接收消息。

A figure showing events being fed, one by one, into the event loop

  1. 在主隔离进程中创建一个 ReceivePortSendPort 会自动创建为 ReceivePort 上的属性。
  2. 使用 Isolate.spawn() 生成工作进程隔离进程
  3. 将对 ReceivePort.sendPort 的引用作为第一条消息传递给工作进程隔离进程。
  4. 在工作进程隔离进程中创建另一个新的 ReceivePort
  5. 将对工作进程隔离进程的 ReceivePort.sendPort 的引用作为第一条消息返回给主隔离进程。

除了创建端口和设置通信之外,您还需要告诉端口在收到消息时执行什么操作。这是通过在每个相应的 ReceivePort 上使用 listen 方法来完成的。

A figure showing events being fed, one by one, into the event loop

  1. 通过主隔离进程对工作进程隔离进程的 SendPort 的引用发送消息。
  2. 通过工作进程隔离进程的 ReceivePort 上的侦听器接收和处理消息。这是您想要从主隔离进程中移出的计算执行的位置。
  3. 通过工作者隔离区对主隔离区的SendPort的引用发送返回消息。
  4. 通过主隔离区的ReceivePort上的侦听器接收消息。

基本端口示例

#

此示例演示了如何设置一个长期工作者隔离区,并使其与主隔离区之间实现双向通信。该代码使用向新隔离区发送 JSON 文本的示例,其中 JSON 将被解析和解码,然后再发送回主隔离区。

步骤 1:定义工作者类

#

首先,为您的后台工作者隔离区创建一个类。此类包含执行以下所有操作所需的所有功能

  • 生成隔离区。
  • 向该隔离区发送消息。
  • 让隔离区解码一些 JSON。
  • 将解码后的 JSON 发送回主隔离区。

该类公开了两个公共方法:一个生成工作者隔离区的方法,另一个处理向该工作者隔离区发送消息的方法。

本示例中的其余部分将向您展示如何逐个填写类方法。

dart
class Worker {
  Future<void> spawn() async {
    // TODO: Add functionality to spawn a worker isolate.
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _startRemoteIsolate(SendPort port) {
    // TODO: Define code that should be executed on the worker isolate.
  }

  Future<void> parseJson(String message) async {
    // TODO: Define a public method that can
    // be used to send messages to the worker isolate.
  }
}

步骤 2:生成工作者隔离区

#

Worker.spawn 方法是您将用于将创建工作者隔离区的代码分组在一起并确保它可以接收和发送消息的位置。

  • 首先,创建一个ReceivePort。这允许主隔离区接收从新生成的 worker 隔离区发送的消息。
  • 接下来,向接收端口添加一个侦听器来处理工作者隔离区将发送回的消息。传递给侦听器的回调 _handleResponsesFromIsolate 将在步骤 4中介绍。
  • 最后,使用Isolate.spawn生成工作者隔离区。它需要两个参数:在工作者隔离区上执行的函数(在步骤 3中介绍)和接收端口的sendPort属性。
dart
Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

当在工作者隔离区上调用时,receivePort.sendPort参数将作为参数传递给回调 (_startRemoteIsolate)。这是确保工作者隔离区有办法向主隔离区发送回消息的第一步。

步骤 3:在工作者隔离区上执行代码

#

在此步骤中,您定义了在生成时发送到工作者隔离区以执行的方法 _startRemoteIsolate。此方法类似于工作者隔离区的“main”方法。

  • 首先,创建一个新的 ReceivePort。此端口接收来自主隔离区中的未来消息。
  • 接下来,将该端口的 SendPort 发送回主隔离区。
  • 最后,向新的 ReceivePort 添加一个侦听器。此侦听器处理主隔离区发送到工作隔离区的消息。
dart
static void _startRemoteIsolate(SendPort port) {
  final receivePort = ReceivePort();
  port.send(receivePort.sendPort);

  receivePort.listen((dynamic message) async {
    if (message is String) {
      final transformed = jsonDecode(message);
      port.send(transformed);
    }
  });
}

工作隔离区中的 ReceivePort 上的侦听器解码从主隔离区传递的 JSON,然后将解码后的 JSON 发送回主隔离区。

此侦听器是从主隔离区向工作隔离区发送消息的入口点。这是您唯一的机会来告诉工作隔离区将来要执行什么代码。

步骤 4:在主隔离区处理消息

#

最后,您需要告诉主隔离区如何处理从工作隔离区发送回主隔离区的消息。为此,您需要填写 _handleResponsesFromIsolate 方法。请回想一下,此方法传递给 receivePort.listen 方法,如 步骤 2 中所述

dart
Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

还要回想一下,您在 步骤 3 中将 SendPort 发送回主隔离区。此方法处理该 SendPort 的接收,以及处理未来消息(这些消息将是解码后的 JSON)。

  • 首先,检查消息是否为 SendPort。如果是,则将该端口分配给类的 _sendPort 属性,以便稍后可用于发送消息。
  • 接下来,检查消息是否为 Map<String, dynamic> 类型,即解码后的 JSON 的预期类型。如果是,则使用特定于应用程序的逻辑处理该消息。在此示例中,打印该消息。
dart
void _handleResponsesFromIsolate(dynamic message) {
  if (message is SendPort) {
    _sendPort = message;
    _isolateReady.complete();
  } else if (message is Map<String, dynamic>) {
    print(message);
  }
}

步骤 5:添加一个完成器以确保您的隔离区已设置

#

要完成该类,请定义一个名为 parseJson 的公共方法,该方法负责向工作隔离区发送消息。它还需要确保在隔离区完全设置之前可以发送消息。要处理此问题,请使用 Completer

  • 首先,添加一个名为 Completer 的类级属性,并将其命名为 _isolateReady
  • 接下来,如果消息为 SendPort,则在 _handleResponsesFromIsolate 方法(在 步骤 4 中创建)中对完成器调用 complete()
  • 最后,在 parseJson 方法中,在添加 _sendPort.send 之前添加 await _isolateReady.future。这确保在生成工作隔离区将其 SendPort 发送回主隔离区之前,不会向工作隔离区发送任何消息。
dart
Future<void> parseJson(String message) async {
  await _isolateReady.future;
  _sendPort.send(message);
}

完整示例

#
展开以查看完整示例
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = Worker();
  await worker.spawn();
  await worker.parseJson('{"key":"value"}');
}

class Worker {
  late SendPort _sendPort;
  final Completer<void> _isolateReady = Completer.sync();

  Future<void> spawn() async {
    final receivePort = ReceivePort();
    receivePort.listen(_handleResponsesFromIsolate);
    await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    if (message is SendPort) {
      _sendPort = message;
      _isolateReady.complete();
    } else if (message is Map<String, dynamic>) {
      print(message);
    }
  }

  static void _startRemoteIsolate(SendPort port) {
    final receivePort = ReceivePort();
    port.send(receivePort.sendPort);

    receivePort.listen((dynamic message) async {
      if (message is String) {
        final transformed = jsonDecode(message);
        port.send(transformed);
      }
    });
  }

  Future<void> parseJson(String message) async {
    await _isolateReady.future;
    _sendPort.send(message);
  }
}

可靠端口示例

#

上一个示例 解释了设置具有双向通信的长生命周期隔离区所需的基本构建块。如前所述,该示例缺少一些重要功能,例如错误处理、在不再使用时关闭端口的能力以及在某些情况下消息排序不一致。

此示例通过创建具有以下附加功能等更多功能且遵循更佳设计模式的长驻工作者隔离来扩展第一个示例中的信息。尽管此代码与第一个示例有相似之处,但它并不是该示例的扩展。

步骤 1:定义工作者类

#

首先,为您的后台工作者隔离区创建一个类。此类包含执行以下所有操作所需的所有功能

  • 生成隔离区。
  • 向该隔离区发送消息。
  • 让隔离区解码一些 JSON。
  • 将解码后的 JSON 发送回主隔离区。

该类公开了三个公共方法:一个用于创建工作者隔离,一个用于处理向该工作者隔离发送消息,一个用于在不再使用时关闭端口。

dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  Future<Object?> parseJson(String message) async {
    // TODO: Ensure the port is still open.
    _commands.send(message);
  }

  static Future<Worker> spawn() async {
    // TODO: Add functionality to create a new Worker object with a
    //  connection to a spawned isolate.
    throw UnimplementedError();
  }

  Worker._(this._commands, this._responses) {
    // TODO: Initialize main isolate receive port listener.
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _handleCommandsToIsolate(ReceivePort rp, SendPort sp) async {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _startRemoteIsolate(SendPort sp) {
    // TODO: Initialize worker isolate's ports.
  }
}

步骤 2:在 Worker.spawn 方法中创建 RawReceivePort

#

在生成隔离之前,您需要创建一个 RawReceivePort,它是一个较低级别的 ReceivePort。使用 RawReceivePort 是首选模式,因为它允许您将隔离启动逻辑与处理隔离上消息传递的逻辑分开。

Worker.spawn 方法中

  • 首先,创建 RawReceivePort。此 ReceivePort 仅负责接收来自工作者隔离的初始消息,该消息将是 SendPort
  • 接下来,创建一个 Completer,它将指示隔离何时准备好接收消息。完成后,它将返回包含 ReceivePortSendPort 的记录。
  • 接下来,定义 RawReceivePort.handler 属性。此属性是一个 Function?,其行为类似于 ReceivePort.listener。当此端口收到消息时,将调用该函数。
  • 在处理程序函数中,调用 connection.complete()。此方法期望一个 记录,其中包含 ReceivePortSendPort 作为参数。SendPort 是从工作者隔离发送的初始消息,它将在下一步中分配给名为 _commands 的类级别 SendPort
  • 然后,使用 ReceivePort.fromRawReceivePort 构造函数创建一个新的 ReceivePort,并传入 initPort
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler.
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
// ···
  }

通过先创建一个 RawReceivePort,再创建一个 ReceivePort,您以后可以向 ReceivePort.listen 添加一个新的回调。相反,如果您直接创建一个 ReceivePort,则只能添加一个 listener,因为 ReceivePort 实现的是 Stream,而不是 BroadcastStream

实际上,这允许您将隔离启动逻辑与在完成通信设置后处理接收消息的逻辑分开。随着其他方法中的逻辑增长,此优势将变得更加明显。

步骤 3:使用 Isolate.spawn 生成一个工作者隔离

#

此步骤继续填写 Worker.spawn 方法。您将添加生成隔离所需的代码,并从此类返回 Worker 的一个实例。在此示例中,对 Isolate.spawn 的调用被包装在一个 try/catch 中,该块确保如果隔离启动失败,则 initPort 将被关闭,并且 Worker 对象不会被创建。

  • 首先,尝试在 try/catch 块中生成一个工作者隔离。如果生成工作者隔离失败,则关闭在上一步中创建的接收端口。传递给 Isolate.spawn 的方法将在后面的步骤中介绍。
  • 接下来,等待 connection.future,并从它返回的记录中解构发送端口和接收端口。
  • 最后,通过调用其私有构造函数并传入来自该完成器的端口来返回 Worker 的一个实例。
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
    // Spawn the isolate.
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(sendPort, receivePort);
  }

请注意,在此示例中(与 上一个示例 相比),Worker.spawn 充当此类的异步静态构造函数,并且是创建 Worker 实例的唯一方法。这简化了 API,使创建 Worker 实例的代码更简洁。

步骤 4:完成隔离设置过程

#

在此步骤中,您将完成基本的隔离设置过程。这几乎完全与 上一个示例 相关,并且没有新概念。有一个细微的变化,即代码被分解为更多的方法,这是一种设计实践,它使您能够通过本示例的其余部分添加更多功能。有关设置隔离的基本过程的深入演练,请参见 基本端口示例

首先,创建从 Worker.spawn 方法返回的私有构造函数。在构造函数主体中,向主隔离使用的接收端口添加一个侦听器,并向该侦听器传递一个尚未定义的方法,称为 _handleResponsesFromIsolate

dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
// ···
  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

接下来,将代码添加到 _startRemoteIsolate,该代码负责初始化工作进程隔离中的端口。回想一下,此方法已在 Worker.spawn 方法中传递给 Isolate.spawn,并且它将作为参数传递主隔离的 SendPort

  • 创建一个新的 ReceivePort
  • 将该端口的 SendPort 发送回主隔离。
  • 调用名为 _handleCommandsToIsolate 的新方法,并将来自主隔离的新 ReceivePortSendPort 作为参数传递。
dart
static void _startRemoteIsolate(SendPort sendPort) {
  final receivePort = ReceivePort();
  sendPort.send(receivePort.sendPort);
  _handleCommandsToIsolate(receivePort, sendPort);
}

接下来,添加 _handleCommandsToIsolate 方法,该方法负责接收来自主隔离的消息,在工作进程隔离中解码 json,并将解码后的 json 作为响应发送回。

  • 首先,在工作进程隔离的 ReceivePort 上声明一个侦听器。
  • 在添加到侦听器的回调中,尝试在 try/catch 中解码从主隔离传递的 JSON。如果解码成功,将解码后的 JSON 发送回主隔离。
  • 如果出现错误,请发送回 RemoteError
dart
static void _handleCommandsToIsolate(
    ReceivePort receivePort, SendPort sendPort) {
  receivePort.listen((message) {
    try {
      final jsonData = jsonDecode(message as String);
      sendPort.send(jsonData);
    } catch (e) {
      sendPort.send(RemoteError(e.toString(), ''));
    }
  });
}

接下来,添加 _handleResponsesFromIsolate 方法的代码。

  • 首先,检查消息是否为 RemoteError,如果是,则应 throw 该错误。
  • 否则,打印消息。在以后的步骤中,你将更新此代码以返回消息,而不是打印它们。
dart
void _handleResponsesFromIsolate(dynamic message) {
  if (message is RemoteError) {
    throw message;
  } else {
    print(message);
  }
}

最后,添加 parseJson 方法,这是一个公共方法,允许外部代码将 JSON 发送到工作进程隔离以进行解码。

dart
Future<Object?> parseJson(String message) async {
  _commands.send(message);
}

你将在下一步中更新此方法。

步骤 5:同时处理多条消息

#

目前,如果你快速向工作进程隔离发送消息,则隔离将以 完成的顺序 发送解码后的 json 响应,而不是发送的顺序。你无法确定哪个响应对应于哪个消息。

在此步骤中,你将通过为每条消息分配一个 ID,并使用 Completer 对象来确保当外部代码调用 parseJson 时,返回给该调用者的响应是正确的响应,从而解决此问题。

首先,向 Worker 添加两个类级属性

  • Map<int, Completer<Object?>> _activeRequests
  • int _idCounter
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;

_activeRequests 映射将发送到工作进程隔离的消息与 Completer 关联起来。_activeRequests 中使用的键取自 _idCounter,随着发送更多消息,_idCounter 将增加。

接下来,更新 parseJson 方法以在向工作进程隔离发送消息之前创建完成器。

  • 首先创建一个 Completer
  • 接下来,递增 _idCounter,以便每个 Completer 与一个唯一数字关联。
  • _activeRequests 映射中添加一个条目,其中键是 _idCounter 的当前数字,而完成器是值。
  • 将消息连同 ID 一起发送到工作者隔离区。因为你只能通过 SendPort 发送一个值,所以将 ID 和消息封装在一个 记录 中。
  • 最后,返回完成器的 future,它最终将包含来自工作者隔离区的响应。
dart
Future<Object?> parseJson(String message) async {
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

你还需要更新 _handleResponsesFromIsolate_handleCommandsToIsolate 来处理此系统。

_handleCommandsToIsolate 中,你需要将 message 视为具有两个值的记录,而不仅仅是 json 文本。通过从 message 解构值来执行此操作。

然后,在解码 json 后,更新对 sendPort.send 的调用,以使用记录将 ID 和解码后的 json 传回主隔离区。

dart
static void _handleCommandsToIsolate(
    ReceivePort receivePort, SendPort sendPort) {
  receivePort.listen((message) {
    final (int id, String jsonText) = message as (int, String); // New
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData)); // Updated
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}

最后,更新 _handleResponsesFromIsolate

  • 首先,再次从消息参数中解构 ID 和响应。
  • 然后,从 _activeRequests 映射中删除与此请求相对应的完成器。
  • 最后,不要抛出错误或打印解码后的 json,而是完成完成器,并传入响应。完成此操作后,响应将返回给在主隔离区上调用 parseJson 的代码。
dart
void _handleResponsesFromIsolate(dynamic message) {
  final (int id, Object? response) = message as (int, Object?); // New
  final completer = _activeRequests.remove(id)!; // New

  if (response is RemoteError) {
    completer.completeError(response); // Updated
  } else {
    completer.complete(response); // Updated
  }
}

步骤 6:添加关闭端口的功能

#

当你的代码不再使用隔离区时,你应该关闭主隔离区和工作者隔离区上的端口。

  • 首先,添加一个类级布尔值来跟踪端口是否已关闭。
  • 然后,添加 Worker.close 方法。在此方法中
    • _closed 更新为 true。
    • 向工作者隔离区发送最后一条消息。此消息是一个读取“shutdown”的 String,但它可以是你想要的任何对象。你将在下一个代码段中使用它。
  • 最后,检查 _activeRequests 是否为空。如果是,则关闭名为 _responses 的主隔离区的 ReceivePort
dart
class Worker {
  bool _closed = false;
// ···
  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
  • 接下来,你需要在工作者隔离区中处理“shutdown”消息。将以下代码添加到 _handleCommandsToIsolate 方法。此代码将检查消息是否为读取“shutdown”的 String。如果是,它将关闭工作者隔离区的 ReceivePort 并返回。
dart
static void _handleCommandsToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
) {
  receivePort.listen((message) {
    // New if-block.
    if (message == 'shutdown') {
      receivePort.close();
      return;
    }
    final (int id, String jsonText) = message as (int, String);
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData));
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}
  • 最后,你应该添加代码来检查端口是否已关闭,然后再尝试发送消息。在 Worker.parseJson 方法中添加一行。
dart
Future<Object?> parseJson(String message) async {
  if (_closed) throw StateError('Closed'); // New
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

完整示例

#
在此展开以查看完整示例
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = await Worker.spawn();
  print(await worker.parseJson('{"key":"value"}'));
  print(await worker.parseJson('"banana"'));
  print(await worker.parseJson('[true, false, null, 1, "string"]'));
  print(
      await Future.wait([worker.parseJson('"yes"'), worker.parseJson('"no"')]));
  worker.close();
}

class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;
  bool _closed = false;

  Future<Object?> parseJson(String message) async {
    if (_closed) throw StateError('Closed');
    final completer = Completer<Object?>.sync();
    final id = _idCounter++;
    _activeRequests[id] = completer;
    _commands.send((id, message));
    return await completer.future;
  }

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };

    // Spawn the isolate.
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(receivePort, sendPort);
  }

  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    final (int id, Object? response) = message as (int, Object?);
    final completer = _activeRequests.remove(id)!;

    if (response is RemoteError) {
      completer.completeError(response);
    } else {
      completer.complete(response);
    }

    if (_closed && _activeRequests.isEmpty) _responses.close();
  }

  static void _handleCommandsToIsolate(
    ReceivePort receivePort,
    SendPort sendPort,
  ) {
    receivePort.listen((message) {
      if (message == 'shutdown') {
        receivePort.close();
        return;
      }
      final (int id, String jsonText) = message as (int, String);
      try {
        final jsonData = jsonDecode(jsonText);
        sendPort.send((id, jsonData));
      } catch (e) {
        sendPort.send((id, RemoteError(e.toString(), '')));
      }
    });
  }

  static void _startRemoteIsolate(SendPort sendPort) {
    final receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);
    _handleCommandsToIsolate(receivePort, sendPort);
  }

  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
}