[utils] add Task._waitDependent

This commit is contained in:
方而静 2023-11-02 08:28:56 +08:00
parent f6aa131caa
commit d805e4dfae
Signed by: szTom
GPG Key ID: 072D999D60C6473C

View File

@ -1,3 +1,5 @@
import assert from 'node:assert/strict';
export class TaskInteruptedError extends Error { export class TaskInteruptedError extends Error {
constructor() { super('Task has been interupted.'); } constructor() { super('Task has been interupted.'); }
}; };
@ -8,6 +10,7 @@ export class Task {
#promise; #promise;
#resolve; #resolve;
#reject; #reject;
#dependent_tasks
constructor() { constructor() {
this.id = ++task_id; this.id = ++task_id;
@ -18,10 +21,15 @@ export class Task {
this.#resolve = resolve; this.#resolve = resolve;
this.#reject = reject; this.#reject = reject;
}); });
this.#dependent_tasks = null;
}
isDone() {
return this.#promise == null;
} }
_ready(result) { _ready(result) {
if (this.#promise == null) { return; } if (this.isDone()) { return; }
this.result = result; this.result = result;
this.status = Task.STATUS.ready; this.status = Task.STATUS.ready;
this.#resolve(this.result); this.#resolve(this.result);
@ -29,7 +37,7 @@ export class Task {
} }
_fail(error) { _fail(error) {
if (this.#promise == null) { return; } if (this.isDone()) { return; }
this.error = error; this.error = error;
this.status = Task.STATUS.failed; this.status = Task.STATUS.failed;
this.#reject(this.error); this.#reject(this.error);
@ -38,11 +46,16 @@ export class Task {
_start() { _start() {
if (this.status != Task.STATUS.pending) { if (this.status != Task.STATUS.pending) {
throw new Error('Task has already left pending stage'); throw new Error('_start() called twice');
} }
this.status = Task.STATUS.running; this.status = Task.STATUS.running;
} }
finally() {
if (this.isDone()) { return Promise.resolve(); }
return this.#promise.finally();
}
interupt() { interupt() {
if (this.status == Task.STATUS.pending) { if (this.status == Task.STATUS.pending) {
this._confirmInterupt(); this._confirmInterupt();
@ -51,14 +64,32 @@ export class Task {
if (this.#promise == null) { return Promise.resolve(); } if (this.#promise == null) { return Promise.resolve(); }
this.status = Task.STATUS.interupting; this.status = Task.STATUS.interupting;
if (this.#dependent_tasks) {
for (let dependent of this.#dependent_tasks) {
dependent.interupt();
}
}
return this.#promise.finally(); return this.#promise.finally();
} }
async _waitDependent(dependent) {
assert.equal(this.#dependent_tasks, null);
if (dependent instanceof Task) { this.#dependent_tasks = [dependent]; }
else { this.#dependent_tasks = dependent; }
await Promise.allSettled(this.#dependent_tasks.map(t => t.finally()));
this.#dependent_tasks = null;
if (dependent instanceof Task) {
return await dependent.get();
}
return dependent.map(async t => await t.get());
}
_shouldInterupt() { _shouldInterupt() {
return this.status == Task.STATUS.interupting; return this.status == Task.STATUS.interupting;
} }
_confirmInterupt() { _confirmInterupt() {
assert.equal(this.#dependent_tasks, null);
this.status = Task.STATUS.interupted; this.status = Task.STATUS.interupted;
this.error = new TaskInteruptedError(); this.error = new TaskInteruptedError();
this.#reject(this.error); this.#reject(this.error);