diff --git a/shared/utility/async-lock.mjs b/shared/utility/async-lock.mjs index 73d7a71..bf8f775 100644 --- a/shared/utility/async-lock.mjs +++ b/shared/utility/async-lock.mjs @@ -7,76 +7,76 @@ import { Queue } from './queue.mjs'; * @class */ export class AsyncLock { - /** - * Creates an instance of AsyncLock. + /** + * Creates an instance of AsyncLock. * @constructor - */ - constructor() { - /** - * A queue to store pending requests for acquiring the lock. - * @type {Queue} + */ + constructor() { + /** + * A queue to store pending requests for acquiring the lock. + * @type {Queue} * @private - */ - this.pending_queue = new Queue(); + */ + this.pending_queue = new Queue(); - /** - * The current state of the lock. - * @type {boolean} + /** + * The current state of the lock. + * @type {boolean} * @private - */ - this.state = false; + */ + this.state = false; - /** - * The ID associated with the current lock holder. - * @type {number} + /** + * The ID associated with the current lock holder. + * @type {number} * @private - */ - this.lock_id = 0; - } + */ + this.lock_id = 0; + } - /** - * Checks the current state of the lock. - * @returns {boolean} True if the lock is acquired; otherwise, false. - */ - query() { - return this.state; - } + /** + * Checks the current state of the lock. + * @returns {boolean} True if the lock is acquired; otherwise, false. + */ + query() { + return this.state; + } - /** - * Acquires the lock asynchronously. - * @returns {Promise} A promise that resolves with the unique ID associated with the acquired lock. - */ - acquire() { - if (!this.state) { - this.state = true; - this.lock_id += 1; - return Promise.resolve(this.lock_id); - } - return new Promise((resolve, _reject) => this.pending_queue.push(resolve)); - } + /** + * Acquires the lock asynchronously. + * @returns {Promise} A promise that resolves with the unique ID associated with the acquired lock. + */ + acquire() { + if (!this.state) { + this.state = true; + this.lock_id += 1; + return Promise.resolve(this.lock_id); + } + return new Promise((resolve, _reject) => this.pending_queue.push(resolve)); + } - /** - * Releases the lock with the specified ID. - * @param {number} lock_id The ID of the lock to release. - */ - release(lock_id) { - if (!this.state || lock_id !== this.lock_id) { return; } - if (this.pending_queue.empty()) { - this.state = false; - return; - } + /** + * Releases the lock with the specified ID. + * @param {number} lock_id The ID of the lock to release. + */ + release(lock_id) { + if (!this.state || lock_id !== this.lock_id) { return; } + if (this.pending_queue.empty()) { + this.state = false; + return; + } - let resolve = this.pending_queue.front(); - this.pending_queue.popFront(); - this.lock_id += 1; - resolve(this.lock_id); - } + let resolve = this.pending_queue.front(); + this.pending_queue.popFront(); + this.lock_id += 1; + resolve(this.lock_id); + } - /** - * Releases the lock forcefully even you are not the owner. + /** + * Releases the lock forcefully even you are not the owner. * BE CAREFUL WHAT YOU WISH FOR!! - */ - forceRelease() { - this.release(this.lock_id); - } + */ + forceRelease() { + this.release(this.lock_id); + } }; \ No newline at end of file diff --git a/shared/utility/async-queue.mjs b/shared/utility/async-queue.mjs new file mode 100644 index 0000000..f1c5538 --- /dev/null +++ b/shared/utility/async-queue.mjs @@ -0,0 +1,90 @@ +import { Buffer } from 'buffer'; +import { AsyncLock } from './async-lock.mjs'; + +/** + * Represents an asynchronous byte queue for managing byte data. + */ +export class AsyncByteQueue { + /** + * Creates an instance of AsyncByteQueue. + */ + constructor() { + /** + * The buffer to store byte data. + * @type {Buffer} + * @private + */ + this.data = Buffer.alloc(0); + + /** + * The index indicating the head of the byte queue. + * @type {number} + * @private + */ + this.head = 0; + + /** + * The lock used to manage access to the byte queue. + * @type {AsyncLock} + * @private + */ + this.lock = new AsyncLock(); + } + + /** + * Checks if the byte queue is empty. + * @returns {boolean} True if the byte queue is empty; otherwise, false. + */ + empty() { + return this.head === this.data.byteLength; + } + + /** + * Retrieves the byte at the front of the queue asynchronously. + * @returns {Promise} A promise that resolves with the byte at the front of the queue. + */ + async front() { + while (this.empty()) { + await this.lock.acquire(); + } + this.lock.forceRelease(); + return this.data[this.head]; + } + + /** + * Removes and retrieves the byte at the front of the queue asynchronously. + * @returns {Promise} A promise that resolves with the byte removed from the front of the queue. + */ + async popFront() { + while (this.empty()) { + await this.lock.acquire(); + } + + let res = this.data[this.head]; + this.head += 1; + this._refactor(); + if (!this.empty()) { + this.lock.forceRelease(); + } + return res; + } + + /** + * Appends byte data to the end of the queue. + * @param {Buffer} data The byte data to append to the queue. + */ + pushBack(data) { + this.data = Buffer.concat([this.data, data]); + this.lock.forceRelease(); + } + + /** + * Refactors the internal data buffer by discarding processed bytes if necessary. + * @private + */ + _refactor() { + if (this.head >= (this.data.byteLength >> 1) && this.head >= 16) { + this.data = this.data.subarray(this.head); + } + } +} diff --git a/shared/utility/index.mjs b/shared/utility/index.mjs index 628e0b0..95a1e27 100644 --- a/shared/utility/index.mjs +++ b/shared/utility/index.mjs @@ -1,3 +1,4 @@ export { areArrayBuffersEqual } from './buffer.mjs'; export { Queue, QueueEmptyError } from './queue.mjs'; export { AsyncLock } from './async-lock.mjs'; +export { AsyncByteQueue } from './async-queue.mjs'; diff --git a/shared/utility/package.json b/shared/utility/package.json index 76ce69b..86ac9a2 100644 --- a/shared/utility/package.json +++ b/shared/utility/package.json @@ -2,5 +2,7 @@ "name": "@og/utility", "type": "module", "main": "index.mjs", - "dependencies": {} + "dependencies": { + "buffer": "^6.0.3" + } } diff --git a/shared/utility/queue.mjs b/shared/utility/queue.mjs index cbe4d72..96a9d08 100644 --- a/shared/utility/queue.mjs +++ b/shared/utility/queue.mjs @@ -86,7 +86,7 @@ export class Queue { * @throws {QueueEmptyError} Throws an error if the queue is empty. */ popFront() { - if (this.empty()) { throw new QueueEmptyError();} + if (this.empty()) { throw new QueueEmptyError(); } this.head += 1; if (this.head == this.data.length) {