Skip to content

Commit

Permalink
Change process stalled jobs strategy to instead look for an unlocked …
Browse files Browse the repository at this point in the history
…job in the active queue on every cycle (looking at the oldest active jobs first).
  • Loading branch information
bradvogel committed Oct 17, 2016
1 parent 4124ecf commit abd20ac
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 62 deletions.
56 changes: 14 additions & 42 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){

// Bind these methods to avoid constant rebinding and/or creating closures
// in processJobs etc.
this.processStalledJobs = this.processStalledJobs.bind(this);
this.processStalledJob = this.processStalledJob.bind(this);
this.getNextJob = this.getNextJob.bind(this);
this.processJobs = this.processJobs.bind(this);
this.processJob = this.processJob.bind(this);
Expand Down Expand Up @@ -475,21 +475,11 @@ Queue.prototype.run = function(concurrency){
var promises = [];
var _this = this;

return this.processStalledJobs().then(function(){

while(concurrency--){
promises.push(new Promise(_this.processJobs));
}

//
// Set process Stalled jobs intervall
//
clearInterval(_this.stalledJobsInterval);
_this.stalledJobsInterval =
setInterval(_this.processStalledJobs, _this.LOCK_RENEW_TIME);
while(concurrency--){
promises.push(new Promise(_this.processJobs));
}

return Promise.all(promises);
});
return Promise.all(promises);
};

// ---------------------------------------------------------------------
Expand Down Expand Up @@ -526,35 +516,16 @@ Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){
}
};

/**
* Process jobs that have been added to the active list but are not being
* processed properly. This can happen due to a process crash in the middle
* of processing a job, leaving it in 'active' but without a job lock.
* Note that there is no way to know for _certain_ if a job is "stalled"
* (since the process that moved it to active might just be slow to get the
* lock on it), so this function takes a 'grace period' parameter to ignore
* jobs that were created in the recent X milliseconds.
*
* @param {Number?} grace Duration in milliseconds. Ignore jobs created since this many milliseconds ago.
* Defaults to LOCK_RENEW_TIME.
*/
Queue.prototype.processStalledJobs = function(grace){
Queue.prototype.processStalledJob = function() {
var _this = this;

grace = grace || this.LOCK_RENEW_TIME;

if(this.closing){
return this.closing;
} else{
return scripts.processStalledJobs(this, grace).then(function(jobs){
if(jobs.length > 0){
_this.emit('stalled', jobs);
}
return null;
}).catch(function(err){
console.error(err);
});
}
return scripts.getStalledJob(this).then(function(job){
if (job) {
return _this.getJobFromId(job).then(function(job){
return _this.processJob(job, true /* Renew the lock */);
});
}
});
};

Queue.prototype.processJobs = function(resolve, reject){
Expand All @@ -564,6 +535,7 @@ Queue.prototype.processJobs = function(resolve, reject){
if(!this.closing){
process.nextTick(function(){
(_this.paused || Promise.resolve())
.then(_this.processStalledJob)
.then(_this.getNextJob)
.then(_this.processJob)
.then(processJobs, function(err){
Expand Down
33 changes: 13 additions & 20 deletions lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -354,37 +354,30 @@ var scripts = {
return execScript.apply(scripts, args);
},

/**
* Iterates to queue and moves all jobs in the active queue that appear to be
* stalled back to the wait state to be reprocessed.
*/
processStalledJobs: function(queue, grace){
getStalledJob: function(queue){
var script = [
'local activeJobs = redis.call("LRANGE", KEYS[1], 0, -1)',
'local stalled = {}',
'for _, job in ipairs(activeJobs) do',
' local jobKey = ARGV[2] .. job',
' if(redis.call("EXISTS", jobKey .. ":lock") == 0) then',
' local jobTS = redis.call("HGET", jobKey, "timestamp")',
' if(jobTS and jobTS < ARGV[1]) then',
' redis.call("LREM", KEYS[1], 0, job)',
' redis.call("RPUSH", KEYS[2], job)',
' table.insert(stalled, job)',
' end',
// Iterate active jobs in reverse, as stuck jobs are more likely to appear
// at the end of the active list. Jobs in the beginning probably just got
// moved there.
'for i = #activeJobs, 1, -1 do',
' local job = activeJobs[i]',
' local jobKey = ARGV[1] .. job',
' if(redis.call("SET", jobKey .. ":lock", ARGV[2], "PX", ARGV[3], "NX")) then',
' return job',
' end',
'end',
'return stalled'
].join('\n');

var args = [
queue.client,
'processStalledJobs',
script,
2,
1,
queue.toKey('active'),
queue.toKey('wait'),
Date.now() - grace,
queue.toKey('')
queue.toKey(''),
queue.token,
queue.LOCK_RENEW_TIME
];

return execScript.apply(scripts, args);
Expand Down

0 comments on commit abd20ac

Please sign in to comment.