Isolates
本页讨论了一些使用 Isolate
API 实现 isolates 的示例。
当你的应用程序处理的计算量足够大,以至于会暂时阻塞其他计算时,你应该使用 isolates。最常见的例子是在 Flutter 应用程序中,当你需要执行大型计算,否则可能会导致 UI 变得无响应时。
关于何时必须使用 isolates 没有任何规则,但以下是一些它们可能有用的更多情况
- 解析和解码异常大的 JSON blobs。
- 处理和压缩照片、音频和视频。
- 转换音频和视频文件。
- 在大型列表或文件系统中执行复杂的搜索和过滤。
- 执行 I/O 操作,例如与数据库通信。
- 处理大量的网络请求。
实现一个简单的工作 Isolate
#这些示例实现了一个主 isolate,它生成一个简单的工作 isolate。Isolate.run()
简化了设置和管理工作 isolates 背后的步骤
- 生成(启动和创建)一个 isolate。
- 在生成 isolate 上运行一个函数。
- 捕获结果。
- 将结果返回到主 isolate。
- 一旦工作完成,终止 isolate。
- 检查、捕获并将异常和错误抛回主 isolate。
在新 Isolate 中运行现有方法
#const String filename = 'with_keys.json';
void main() async {
// Read some data.
final jsonData = await Isolate.run(_readAndParseJson);
// Use that data.
print('Number of JSON keys: ${jsonData.length}');
}
- 将你想要工作 isolate 执行的函数作为其第一个参数传递给它。在本例中,它是现有的函数
_readAndParseJson()
Future<Map<String, dynamic>> _readAndParseJson() async {
final fileData = await File(filename).readAsString();
final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
return jsonData;
}
Isolate.run()
获取_readAndParseJson()
返回的结果,并将该值发送回主 isolate,关闭工作 isolate。工作 isolate 转移 存储结果的内存到主 isolate。它不复制数据。工作 isolate 执行验证传递,以确保对象被允许转移。
_readAndParseJson()
是一个现有的异步函数,它可以很容易地直接在主 isolate 中运行。使用 Isolate.run()
来运行它反而实现了并发。工作 isolate 完全抽象了 _readAndParseJson()
的计算。它可以完成而不会阻塞主 isolate。
Isolate.run()
的结果始终是一个 Future,因为主 isolate 中的代码继续运行。工作 isolate 执行的计算是同步的还是异步的,都不会影响主 isolate,因为它无论如何都是并发运行的。
对于完整的程序,请查看 send_and_receive.dart 示例。
使用 Isolates 发送闭包
#你也可以使用函数字面量或闭包,直接在主 isolate 中使用 run()
创建一个简单的工作 isolate。
const String filename = 'with_keys.json';
void main() async {
// Read some data.
final jsonData = await Isolate.run(() async {
final fileData = await File(filename).readAsString();
final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
return jsonData;
});
// Use that data.
print('Number of JSON keys: ${jsonData.length}');
}
此示例完成与前一个示例相同的事情。一个新的 isolate 生成,计算一些东西,并将结果发送回来。
然而,现在 isolate 发送一个闭包。闭包比典型的命名函数限制更少,无论是在它们的功能还是在它们如何写入代码方面。在本例中,Isolate.run()
并发执行看起来像本地代码的代码。从这个意义上说,你可以想象 run()
的工作方式类似于“并行运行”的控制流运算符。
使用端口在 Isolates 之间发送多条消息
#短生命周期的 isolates 使用起来很方便,但需要性能开销来生成新的 isolates 并将对象从一个 isolate 复制到另一个 isolate。如果你的代码依赖于重复使用 Isolate.run
运行相同的计算,你可能会通过创建不立即退出的长生命周期 isolates 来提高性能。
要做到这一点,你可以使用 Isolate.run
抽象的一些底层 isolate API
本节介绍在新生成的 isolate 和主 isolate 之间建立双向通信所需的步骤。第一个示例,基本端口,从高层次介绍了该过程。第二个示例,健壮的端口,逐渐为第一个示例添加更实用、真实的实际功能。
ReceivePort
和 SendPort
#在 isolates 之间建立长期通信需要两个类(除了 Isolate
):ReceivePort
和 SendPort
。这些端口是 isolates 之间相互通信的唯一方式。
ReceivePort
是一个对象,用于处理从其他 isolates 发送的消息。这些消息通过 SendPort
发送。
端口的行为类似于 Stream
对象(实际上,接收端口实现了 Stream
!)。你可以将 SendPort
和 ReceivePort
分别视为 Stream 的 StreamController
和监听器。SendPort
就像一个 StreamController
,因为你使用 SendPort.send()
方法 向它们“添加”消息,而这些消息由监听器处理,在本例中是 ReceivePort
。然后,ReceivePort
通过将接收到的消息作为参数传递给你提供的回调来处理这些消息。
设置端口
#新生成的 isolate 仅具有通过 Isolate.spawn
调用接收到的信息。如果你需要主 isolate 在其初始创建之后继续与生成的 isolate 通信,则必须设置一个通信通道,以便生成的 isolate 可以向主 isolate 发送消息。Isolates 只能通过消息传递进行通信。它们无法“看到”彼此的内存内部,这就是“isolate”这个名称的由来。
要设置这种双向通信,首先在主 isolate 中创建一个 ReceivePort
,然后在用 Isolate.spawn
生成新 isolate 时,将其 SendPort
作为参数传递给新 isolate。然后,新的 isolate 创建自己的 ReceivePort
,并在主 isolate 传递给它的 SendPort
上发送它自己的 SendPort
返回。主 isolate 接收到这个 SendPort
,现在双方都有一个开放的通道来发送和接收消息。
- 在主 isolate 中创建一个
ReceivePort
。SendPort
会自动创建为ReceivePort
的属性。 - 使用
Isolate.spawn()
生成工作 isolate - 将对
ReceivePort.sendPort
的引用作为第一条消息传递给工作 isolate。 - 在工作 isolate 中创建另一个新的
ReceivePort
。 - 将对工作 isolate 的
ReceivePort.sendPort
的引用作为第一条消息返回给主 isolate。
除了创建端口和设置通信之外,你还需要告诉端口在接收到消息时要执行的操作。这可以使用每个 ReceivePort
上的 listen
方法来完成。
- 通过主 isolate 对工作 isolate 的
SendPort
的引用发送一条消息。 - 通过工作 isolate 的
ReceivePort
上的监听器接收和处理消息。这是你要从主 isolate 移出的计算执行的地方。 - 通过工作 isolate 对主 isolate 的
SendPort
的引用发送一条返回消息。 - 通过主 isolate 的
ReceivePort
上的监听器接收消息。
基本端口示例
#此示例演示了如何设置一个长生命周期的工作 isolate,它与主 isolate 之间具有双向通信。该代码使用发送 JSON 文本到新 isolate 的示例,在该 isolate 中,JSON 将被解析和解码,然后再发送回主 isolate。
步骤 1:定义工作类
#首先,为你的后台工作 isolate 创建一个类。这个类包含你需要的所有功能,以
- 生成一个 isolate。
- 向该 isolate 发送消息。
- 让 isolate 解码一些 JSON。
- 将解码后的 JSON 发送回主 isolate。
该类公开了两个公共方法:一个用于生成工作 isolate,另一个用于处理向该工作 isolate 发送消息。
本示例的其余部分将逐步展示如何填充类方法。
class Worker {
Future<void> spawn() async {
// TODO: Add functionality to spawn a worker isolate.
}
void _handleResponsesFromIsolate(dynamic message) {
// TODO: Handle messages sent back from the worker isolate.
}
static void _startRemoteIsolate(SendPort port) {
// TODO: Define code that should be executed on the worker isolate.
}
Future<void> parseJson(String message) async {
// TODO: Define a public method that can
// be used to send messages to the worker isolate.
}
}
步骤 2:生成工作 isolate
#Worker.spawn
方法是你将分组用于创建工作 isolate 并确保它可以接收和发送消息的代码的地方。
- 首先,创建一个
ReceivePort
。这允许主 isolate 接收从新生成的工作 isolate 发送的消息。 - 接下来,向接收端口添加一个监听器,以处理工作 isolate 将发送回来的消息。传递给监听器的回调
_handleResponsesFromIsolate
将在步骤 4 中介绍。 - 最后,使用
Isolate.spawn
生成工作 isolate。它期望两个参数:一个在工作 isolate 上执行的函数(在步骤 3 中介绍),以及接收端口的sendPort
属性。
Future<void> spawn() async {
final receivePort = ReceivePort();
receivePort.listen(_handleResponsesFromIsolate);
await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}
当在工作 isolate 上调用回调(_startRemoteIsolate
)时,receivePort.sendPort
参数将作为参数传递给它。这是确保工作 isolate 有办法将消息发送回主 isolate 的第一步。
步骤 3:在工作 isolate 上执行代码
#在此步骤中,你定义方法 _startRemoteIsolate
,该方法被发送到工作 isolate 以在其生成时执行。此方法类似于工作 isolate 的“main”方法。
- 首先,创建另一个新的
ReceivePort
。此端口接收来自主 isolate 的未来消息。 - 接下来,将该端口的
SendPort
发送回主 isolate。 - 最后,向新的
ReceivePort
添加一个监听器。此监听器处理主 isolate 发送到工作 isolate 的消息。
static void _startRemoteIsolate(SendPort port) {
final receivePort = ReceivePort();
port.send(receivePort.sendPort);
receivePort.listen((dynamic message) async {
if (message is String) {
final transformed = jsonDecode(message);
port.send(transformed);
}
});
}
工作 isolate 的 ReceivePort
上的监听器解码从主 isolate 传递的 JSON,然后将解码后的 JSON 发送回主 isolate。
此监听器是从主 isolate 发送到工作 isolate 的消息的入口点。这是你唯一一次告诉工作 isolate 未来要执行什么代码的机会。
步骤 4:处理主 isolate 上的消息
#最后,你需要告诉主 isolate 如何处理从工作 isolate 发送回主 isolate 的消息。为此,你需要填充 _handleResponsesFromIsolate
方法。回想一下,此方法被传递给 receivePort.listen
方法,如步骤 2 中所述
Future<void> spawn() async {
final receivePort = ReceivePort();
receivePort.listen(_handleResponsesFromIsolate);
await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}
还要回想一下,你在步骤 3 中将一个 SendPort
发送回主 isolate。此方法处理该 SendPort
的接收,以及处理未来的消息(这将是解码后的 JSON)。
- 首先,检查消息是否为
SendPort
。如果是,则将该端口分配给类的_sendPort
属性,以便以后可以用来发送消息。 - 接下来,检查消息是否为
Map<String, dynamic>
类型,即解码后的 JSON 的预期类型。如果是,则使用你的应用程序特定逻辑处理该消息。在本例中,消息被打印出来。
void _handleResponsesFromIsolate(dynamic message) {
if (message is SendPort) {
_sendPort = message;
_isolateReady.complete();
} else if (message is Map<String, dynamic>) {
print(message);
}
}
步骤 5:添加 Completer 以确保你的 isolate 已设置好
#为了完成该类,定义一个名为 parseJson
的公共方法,该方法负责向工作 isolate 发送消息。它还需要确保在 isolate 完全设置好之前可以发送消息。为了处理这种情况,使用 Completer
。
- 首先,添加一个类级别的属性,称为
Completer
并将其命名为_isolateReady
。 - 接下来,如果在
_handleResponsesFromIsolate
方法(在步骤 4 中创建)中消息是SendPort
,则添加对 completer 的complete()
调用。 - 最后,在
parseJson
方法中,在添加_sendPort.send
之前添加await _isolateReady.future
。这确保在工作 isolate 生成并且已将其SendPort
发送回主 isolate 之前,任何消息都不能发送到工作 isolate。
Future<void> parseJson(String message) async {
await _isolateReady.future;
_sendPort.send(message);
}
完整示例
#展开以查看完整示例
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';
void main() async {
final worker = Worker();
await worker.spawn();
await worker.parseJson('{"key":"value"}');
}
class Worker {
late SendPort _sendPort;
final Completer<void> _isolateReady = Completer.sync();
Future<void> spawn() async {
final receivePort = ReceivePort();
receivePort.listen(_handleResponsesFromIsolate);
await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}
void _handleResponsesFromIsolate(dynamic message) {
if (message is SendPort) {
_sendPort = message;
_isolateReady.complete();
} else if (message is Map<String, dynamic>) {
print(message);
}
}
static void _startRemoteIsolate(SendPort port) {
final receivePort = ReceivePort();
port.send(receivePort.sendPort);
receivePort.listen((dynamic message) async {
if (message is String) {
final transformed = jsonDecode(message);
port.send(transformed);
}
});
}
Future<void> parseJson(String message) async {
await _isolateReady.future;
_sendPort.send(message);
}
}
健壮的端口示例
#之前的示例解释了设置具有双向通信的长期 isolate 所需的基本构建块。如前所述,该示例缺少一些重要功能,例如错误处理、在不再使用时关闭端口以及某些情况下消息排序不一致。
本示例通过创建一个具有这些附加功能和更多功能的长期工作 isolate,并遵循更好的设计模式,扩展了第一个示例中的信息。虽然此代码与第一个示例有相似之处,但它不是该示例的扩展。
步骤 1:定义工作类
#首先,为你的后台工作 isolate 创建一个类。这个类包含你需要的所有功能,以
- 生成一个 isolate。
- 向该 isolate 发送消息。
- 让 isolate 解码一些 JSON。
- 将解码后的 JSON 发送回主 isolate。
该类公开了三个公共方法:一个用于创建工作 isolate,一个用于处理向该工作 isolate 发送消息,以及一个可以在不再使用时关闭端口的方法。
class Worker {
final SendPort _commands;
final ReceivePort _responses;
Future<Object?> parseJson(String message) async {
// TODO: Ensure the port is still open.
_commands.send(message);
}
static Future<Worker> spawn() async {
// TODO: Add functionality to create a new Worker object with a
// connection to a spawned isolate.
throw UnimplementedError();
}
Worker._(this._responses, this._commands) {
// TODO: Initialize main isolate receive port listener.
}
void _handleResponsesFromIsolate(dynamic message) {
// TODO: Handle messages sent back from the worker isolate.
}
static void _handleCommandsToIsolate(ReceivePort rp, SendPort sp) async {
// TODO: Handle messages sent back from the worker isolate.
}
static void _startRemoteIsolate(SendPort sp) {
// TODO: Initialize worker isolate's ports.
}
}
步骤 2:在 Worker.spawn
方法中创建一个 RawReceivePort
#在生成 isolate 之前,你需要创建一个 RawReceivePort
,这是一个更低级别的 ReceivePort
。使用 RawReceivePort
是一种首选模式,因为它允许你将 isolate 启动逻辑与处理 isolate 上消息传递的逻辑分开。
在 Worker.spawn
方法中
- 首先,创建
RawReceivePort
。此ReceivePort
仅负责接收来自工作 isolate 的初始消息,这将是一个SendPort
。 - 接下来,创建一个
Completer
,它将指示 isolate 何时准备好接收消息。当它完成时,它将返回一个带有ReceivePort
和SendPort
的记录。 - 接下来,定义
RawReceivePort.handler
属性。此属性是一个Function?
,其行为类似于ReceivePort.listener
。当此端口接收到消息时,将调用该函数。 - 在处理程序函数中,调用
connection.complete()
。此方法期望一个带有ReceivePort
和SendPort
的记录作为参数。SendPort
是从工作 isolate 发送的初始消息,将在下一步中分配给类级别名为_commands
的SendPort
。 - 然后,使用
ReceivePort.fromRawReceivePort
构造函数创建一个新的ReceivePort
,并将initPort
传递给它。
class Worker {
final SendPort _commands;
final ReceivePort _responses;
static Future<Worker> spawn() async {
// Create a receive port and add its initial message handler.
final initPort = RawReceivePort();
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
final commandPort = initialMessage as SendPort;
connection.complete((
ReceivePort.fromRawReceivePort(initPort),
commandPort,
));
};
}
}
通过首先创建一个 RawReceivePort
,然后再创建一个 ReceivePort
,你将能够在以后向 ReceivePort.listen
添加一个新的回调。相反,如果你直接创建一个 ReceivePort
,你将只能添加一个 listener
,因为 ReceivePort
实现了 Stream
,而不是 BroadcastStream
。
实际上,这允许你将 isolate 启动逻辑与在设置通信完成后处理接收消息的逻辑分开。随着其他方法中的逻辑增长,这种好处将变得更加明显。
步骤 3:使用 Isolate.spawn
生成工作 isolate
#此步骤继续填充 Worker.spawn
方法。你将添加生成 isolate 所需的代码,并从此类返回 Worker
的实例。在本例中,对 Isolate.spawn
的调用被包装在 try
/catch
块 中,这确保了如果 isolate 启动失败,则将关闭 initPort
,并且不会创建 Worker
对象。
- 首先,尝试在
try
/catch
块中生成一个工作 isolate。如果生成工作 isolate 失败,请关闭在上一步中创建的接收端口。传递给Isolate.spawn
的方法将在后面的步骤中介绍。 - 接下来,等待
connection.future
,并从它返回的记录中解构发送端口和接收端口。 - 最后,通过调用其私有构造函数并传入来自该 completer 的端口来返回
Worker
的实例。
class Worker {
final SendPort _commands;
final ReceivePort _responses;
static Future<Worker> spawn() async {
// Create a receive port and add its initial message handler
final initPort = RawReceivePort();
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
final commandPort = initialMessage as SendPort;
connection.complete((
ReceivePort.fromRawReceivePort(initPort),
commandPort,
));
};
// Spawn the isolate.
try {
await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
} on Object {
initPort.close();
rethrow;
}
final (ReceivePort receivePort, SendPort sendPort) =
await connection.future;
return Worker._(receivePort, sendPort);
}
}
请注意,在本示例中(与之前的示例相比),Worker.spawn
充当此类的异步静态构造函数,并且是创建 Worker
实例的唯一方法。这简化了 API,使创建 Worker
实例的代码更简洁。
步骤 4:完成 isolate 设置过程
#在此步骤中,你将完成基本的 isolate 设置过程。这几乎完全与之前的示例相关,并且没有新概念。略有变化的是,代码被分解为更多的方法,这是一种设计实践,可以让你在整个示例的其余部分中添加更多功能。有关设置 isolate 的基本过程的深入演练,请参阅基本端口示例。
首先,创建从 Worker.spawn
方法返回的私有构造函数。在构造函数体中,向主 isolate 使用的接收端口添加一个监听器,并将一个尚未定义的方法传递给该监听器,称为 _handleResponsesFromIsolate
。
class Worker {
final SendPort _commands;
final ReceivePort _responses;
Worker._(this._responses, this._commands) {
_responses.listen(_handleResponsesFromIsolate);
}
}
接下来,添加代码到 _startRemoteIsolate
,该代码负责初始化工作 isolate 上的端口。回想一下,此方法在 Worker.spawn
方法中传递给 Isolate.spawn
,并且主 isolate 的 SendPort
将作为参数传递给它。
- 创建一个新的
ReceivePort
。 - 将该端口的
SendPort
发送回主 isolate。 - 调用一个名为
_handleCommandsToIsolate
的新方法,并将新的ReceivePort
和来自主 isolate 的SendPort
作为参数传递。
static void _startRemoteIsolate(SendPort sendPort) {
final receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
_handleCommandsToIsolate(receivePort, sendPort);
}
接下来,添加 _handleCommandsToIsolate
方法,该方法负责接收来自主 isolate 的消息,在工作 isolate 上解码 json,并将解码后的 json 作为响应发送回去。
- 首先,在工作 isolate 的
ReceivePort
上声明一个监听器。 - 在添加到监听器的回调中,尝试在
try
/catch
块 中解码从主 isolate 传递的 JSON。如果解码成功,则将解码后的 JSON 发送回主 isolate。 - 如果出现错误,则发送回
RemoteError
。
static void _handleCommandsToIsolate(
ReceivePort receivePort,
SendPort sendPort,
) {
receivePort.listen((message) {
try {
final jsonData = jsonDecode(message as String);
sendPort.send(jsonData);
} catch (e) {
sendPort.send(RemoteError(e.toString(), ''));
}
});
}
接下来,为 _handleResponsesFromIsolate
方法添加代码。
- 首先,检查消息是否为
RemoteError
,如果是,则应throw
该错误。 - 否则,打印消息。在未来的步骤中,你将更新此代码以返回消息而不是打印它们。
void _handleResponsesFromIsolate(dynamic message) {
if (message is RemoteError) {
throw message;
} else {
print(message);
}
}
最后,添加 parseJson
方法,这是一个公共方法,允许外部代码将 JSON 发送到工作 isolate 进行解码。
Future<Object?> parseJson(String message) async {
_commands.send(message);
}
你将在下一步中更新此方法。
步骤 5:同时处理多条消息
#目前,如果你快速向工作 isolate 发送消息,则 isolate 将以它们完成的顺序而不是它们发送的顺序发送解码后的 json 响应。你无法确定哪个响应对应于哪个消息。
在此步骤中,你将通过为每条消息提供一个 id,并使用 Completer
对象来解决此问题,以确保当外部代码调用 parseJson
时,返回给该调用者的响应是正确的响应。
首先,向 Worker
添加两个类级别的属性
Map<int, Completer<Object?>> _activeRequests
int _idCounter
class Worker {
final SendPort _commands;
final ReceivePort _responses;
final Map<int, Completer<Object?>> _activeRequests = {};
int _idCounter = 0;
// ···
}
_activeRequests
映射将发送到工作 isolate 的消息与 Completer
关联起来。_activeRequests
中使用的键取自 _idCounter
,随着发送更多消息,_idCounter
将会增加。
接下来,更新 parseJson
方法以在发送消息到工作 isolate 之前创建 completers。
- 首先创建一个
Completer
。 - 接下来,递增
_idCounter
,以便每个Completer
与一个唯一的数字关联。 - 向
_activeRequests
映射添加一个条目,其中键是_idCounter
的当前数字,而值是 completer。 - 将消息连同 id 发送到工作 isolate。因为你只能通过
SendPort
发送一个值,所以将 id 和消息包装在一个 记录 中。 - 最后,返回 completer 的 future,它最终将包含来自工作 isolate 的响应。
Future<Object?> parseJson(String message) async {
final completer = Completer<Object?>.sync();
final id = _idCounter++;
_activeRequests[id] = completer;
_commands.send((id, message));
return await completer.future;
}
你还需要更新 _handleResponsesFromIsolate
和 _handleCommandsToIsolate
以处理此系统。
在 _handleCommandsToIsolate
中,你需要考虑 message
是一个带有两个值的记录,而不仅仅是 json 文本。通过从 message
中解构值来做到这一点。
然后,在解码 json 之后,更新对 sendPort.send
的调用,以将 id 和解码后的 json 都发送回主 isolate,再次使用记录。
static void _handleCommandsToIsolate(
ReceivePort receivePort,
SendPort sendPort,
) {
receivePort.listen((message) {
final (int id, String jsonText) = message as (int, String); // New
try {
final jsonData = jsonDecode(jsonText);
sendPort.send((id, jsonData)); // Updated
} catch (e) {
sendPort.send((id, RemoteError(e.toString(), '')));
}
});
}
最后,更新 _handleResponsesFromIsolate
。
- 首先,再次从 message 参数中解构 id 和响应。
- 然后,从
_activeRequests
映射中删除与此请求对应的 completer。 - 最后,不是抛出错误或打印解码后的 json,而是完成 completer,传入响应。当它完成时,响应将返回到在主 isolate 上调用
parseJson
的代码。
void _handleResponsesFromIsolate(dynamic message) {
final (int id, Object? response) = message as (int, Object?); // New
final completer = _activeRequests.remove(id)!; // New
if (response is RemoteError) {
completer.completeError(response); // Updated
} else {
completer.complete(response); // Updated
}
}
步骤 6:添加关闭端口的功能
#当你的代码不再使用 isolate 时,你应该关闭主 isolate 和工作 isolate 上的端口。
- 首先,添加一个类级别的布尔值,以跟踪端口是否已关闭。
- 然后,添加
Worker.close
方法。在此方法中- 将
_closed
更新为 true。 - 向工作 isolate 发送最后一条消息。此消息是一个读取“shutdown”的
String
,但它可以是你想要的任何对象。你将在下一个代码片段中使用它。
- 将
- 最后,检查
_activeRequests
是否为空。如果是,则关闭主 isolate 的名为_responses
的ReceivePort
。
class Worker {
bool _closed = false;
// ···
void close() {
if (!_closed) {
_closed = true;
_commands.send('shutdown');
if (_activeRequests.isEmpty) _responses.close();
print('--- port closed --- ');
}
}
}
- 接下来,你需要在工作 isolate 中处理“shutdown”消息。将以下代码添加到
_handleCommandsToIsolate
方法。此代码将检查消息是否为读取“shutdown”的String
。如果是,它将关闭工作 isolate 的ReceivePort
,并返回。
static void _handleCommandsToIsolate(
ReceivePort receivePort,
SendPort sendPort,
) {
receivePort.listen((message) {
// New if-block.
if (message == 'shutdown') {
receivePort.close();
return;
}
final (int id, String jsonText) = message as (int, String);
try {
final jsonData = jsonDecode(jsonText);
sendPort.send((id, jsonData));
} catch (e) {
sendPort.send((id, RemoteError(e.toString(), '')));
}
});
}
- 最后,你应该添加代码以检查端口是否已关闭,然后再尝试发送消息。在
Worker.parseJson
方法中添加一行。
Future<Object?> parseJson(String message) async {
if (_closed) throw StateError('Closed'); // New
final completer = Completer<Object?>.sync();
final id = _idCounter++;
_activeRequests[id] = completer;
_commands.send((id, message));
return await completer.future;
}
完整示例
#在此处展开以查看完整示例
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';
void main() async {
final worker = await Worker.spawn();
print(await worker.parseJson('{"key":"value"}'));
print(await worker.parseJson('"banana"'));
print(await worker.parseJson('[true, false, null, 1, "string"]'));
print(
await Future.wait([worker.parseJson('"yes"'), worker.parseJson('"no"')]),
);
worker.close();
}
class Worker {
final SendPort _commands;
final ReceivePort _responses;
final Map<int, Completer<Object?>> _activeRequests = {};
int _idCounter = 0;
bool _closed = false;
Future<Object?> parseJson(String message) async {
if (_closed) throw StateError('Closed');
final completer = Completer<Object?>.sync();
final id = _idCounter++;
_activeRequests[id] = completer;
_commands.send((id, message));
return await completer.future;
}
static Future<Worker> spawn() async {
// Create a receive port and add its initial message handler
final initPort = RawReceivePort();
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
final commandPort = initialMessage as SendPort;
connection.complete((
ReceivePort.fromRawReceivePort(initPort),
commandPort,
));
};
// Spawn the isolate.
try {
await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
} on Object {
initPort.close();
rethrow;
}
final (ReceivePort receivePort, SendPort sendPort) =
await connection.future;
return Worker._(receivePort, sendPort);
}
Worker._(this._responses, this._commands) {
_responses.listen(_handleResponsesFromIsolate);
}
void _handleResponsesFromIsolate(dynamic message) {
final (int id, Object? response) = message as (int, Object?);
final completer = _activeRequests.remove(id)!;
if (response is RemoteError) {
completer.completeError(response);
} else {
completer.complete(response);
}
if (_closed && _activeRequests.isEmpty) _responses.close();
}
static void _handleCommandsToIsolate(
ReceivePort receivePort,
SendPort sendPort,
) {
receivePort.listen((message) {
if (message == 'shutdown') {
receivePort.close();
return;
}
final (int id, String jsonText) = message as (int, String);
try {
final jsonData = jsonDecode(jsonText);
sendPort.send((id, jsonData));
} catch (e) {
sendPort.send((id, RemoteError(e.toString(), '')));
}
});
}
static void _startRemoteIsolate(SendPort sendPort) {
final receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
_handleCommandsToIsolate(receivePort, sendPort);
}
void close() {
if (!_closed) {
_closed = true;
_commands.send('shutdown');
if (_activeRequests.isEmpty) _responses.close();
print('--- port closed --- ');
}
}
}
除非另有说明,否则本网站上的文档反映的是 Dart 3.7.1 版本。页面上次更新于 2025-02-12。 查看源代码 或 报告问题。