From 00e213a633c8fa357e1052cc0294c29990e60770 Mon Sep 17 00:00:00 2001 From: Brad Vogel Date: Fri, 14 Oct 2016 21:05:16 -0700 Subject: [PATCH 01/16] Rewrite the handling of process stalled jobs to be atomic so it doesn't double-process jobs. See https://github.com/OptimalBits/bull/issues/356 for more on how this can happen. Additionally, this addresses long-standing issue where a job can be considered "stalled" even though it was just moved to active and before a lock could be obtained by the worker that moved it (https://github.com/OptimalBits/bull/issues/258), by waiting a grace period (with a default of LOCK_RENEW_TIME) before considering a job as possibly stalled. This gives the (real) worker time to acquire its lock after moving it to active. Note that this includes a small API change: the 'stalled' event is now passed an array of events. --- README.md | 4 ++-- lib/queue.js | 43 +++++++++++++++++-------------------------- lib/scripts.js | 34 ++++++++++++++++++++++------------ 3 files changed, 41 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 4907a2c49..1ef119814 100644 --- a/README.md +++ b/README.md @@ -167,8 +167,8 @@ A queue emits also some useful events: // Job started // You can use jobPromise.cancel() to abort this job. }) -.on('stalled', function(job){ - // The job was considered stalled (i.e. its lock was not renewed in LOCK_RENEW_TIME). +.on('stalled', function(jobs){ + // Array of jobs that were considered 'stalled' and re-enqueued (from 'active' to 'wait'). // Useful for debugging job workers that crash or pause the event loop. }) .on('progress', function(job, progress){ diff --git a/lib/queue.js b/lib/queue.js index 5811e9ab2..e04d6a280 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -225,7 +225,6 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){ // Bind these methods to avoid constant rebinding and/or creating closures // in processJobs etc. this.processStalledJobs = this.processStalledJobs.bind(this); - this.processStalledJob = this.processStalledJob.bind(this); this.getNextJob = this.getNextJob.bind(this); this.processJobs = this.processJobs.bind(this); this.processJob = this.processJob.bind(this); @@ -528,42 +527,34 @@ Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){ }; /** - Process jobs that have been added to the active list but are not being - processed properly. + * Process jobs that have been added to the active list but are not being + * processed properly. This can happen due to a process crash in the middle + * of processing a job, leaving it in 'active' but without a job lock. + * Note that there is no way to know for _certain_ if a job is "stalled" + * (since the process that moved it to active might just be slow to get the + * lock on it), so this function takes a 'grace period' parameter to ignore + * jobs that were created in the recent X milliseconds. + * + * @param {Number?} grace Duration in milliseconds. Ignore jobs created since this many milliseconds ago. + * Defaults to LOCK_RENEW_TIME. */ -Queue.prototype.processStalledJobs = function(){ +Queue.prototype.processStalledJobs = function(grace){ var _this = this; + + grace = grace || this.LOCK_RENEW_TIME; + if(this.closing){ return this.closing; } else{ - return this.client.lrangeAsync(this.toKey('active'), 0, -1).then(function(jobs){ - return Promise.each(jobs, function(jobId) { - return Job.fromId(_this, jobId).then(_this.processStalledJob); - }); + return scripts.processStalledJobs(this, grace).then(function(jobs){ + _this.emit('stalled', jobs); + return null; }).catch(function(err){ console.error(err); }); } }; -Queue.prototype.processStalledJob = function(job){ - var _this = this; - if(this.closing){ - return this.closing; - } - - if(!job){ - return Promise.resolve(); - }else{ - return scripts.getStalledJob(this, job, _this.token).then(function(isStalled){ - if(isStalled){ - _this.distEmit('stalled', job.toJSON()); - return _this.processJob(job, true); - } - }); - } -}; - Queue.prototype.processJobs = function(resolve, reject){ var _this = this; var processJobs = this.processJobs.bind(this, resolve, reject); diff --git a/lib/scripts.js b/lib/scripts.js index f23de70bc..8e6dee7d9 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -355,26 +355,36 @@ var scripts = { }, /** - * Gets a stalled job by locking it and checking it is not already completed. - * Returns a "OK" if the job was locked and not in completed set. + * Iterates to queue and moves all jobs in the active queue that appear to be + * stalled back to the wait state to be reprocessed. */ - getStalledJob: function(queue, job, token){ + processStalledJobs: function(queue, grace){ var script = [ - 'if redis.call("sismember", KEYS[1], ARGV[1]) == 0 then', - ' return redis.call("set", KEYS[2], ARGV[2], "PX", ARGV[3], "NX")', + 'local activeJobs = redis.call("LRANGE", KEYS[1], 0, -1)', + 'local stalled = {}', + 'for _, job in ipairs(activeJobs) do', + ' local jobKey = ARGV[2] .. job', + ' if(redis.call("EXISTS", jobKey .. ":lock") == 0) then', + ' local jobTS = redis.call("HGET", jobKey, "timestamp")', + ' if(jobTS and jobTS < ARGV[1]) then', + ' redis.call("LREM", KEYS[1], 0, job)', + ' redis.call("RPUSH", KEYS[2], job)', + ' table.insert(stalled, job)', + ' end', + ' end', 'end', - 'return 0'].join('\n'); + 'return stalled' + ].join('\n'); var args = [ queue.client, - 'getStalledJob', + 'processStalledJobs', script, 2, - queue.toKey('completed'), - job.lockKey(), - job.jobId, - token, - queue.LOCK_RENEW_TIME + queue.toKey('active'), + queue.toKey('wait'), + Date.now() - grace, + queue.toKey('') ]; return execScript.apply(scripts, args); From 4124ecf8df56f52b02f995c41cbcfea7c87192ed Mon Sep 17 00:00:00 2001 From: Brad Vogel Date: Fri, 14 Oct 2016 21:21:54 -0700 Subject: [PATCH 02/16] Only emit 'stalled' event if there were stalled jobs. --- lib/queue.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/queue.js b/lib/queue.js index e04d6a280..93f496cde 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -547,7 +547,9 @@ Queue.prototype.processStalledJobs = function(grace){ return this.closing; } else{ return scripts.processStalledJobs(this, grace).then(function(jobs){ - _this.emit('stalled', jobs); + if(jobs.length > 0){ + _this.emit('stalled', jobs); + } return null; }).catch(function(err){ console.error(err); From abd20ac1d4d099ed8cb3e72c7821509b26efca2d Mon Sep 17 00:00:00 2001 From: Brad Vogel Date: Mon, 17 Oct 2016 00:43:14 -0700 Subject: [PATCH 03/16] Change process stalled jobs strategy to instead look for an unlocked job in the active queue on every cycle (looking at the oldest active jobs first). --- lib/queue.js | 56 +++++++++++++------------------------------------- lib/scripts.js | 33 ++++++++++++----------------- 2 files changed, 27 insertions(+), 62 deletions(-) diff --git a/lib/queue.js b/lib/queue.js index 93f496cde..7df84ef4e 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -224,7 +224,7 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){ // Bind these methods to avoid constant rebinding and/or creating closures // in processJobs etc. - this.processStalledJobs = this.processStalledJobs.bind(this); + this.processStalledJob = this.processStalledJob.bind(this); this.getNextJob = this.getNextJob.bind(this); this.processJobs = this.processJobs.bind(this); this.processJob = this.processJob.bind(this); @@ -475,21 +475,11 @@ Queue.prototype.run = function(concurrency){ var promises = []; var _this = this; - return this.processStalledJobs().then(function(){ - - while(concurrency--){ - promises.push(new Promise(_this.processJobs)); - } - - // - // Set process Stalled jobs intervall - // - clearInterval(_this.stalledJobsInterval); - _this.stalledJobsInterval = - setInterval(_this.processStalledJobs, _this.LOCK_RENEW_TIME); + while(concurrency--){ + promises.push(new Promise(_this.processJobs)); + } - return Promise.all(promises); - }); + return Promise.all(promises); }; // --------------------------------------------------------------------- @@ -526,35 +516,16 @@ Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){ } }; -/** - * Process jobs that have been added to the active list but are not being - * processed properly. This can happen due to a process crash in the middle - * of processing a job, leaving it in 'active' but without a job lock. - * Note that there is no way to know for _certain_ if a job is "stalled" - * (since the process that moved it to active might just be slow to get the - * lock on it), so this function takes a 'grace period' parameter to ignore - * jobs that were created in the recent X milliseconds. - * - * @param {Number?} grace Duration in milliseconds. Ignore jobs created since this many milliseconds ago. - * Defaults to LOCK_RENEW_TIME. -*/ -Queue.prototype.processStalledJobs = function(grace){ +Queue.prototype.processStalledJob = function() { var _this = this; - grace = grace || this.LOCK_RENEW_TIME; - - if(this.closing){ - return this.closing; - } else{ - return scripts.processStalledJobs(this, grace).then(function(jobs){ - if(jobs.length > 0){ - _this.emit('stalled', jobs); - } - return null; - }).catch(function(err){ - console.error(err); - }); - } + return scripts.getStalledJob(this).then(function(job){ + if (job) { + return _this.getJobFromId(job).then(function(job){ + return _this.processJob(job, true /* Renew the lock */); + }); + } + }); }; Queue.prototype.processJobs = function(resolve, reject){ @@ -564,6 +535,7 @@ Queue.prototype.processJobs = function(resolve, reject){ if(!this.closing){ process.nextTick(function(){ (_this.paused || Promise.resolve()) + .then(_this.processStalledJob) .then(_this.getNextJob) .then(_this.processJob) .then(processJobs, function(err){ diff --git a/lib/scripts.js b/lib/scripts.js index 8e6dee7d9..3a00f9c41 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -354,37 +354,30 @@ var scripts = { return execScript.apply(scripts, args); }, - /** - * Iterates to queue and moves all jobs in the active queue that appear to be - * stalled back to the wait state to be reprocessed. - */ - processStalledJobs: function(queue, grace){ + getStalledJob: function(queue){ var script = [ 'local activeJobs = redis.call("LRANGE", KEYS[1], 0, -1)', - 'local stalled = {}', - 'for _, job in ipairs(activeJobs) do', - ' local jobKey = ARGV[2] .. job', - ' if(redis.call("EXISTS", jobKey .. ":lock") == 0) then', - ' local jobTS = redis.call("HGET", jobKey, "timestamp")', - ' if(jobTS and jobTS < ARGV[1]) then', - ' redis.call("LREM", KEYS[1], 0, job)', - ' redis.call("RPUSH", KEYS[2], job)', - ' table.insert(stalled, job)', - ' end', + // Iterate active jobs in reverse, as stuck jobs are more likely to appear + // at the end of the active list. Jobs in the beginning probably just got + // moved there. + 'for i = #activeJobs, 1, -1 do', + ' local job = activeJobs[i]', + ' local jobKey = ARGV[1] .. job', + ' if(redis.call("SET", jobKey .. ":lock", ARGV[2], "PX", ARGV[3], "NX")) then', + ' return job', ' end', 'end', - 'return stalled' ].join('\n'); var args = [ queue.client, 'processStalledJobs', script, - 2, + 1, queue.toKey('active'), - queue.toKey('wait'), - Date.now() - grace, - queue.toKey('') + queue.toKey(''), + queue.token, + queue.LOCK_RENEW_TIME ]; return execScript.apply(scripts, args); From 417ba9d3de0dad34a32edabf36d3b26a9e62b179 Mon Sep 17 00:00:00 2001 From: Brad Vogel Date: Mon, 17 Oct 2016 16:00:50 -0700 Subject: [PATCH 04/16] Emit event if the job has been started (for demonstration - the 'started' property needs to be written). --- lib/queue.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/queue.js b/lib/queue.js index 7df84ef4e..1918c60a0 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -522,6 +522,9 @@ Queue.prototype.processStalledJob = function() { return scripts.getStalledJob(this).then(function(job){ if (job) { return _this.getJobFromId(job).then(function(job){ + if (job.started) { + _this.emit('stalled', job); + } return _this.processJob(job, true /* Renew the lock */); }); } From afc980c92523341ef7d6c74d34ad8863d7bc8a49 Mon Sep 17 00:00:00 2001 From: Brad Vogel Date: Thu, 20 Oct 2016 15:49:24 -0700 Subject: [PATCH 05/16] Comment fixes. --- lib/job.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/job.js b/lib/job.js index 1a2f4cfcf..11c8c65dd 100644 --- a/lib/job.js +++ b/lib/job.js @@ -110,10 +110,10 @@ Job.prototype.toJSON = function(){ stacktrace: this.stacktrace || null, returnvalue: this.returnvalue || null }; -} +}; /** - Return a unique key representin a lock for this Job + Return a unique key representing a lock for this Job */ Job.prototype.lockKey = function(){ return this.queue.toKey(this.jobId) + ':lock'; @@ -138,7 +138,7 @@ Job.prototype.takeLock = function(token, renew){ Renews a lock so that it gets some more time before expiring. */ Job.prototype.renewLock = function(token){ - return this.takeLock(token, true); + return this.takeLock(token, true /* Renew */); }; /** From 0f60b9371d0192b65505f04d36652b6359c0be75 Mon Sep 17 00:00:00 2001 From: Brad Vogel Date: Thu, 20 Oct 2016 15:57:40 -0700 Subject: [PATCH 06/16] Change the stalled job processing strategy per https://github.com/OptimalBits/bull/pull/359#issuecomment-254427413. --- lib/job.js | 9 +---- lib/queue.js | 68 ++++++++++++++++++++++++--------- lib/scripts.js | 102 +++++++++++++++++++++++++++++++++++++++++-------- 3 files changed, 138 insertions(+), 41 deletions(-) diff --git a/lib/job.js b/lib/job.js index 11c8c65dd..3fee61242 100644 --- a/lib/job.js +++ b/lib/job.js @@ -124,13 +124,8 @@ Job.prototype.lockKey = function(){ same time. */ Job.prototype.takeLock = function(token, renew){ - var args = [this.lockKey(), token, 'PX', this.queue.LOCK_RENEW_TIME]; - if(!renew){ - args.push('NX'); - } - - return this.queue.client.setAsync.apply(this.queue.client, args).then(function(result){ - return result === 'OK'; + return scripts.takeLock(this.queue, this, renew).then(function(res){ + return res === 1; // Indicates successful lock. }); }; diff --git a/lib/queue.js b/lib/queue.js index 1918c60a0..21ac60ed5 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -48,6 +48,14 @@ Promise.promisifyAll(redis.Multi.prototype); */ var MINIMUM_REDIS_VERSION = '2.8.11'; var LOCK_RENEW_TIME = 5000; // 5 seconds is the renew time. + +// The interval for which to check for stalled jobs. +var STALLED_JOB_CHECK_INTERVAL = 5000; // 5 seconds is the renew time. + +// The maximum number of times a job can be recovered from the 'stalled' state +// (moved back to 'wait'), before it is failed. +var MAX_STALLED_JOB_COUNT = 1; + var CLIENT_CLOSE_TIMEOUT_MS = 5000; var POLLING_INTERVAL = 5000; @@ -128,6 +136,8 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){ this.token = uuid(); this.LOCK_RENEW_TIME = LOCK_RENEW_TIME; + this.STALLED_JOB_CHECK_INTERVAL = STALLED_JOB_CHECK_INTERVAL; + this.MAX_STALLED_JOB_COUNT = MAX_STALLED_JOB_COUNT; // bubble up Redis error events [this.client, this.bclient, this.eclient].forEach(function (client) { @@ -224,7 +234,7 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){ // Bind these methods to avoid constant rebinding and/or creating closures // in processJobs etc. - this.processStalledJob = this.processStalledJob.bind(this); + this.moveUnlockedJobsToWait = this.moveUnlockedJobsToWait.bind(this); this.getNextJob = this.getNextJob.bind(this); this.processJobs = this.processJobs.bind(this); this.processJob = this.processJob.bind(this); @@ -276,7 +286,7 @@ Queue.prototype.close = function( doNotWaitJobs ){ return this.closing = this._initializing.then(function(){ clearTimeout(_this.delayTimer); clearInterval(_this.guardianTimer); - clearInterval(_this.stalledJobsInterval); + clearInterval(_this.moveUnlockedJobsToWaitInterval); _this.timers.clearAll(); return _this.timers.whenIdle().then(function(){ @@ -475,11 +485,20 @@ Queue.prototype.run = function(concurrency){ var promises = []; var _this = this; - while(concurrency--){ - promises.push(new Promise(_this.processJobs)); - } + return this.moveUnlockedJobsToWait().then(function(){ + + while(concurrency--){ + promises.push(new Promise(_this.processJobs)); + } - return Promise.all(promises); + if (_this.STALLED_JOB_CHECK_INTERVAL > 0){ + clearInterval(_this.moveUnlockedJobsToWaitInterval); + _this.moveUnlockedJobsToWaitInterval = + setInterval(_this.moveUnlockedJobsToWait, _this.STALLED_JOB_CHECK_INTERVAL); + } + + return Promise.all(promises); + }); }; // --------------------------------------------------------------------- @@ -516,19 +535,33 @@ Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){ } }; -Queue.prototype.processStalledJob = function() { +/** + * Process jobs that have been added to the active list but are not being + * processed properly. This can happen due to a process crash in the middle + * of processing a job, leaving it in 'active' but without a job lock. + * Note that there is no way to know for _certain_ if a job is "stalled" + * (since the process that moved it to active might just be slow to get the + * lock on it), so this function takes a 'grace period' parameter to ignore + * jobs that were created in the recent X milliseconds. + * + * @param {Number?} grace Duration in milliseconds. Ignore jobs created since this many milliseconds ago. + * Defaults to LOCK_RENEW_TIME. +*/ +Queue.prototype.moveUnlockedJobsToWait = function(){ var _this = this; - return scripts.getStalledJob(this).then(function(job){ - if (job) { - return _this.getJobFromId(job).then(function(job){ - if (job.started) { - _this.emit('stalled', job); - } - return _this.processJob(job, true /* Renew the lock */); - }); - } - }); + if(this.closing){ + return this.closing; + } else{ + return scripts.moveUnlockedJobsToWait(this).then(function(stalledJobIds){ + if (stalledJobIds.length > 0) { + _this.emit('stalled', stalledJobIds); + } + return null; + }).catch(function(err){ + console.error(err); + }); + } }; Queue.prototype.processJobs = function(resolve, reject){ @@ -538,7 +571,6 @@ Queue.prototype.processJobs = function(resolve, reject){ if(!this.closing){ process.nextTick(function(){ (_this.paused || Promise.resolve()) - .then(_this.processStalledJob) .then(_this.getNextJob) .then(_this.processJob) .then(processJobs, function(err){ diff --git a/lib/scripts.js b/lib/scripts.js index 3a00f9c41..651ea19c0 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -281,13 +281,49 @@ var scripts = { return execScript.apply(scripts, args); }, + + /** + * Takes a lock + */ + takeLock: function(queue, job, renew){ + var lockCall; + if (renew){ + lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])'; + } else { + lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2], "NX")'; + } + + var script = [ + 'if(' + lockCall + ') then', + // Mark the job as having been locked at least once. Used to determine if the job was stalled. + ' redis.call("HSET", KEYS[2], "lockAcquired", "1")', + ' return 1', + 'else', + ' return 0', + 'end' + ].join('\n'); + + var args = [ + queue.client, + 'takeLock' + (renew ? 'Renew' : ''), + script, + 2, + job.lockKey(), + queue.toKey(job.jobId), + queue.token, + queue.LOCK_RENEW_TIME + ]; + + return execScript.apply(scripts, args); + }, + releaseLock: function(job, token){ var script = [ 'if redis.call("get", KEYS[1]) == ARGV[1]', 'then', - 'return redis.call("del", KEYS[1])', + ' return redis.call("del", KEYS[1])', 'else', - 'return 0', + ' return 0', 'end'].join('\n'); var args = [ @@ -354,30 +390,64 @@ var scripts = { return execScript.apply(scripts, args); }, - getStalledJob: function(queue){ + /** + * Looks for unlocked jobs in the active queue. There are two circumstances in which a job + * would be in 'active' but NOT have a job lock: + * + * Case A) The job was being worked on, but the worker process died and it failed to renew the lock. + * We call these jobs 'stalled'. This is the most common case. We resolve these by moving them + * back to wait to be re-processed. To prevent jobs from cycling endlessly between active and wait, + * (e.g. if the job handler keeps crashing), we limit the number stalled job recoveries to 100. + + * Case B) The job was just moved to 'active' from 'wait' and the worker that moved it hasn't gotten + * a lock yet, or died immediately before getting the lock (note that due to Redis limitations, the + * worker can't move the job and get the lock atomically - https://github.com/OptimalBits/bull/issues/258). + * For this case we also move the job back to 'wait' for reprocessing, but don't consider it 'stalled' + * since the job had never been started. This case is much rarer than Case A due to the very small + * timing window in which it must occur. + */ + moveUnlockedJobsToWait: function(queue){ var script = [ + 'local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1])', 'local activeJobs = redis.call("LRANGE", KEYS[1], 0, -1)', - // Iterate active jobs in reverse, as stuck jobs are more likely to appear - // at the end of the active list. Jobs in the beginning probably just got - // moved there. - 'for i = #activeJobs, 1, -1 do', - ' local job = activeJobs[i]', - ' local jobKey = ARGV[1] .. job', - ' if(redis.call("SET", jobKey .. ":lock", ARGV[2], "PX", ARGV[3], "NX")) then', - ' return job', - ' end', + 'local stalled = {}', + 'for _, job in ipairs(activeJobs) do', + ' local jobKey = ARGV[2] .. job', + ' if(redis.call("EXISTS", jobKey .. ":lock") == 0) then', + // Remove from the active queue. + ' redis.call("LREM", KEYS[1], 1, job)', + ' local lockAcquired = redis.call("HGET", jobKey, "lockAcquired")', + ' if(lockAcquired) then', + // If it was previously locked then we consider it 'stalled' (Case A above). + ' table.insert(stalled, job)', + // If this job has been stalled too many times, such as if it crashes the worker, then fail it. + ' local stalledCount = redis.call("HINCRBY", jobKey, "stalledCounter", 1)', + ' if(stalledCount > MAX_STALLED_JOB_COUNT) then', + ' redis.call("SADD", KEYS[3], job)', + ' redis.call("HSET", jobKey, "failedReason", "job stalled more than allowable limit")', + ' else', + // Move the job back to the wait queue, to immediately be picked up by a waiting worker. + ' redis.call("RPUSH", KEYS[2], job)', + ' end', + ' else', + // Move the job back to the wait queue, to immediately be picked up by a waiting worker. + ' redis.call("RPUSH", KEYS[2], job)', + ' end', + ' end', 'end', + 'return stalled' ].join('\n'); var args = [ queue.client, 'processStalledJobs', script, - 1, + 3, queue.toKey('active'), - queue.toKey(''), - queue.token, - queue.LOCK_RENEW_TIME + queue.toKey('wait'), + queue.toKey('failed'), + queue.MAX_STALLED_JOB_COUNT, + queue.toKey('') ]; return execScript.apply(scripts, args); From 372fa3fe80b6354712dd1da340c8eca75f262579 Mon Sep 17 00:00:00 2001 From: Brad Vogel Date: Fri, 21 Oct 2016 17:22:47 -0700 Subject: [PATCH 07/16] Remove part of comment that no longer applies. --- lib/queue.js | 7 ------- 1 file changed, 7 deletions(-) diff --git a/lib/queue.js b/lib/queue.js index 21ac60ed5..834b1feee 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -539,13 +539,6 @@ Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){ * Process jobs that have been added to the active list but are not being * processed properly. This can happen due to a process crash in the middle * of processing a job, leaving it in 'active' but without a job lock. - * Note that there is no way to know for _certain_ if a job is "stalled" - * (since the process that moved it to active might just be slow to get the - * lock on it), so this function takes a 'grace period' parameter to ignore - * jobs that were created in the recent X milliseconds. - * - * @param {Number?} grace Duration in milliseconds. Ignore jobs created since this many milliseconds ago. - * Defaults to LOCK_RENEW_TIME. */ Queue.prototype.moveUnlockedJobsToWait = function(){ var _this = this; From 2abd183e3d8336c646905bf624792a10133c894e Mon Sep 17 00:00:00 2001 From: Brad Vogel Date: Fri, 21 Oct 2016 18:20:01 -0700 Subject: [PATCH 08/16] Emit 'stalled' and 'failed' events when processing unlocked jobs in 'active'. --- README.md | 5 ++--- lib/queue.js | 19 ++++++++++++++----- lib/scripts.js | 10 ++++++---- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 1ef119814..1a02f7433 100644 --- a/README.md +++ b/README.md @@ -167,9 +167,8 @@ A queue emits also some useful events: // Job started // You can use jobPromise.cancel() to abort this job. }) -.on('stalled', function(jobs){ - // Array of jobs that were considered 'stalled' and re-enqueued (from 'active' to 'wait'). - // Useful for debugging job workers that crash or pause the event loop. +.on('stalled', function(job){ + // Job that was considered stalled. Useful for debugging job workers that crash or pause the event loop. }) .on('progress', function(job, progress){ // Job progress updated! diff --git a/lib/queue.js b/lib/queue.js index 834b1feee..ca9f656a3 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -546,11 +546,20 @@ Queue.prototype.moveUnlockedJobsToWait = function(){ if(this.closing){ return this.closing; } else{ - return scripts.moveUnlockedJobsToWait(this).then(function(stalledJobIds){ - if (stalledJobIds.length > 0) { - _this.emit('stalled', stalledJobIds); - } - return null; + return scripts.moveUnlockedJobsToWait(this).then(function(responses){ + var handleFailedJobs = responses[0].map(function(jobId){ + return _this.getJobFromId(jobId).then(function(job){ + _this.distEmit('failed', job.toJSON(), new Error('job stalled more than allowable limit')); + return null; + }); + }); + var handleStalledJobs = responses[1].map(function(jobId){ + return _this.getJobFromId(jobId).then(function(job){ + _this.distEmit('stalled', job.toJSON()); + return null; + }); + }); + return Promise.all(handleFailedJobs.concat(handleStalledJobs)); }).catch(function(err){ console.error(err); }); diff --git a/lib/scripts.js b/lib/scripts.js index 651ea19c0..d32827f3f 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -411,6 +411,7 @@ var scripts = { 'local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1])', 'local activeJobs = redis.call("LRANGE", KEYS[1], 0, -1)', 'local stalled = {}', + 'local failed = {}', 'for _, job in ipairs(activeJobs) do', ' local jobKey = ARGV[2] .. job', ' if(redis.call("EXISTS", jobKey .. ":lock") == 0) then', @@ -418,16 +419,17 @@ var scripts = { ' redis.call("LREM", KEYS[1], 1, job)', ' local lockAcquired = redis.call("HGET", jobKey, "lockAcquired")', ' if(lockAcquired) then', - // If it was previously locked then we consider it 'stalled' (Case A above). - ' table.insert(stalled, job)', - // If this job has been stalled too many times, such as if it crashes the worker, then fail it. + // If it was previously locked then we consider it 'stalled' (Case A above). If this job + // has been stalled too many times, such as if it crashes the worker, then fail it. ' local stalledCount = redis.call("HINCRBY", jobKey, "stalledCounter", 1)', ' if(stalledCount > MAX_STALLED_JOB_COUNT) then', ' redis.call("SADD", KEYS[3], job)', ' redis.call("HSET", jobKey, "failedReason", "job stalled more than allowable limit")', + ' table.insert(failed, job)', ' else', // Move the job back to the wait queue, to immediately be picked up by a waiting worker. ' redis.call("RPUSH", KEYS[2], job)', + ' table.insert(stalled, job)', ' end', ' else', // Move the job back to the wait queue, to immediately be picked up by a waiting worker. @@ -435,7 +437,7 @@ var scripts = { ' end', ' end', 'end', - 'return stalled' + 'return {failed, stalled}' ].join('\n'); var args = [ From 6c67253d3dcc076f3ebf7c81a7498ee9f4b2033a Mon Sep 17 00:00:00 2001 From: Brad Vogel Date: Fri, 21 Oct 2016 18:20:30 -0700 Subject: [PATCH 09/16] Add notes to readme about automatic job recovery. --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 1a02f7433..cbbe0b75e 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ Features: - Priority. - Concurrency. - Pause/resume (globally or locally). +- Automatic recovery from process crashes. UIs: ---- @@ -241,6 +242,7 @@ Important Notes The queue aims for "at most once" working strategy. When a worker is processing a job, it will keep the job locked until the work is done. However, it is important that the worker does not lock the event loop too long, otherwise other workers could pick the job believing that the worker processing it has been stalled. +If the process that is handling the job fails the reacquire the lock (because it hung or crashed), the job will be automatically restarted by any worker. Useful patterns --------------- From 3a6931923d0eb6269d2d961969ca2fccd3bedf55 Mon Sep 17 00:00:00 2001 From: Brad Vogel Date: Fri, 21 Oct 2016 18:20:43 -0700 Subject: [PATCH 10/16] Clearer formatting. --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index cbbe0b75e..f9293175d 100644 --- a/README.md +++ b/README.md @@ -35,9 +35,9 @@ UIs: There are a few third party UIs that can be used for easier administration of the queues (not in any particular order): -[matador](https://github.com/ShaneK/Matador) -[react-bull](https://github.com/kfatehi/react-bull) -[toureiro](https://github.com/Epharmix/Toureiro) +* [matador](https://github.com/ShaneK/Matador) +* [react-bull](https://github.com/kfatehi/react-bull) +* [toureiro](https://github.com/Epharmix/Toureiro) We also have an official UI which is at the moment bare bones project: [bull-ui](https://github.com/OptimalBits/bull-ui) From 6c9250de79e3e097c734ea2d7e0a30368779c431 Mon Sep 17 00:00:00 2001 From: Brad Vogel Date: Fri, 21 Oct 2016 18:21:12 -0700 Subject: [PATCH 11/16] Fix comment. --- lib/scripts.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/scripts.js b/lib/scripts.js index d32827f3f..60021bd20 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -397,7 +397,7 @@ var scripts = { * Case A) The job was being worked on, but the worker process died and it failed to renew the lock. * We call these jobs 'stalled'. This is the most common case. We resolve these by moving them * back to wait to be re-processed. To prevent jobs from cycling endlessly between active and wait, - * (e.g. if the job handler keeps crashing), we limit the number stalled job recoveries to 100. + * (e.g. if the job handler keeps crashing), we limit the number stalled job recoveries to MAX_STALLED_JOB_COUNT. * Case B) The job was just moved to 'active' from 'wait' and the worker that moved it hasn't gotten * a lock yet, or died immediately before getting the lock (note that due to Redis limitations, the From 88db693b154e49788a3b4eb71b9689820fc8ea24 Mon Sep 17 00:00:00 2001 From: Brad Vogel Date: Fri, 21 Oct 2016 18:28:18 -0700 Subject: [PATCH 12/16] Add error text per PR review. --- lib/queue.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/queue.js b/lib/queue.js index ca9f656a3..366a86de9 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -561,7 +561,7 @@ Queue.prototype.moveUnlockedJobsToWait = function(){ }); return Promise.all(handleFailedJobs.concat(handleStalledJobs)); }).catch(function(err){ - console.error(err); + console.error('Failed to handle unlocked job in active:', err); }); } }; From 8ad21bd3dffbd774cca85d453a2b0c22cc664826 Mon Sep 17 00:00:00 2001 From: Brad Vogel Date: Mon, 24 Oct 2016 11:53:50 -0700 Subject: [PATCH 13/16] Fix indentation. --- lib/queue.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/queue.js b/lib/queue.js index 366a86de9..1ddb466b8 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -549,14 +549,14 @@ Queue.prototype.moveUnlockedJobsToWait = function(){ return scripts.moveUnlockedJobsToWait(this).then(function(responses){ var handleFailedJobs = responses[0].map(function(jobId){ return _this.getJobFromId(jobId).then(function(job){ - _this.distEmit('failed', job.toJSON(), new Error('job stalled more than allowable limit')); - return null; + _this.distEmit('failed', job.toJSON(), new Error('job stalled more than allowable limit')); + return null; }); }); var handleStalledJobs = responses[1].map(function(jobId){ return _this.getJobFromId(jobId).then(function(job){ - _this.distEmit('stalled', job.toJSON()); - return null; + _this.distEmit('stalled', job.toJSON()); + return null; }); }); return Promise.all(handleFailedJobs.concat(handleStalledJobs)); From b0253835ca783a270eff3e898dc2b93ff574640a Mon Sep 17 00:00:00 2001 From: Brad Vogel Date: Mon, 24 Oct 2016 11:55:03 -0700 Subject: [PATCH 14/16] Use new processStalledJobs replacement. --- lib/priority-queue.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/priority-queue.js b/lib/priority-queue.js index c4790a69d..4fc594bad 100644 --- a/lib/priority-queue.js +++ b/lib/priority-queue.js @@ -120,7 +120,7 @@ PriorityQueue.prototype.run = function() { var i = 0; var fn = function() { - return queue.processStalledJobs().then(queue.getNextJob.bind(queue, { + return queue.moveUnlockedJobsToWait().then(queue.getNextJob.bind(queue, { block: false })) .then(function(job) { From 70996b6d1f645d6a5dd2b64935361e461968b3e4 Mon Sep 17 00:00:00 2001 From: Brad Vogel Date: Mon, 24 Oct 2016 11:55:21 -0700 Subject: [PATCH 15/16] Rename script to match function name. --- lib/scripts.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/scripts.js b/lib/scripts.js index 60021bd20..36938467f 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -442,7 +442,7 @@ var scripts = { var args = [ queue.client, - 'processStalledJobs', + 'moveUnlockedJobsToWait', script, 3, queue.toKey('active'), From 9fdea1674a30b52025ac39130ab0369532691227 Mon Sep 17 00:00:00 2001 From: Brad Vogel Date: Mon, 24 Oct 2016 12:13:15 -0700 Subject: [PATCH 16/16] Use token passed in to the job. Fixes broken unit test. --- lib/job.js | 2 +- lib/scripts.js | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/job.js b/lib/job.js index 3fee61242..a0d2f0d80 100644 --- a/lib/job.js +++ b/lib/job.js @@ -124,7 +124,7 @@ Job.prototype.lockKey = function(){ same time. */ Job.prototype.takeLock = function(token, renew){ - return scripts.takeLock(this.queue, this, renew).then(function(res){ + return scripts.takeLock(this.queue, this, token, renew).then(function(res){ return res === 1; // Indicates successful lock. }); }; diff --git a/lib/scripts.js b/lib/scripts.js index 36938467f..0fba966aa 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -285,7 +285,7 @@ var scripts = { /** * Takes a lock */ - takeLock: function(queue, job, renew){ + takeLock: function(queue, job, token, renew){ var lockCall; if (renew){ lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])'; @@ -310,7 +310,7 @@ var scripts = { 2, job.lockKey(), queue.toKey(job.jobId), - queue.token, + token, queue.LOCK_RENEW_TIME ];