Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite the handling of process stalled jobs to be atomic so it doesn… #359

Merged
merged 16 commits into from
Oct 26, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ A queue emits also some useful events:
// Job started
// You can use jobPromise.cancel() to abort this job.
})
.on('stalled', function(job){
// The job was considered stalled (i.e. its lock was not renewed in LOCK_RENEW_TIME).
.on('stalled', function(jobs){
// Array of jobs that were considered 'stalled' and re-enqueued (from 'active' to 'wait').
// Useful for debugging job workers that crash or pause the event loop.
})
.on('progress', function(job, progress){
Expand Down
43 changes: 17 additions & 26 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){
// Bind these methods to avoid constant rebinding and/or creating closures
// in processJobs etc.
this.processStalledJobs = this.processStalledJobs.bind(this);
this.processStalledJob = this.processStalledJob.bind(this);
this.getNextJob = this.getNextJob.bind(this);
this.processJobs = this.processJobs.bind(this);
this.processJob = this.processJob.bind(this);
Expand Down Expand Up @@ -528,42 +527,34 @@ Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){
};

/**
Process jobs that have been added to the active list but are not being
processed properly.
* Process jobs that have been added to the active list but are not being
* processed properly. This can happen due to a process crash in the middle
* of processing a job, leaving it in 'active' but without a job lock.
* Note that there is no way to know for _certain_ if a job is "stalled"
* (since the process that moved it to active might just be slow to get the
* lock on it), so this function takes a 'grace period' parameter to ignore
* jobs that were created in the recent X milliseconds.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess some things in this comment regarding the grace period are not relevant anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, fixed!

*
* @param {Number?} grace Duration in milliseconds. Ignore jobs created since this many milliseconds ago.
* Defaults to LOCK_RENEW_TIME.
*/
Queue.prototype.processStalledJobs = function(){
Queue.prototype.processStalledJobs = function(grace){
var _this = this;

grace = grace || this.LOCK_RENEW_TIME;

if(this.closing){
return this.closing;
} else{
return this.client.lrangeAsync(this.toKey('active'), 0, -1).then(function(jobs){
return Promise.each(jobs, function(jobId) {
return Job.fromId(_this, jobId).then(_this.processStalledJob);
});
return scripts.processStalledJobs(this, grace).then(function(jobs){
_this.emit('stalled', jobs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should only emit this event if jobs.length > 0

return null;
}).catch(function(err){
console.error(err);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it could be useful to add a text to this log to explain it was related to the moveUlockedJobsToWait.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

});
}
};

Queue.prototype.processStalledJob = function(job){
var _this = this;
if(this.closing){
return this.closing;
}

if(!job){
return Promise.resolve();
}else{
return scripts.getStalledJob(this, job, _this.token).then(function(isStalled){
if(isStalled){
_this.distEmit('stalled', job.toJSON());
return _this.processJob(job, true);
}
});
}
};

Queue.prototype.processJobs = function(resolve, reject){
var _this = this;
var processJobs = this.processJobs.bind(this, resolve, reject);
Expand Down
34 changes: 22 additions & 12 deletions lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -355,26 +355,36 @@ var scripts = {
},

/**
* Gets a stalled job by locking it and checking it is not already completed.
* Returns a "OK" if the job was locked and not in completed set.
* Iterates to queue and moves all jobs in the active queue that appear to be
* stalled back to the wait state to be reprocessed.
*/
getStalledJob: function(queue, job, token){
processStalledJobs: function(queue, grace){
var script = [
'if redis.call("sismember", KEYS[1], ARGV[1]) == 0 then',
' return redis.call("set", KEYS[2], ARGV[2], "PX", ARGV[3], "NX")',
'local activeJobs = redis.call("LRANGE", KEYS[1], 0, -1)',
'local stalled = {}',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought, maybe it would be interesting to have a configurable LRANGE here. For example, with very large queues, the active list could be too big for being traversed too often. I have to double check, but if the oldest jobs are at the end of the queue, limiting to a max number of elements per call may work well. I am thinking to expose also the other constants that we have, for better finetuning. We do not need to change more in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good idea.

'for _, job in ipairs(activeJobs) do',
' local jobKey = ARGV[2] .. job',
' if(redis.call("EXISTS", jobKey .. ":lock") == 0) then',
' local jobTS = redis.call("HGET", jobKey, "timestamp")',
' if(jobTS and jobTS < ARGV[1]) then',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this. Isn't timestamp the time when the job was inserted inte the queue? Wouldn't the timestamp that we are interested in be the one that represents the moment in time where the job was moved to active? (I see a chicken and egg problem here already unless I am missing something...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. This was only a hack to cover the majority case: where jobs were just created and about to be processed by the worker that moved.

' redis.call("LREM", KEYS[1], 0, job)',
' redis.call("RPUSH", KEYS[2], job)',
' table.insert(stalled, job)',
' end',
' end',
'end',
'return 0'].join('\n');
'return stalled'
].join('\n');

var args = [
queue.client,
'getStalledJob',
'processStalledJobs',
script,
2,
queue.toKey('completed'),
job.lockKey(),
job.jobId,
token,
queue.LOCK_RENEW_TIME
queue.toKey('active'),
queue.toKey('wait'),
Date.now() - grace,
queue.toKey('')
];

return execScript.apply(scripts, args);
Expand Down