[js][utility] add AsyncByteQueue

This commit is contained in:
方而静 2024-02-10 21:15:02 +08:00
parent 27a4be731d
commit 5ab2725947
Signed by: szTom
GPG Key ID: 072D999D60C6473C
5 changed files with 155 additions and 62 deletions

View File

@ -7,76 +7,76 @@ import { Queue } from './queue.mjs';
* @class * @class
*/ */
export class AsyncLock { export class AsyncLock {
/** /**
* Creates an instance of AsyncLock. * Creates an instance of AsyncLock.
* @constructor * @constructor
*/ */
constructor() { constructor() {
/** /**
* A queue to store pending requests for acquiring the lock. * A queue to store pending requests for acquiring the lock.
* @type {Queue} * @type {Queue}
* @private * @private
*/ */
this.pending_queue = new Queue(); this.pending_queue = new Queue();
/** /**
* The current state of the lock. * The current state of the lock.
* @type {boolean} * @type {boolean}
* @private * @private
*/ */
this.state = false; this.state = false;
/** /**
* The ID associated with the current lock holder. * The ID associated with the current lock holder.
* @type {number} * @type {number}
* @private * @private
*/ */
this.lock_id = 0; this.lock_id = 0;
} }
/** /**
* Checks the current state of the lock. * Checks the current state of the lock.
* @returns {boolean} True if the lock is acquired; otherwise, false. * @returns {boolean} True if the lock is acquired; otherwise, false.
*/ */
query() { query() {
return this.state; return this.state;
} }
/** /**
* Acquires the lock asynchronously. * Acquires the lock asynchronously.
* @returns {Promise<number>} A promise that resolves with the unique ID associated with the acquired lock. * @returns {Promise<number>} A promise that resolves with the unique ID associated with the acquired lock.
*/ */
acquire() { acquire() {
if (!this.state) { if (!this.state) {
this.state = true; this.state = true;
this.lock_id += 1; this.lock_id += 1;
return Promise.resolve(this.lock_id); return Promise.resolve(this.lock_id);
} }
return new Promise((resolve, _reject) => this.pending_queue.push(resolve)); return new Promise((resolve, _reject) => this.pending_queue.push(resolve));
} }
/** /**
* Releases the lock with the specified ID. * Releases the lock with the specified ID.
* @param {number} lock_id The ID of the lock to release. * @param {number} lock_id The ID of the lock to release.
*/ */
release(lock_id) { release(lock_id) {
if (!this.state || lock_id !== this.lock_id) { return; } if (!this.state || lock_id !== this.lock_id) { return; }
if (this.pending_queue.empty()) { if (this.pending_queue.empty()) {
this.state = false; this.state = false;
return; return;
} }
let resolve = this.pending_queue.front(); let resolve = this.pending_queue.front();
this.pending_queue.popFront(); this.pending_queue.popFront();
this.lock_id += 1; this.lock_id += 1;
resolve(this.lock_id); resolve(this.lock_id);
} }
/** /**
* Releases the lock forcefully even you are not the owner. * Releases the lock forcefully even you are not the owner.
* BE CAREFUL WHAT YOU WISH FOR!! * BE CAREFUL WHAT YOU WISH FOR!!
*/ */
forceRelease() { forceRelease() {
this.release(this.lock_id); this.release(this.lock_id);
} }
}; };

View File

@ -0,0 +1,90 @@
import { Buffer } from 'buffer';
import { AsyncLock } from './async-lock.mjs';
/**
* Represents an asynchronous byte queue for managing byte data.
*/
export class AsyncByteQueue {
/**
* Creates an instance of AsyncByteQueue.
*/
constructor() {
/**
* The buffer to store byte data.
* @type {Buffer}
* @private
*/
this.data = Buffer.alloc(0);
/**
* The index indicating the head of the byte queue.
* @type {number}
* @private
*/
this.head = 0;
/**
* The lock used to manage access to the byte queue.
* @type {AsyncLock}
* @private
*/
this.lock = new AsyncLock();
}
/**
* Checks if the byte queue is empty.
* @returns {boolean} True if the byte queue is empty; otherwise, false.
*/
empty() {
return this.head === this.data.byteLength;
}
/**
* Retrieves the byte at the front of the queue asynchronously.
* @returns {Promise<number>} A promise that resolves with the byte at the front of the queue.
*/
async front() {
while (this.empty()) {
await this.lock.acquire();
}
this.lock.forceRelease();
return this.data[this.head];
}
/**
* Removes and retrieves the byte at the front of the queue asynchronously.
* @returns {Promise<number>} A promise that resolves with the byte removed from the front of the queue.
*/
async popFront() {
while (this.empty()) {
await this.lock.acquire();
}
let res = this.data[this.head];
this.head += 1;
this._refactor();
if (!this.empty()) {
this.lock.forceRelease();
}
return res;
}
/**
* Appends byte data to the end of the queue.
* @param {Buffer} data The byte data to append to the queue.
*/
pushBack(data) {
this.data = Buffer.concat([this.data, data]);
this.lock.forceRelease();
}
/**
* Refactors the internal data buffer by discarding processed bytes if necessary.
* @private
*/
_refactor() {
if (this.head >= (this.data.byteLength >> 1) && this.head >= 16) {
this.data = this.data.subarray(this.head);
}
}
}

View File

@ -1,3 +1,4 @@
export { areArrayBuffersEqual } from './buffer.mjs'; export { areArrayBuffersEqual } from './buffer.mjs';
export { Queue, QueueEmptyError } from './queue.mjs'; export { Queue, QueueEmptyError } from './queue.mjs';
export { AsyncLock } from './async-lock.mjs'; export { AsyncLock } from './async-lock.mjs';
export { AsyncByteQueue } from './async-queue.mjs';

View File

@ -2,5 +2,7 @@
"name": "@og/utility", "name": "@og/utility",
"type": "module", "type": "module",
"main": "index.mjs", "main": "index.mjs",
"dependencies": {} "dependencies": {
"buffer": "^6.0.3"
}
} }

View File

@ -86,7 +86,7 @@ export class Queue {
* @throws {QueueEmptyError} Throws an error if the queue is empty. * @throws {QueueEmptyError} Throws an error if the queue is empty.
*/ */
popFront() { popFront() {
if (this.empty()) { throw new QueueEmptyError();} if (this.empty()) { throw new QueueEmptyError(); }
this.head += 1; this.head += 1;
if (this.head == this.data.length) { if (this.head == this.data.length) {