Skip to content

Commit

Permalink
Rewrite the handling of process stalled jobs to be atomic so it doesn…
Browse files Browse the repository at this point in the history
…'t double-process jobs. See #356 for more on how this can happen.

Additionally, this addresses long-standing issue where a job can be considered "stalled" even though it was just moved to active and before a lock could be obtained by the worker that moved it (#258), by waiting a grace period (with a default of LOCK_RENEW_TIME) before considering a job as possibly stalled. This gives the (real) worker time to acquire its lock after moving it to active.

Note that this includes a small API change: the 'stalled' event is now passed an array of events.
  • Loading branch information
bradvogel committed Oct 15, 2016
1 parent 7b7d04c commit 00e213a
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 40 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ A queue emits also some useful events:
// Job started
// You can use jobPromise.cancel() to abort this job.
})
.on('stalled', function(job){
// The job was considered stalled (i.e. its lock was not renewed in LOCK_RENEW_TIME).
.on('stalled', function(jobs){
// Array of jobs that were considered 'stalled' and re-enqueued (from 'active' to 'wait').
// Useful for debugging job workers that crash or pause the event loop.
})
.on('progress', function(job, progress){
Expand Down
43 changes: 17 additions & 26 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){
// Bind these methods to avoid constant rebinding and/or creating closures
// in processJobs etc.
this.processStalledJobs = this.processStalledJobs.bind(this);
this.processStalledJob = this.processStalledJob.bind(this);
this.getNextJob = this.getNextJob.bind(this);
this.processJobs = this.processJobs.bind(this);
this.processJob = this.processJob.bind(this);
Expand Down Expand Up @@ -528,42 +527,34 @@ Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){
};

/**
Process jobs that have been added to the active list but are not being
processed properly.
* Process jobs that have been added to the active list but are not being
* processed properly. This can happen due to a process crash in the middle
* of processing a job, leaving it in 'active' but without a job lock.
* Note that there is no way to know for _certain_ if a job is "stalled"
* (since the process that moved it to active might just be slow to get the
* lock on it), so this function takes a 'grace period' parameter to ignore
* jobs that were created in the recent X milliseconds.
*
* @param {Number?} grace Duration in milliseconds. Ignore jobs created since this many milliseconds ago.
* Defaults to LOCK_RENEW_TIME.
*/
Queue.prototype.processStalledJobs = function(){
Queue.prototype.processStalledJobs = function(grace){
var _this = this;

grace = grace || this.LOCK_RENEW_TIME;

if(this.closing){
return this.closing;
} else{
return this.client.lrangeAsync(this.toKey('active'), 0, -1).then(function(jobs){
return Promise.each(jobs, function(jobId) {
return Job.fromId(_this, jobId).then(_this.processStalledJob);
});
return scripts.processStalledJobs(this, grace).then(function(jobs){
_this.emit('stalled', jobs);
return null;
}).catch(function(err){
console.error(err);
});
}
};

Queue.prototype.processStalledJob = function(job){
var _this = this;
if(this.closing){
return this.closing;
}

if(!job){
return Promise.resolve();
}else{
return scripts.getStalledJob(this, job, _this.token).then(function(isStalled){
if(isStalled){
_this.distEmit('stalled', job.toJSON());
return _this.processJob(job, true);
}
});
}
};

Queue.prototype.processJobs = function(resolve, reject){
var _this = this;
var processJobs = this.processJobs.bind(this, resolve, reject);
Expand Down
34 changes: 22 additions & 12 deletions lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -355,26 +355,36 @@ var scripts = {
},

/**
* Gets a stalled job by locking it and checking it is not already completed.
* Returns a "OK" if the job was locked and not in completed set.
* Iterates to queue and moves all jobs in the active queue that appear to be
* stalled back to the wait state to be reprocessed.
*/
getStalledJob: function(queue, job, token){
processStalledJobs: function(queue, grace){
var script = [
'if redis.call("sismember", KEYS[1], ARGV[1]) == 0 then',
' return redis.call("set", KEYS[2], ARGV[2], "PX", ARGV[3], "NX")',
'local activeJobs = redis.call("LRANGE", KEYS[1], 0, -1)',
'local stalled = {}',
'for _, job in ipairs(activeJobs) do',
' local jobKey = ARGV[2] .. job',
' if(redis.call("EXISTS", jobKey .. ":lock") == 0) then',
' local jobTS = redis.call("HGET", jobKey, "timestamp")',
' if(jobTS and jobTS < ARGV[1]) then',
' redis.call("LREM", KEYS[1], 0, job)',
' redis.call("RPUSH", KEYS[2], job)',
' table.insert(stalled, job)',
' end',
' end',
'end',
'return 0'].join('\n');
'return stalled'
].join('\n');

var args = [
queue.client,
'getStalledJob',
'processStalledJobs',
script,
2,
queue.toKey('completed'),
job.lockKey(),
job.jobId,
token,
queue.LOCK_RENEW_TIME
queue.toKey('active'),
queue.toKey('wait'),
Date.now() - grace,
queue.toKey('')
];

return execScript.apply(scripts, args);
Expand Down

0 comments on commit 00e213a

Please sign in to comment.