diff --git a/README.md b/README.md index 4907a2c49..f9293175d 100644 --- a/README.md +++ b/README.md @@ -28,15 +28,16 @@ Features: - Priority. - Concurrency. - Pause/resume (globally or locally). +- Automatic recovery from process crashes. UIs: ---- There are a few third party UIs that can be used for easier administration of the queues (not in any particular order): -[matador](https://github.com/ShaneK/Matador) -[react-bull](https://github.com/kfatehi/react-bull) -[toureiro](https://github.com/Epharmix/Toureiro) +* [matador](https://github.com/ShaneK/Matador) +* [react-bull](https://github.com/kfatehi/react-bull) +* [toureiro](https://github.com/Epharmix/Toureiro) We also have an official UI which is at the moment bare bones project: [bull-ui](https://github.com/OptimalBits/bull-ui) @@ -168,8 +169,7 @@ A queue emits also some useful events: // You can use jobPromise.cancel() to abort this job. }) .on('stalled', function(job){ - // The job was considered stalled (i.e. its lock was not renewed in LOCK_RENEW_TIME). - // Useful for debugging job workers that crash or pause the event loop. + // Job that was considered stalled. Useful for debugging job workers that crash or pause the event loop. }) .on('progress', function(job, progress){ // Job progress updated! @@ -242,6 +242,7 @@ Important Notes The queue aims for "at most once" working strategy. When a worker is processing a job, it will keep the job locked until the work is done. However, it is important that the worker does not lock the event loop too long, otherwise other workers could pick the job believing that the worker processing it has been stalled. +If the process that is handling the job fails the reacquire the lock (because it hung or crashed), the job will be automatically restarted by any worker. Useful patterns --------------- diff --git a/lib/job.js b/lib/job.js index 1a2f4cfcf..a0d2f0d80 100644 --- a/lib/job.js +++ b/lib/job.js @@ -110,10 +110,10 @@ Job.prototype.toJSON = function(){ stacktrace: this.stacktrace || null, returnvalue: this.returnvalue || null }; -} +}; /** - Return a unique key representin a lock for this Job + Return a unique key representing a lock for this Job */ Job.prototype.lockKey = function(){ return this.queue.toKey(this.jobId) + ':lock'; @@ -124,13 +124,8 @@ Job.prototype.lockKey = function(){ same time. */ Job.prototype.takeLock = function(token, renew){ - var args = [this.lockKey(), token, 'PX', this.queue.LOCK_RENEW_TIME]; - if(!renew){ - args.push('NX'); - } - - return this.queue.client.setAsync.apply(this.queue.client, args).then(function(result){ - return result === 'OK'; + return scripts.takeLock(this.queue, this, token, renew).then(function(res){ + return res === 1; // Indicates successful lock. }); }; @@ -138,7 +133,7 @@ Job.prototype.takeLock = function(token, renew){ Renews a lock so that it gets some more time before expiring. */ Job.prototype.renewLock = function(token){ - return this.takeLock(token, true); + return this.takeLock(token, true /* Renew */); }; /** diff --git a/lib/priority-queue.js b/lib/priority-queue.js index c4790a69d..4fc594bad 100644 --- a/lib/priority-queue.js +++ b/lib/priority-queue.js @@ -120,7 +120,7 @@ PriorityQueue.prototype.run = function() { var i = 0; var fn = function() { - return queue.processStalledJobs().then(queue.getNextJob.bind(queue, { + return queue.moveUnlockedJobsToWait().then(queue.getNextJob.bind(queue, { block: false })) .then(function(job) { diff --git a/lib/queue.js b/lib/queue.js index 5811e9ab2..1ddb466b8 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -48,6 +48,14 @@ Promise.promisifyAll(redis.Multi.prototype); */ var MINIMUM_REDIS_VERSION = '2.8.11'; var LOCK_RENEW_TIME = 5000; // 5 seconds is the renew time. + +// The interval for which to check for stalled jobs. +var STALLED_JOB_CHECK_INTERVAL = 5000; // 5 seconds is the renew time. + +// The maximum number of times a job can be recovered from the 'stalled' state +// (moved back to 'wait'), before it is failed. +var MAX_STALLED_JOB_COUNT = 1; + var CLIENT_CLOSE_TIMEOUT_MS = 5000; var POLLING_INTERVAL = 5000; @@ -128,6 +136,8 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){ this.token = uuid(); this.LOCK_RENEW_TIME = LOCK_RENEW_TIME; + this.STALLED_JOB_CHECK_INTERVAL = STALLED_JOB_CHECK_INTERVAL; + this.MAX_STALLED_JOB_COUNT = MAX_STALLED_JOB_COUNT; // bubble up Redis error events [this.client, this.bclient, this.eclient].forEach(function (client) { @@ -224,8 +234,7 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){ // Bind these methods to avoid constant rebinding and/or creating closures // in processJobs etc. - this.processStalledJobs = this.processStalledJobs.bind(this); - this.processStalledJob = this.processStalledJob.bind(this); + this.moveUnlockedJobsToWait = this.moveUnlockedJobsToWait.bind(this); this.getNextJob = this.getNextJob.bind(this); this.processJobs = this.processJobs.bind(this); this.processJob = this.processJob.bind(this); @@ -277,7 +286,7 @@ Queue.prototype.close = function( doNotWaitJobs ){ return this.closing = this._initializing.then(function(){ clearTimeout(_this.delayTimer); clearInterval(_this.guardianTimer); - clearInterval(_this.stalledJobsInterval); + clearInterval(_this.moveUnlockedJobsToWaitInterval); _this.timers.clearAll(); return _this.timers.whenIdle().then(function(){ @@ -476,18 +485,17 @@ Queue.prototype.run = function(concurrency){ var promises = []; var _this = this; - return this.processStalledJobs().then(function(){ + return this.moveUnlockedJobsToWait().then(function(){ while(concurrency--){ promises.push(new Promise(_this.processJobs)); } - // - // Set process Stalled jobs intervall - // - clearInterval(_this.stalledJobsInterval); - _this.stalledJobsInterval = - setInterval(_this.processStalledJobs, _this.LOCK_RENEW_TIME); + if (_this.STALLED_JOB_CHECK_INTERVAL > 0){ + clearInterval(_this.moveUnlockedJobsToWaitInterval); + _this.moveUnlockedJobsToWaitInterval = + setInterval(_this.moveUnlockedJobsToWait, _this.STALLED_JOB_CHECK_INTERVAL); + } return Promise.all(promises); }); @@ -528,38 +536,32 @@ Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){ }; /** - Process jobs that have been added to the active list but are not being - processed properly. + * Process jobs that have been added to the active list but are not being + * processed properly. This can happen due to a process crash in the middle + * of processing a job, leaving it in 'active' but without a job lock. */ -Queue.prototype.processStalledJobs = function(){ +Queue.prototype.moveUnlockedJobsToWait = function(){ var _this = this; + if(this.closing){ return this.closing; } else{ - return this.client.lrangeAsync(this.toKey('active'), 0, -1).then(function(jobs){ - return Promise.each(jobs, function(jobId) { - return Job.fromId(_this, jobId).then(_this.processStalledJob); + return scripts.moveUnlockedJobsToWait(this).then(function(responses){ + var handleFailedJobs = responses[0].map(function(jobId){ + return _this.getJobFromId(jobId).then(function(job){ + _this.distEmit('failed', job.toJSON(), new Error('job stalled more than allowable limit')); + return null; + }); }); + var handleStalledJobs = responses[1].map(function(jobId){ + return _this.getJobFromId(jobId).then(function(job){ + _this.distEmit('stalled', job.toJSON()); + return null; + }); + }); + return Promise.all(handleFailedJobs.concat(handleStalledJobs)); }).catch(function(err){ - console.error(err); - }); - } -}; - -Queue.prototype.processStalledJob = function(job){ - var _this = this; - if(this.closing){ - return this.closing; - } - - if(!job){ - return Promise.resolve(); - }else{ - return scripts.getStalledJob(this, job, _this.token).then(function(isStalled){ - if(isStalled){ - _this.distEmit('stalled', job.toJSON()); - return _this.processJob(job, true); - } + console.error('Failed to handle unlocked job in active:', err); }); } }; diff --git a/lib/scripts.js b/lib/scripts.js index f23de70bc..0fba966aa 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -281,13 +281,49 @@ var scripts = { return execScript.apply(scripts, args); }, + + /** + * Takes a lock + */ + takeLock: function(queue, job, token, renew){ + var lockCall; + if (renew){ + lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])'; + } else { + lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2], "NX")'; + } + + var script = [ + 'if(' + lockCall + ') then', + // Mark the job as having been locked at least once. Used to determine if the job was stalled. + ' redis.call("HSET", KEYS[2], "lockAcquired", "1")', + ' return 1', + 'else', + ' return 0', + 'end' + ].join('\n'); + + var args = [ + queue.client, + 'takeLock' + (renew ? 'Renew' : ''), + script, + 2, + job.lockKey(), + queue.toKey(job.jobId), + token, + queue.LOCK_RENEW_TIME + ]; + + return execScript.apply(scripts, args); + }, + releaseLock: function(job, token){ var script = [ 'if redis.call("get", KEYS[1]) == ARGV[1]', 'then', - 'return redis.call("del", KEYS[1])', + ' return redis.call("del", KEYS[1])', 'else', - 'return 0', + ' return 0', 'end'].join('\n'); var args = [ @@ -355,26 +391,65 @@ var scripts = { }, /** - * Gets a stalled job by locking it and checking it is not already completed. - * Returns a "OK" if the job was locked and not in completed set. + * Looks for unlocked jobs in the active queue. There are two circumstances in which a job + * would be in 'active' but NOT have a job lock: + * + * Case A) The job was being worked on, but the worker process died and it failed to renew the lock. + * We call these jobs 'stalled'. This is the most common case. We resolve these by moving them + * back to wait to be re-processed. To prevent jobs from cycling endlessly between active and wait, + * (e.g. if the job handler keeps crashing), we limit the number stalled job recoveries to MAX_STALLED_JOB_COUNT. + + * Case B) The job was just moved to 'active' from 'wait' and the worker that moved it hasn't gotten + * a lock yet, or died immediately before getting the lock (note that due to Redis limitations, the + * worker can't move the job and get the lock atomically - https://github.com/OptimalBits/bull/issues/258). + * For this case we also move the job back to 'wait' for reprocessing, but don't consider it 'stalled' + * since the job had never been started. This case is much rarer than Case A due to the very small + * timing window in which it must occur. */ - getStalledJob: function(queue, job, token){ + moveUnlockedJobsToWait: function(queue){ var script = [ - 'if redis.call("sismember", KEYS[1], ARGV[1]) == 0 then', - ' return redis.call("set", KEYS[2], ARGV[2], "PX", ARGV[3], "NX")', + 'local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1])', + 'local activeJobs = redis.call("LRANGE", KEYS[1], 0, -1)', + 'local stalled = {}', + 'local failed = {}', + 'for _, job in ipairs(activeJobs) do', + ' local jobKey = ARGV[2] .. job', + ' if(redis.call("EXISTS", jobKey .. ":lock") == 0) then', + // Remove from the active queue. + ' redis.call("LREM", KEYS[1], 1, job)', + ' local lockAcquired = redis.call("HGET", jobKey, "lockAcquired")', + ' if(lockAcquired) then', + // If it was previously locked then we consider it 'stalled' (Case A above). If this job + // has been stalled too many times, such as if it crashes the worker, then fail it. + ' local stalledCount = redis.call("HINCRBY", jobKey, "stalledCounter", 1)', + ' if(stalledCount > MAX_STALLED_JOB_COUNT) then', + ' redis.call("SADD", KEYS[3], job)', + ' redis.call("HSET", jobKey, "failedReason", "job stalled more than allowable limit")', + ' table.insert(failed, job)', + ' else', + // Move the job back to the wait queue, to immediately be picked up by a waiting worker. + ' redis.call("RPUSH", KEYS[2], job)', + ' table.insert(stalled, job)', + ' end', + ' else', + // Move the job back to the wait queue, to immediately be picked up by a waiting worker. + ' redis.call("RPUSH", KEYS[2], job)', + ' end', + ' end', 'end', - 'return 0'].join('\n'); + 'return {failed, stalled}' + ].join('\n'); var args = [ queue.client, - 'getStalledJob', + 'moveUnlockedJobsToWait', script, - 2, - queue.toKey('completed'), - job.lockKey(), - job.jobId, - token, - queue.LOCK_RENEW_TIME + 3, + queue.toKey('active'), + queue.toKey('wait'), + queue.toKey('failed'), + queue.MAX_STALLED_JOB_COUNT, + queue.toKey('') ]; return execScript.apply(scripts, args);