From cadf63e22d2789d114fe942d09adc68a2ba11af7 Mon Sep 17 00:00:00 2001 From: Timur Sevimli Date: Sun, 4 Feb 2024 14:05:37 +0300 Subject: [PATCH] Add queue on classes --- lib/queue.class.js | 300 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 300 insertions(+) create mode 100644 lib/queue.class.js diff --git a/lib/queue.class.js b/lib/queue.class.js new file mode 100644 index 0000000..a8c3a27 --- /dev/null +++ b/lib/queue.class.js @@ -0,0 +1,300 @@ +'use strict'; + +const QUEUE_TIMEOUT = 'Metasync: Queue timed out'; + +class Queue { +// Queue constructor +// concurrency - , asynchronous concurrency + constructor(concurrency) { + this.paused = false; + this.concurrency = concurrency; + this.waitTimeout = 0; + this.processTimeout = 0; + this.throttleCount = 0; + this.throttleInterval = 1000; + this.count = 0; + this.tasks = []; + this.waiting = []; + this.factors = {}; + this.fifoMode = true; + this.roundRobinMode = false; + this.priorityMode = false; + this.onProcess = null; + this.onDone = null; + this.onSuccess = null; + this.onTimeout = null; + this.onFailure = null; + this.onDrain = null; + } + +// Set wait before processing timeout +// msec - , wait timeout for single item +// +// Returns: + wait(msec) { + this.waitTimeout = msec; + return this; + } + +// Throttle to limit throughput +// Signature: count[, interval] +// count - , item count +// interval - , per interval, optional +// default: 1000 msec +// +// Returns: + throttle(count, interval = 1000) { + this.throttleCount = count; + this.throttleInterval = interval; + return this; + } + +// Add item to queue +// Signature: item[, factor[, priority]] +// item - , to be added +// factor - | , type, source, +// destination or path, optional +// priority - , optional +// +// Returns: + add(item, factor = 0, priority = 0) { + if (this.priorityMode && !this.roundRobinMode) { + priority = factor; + factor = 0; + } + const task = [item, factor, priority]; + const slot = this.count < this.concurrency; + if (!this.paused && slot && this.onProcess) { + this.next(task); + return this; + } + let tasks; + if (this.roundRobinMode) { + tasks = this.factors[factor]; + if (!tasks) { + tasks = []; + this.factors[factor] = tasks; + this.waiting.push(tasks); + } + } else { + tasks = this.tasks; + } + + if (this.fifoMode) tasks.push(task); + else tasks.unshift(task); + + if (this.priorityMode) { + if (this.fifoMode) { + tasks.sort((a, b) => b[2] - a[2]); + } else { + tasks.sort((a, b) => a[2] - b[2]); + } + } + return this; + } + +// Process next item +// task - , next task [item, factor, priority] +// +// Returns: + next(task) { + const item = task[0]; + let timer; + this.count++; + if (this.processTimeout) { + timer = setTimeout(() => { + const err = new Error(QUEUE_TIMEOUT); + if (this.onTimeout) this.onTimeout(err); + }, this.processTimeout); + } + this.onProcess(item, (err, result) => { + if (this.onDone) this.onDone(err, result); + if (err) { + if (this.onFailure) this.onFailure(err); + } else if (this.onSuccess) { + this.onSuccess(result); + } + if (timer) { + clearTimeout(timer); + timer = null; + } + this.count--; + if (this.tasks.length > 0 || this.waiting.length > 0) { + this.takeNext(); + } else if (this.count === 0 && this.onDrain) { + this.onDrain(); + } + }); + return this; + } + +// Prepare next item for processing +// +// Returns: + takeNext() { + if (this.paused || !this.onProcess) { + return this; + } + let tasks; + if (this.roundRobinMode) { + tasks = this.waiting.shift(); + if (tasks.length > 1) { + this.waiting.push(tasks); + } + } else { + tasks = this.tasks; + } + const task = tasks.shift(); + if (task) this.next(task); + return this; + } + +// This function is not completely implemented yet +// +// Returns: + pause() { + this.paused = true; + return this; + } + +// Resume queue +// This function is not completely implemented yet +// +// Returns: + resume() { + this.paused = false; + return this; + } + +// Clear queue +// +// Returns: + clear() { + this.count = 0; + this.tasks = []; + this.waiting = []; + this.factors = {}; + return this; + } + +// Set timeout interval and listener +// msec - , process timeout for single item +// onTimeout - +// +// Returns: + timeout(msec, onTimeout = null) { + this.processTimeout = msec; + if (onTimeout) this.onTimeout = onTimeout; + return this; + } + +// Set processing function +// fn - +// item - +// callback - +// err - | +// result - +// +// Returns: + process(fn) { + this.onProcess = fn; + return this; + } + +// Set listener on processing done +// fn - , done listener +// err - | +// result - +// +// Returns: + done(fn) { + this.onDone = fn; + return this; + } + +// Set listener on processing success +// listener - , on success +// item - +// +// Returns: + success(listener) { + this.onSuccess = listener; + return this; + } + +// Set listener on processing error +// listener - , on failure +// err - | +// +// Returns: + failure(listener) { + this.onFailure = listener; + return this; + } + +// Set listener on drain Queue +// listener - , on drain +// +// Returns: + drain(listener) { + this.onDrain = listener; + return this; + } + +// Switch to FIFO mode (default for Queue) +// +// Returns: + fifo() { + this.fifoMode = true; + return this; + } + +// Switch to LIFO mode +// +// Returns: + lifo() { + this.fifoMode = false; + return this; + } + +// Activate or deactivate priority mode +// flag - , default: true, false will +// disable priority mode +// +// Returns: + priority(flag = true) { + this.priorityMode = flag; + return this; + } + +// Activate or deactivate round robin mode +// flag - , default: true, false will +// disable roundRobin mode +// +// Returns: + roundRobin(flag = true) { + this.roundRobinMode = flag; + return this; + } + + +// Pipe processed items to different queue +// dest - , destination queue +// +// Returns: + pipe(dest) { + if (dest instanceof Queue) { + this.success((item) => void dest.add(item)); + } + return this; + } +} + +// Create Queue instance +// concurrency - , simultaneous and +// asynchronously executing tasks +// +// Returns: +const queue = (concurrency) => new Queue(concurrency); + +module.exports = { queue, Queue };