目录

隔离

本页面讨论一些使用 Isolate API 来实现隔离的示例。

当您的应用程序处理足够大的计算以暂时阻塞其他计算时,您应该使用隔离。最常见的例子是在 Flutter 应用程序中,当您需要执行可能导致 UI 无响应的大型计算时。

关于何时必须使用隔离没有任何规则,但以下是一些更有用的情况:

  • 解析和解码非常大的 JSON 数据块。
  • 处理和压缩照片、音频和视频。
  • 转换音频和视频文件。
  • 对大型列表或文件系统执行复杂的搜索和过滤。
  • 执行 I/O 操作,例如与数据库通信。
  • 处理大量的网络请求。

实现简单的 worker 隔离

#

这些示例实现了一个主隔离,它会生成一个简单的 worker 隔离。 Isolate.run() 简化了设置和管理 worker 隔离的步骤

  1. 生成(启动并创建)一个隔离。
  2. 在生成的隔离上运行一个函数。
  3. 捕获结果。
  4. 将结果返回到主隔离。
  5. 工作完成后终止隔离。
  6. 检查、捕获并将异常和错误抛回主隔离。

在新隔离中运行现有方法

#
  1. 调用 run() 以在 主隔离中直接生成一个新的隔离(后台 worker),同时 main() 等待结果
dart
const String filename = 'with_keys.json';

void main() async {
  // Read some data.
  final jsonData = await Isolate.run(_readAndParseJson);

  // Use that data.
  print('Number of JSON keys: ${jsonData.length}');
}
  1. 将您希望 worker 隔离执行的函数作为其第一个参数传递。在本例中,它是现有函数 _readAndParseJson()
dart
Future<Map<String, dynamic>> _readAndParseJson() async {
  final fileData = await File(filename).readAsString();
  final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
  return jsonData;
}
  1. Isolate.run() 获取 _readAndParseJson() 返回的结果,并将值发送回主隔离,从而关闭 worker 隔离。

  2. worker 隔离转移存储结果的内存到主隔离。它不复制数据。worker 隔离执行验证过程,以确保允许传输对象。

_readAndParseJson() 是一个现有的异步函数,它可以很容易地直接在主隔离中运行。使用 Isolate.run() 来运行它反而实现了并发。worker 隔离完全抽象了 _readAndParseJson() 的计算。它可以在不阻塞主隔离的情况下完成。

Isolate.run() 的结果始终是一个 Future,因为主隔离中的代码会继续运行。worker 隔离执行的计算是同步还是异步不会影响主隔离,因为无论如何它都在并发运行。

有关完整的程序,请查看 send_and_receive.dart 示例。

使用隔离发送闭包

#

您还可以使用函数字面量或闭包,直接在主隔离中使用 run() 创建简单的 worker 隔离。

dart
const String filename = 'with_keys.json';

void main() async {
  // Read some data.
  final jsonData = await Isolate.run(() async {
    final fileData = await File(filename).readAsString();
    final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
    return jsonData;
  });

  // Use that data.
  print('Number of JSON keys: ${jsonData.length}');
}

此示例实现了与上一个示例相同的功能。生成一个新的隔离,计算一些内容,并将结果发送回去。

但是,现在隔离发送一个闭包。闭包在功能和如何编写到代码中方面都比典型的命名函数更少限制。在此示例中,Isolate.run() 并发执行看起来像本地代码的内容。从这个意义上讲,您可以想象 run() 的工作方式类似于“并行运行”的控制流运算符。

使用端口在隔离之间发送多个消息

#

短时隔离使用方便,但需要性能开销来生成新的隔离并将对象从一个隔离复制到另一个隔离。如果您的代码依赖于使用 Isolate.run 重复运行相同的计算,那么您可以通过创建不立即退出的长时隔离来提高性能。

为此,您可以使用 Isolate.run 抽象的一些低级隔离 API

本节介绍了在新生成的隔离和 主隔离之间建立双向通信所需的步骤。第一个示例,基本端口,从高层次介绍了该过程。第二个示例,健壮端口,逐渐为第一个示例添加更多实用、真实的的功能。

ReceivePortSendPort

#

在隔离之间设置长时通信需要两个类(除了 Isolate 之外):ReceivePortSendPort。这些端口是隔离之间相互通信的唯一方式。

ReceivePort 是一个处理从其他隔离发送的消息的对象。这些消息通过 SendPort 发送。

端口的行为类似于 Stream 对象(实际上,接收端口实现了 Stream!)。您可以将 SendPortReceivePort 分别看作是 Stream 的 StreamController 和监听器。SendPort 类似于 StreamController,因为您可以使用 SendPort.send() 方法向它们“添加”消息,而这些消息由监听器处理,在本例中是 ReceivePort。然后 ReceivePort 通过将它们作为参数传递给您提供的回调来处理它接收到的消息。

设置端口

#

新生成的隔离只具有它通过 Isolate.spawn 调用接收的信息。如果您需要主隔离在初始创建之后继续与生成的隔离通信,您必须设置一个通信通道,以便生成的隔离可以将消息发送到主隔离。隔离只能通过消息传递进行通信。它们无法“看到”彼此内存的内部,这也是“隔离”这个名称的由来。

要建立这种双向通信,首先在主隔离区中创建一个 ReceivePort,然后在用 Isolate.spawn 生成新的隔离区时,将其 SendPort 作为参数传递给新的隔离区。新的隔离区然后创建自己的 ReceivePort,并将其自身的 SendPort 发送回主隔离区传递给它的 SendPort。主隔离区接收到这个 SendPort,现在双方都有一个开放的通道来发送和接收消息。

A figure showing events being fed, one by one, into the event loop

  1. 在主隔离区中创建一个 ReceivePortSendPort 会自动创建为 ReceivePort 的属性。
  2. 使用 Isolate.spawn() 生成工作隔离区。
  3. ReceivePort.sendPort 的引用作为第一条消息传递给工作隔离区。
  4. 在工作隔离区中创建另一个新的 ReceivePort
  5. 将工作隔离区的 ReceivePort.sendPort 的引用作为第一条消息返回给主隔离区。

除了创建端口和设置通信外,您还需要告诉端口在接收到消息时该怎么做。这是通过在每个相应的 ReceivePort 上使用 listen 方法完成的。

A figure showing events being fed, one by one, into the event loop

  1. 通过主隔离区对工作隔离区的 SendPort 的引用发送消息。
  2. 通过工作隔离区的 ReceivePort 上的监听器接收并处理消息。 这是执行您想要从主隔离区移出的计算的地方。
  3. 通过工作隔离区对主隔离区的 SendPort 的引用发送返回消息。
  4. 通过主隔离区的 ReceivePort 上的监听器接收消息。

基本端口示例

#

此示例演示了如何设置一个长期的工作隔离区,该隔离区与主隔离区之间进行双向通信。该代码使用将 JSON 文本发送到新隔离区的示例,在该隔离区中,JSON 将被解析和解码,然后再发送回主隔离区。

步骤 1:定义工作类

#

首先,为您的后台工作隔离区创建一个类。此类包含您需要的所有功能:

  • 生成一个隔离区。
  • 向该隔离区发送消息。
  • 让隔离区解码一些 JSON。
  • 将解码后的 JSON 发送回主隔离区。

该类公开了两个公共方法:一个用于生成工作隔离区,另一个用于处理向该工作隔离区发送消息。

本示例的其余部分将向您展示如何逐步填充类方法。

dart
class Worker {
  Future<void> spawn() async {
    // TODO: Add functionality to spawn a worker isolate.
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _startRemoteIsolate(SendPort port) {
    // TODO: Define code that should be executed on the worker isolate.
  }

  Future<void> parseJson(String message) async {
    // TODO: Define a public method that can
    // be used to send messages to the worker isolate.
  }
}

步骤 2:生成工作隔离区

#

Worker.spawn 方法是您将用于创建工作隔离区并确保它可以接收和发送消息的代码分组的位置。

  • 首先,创建一个 ReceivePort。这允许主隔离区接收从新生成的工作隔离区发送的消息。
  • 接下来,向接收端口添加一个监听器,以处理工作隔离区将发送回的消息。传递给监听器的回调 _handleResponsesFromIsolate 将在 步骤 4 中介绍。
  • 最后,使用 Isolate.spawn 生成工作隔离区。它需要两个参数:一个要在工作隔离区上执行的函数(在 步骤 3 中介绍)和接收端口的 sendPort 属性。
dart
Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

当在工作隔离区上调用回调 (_startRemoteIsolate) 时,receivePort.sendPort 参数将作为参数传递给该回调。这是确保工作隔离区能够将消息发送回主隔离区的第一步。

步骤 3:在工作隔离区上执行代码

#

在此步骤中,您定义了发送到工作隔离区以在生成时执行的方法 _startRemoteIsolate。此方法类似于工作隔离区的“main”方法。

  • 首先,创建另一个新的 ReceivePort。此端口接收来自主隔离区的未来消息。
  • 接下来,将该端口的 SendPort 发送回主隔离区。
  • 最后,向新的 ReceivePort 添加一个监听器。此监听器处理主隔离区发送给工作隔离区的消息。
dart
static void _startRemoteIsolate(SendPort port) {
  final receivePort = ReceivePort();
  port.send(receivePort.sendPort);

  receivePort.listen((dynamic message) async {
    if (message is String) {
      final transformed = jsonDecode(message);
      port.send(transformed);
    }
  });
}

工作区的 ReceivePort 上的监听器解码从主隔离区传递的 JSON,然后将解码后的 JSON 发送回主隔离区。

此监听器是从主隔离区发送到工作隔离区的消息的入口点。这是您告诉工作隔离区将来要执行什么代码的唯一机会。

步骤 4:处理主隔离区上的消息

#

最后,您需要告诉主隔离区如何处理从工作隔离区发送回主隔离区的消息。为此,您需要填写 _handleResponsesFromIsolate 方法。回想一下,此方法被传递给 receivePort.listen 方法,如 步骤 2 中所述。

dart
Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

还记得您在 步骤 3 中将 SendPort 发送回了主隔离区。此方法处理该 SendPort 的接收以及处理未来的消息(这将是解码后的 JSON)。

  • 首先,检查消息是否为 SendPort。如果是,则将该端口分配给该类的 _sendPort 属性,以便以后可以使用它来发送消息。
  • 接下来,检查消息的类型是否为 Map<String, dynamic>,这是解码后的 JSON 的预期类型。如果是,请使用特定于您的应用程序的逻辑处理该消息。在此示例中,将打印该消息。
dart
void _handleResponsesFromIsolate(dynamic message) {
  if (message is SendPort) {
    _sendPort = message;
    _isolateReady.complete();
  } else if (message is Map<String, dynamic>) {
    print(message);
  }
}

步骤 5:添加一个 Completer 以确保您的隔离区已设置

#

要完成此类,请定义一个名为 parseJson 的公共方法,该方法负责向工作隔离区发送消息。它还需要确保在隔离区完全设置好之前可以发送消息。为了处理这种情况,请使用 Completer

  • 首先,添加一个名为 Completer 的类级别属性,并将其命名为 _isolateReady
  • 接下来,如果在 _handleResponsesFromIsolate 方法(在 步骤 4 中创建)中,消息是 SendPort,则在该 Completer 上添加一个对 complete() 的调用。
  • 最后,在 parseJson 方法中,在添加 _sendPort.send 之前添加 await _isolateReady.future。这确保了在工作隔离区生成并且将其 SendPort 发送回主隔离区之前,无法向工作隔离区发送任何消息。
dart
Future<void> parseJson(String message) async {
  await _isolateReady.future;
  _sendPort.send(message);
}

完整示例

#
展开以查看完整示例
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = Worker();
  await worker.spawn();
  await worker.parseJson('{"key":"value"}');
}

class Worker {
  late SendPort _sendPort;
  final Completer<void> _isolateReady = Completer.sync();

  Future<void> spawn() async {
    final receivePort = ReceivePort();
    receivePort.listen(_handleResponsesFromIsolate);
    await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    if (message is SendPort) {
      _sendPort = message;
      _isolateReady.complete();
    } else if (message is Map<String, dynamic>) {
      print(message);
    }
  }

  static void _startRemoteIsolate(SendPort port) {
    final receivePort = ReceivePort();
    port.send(receivePort.sendPort);

    receivePort.listen((dynamic message) async {
      if (message is String) {
        final transformed = jsonDecode(message);
        port.send(transformed);
      }
    });
  }

  Future<void> parseJson(String message) async {
    await _isolateReady.future;
    _sendPort.send(message);
  }
}

健壮端口示例

#

前面的示例解释了建立具有双向通信的长期隔离区所需的基本构建块。如前所述,该示例缺少一些重要功能,例如错误处理、在不再使用时关闭端口的能力以及在某些情况下消息排序方面的不一致。

此示例通过创建一个具有这些附加功能且遵循更好设计模式的长期工作隔离区,扩展了第一个示例中的信息。尽管此代码与第一个示例有相似之处,但它不是该示例的扩展。

步骤 1:定义工作类

#

首先,为您的后台工作隔离区创建一个类。此类包含您需要的所有功能:

  • 生成一个隔离区。
  • 向该隔离区发送消息。
  • 让隔离区解码一些 JSON。
  • 将解码后的 JSON 发送回主隔离区。

该类公开了三个公共方法:一个用于创建工作隔离区,一个用于处理向该工作隔离区发送消息,另一个用于在不再使用时关闭端口。

dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  Future<Object?> parseJson(String message) async {
    // TODO: Ensure the port is still open.
    _commands.send(message);
  }

  static Future<Worker> spawn() async {
    // TODO: Add functionality to create a new Worker object with a
    //  connection to a spawned isolate.
    throw UnimplementedError();
  }

  Worker._(this._responses, this._commands) {
    // TODO: Initialize main isolate receive port listener.
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _handleCommandsToIsolate(ReceivePort rp, SendPort sp) async {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _startRemoteIsolate(SendPort sp) {
    // TODO: Initialize worker isolate's ports.
  }
}

步骤 2:在 Worker.spawn 方法中创建一个 RawReceivePort

#

在生成隔离区之前,您需要创建一个 RawReceivePort,这是一个较低级别的 ReceivePort。使用 RawReceivePort 是一种首选模式,因为它允许您将隔离区启动逻辑与处理隔离区消息传递的逻辑分开。

Worker.spawn 方法中

  • 首先,创建 RawReceivePort。 此 ReceivePort 仅负责接收来自工作隔离区的初始消息,该消息将是一个 SendPort
  • 接下来,创建一个 Completer,它将指示隔离区何时准备好接收消息。 完成此操作后,它将返回一个带有 ReceivePortSendPort 的记录。
  • 接下来,定义 RawReceivePort.handler 属性。 此属性是一个 Function?,其行为类似于 ReceivePort.listener。 当此端口接收到消息时,将调用该函数。
  • 在处理程序函数中,调用 connection.complete()。此方法需要一个带有 ReceivePortSendPort 作为参数的 记录SendPort 是从工作隔离区发送的初始消息,该消息将在下一步中分配给名为 _commands 的类级别 SendPort
  • 然后,使用 ReceivePort.fromRawReceivePort 构造函数创建一个新的 ReceivePort,并传入 initPort
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler.
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
// ···
  }

通过首先创建一个 RawReceivePort,然后再创建一个 ReceivePort,你可以在之后向 ReceivePort.listen 添加新的回调。相反,如果你直接创建一个 ReceivePort,你将只能添加一个 listener,因为 ReceivePort 实现的是 Stream,而不是 BroadcastStream

实际上,这允许你将 isolate 的启动逻辑与设置通信完成后处理消息的逻辑分开。当其他方法中的逻辑增长时,这种好处将变得更加明显。

步骤 3:使用 Isolate.spawn 派生一个 worker isolate

#

此步骤继续完善 Worker.spawn 方法。你将添加派生 isolate 所需的代码,并从此类返回一个 Worker 实例。在此示例中,对 Isolate.spawn 的调用被包装在一个 try/catch 代码块中,这确保了如果 isolate 启动失败,initPort 将被关闭,并且不会创建 Worker 对象。

  • 首先,尝试在一个 try/catch 代码块中派生一个 worker isolate。如果派生 worker isolate 失败,则关闭在上一步中创建的接收端口。传递给 Isolate.spawn 的方法将在稍后的步骤中介绍。
  • 接下来,等待 connection.future,并从它返回的记录中解构发送端口和接收端口。
  • 最后,通过调用其私有构造函数并传入来自该 Completer 的端口,返回一个 Worker 实例。
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
    // Spawn the isolate.
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(receivePort, sendPort);
  }

请注意,在此示例中(与之前的示例相比),Worker.spawn 充当该类的异步静态构造函数,并且是创建 Worker 实例的唯一方法。这简化了 API,使创建 Worker 实例的代码更加简洁。

步骤 4:完成 isolate 设置过程

#

在此步骤中,你将完成基本的 isolate 设置过程。这几乎完全与之前的示例相关,并且没有新的概念。一个细微的变化是,代码被分解成更多的方法,这是一种设计实践,可以让你在本次示例的其余部分添加更多功能。有关设置 isolate 的基本过程的深入讲解,请参阅基本端口示例

首先,创建从 Worker.spawn 方法返回的私有构造函数。在构造函数主体中,向主 isolate 使用的接收端口添加一个侦听器,并将一个尚未定义的方法(名为 _handleResponsesFromIsolate)传递给该侦听器。

dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
// ···
  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

接下来,将负责初始化 worker isolate 上的端口的代码添加到 _startRemoteIsolate回想一下,此方法在 Worker.spawn 方法中传递给 Isolate.spawn,并且它将接收主 isolate 的 SendPort 作为参数。

  • 创建一个新的 ReceivePort
  • 将该端口的 SendPort 发回主 isolate。
  • 调用一个名为 _handleCommandsToIsolate 的新方法,并将新的 ReceivePort 和来自主 isolate 的 SendPort 作为参数传递。
dart
static void _startRemoteIsolate(SendPort sendPort) {
  final receivePort = ReceivePort();
  sendPort.send(receivePort.sendPort);
  _handleCommandsToIsolate(receivePort, sendPort);
}

接下来,添加 _handleCommandsToIsolate 方法,该方法负责接收来自主 isolate 的消息,在 worker isolate 上解码 json,并将解码后的 json 作为响应发回。

  • 首先,在 worker isolate 的 ReceivePort 上声明一个侦听器。
  • 在添加到侦听器的回调中,尝试在 try/catch 代码块中解码从主 isolate 传递的 JSON。如果解码成功,则将解码后的 JSON 发回主 isolate。
  • 如果出现错误,则发回一个 RemoteError
dart
static void _handleCommandsToIsolate(
    ReceivePort receivePort, SendPort sendPort) {
  receivePort.listen((message) {
    try {
      final jsonData = jsonDecode(message as String);
      sendPort.send(jsonData);
    } catch (e) {
      sendPort.send(RemoteError(e.toString(), ''));
    }
  });
}

接下来,添加 _handleResponsesFromIsolate 方法的代码。

  • 首先,检查消息是否为 RemoteError,如果是,则应该 throw 该错误。
  • 否则,打印消息。在未来的步骤中,你将更新此代码以返回消息而不是打印它们。
dart
void _handleResponsesFromIsolate(dynamic message) {
  if (message is RemoteError) {
    throw message;
  } else {
    print(message);
  }
}

最后,添加 parseJson 方法,这是一个公共方法,允许外部代码将 JSON 发送到 worker isolate 进行解码。

dart
Future<Object?> parseJson(String message) async {
  _commands.send(message);
}

你将在下一步中更新此方法。

步骤 5:同时处理多个消息

#

目前,如果你快速向 worker isolate 发送消息,isolate 将按它们完成的顺序而不是按它们发送的顺序发送解码后的 json 响应。你无法确定哪个响应对应于哪个消息。

在此步骤中,你将通过为每个消息提供一个 id 并使用 Completer 对象来解决此问题,以确保当外部代码调用 parseJson 时,返回给该调用者的响应是正确的响应。

首先,向 Worker 添加两个类级别的属性

  • Map<int, Completer<Object?>> _activeRequests
  • int _idCounter
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;

_activeRequests 映射将发送到 worker isolate 的消息与 Completer 相关联。_activeRequests 中使用的键取自 _idCounter,随着发送更多消息,它将递增。

接下来,更新 parseJson 方法,以在向 worker isolate 发送消息之前创建 completer。

  • 首先创建一个 Completer
  • 接下来,递增 _idCounter,以便每个 Completer 都与一个唯一的数字关联。
  • _activeRequests 映射添加一个条目,其中键是 _idCounter 的当前数字,而 completer 是值。
  • 将消息连同 id 一起发送到 worker isolate。因为你只能通过 SendPort 发送一个值,所以将 id 和消息包装在一个记录中。
  • 最后,返回 completer 的 future,它最终将包含来自 worker isolate 的响应。
dart
Future<Object?> parseJson(String message) async {
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

你还需要更新 _handleResponsesFromIsolate_handleCommandsToIsolate 以处理此系统。

_handleCommandsToIsolate 中,你需要考虑 message 是一个具有两个值的记录,而不仅仅是 json 文本。通过解构 message 中的值来实现。

然后,在解码 json 之后,更新对 sendPort.send 的调用,以将 id 和解码后的 json 都发回主 isolate,再次使用记录。

dart
static void _handleCommandsToIsolate(
    ReceivePort receivePort, SendPort sendPort) {
  receivePort.listen((message) {
    final (int id, String jsonText) = message as (int, String); // New
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData)); // Updated
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}

最后,更新 _handleResponsesFromIsolate

  • 首先,再次从消息参数中解构 id 和响应。
  • 然后,从 _activeRequests 映射中删除与此请求对应的 completer。
  • 最后,不是抛出错误或打印解码后的 json,而是完成 completer,传入响应。当完成此操作时,响应将返回给在主 isolate 上调用 parseJson 的代码。
dart
void _handleResponsesFromIsolate(dynamic message) {
  final (int id, Object? response) = message as (int, Object?); // New
  final completer = _activeRequests.remove(id)!; // New

  if (response is RemoteError) {
    completer.completeError(response); // Updated
  } else {
    completer.complete(response); // Updated
  }
}

步骤 6:添加关闭端口的功能

#

当你的代码不再使用 isolate 时,你应该关闭主 isolate 和 worker isolate 上的端口。

  • 首先,添加一个类级别的布尔值,以跟踪端口是否已关闭。
  • 然后,添加 Worker.close 方法。在此方法中
    • _closed 更新为 true。
    • 向 worker isolate 发送最后一条消息。此消息是一个读取为“shutdown”的 String,但它可以是你想要的任何对象。你将在下一个代码片段中使用它。
  • 最后,检查 _activeRequests 是否为空。如果是,则关闭主 isolate 的名为 _responsesReceivePort
dart
class Worker {
  bool _closed = false;
// ···
  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
  • 接下来,你需要在 worker isolate 中处理“shutdown”消息。将以下代码添加到 _handleCommandsToIsolate 方法。此代码将检查消息是否为读取为“shutdown”的 String。如果是,它将关闭 worker isolate 的 ReceivePort 并返回。
dart
static void _handleCommandsToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
) {
  receivePort.listen((message) {
    // New if-block.
    if (message == 'shutdown') {
      receivePort.close();
      return;
    }
    final (int id, String jsonText) = message as (int, String);
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData));
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}
  • 最后,你应该添加代码来检查端口是否已关闭,然后再尝试发送消息。在 Worker.parseJson 方法中添加一行。
dart
Future<Object?> parseJson(String message) async {
  if (_closed) throw StateError('Closed'); // New
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

完整示例

#
展开此处以查看完整示例
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = await Worker.spawn();
  print(await worker.parseJson('{"key":"value"}'));
  print(await worker.parseJson('"banana"'));
  print(await worker.parseJson('[true, false, null, 1, "string"]'));
  print(
      await Future.wait([worker.parseJson('"yes"'), worker.parseJson('"no"')]));
  worker.close();
}

class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;
  bool _closed = false;

  Future<Object?> parseJson(String message) async {
    if (_closed) throw StateError('Closed');
    final completer = Completer<Object?>.sync();
    final id = _idCounter++;
    _activeRequests[id] = completer;
    _commands.send((id, message));
    return await completer.future;
  }

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };

    // Spawn the isolate.
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(receivePort, sendPort);
  }

  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    final (int id, Object? response) = message as (int, Object?);
    final completer = _activeRequests.remove(id)!;

    if (response is RemoteError) {
      completer.completeError(response);
    } else {
      completer.complete(response);
    }

    if (_closed && _activeRequests.isEmpty) _responses.close();
  }

  static void _handleCommandsToIsolate(
    ReceivePort receivePort,
    SendPort sendPort,
  ) {
    receivePort.listen((message) {
      if (message == 'shutdown') {
        receivePort.close();
        return;
      }
      final (int id, String jsonText) = message as (int, String);
      try {
        final jsonData = jsonDecode(jsonText);
        sendPort.send((id, jsonData));
      } catch (e) {
        sendPort.send((id, RemoteError(e.toString(), '')));
      }
    });
  }

  static void _startRemoteIsolate(SendPort sendPort) {
    final receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);
    _handleCommandsToIsolate(receivePort, sendPort);
  }

  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
}