diff --git a/README.md b/README.md index 4907a2c49..1ef119814 100644 --- a/README.md +++ b/README.md @@ -167,8 +167,8 @@ A queue emits also some useful events: // Job started // You can use jobPromise.cancel() to abort this job. }) -.on('stalled', function(job){ - // The job was considered stalled (i.e. its lock was not renewed in LOCK_RENEW_TIME). +.on('stalled', function(jobs){ + // Array of jobs that were considered 'stalled' and re-enqueued (from 'active' to 'wait'). // Useful for debugging job workers that crash or pause the event loop. }) .on('progress', function(job, progress){ diff --git a/lib/queue.js b/lib/queue.js index 5811e9ab2..e04d6a280 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -225,7 +225,6 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){ // Bind these methods to avoid constant rebinding and/or creating closures // in processJobs etc. this.processStalledJobs = this.processStalledJobs.bind(this); - this.processStalledJob = this.processStalledJob.bind(this); this.getNextJob = this.getNextJob.bind(this); this.processJobs = this.processJobs.bind(this); this.processJob = this.processJob.bind(this); @@ -528,42 +527,34 @@ Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){ }; /** - Process jobs that have been added to the active list but are not being - processed properly. + * Process jobs that have been added to the active list but are not being + * processed properly. This can happen due to a process crash in the middle + * of processing a job, leaving it in 'active' but without a job lock. + * Note that there is no way to know for _certain_ if a job is "stalled" + * (since the process that moved it to active might just be slow to get the + * lock on it), so this function takes a 'grace period' parameter to ignore + * jobs that were created in the recent X milliseconds. + * + * @param {Number?} grace Duration in milliseconds. Ignore jobs created since this many milliseconds ago. + * Defaults to LOCK_RENEW_TIME. */ -Queue.prototype.processStalledJobs = function(){ +Queue.prototype.processStalledJobs = function(grace){ var _this = this; + + grace = grace || this.LOCK_RENEW_TIME; + if(this.closing){ return this.closing; } else{ - return this.client.lrangeAsync(this.toKey('active'), 0, -1).then(function(jobs){ - return Promise.each(jobs, function(jobId) { - return Job.fromId(_this, jobId).then(_this.processStalledJob); - }); + return scripts.processStalledJobs(this, grace).then(function(jobs){ + _this.emit('stalled', jobs); + return null; }).catch(function(err){ console.error(err); }); } }; -Queue.prototype.processStalledJob = function(job){ - var _this = this; - if(this.closing){ - return this.closing; - } - - if(!job){ - return Promise.resolve(); - }else{ - return scripts.getStalledJob(this, job, _this.token).then(function(isStalled){ - if(isStalled){ - _this.distEmit('stalled', job.toJSON()); - return _this.processJob(job, true); - } - }); - } -}; - Queue.prototype.processJobs = function(resolve, reject){ var _this = this; var processJobs = this.processJobs.bind(this, resolve, reject); diff --git a/lib/scripts.js b/lib/scripts.js index f23de70bc..8e6dee7d9 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -355,26 +355,36 @@ var scripts = { }, /** - * Gets a stalled job by locking it and checking it is not already completed. - * Returns a "OK" if the job was locked and not in completed set. + * Iterates to queue and moves all jobs in the active queue that appear to be + * stalled back to the wait state to be reprocessed. */ - getStalledJob: function(queue, job, token){ + processStalledJobs: function(queue, grace){ var script = [ - 'if redis.call("sismember", KEYS[1], ARGV[1]) == 0 then', - ' return redis.call("set", KEYS[2], ARGV[2], "PX", ARGV[3], "NX")', + 'local activeJobs = redis.call("LRANGE", KEYS[1], 0, -1)', + 'local stalled = {}', + 'for _, job in ipairs(activeJobs) do', + ' local jobKey = ARGV[2] .. job', + ' if(redis.call("EXISTS", jobKey .. ":lock") == 0) then', + ' local jobTS = redis.call("HGET", jobKey, "timestamp")', + ' if(jobTS and jobTS < ARGV[1]) then', + ' redis.call("LREM", KEYS[1], 0, job)', + ' redis.call("RPUSH", KEYS[2], job)', + ' table.insert(stalled, job)', + ' end', + ' end', 'end', - 'return 0'].join('\n'); + 'return stalled' + ].join('\n'); var args = [ queue.client, - 'getStalledJob', + 'processStalledJobs', script, 2, - queue.toKey('completed'), - job.lockKey(), - job.jobId, - token, - queue.LOCK_RENEW_TIME + queue.toKey('active'), + queue.toKey('wait'), + Date.now() - grace, + queue.toKey('') ]; return execScript.apply(scripts, args);