diff --git a/utils/async-lock.mjs b/utils/async-lock.mjs new file mode 100644 index 0000000..9e55e2b --- /dev/null +++ b/utils/async-lock.mjs @@ -0,0 +1,35 @@ +import { Queue } from './queue.mjs'; + +export class AsyncLock { + constructor() { + this.pending_queue = new Queue(); + this.state = false; + this.lock_id = 0; + } + + query() { + return this.state; + } + + acquire() { + if (!this.state) { + this.state = true; + this.lock_id += 1; + return Promise.resolve(this.lock_id); + } + return new Promise((resolve, _reject) => this.pending_queue.push(resolve)); + } + + release(lock_id) { + if (lock_id != this.lock_id) { return; } + if (this.pending_queue.empty()) { + this.state = false; + return; + } + + let resolve = this.pending_queue.front(); + this.pending_queue.popFront(); + this.lock_id += 1; + resolve(lock_id); + } +}; diff --git a/utils/index.mjs b/utils/index.mjs index db4c85e..eeaa85c 100644 --- a/utils/index.mjs +++ b/utils/index.mjs @@ -1,5 +1,6 @@ import fs from 'node:fs/promises'; +// Reads JSON data from file, returns null if file not found or has other errors. export async function readJsonFile(path) { try { const data = await fs.readFile(path, 'utf8'); @@ -9,21 +10,54 @@ export async function readJsonFile(path) { } } +// Write JSON data into a file. export function writeJsonFile(path, data) { const json_string = JSON.stringify(data); return fs.writeFile(path, json_string, 'utf8'); } +// Parse format "profile@host:port", port can be undefined. export function parseLogin(url) { const [profile_host, port] = url.split(':'); const [profile, host] = profile_host.split('@'); return [profile, host, port ? parseInt(port) : undefined]; } +// Returns a promise, wait unitl the EventEmitter emits certian event next time. export function waitEvent(em, event) { return new Promise((resolve, _reject) => { em.once(event, resolve); }); } -export default { readJsonFile, writeJsonFile, parseLogin, waitEvent }; +export function asyncSleep(t) { + return new Promise((resolve, _reject) => { + setTimeout(resolve, t); + }); +} + +export function asyncTimeout(t) { + return new Promise((_resolve, reject) => { + setTimeout(reject, t); + }); +} + +export function promiseTimeout(p, t) { + return Promise.race([p, asyncTimeout(t)]); +} + +export function yieldTask() { + return new Promise((resolve, _reject) => { + queueMicrotask(resolve); + }); +} + +// Checks wheather an object is iterable. +export function isIterable(obj) { + if (obj == null) { return false; } + return typeof obj[Symbol.iterator] == 'function'; +} + +export { Queue, QueueEmptyError } from './queue.mjs'; +export { AsyncLock } from './async-lock.mjs'; +export { Task, TaskInteruptedError } from './task.mjs'; diff --git a/utils/package.json b/utils/package.json new file mode 100644 index 0000000..6b9a837 --- /dev/null +++ b/utils/package.json @@ -0,0 +1,6 @@ +{ + "name": "@fang_erj/compass-utils", + "description": "Utility functions use in compass-bot", + "type": "module", + "main": "index.mjs" +} diff --git a/utils/queue.mjs b/utils/queue.mjs new file mode 100644 index 0000000..14f1c32 --- /dev/null +++ b/utils/queue.mjs @@ -0,0 +1,62 @@ +export class QueueEmptyError extends Error { + constructor() { super('Queue is empty'); } +} + +export class Queue { + constructor() { + this.data = []; + this.head = 0; + } + + front() { + return this.data[this.head]; + } + + back() { + return this.data[this.data.length - 1]; + } + + popBack() { + if (this.data.length == this.head) { + throw new QueueEmptyError(); + } + this.data.length -= 1; + } + + empty() { + return this.data.length == this.head; + } + + size() { + return this.data.length - this.head; + } + + get length() { + return this.size(); + } + + push() { + this.data.push.apply(this.data, arguments); + } + + popFront() { + if (this.empty()) { throw new QueueEmptyError();} + + this.head += 1; + if (this.head == this.data.length) { + this.data = []; + this.head = 0; + return; + } + if (this.head >= this.data.length >> 1 && this.head >= 16) { + this.data = this.data.slice(this.head); + this.head = 0; + } + } + + *[Symbol.iterator]() { + for (let i = this.head; i < this.data.length; i += 1) { + yield this.data[i]; + } + } +} diff --git a/utils/task.mjs b/utils/task.mjs new file mode 100644 index 0000000..34f0b12 --- /dev/null +++ b/utils/task.mjs @@ -0,0 +1,81 @@ +export class TaskInteruptedError { + constructor() { super('Task has been interupted.'); } +}; + +export class Task { + constructor() { + this.status = Task.STATUS.pending; + this.error = null; + this.result = null; + this.result_listeners = []; + } + + _ready(result) { + this.result = result; + this.status = Task.STATUS.ready; + this.#notifySuccess(); + } + + _fail(error) { + if (this.status == Task.STATUS.interupted) { return; } + this.error = error; + this.status = Task.STATUS.failed; + this.#notifyFailure(); + } + + _start() { + if (this.status != Task.STATUS.pending) { + throw new Error('Task has already left pending stage'); + } + this.status = Task.STATUS.running; + } + + interupt() { + if (this.status == Task.STATUS.pending) { this._confirmInterupt(); } + else { this.status = Task.STATUS.interupting; } + } + + _shouldInterupt() { + return this.status == Task.STATUS.interupting; + } + + _confirmInterupt() { + this.status = Task.STATUS.interupted; + this.error = new TaskInteruptedError(); + this.#notifyFailure(); + } + + get() { + if (this.status == Task.STATUS.ready) { return Promise.resolve(this.result); } + if (this.status == Task.STATUS.failed || this.status == Task.STATUS.interupted) { + return Promise.reject(this.error); + } + + return new Promise((resolve, reject) => { + this.result_listeners.push([resolve, reject]); + }); + } + + #notifyFailure() { + for (let [_resolve, reject] of this.result_listeners) { + reject(this.error); + } + this.result_listeners = []; + } + + #notifySuccess() { + for (let [resolve, _reject] of this.result_listeners) { + resolve(this.result); + } + this.result_listeners = []; + } +}; + +Task.STATUS = { + pending: 0, + running: 1, + interupting: 2, + ready: 3, + interupted: 4, + failed: 5, +};