add utilty functions:

- Classes: AsycLock, Task, Queue
- Functions: asyncSleep, asyncTimeout, promiseTimeout, yieldTask
- Errors: QueueEmptyError, TaskInteruptedError
This commit is contained in:
方而静 2023-10-30 15:19:16 +08:00
parent 327d6e9328
commit 73a667eeb4
Signed by: szTom
GPG Key ID: 072D999D60C6473C
5 changed files with 219 additions and 1 deletions

35
utils/async-lock.mjs Normal file
View File

@ -0,0 +1,35 @@
import { Queue } from './queue.mjs';
export class AsyncLock {
constructor() {
this.pending_queue = new Queue();
this.state = false;
this.lock_id = 0;
}
query() {
return this.state;
}
acquire() {
if (!this.state) {
this.state = true;
this.lock_id += 1;
return Promise.resolve(this.lock_id);
}
return new Promise((resolve, _reject) => this.pending_queue.push(resolve));
}
release(lock_id) {
if (lock_id != this.lock_id) { return; }
if (this.pending_queue.empty()) {
this.state = false;
return;
}
let resolve = this.pending_queue.front();
this.pending_queue.popFront();
this.lock_id += 1;
resolve(lock_id);
}
};

View File

@ -1,5 +1,6 @@
import fs from 'node:fs/promises';
// Reads JSON data from file, returns null if file not found or has other errors.
export async function readJsonFile(path) {
try {
const data = await fs.readFile(path, 'utf8');
@ -9,21 +10,54 @@ export async function readJsonFile(path) {
}
}
// Write JSON data into a file.
export function writeJsonFile(path, data) {
const json_string = JSON.stringify(data);
return fs.writeFile(path, json_string, 'utf8');
}
// Parse format "profile@host:port", port can be undefined.
export function parseLogin(url) {
const [profile_host, port] = url.split(':');
const [profile, host] = profile_host.split('@');
return [profile, host, port ? parseInt(port) : undefined];
}
// Returns a promise, wait unitl the EventEmitter emits certian event next time.
export function waitEvent(em, event) {
return new Promise((resolve, _reject) => {
em.once(event, resolve);
});
}
export default { readJsonFile, writeJsonFile, parseLogin, waitEvent };
export function asyncSleep(t) {
return new Promise((resolve, _reject) => {
setTimeout(resolve, t);
});
}
export function asyncTimeout(t) {
return new Promise((_resolve, reject) => {
setTimeout(reject, t);
});
}
export function promiseTimeout(p, t) {
return Promise.race([p, asyncTimeout(t)]);
}
export function yieldTask() {
return new Promise((resolve, _reject) => {
queueMicrotask(resolve);
});
}
// Checks wheather an object is iterable.
export function isIterable(obj) {
if (obj == null) { return false; }
return typeof obj[Symbol.iterator] == 'function';
}
export { Queue, QueueEmptyError } from './queue.mjs';
export { AsyncLock } from './async-lock.mjs';
export { Task, TaskInteruptedError } from './task.mjs';

6
utils/package.json Normal file
View File

@ -0,0 +1,6 @@
{
"name": "@fang_erj/compass-utils",
"description": "Utility functions use in compass-bot",
"type": "module",
"main": "index.mjs"
}

62
utils/queue.mjs Normal file
View File

@ -0,0 +1,62 @@
export class QueueEmptyError extends Error {
constructor() { super('Queue is empty'); }
}
export class Queue {
constructor() {
this.data = [];
this.head = 0;
}
front() {
return this.data[this.head];
}
back() {
return this.data[this.data.length - 1];
}
popBack() {
if (this.data.length == this.head) {
throw new QueueEmptyError();
}
this.data.length -= 1;
}
empty() {
return this.data.length == this.head;
}
size() {
return this.data.length - this.head;
}
get length() {
return this.size();
}
push() {
this.data.push.apply(this.data, arguments);
}
popFront() {
if (this.empty()) { throw new QueueEmptyError();}
this.head += 1;
if (this.head == this.data.length) {
this.data = [];
this.head = 0;
return;
}
if (this.head >= this.data.length >> 1 && this.head >= 16) {
this.data = this.data.slice(this.head);
this.head = 0;
}
}
*[Symbol.iterator]() {
for (let i = this.head; i < this.data.length; i += 1) {
yield this.data[i];
}
}
}

81
utils/task.mjs Normal file
View File

@ -0,0 +1,81 @@
export class TaskInteruptedError {
constructor() { super('Task has been interupted.'); }
};
export class Task {
constructor() {
this.status = Task.STATUS.pending;
this.error = null;
this.result = null;
this.result_listeners = [];
}
_ready(result) {
this.result = result;
this.status = Task.STATUS.ready;
this.#notifySuccess();
}
_fail(error) {
if (this.status == Task.STATUS.interupted) { return; }
this.error = error;
this.status = Task.STATUS.failed;
this.#notifyFailure();
}
_start() {
if (this.status != Task.STATUS.pending) {
throw new Error('Task has already left pending stage');
}
this.status = Task.STATUS.running;
}
interupt() {
if (this.status == Task.STATUS.pending) { this._confirmInterupt(); }
else { this.status = Task.STATUS.interupting; }
}
_shouldInterupt() {
return this.status == Task.STATUS.interupting;
}
_confirmInterupt() {
this.status = Task.STATUS.interupted;
this.error = new TaskInteruptedError();
this.#notifyFailure();
}
get() {
if (this.status == Task.STATUS.ready) { return Promise.resolve(this.result); }
if (this.status == Task.STATUS.failed || this.status == Task.STATUS.interupted) {
return Promise.reject(this.error);
}
return new Promise((resolve, reject) => {
this.result_listeners.push([resolve, reject]);
});
}
#notifyFailure() {
for (let [_resolve, reject] of this.result_listeners) {
reject(this.error);
}
this.result_listeners = [];
}
#notifySuccess() {
for (let [resolve, _reject] of this.result_listeners) {
resolve(this.result);
}
this.result_listeners = [];
}
};
Task.STATUS = {
pending: 0,
running: 1,
interupting: 2,
ready: 3,
interupted: 4,
failed: 5,
};