From d805e4dfaeeea4307ee181ecdfd90a0a377e4ad3 Mon Sep 17 00:00:00 2001 From: szdytom Date: Thu, 2 Nov 2023 08:28:56 +0800 Subject: [PATCH] [utils] add `Task._waitDependent` --- utils/task.mjs | 37 ++++++++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/utils/task.mjs b/utils/task.mjs index 972c42d..bc6ce90 100644 --- a/utils/task.mjs +++ b/utils/task.mjs @@ -1,3 +1,5 @@ +import assert from 'node:assert/strict'; + export class TaskInteruptedError extends Error { constructor() { super('Task has been interupted.'); } }; @@ -8,6 +10,7 @@ export class Task { #promise; #resolve; #reject; + #dependent_tasks constructor() { this.id = ++task_id; @@ -18,10 +21,15 @@ export class Task { this.#resolve = resolve; this.#reject = reject; }); + this.#dependent_tasks = null; + } + + isDone() { + return this.#promise == null; } _ready(result) { - if (this.#promise == null) { return; } + if (this.isDone()) { return; } this.result = result; this.status = Task.STATUS.ready; this.#resolve(this.result); @@ -29,7 +37,7 @@ export class Task { } _fail(error) { - if (this.#promise == null) { return; } + if (this.isDone()) { return; } this.error = error; this.status = Task.STATUS.failed; this.#reject(this.error); @@ -38,11 +46,16 @@ export class Task { _start() { if (this.status != Task.STATUS.pending) { - throw new Error('Task has already left pending stage'); + throw new Error('_start() called twice'); } this.status = Task.STATUS.running; } + finally() { + if (this.isDone()) { return Promise.resolve(); } + return this.#promise.finally(); + } + interupt() { if (this.status == Task.STATUS.pending) { this._confirmInterupt(); @@ -51,14 +64,32 @@ export class Task { if (this.#promise == null) { return Promise.resolve(); } this.status = Task.STATUS.interupting; + if (this.#dependent_tasks) { + for (let dependent of this.#dependent_tasks) { + dependent.interupt(); + } + } return this.#promise.finally(); } + async _waitDependent(dependent) { + assert.equal(this.#dependent_tasks, null); + if (dependent instanceof Task) { this.#dependent_tasks = [dependent]; } + else { this.#dependent_tasks = dependent; } + await Promise.allSettled(this.#dependent_tasks.map(t => t.finally())); + this.#dependent_tasks = null; + if (dependent instanceof Task) { + return await dependent.get(); + } + return dependent.map(async t => await t.get()); + } + _shouldInterupt() { return this.status == Task.STATUS.interupting; } _confirmInterupt() { + assert.equal(this.#dependent_tasks, null); this.status = Task.STATUS.interupted; this.error = new TaskInteruptedError(); this.#reject(this.error);