-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rewrite the handling of process stalled jobs to be atomic so it doesn… #359
Changes from 1 commit
00e213a
4124ecf
abd20ac
417ba9d
afc980c
0f60b93
372fa3f
2abd183
6c67253
3a69319
6c9250d
88db693
8ad21bd
b025383
70996b6
9fdea16
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,6 +48,14 @@ Promise.promisifyAll(redis.Multi.prototype); | |
*/ | ||
var MINIMUM_REDIS_VERSION = '2.8.11'; | ||
var LOCK_RENEW_TIME = 5000; // 5 seconds is the renew time. | ||
|
||
// The interval for which to check for stalled jobs. | ||
var STALLED_JOB_CHECK_INTERVAL = 5000; // 5 seconds is the renew time. | ||
|
||
// The maximum number of times a job can be recovered from the 'stalled' state | ||
// (moved back to 'wait'), before it is failed. | ||
var MAX_STALLED_JOB_COUNT = 1; | ||
|
||
var CLIENT_CLOSE_TIMEOUT_MS = 5000; | ||
var POLLING_INTERVAL = 5000; | ||
|
||
|
@@ -128,6 +136,8 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){ | |
|
||
this.token = uuid(); | ||
this.LOCK_RENEW_TIME = LOCK_RENEW_TIME; | ||
this.STALLED_JOB_CHECK_INTERVAL = STALLED_JOB_CHECK_INTERVAL; | ||
this.MAX_STALLED_JOB_COUNT = MAX_STALLED_JOB_COUNT; | ||
|
||
// bubble up Redis error events | ||
[this.client, this.bclient, this.eclient].forEach(function (client) { | ||
|
@@ -224,7 +234,7 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){ | |
|
||
// Bind these methods to avoid constant rebinding and/or creating closures | ||
// in processJobs etc. | ||
this.processStalledJob = this.processStalledJob.bind(this); | ||
this.moveUnlockedJobsToWait = this.moveUnlockedJobsToWait.bind(this); | ||
this.getNextJob = this.getNextJob.bind(this); | ||
this.processJobs = this.processJobs.bind(this); | ||
this.processJob = this.processJob.bind(this); | ||
|
@@ -276,7 +286,7 @@ Queue.prototype.close = function( doNotWaitJobs ){ | |
return this.closing = this._initializing.then(function(){ | ||
clearTimeout(_this.delayTimer); | ||
clearInterval(_this.guardianTimer); | ||
clearInterval(_this.stalledJobsInterval); | ||
clearInterval(_this.moveUnlockedJobsToWaitInterval); | ||
_this.timers.clearAll(); | ||
|
||
return _this.timers.whenIdle().then(function(){ | ||
|
@@ -475,11 +485,20 @@ Queue.prototype.run = function(concurrency){ | |
var promises = []; | ||
var _this = this; | ||
|
||
while(concurrency--){ | ||
promises.push(new Promise(_this.processJobs)); | ||
} | ||
return this.moveUnlockedJobsToWait().then(function(){ | ||
|
||
while(concurrency--){ | ||
promises.push(new Promise(_this.processJobs)); | ||
} | ||
|
||
return Promise.all(promises); | ||
if (_this.STALLED_JOB_CHECK_INTERVAL > 0){ | ||
clearInterval(_this.moveUnlockedJobsToWaitInterval); | ||
_this.moveUnlockedJobsToWaitInterval = | ||
setInterval(_this.moveUnlockedJobsToWait, _this.STALLED_JOB_CHECK_INTERVAL); | ||
} | ||
|
||
return Promise.all(promises); | ||
}); | ||
}; | ||
|
||
// --------------------------------------------------------------------- | ||
|
@@ -516,19 +535,33 @@ Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){ | |
} | ||
}; | ||
|
||
Queue.prototype.processStalledJob = function() { | ||
/** | ||
* Process jobs that have been added to the active list but are not being | ||
* processed properly. This can happen due to a process crash in the middle | ||
* of processing a job, leaving it in 'active' but without a job lock. | ||
* Note that there is no way to know for _certain_ if a job is "stalled" | ||
* (since the process that moved it to active might just be slow to get the | ||
* lock on it), so this function takes a 'grace period' parameter to ignore | ||
* jobs that were created in the recent X milliseconds. | ||
* | ||
* @param {Number?} grace Duration in milliseconds. Ignore jobs created since this many milliseconds ago. | ||
* Defaults to LOCK_RENEW_TIME. | ||
*/ | ||
Queue.prototype.moveUnlockedJobsToWait = function(){ | ||
var _this = this; | ||
|
||
return scripts.getStalledJob(this).then(function(job){ | ||
if (job) { | ||
return _this.getJobFromId(job).then(function(job){ | ||
if (job.started) { | ||
_this.emit('stalled', job); | ||
} | ||
return _this.processJob(job, true /* Renew the lock */); | ||
}); | ||
} | ||
}); | ||
if(this.closing){ | ||
return this.closing; | ||
} else{ | ||
return scripts.moveUnlockedJobsToWait(this).then(function(stalledJobIds){ | ||
if (stalledJobIds.length > 0) { | ||
_this.emit('stalled', stalledJobIds); | ||
} | ||
return null; | ||
}).catch(function(err){ | ||
console.error(err); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it could be useful to add a text to this log to explain it was related to the moveUlockedJobsToWait. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
}); | ||
} | ||
}; | ||
|
||
Queue.prototype.processJobs = function(resolve, reject){ | ||
|
@@ -538,7 +571,6 @@ Queue.prototype.processJobs = function(resolve, reject){ | |
if(!this.closing){ | ||
process.nextTick(function(){ | ||
(_this.paused || Promise.resolve()) | ||
.then(_this.processStalledJob) | ||
.then(_this.getNextJob) | ||
.then(_this.processJob) | ||
.then(processJobs, function(err){ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -281,13 +281,49 @@ var scripts = { | |
|
||
return execScript.apply(scripts, args); | ||
}, | ||
|
||
/** | ||
* Takes a lock | ||
*/ | ||
takeLock: function(queue, job, renew){ | ||
var lockCall; | ||
if (renew){ | ||
lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])'; | ||
} else { | ||
lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2], "NX")'; | ||
} | ||
|
||
var script = [ | ||
'if(' + lockCall + ') then', | ||
// Mark the job as having been locked at least once. Used to determine if the job was stalled. | ||
' redis.call("HSET", KEYS[2], "lockAcquired", "1")', | ||
' return 1', | ||
'else', | ||
' return 0', | ||
'end' | ||
].join('\n'); | ||
|
||
var args = [ | ||
queue.client, | ||
'takeLock' + (renew ? 'Renew' : ''), | ||
script, | ||
2, | ||
job.lockKey(), | ||
queue.toKey(job.jobId), | ||
queue.token, | ||
queue.LOCK_RENEW_TIME | ||
]; | ||
|
||
return execScript.apply(scripts, args); | ||
}, | ||
|
||
releaseLock: function(job, token){ | ||
var script = [ | ||
'if redis.call("get", KEYS[1]) == ARGV[1]', | ||
'then', | ||
'return redis.call("del", KEYS[1])', | ||
' return redis.call("del", KEYS[1])', | ||
'else', | ||
'return 0', | ||
' return 0', | ||
'end'].join('\n'); | ||
|
||
var args = [ | ||
|
@@ -354,30 +390,64 @@ var scripts = { | |
return execScript.apply(scripts, args); | ||
}, | ||
|
||
getStalledJob: function(queue){ | ||
/** | ||
* Looks for unlocked jobs in the active queue. There are two circumstances in which a job | ||
* would be in 'active' but NOT have a job lock: | ||
* | ||
* Case A) The job was being worked on, but the worker process died and it failed to renew the lock. | ||
* We call these jobs 'stalled'. This is the most common case. We resolve these by moving them | ||
* back to wait to be re-processed. To prevent jobs from cycling endlessly between active and wait, | ||
* (e.g. if the job handler keeps crashing), we limit the number stalled job recoveries to 100. | ||
|
||
* Case B) The job was just moved to 'active' from 'wait' and the worker that moved it hasn't gotten | ||
* a lock yet, or died immediately before getting the lock (note that due to Redis limitations, the | ||
* worker can't move the job and get the lock atomically - https://github.com/OptimalBits/bull/issues/258). | ||
* For this case we also move the job back to 'wait' for reprocessing, but don't consider it 'stalled' | ||
* since the job had never been started. This case is much rarer than Case A due to the very small | ||
* timing window in which it must occur. | ||
*/ | ||
moveUnlockedJobsToWait: function(queue){ | ||
var script = [ | ||
'local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1])', | ||
'local activeJobs = redis.call("LRANGE", KEYS[1], 0, -1)', | ||
// Iterate active jobs in reverse, as stuck jobs are more likely to appear | ||
// at the end of the active list. Jobs in the beginning probably just got | ||
// moved there. | ||
'for i = #activeJobs, 1, -1 do', | ||
' local job = activeJobs[i]', | ||
' local jobKey = ARGV[1] .. job', | ||
' if(redis.call("SET", jobKey .. ":lock", ARGV[2], "PX", ARGV[3], "NX")) then', | ||
' return job', | ||
' end', | ||
'local stalled = {}', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a thought, maybe it would be interesting to have a configurable LRANGE here. For example, with very large queues, the active list could be too big for being traversed too often. I have to double check, but if the oldest jobs are at the end of the queue, limiting to a max number of elements per call may work well. I am thinking to expose also the other constants that we have, for better finetuning. We do not need to change more in this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, good idea. |
||
'for _, job in ipairs(activeJobs) do', | ||
' local jobKey = ARGV[2] .. job', | ||
' if(redis.call("EXISTS", jobKey .. ":lock") == 0) then', | ||
// Remove from the active queue. | ||
' redis.call("LREM", KEYS[1], 1, job)', | ||
' local lockAcquired = redis.call("HGET", jobKey, "lockAcquired")', | ||
' if(lockAcquired) then', | ||
// If it was previously locked then we consider it 'stalled' (Case A above). | ||
' table.insert(stalled, job)', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if this jobs fails due too many times stalled, should we still emit it as an event? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right. I fixed this and changed the |
||
// If this job has been stalled too many times, such as if it crashes the worker, then fail it. | ||
' local stalledCount = redis.call("HINCRBY", jobKey, "stalledCounter", 1)', | ||
' if(stalledCount > MAX_STALLED_JOB_COUNT) then', | ||
' redis.call("SADD", KEYS[3], job)', | ||
' redis.call("HSET", jobKey, "failedReason", "job stalled more than allowable limit")', | ||
' else', | ||
// Move the job back to the wait queue, to immediately be picked up by a waiting worker. | ||
' redis.call("RPUSH", KEYS[2], job)', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the queue is a LIFO (check options), we need to do a LPUSH here instead. LIFO should also imply a new name for the script hash (since we could have different queues (LIFO/FIFO) in the same redis instance). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't LIFO an option for the job, not the queue? Also, if we're reprocessing jobs that already made it to 'active', don't we always want to make them LIFO? Otherwise they'd be unfairly penalized by waiting the entire |
||
' end', | ||
' else', | ||
// Move the job back to the wait queue, to immediately be picked up by a waiting worker. | ||
' redis.call("RPUSH", KEYS[2], job)', | ||
' end', | ||
' end', | ||
'end', | ||
'return stalled' | ||
].join('\n'); | ||
|
||
var args = [ | ||
queue.client, | ||
'processStalledJobs', | ||
script, | ||
1, | ||
3, | ||
queue.toKey('active'), | ||
queue.toKey(''), | ||
queue.token, | ||
queue.LOCK_RENEW_TIME | ||
queue.toKey('wait'), | ||
queue.toKey('failed'), | ||
queue.MAX_STALLED_JOB_COUNT, | ||
queue.toKey('') | ||
]; | ||
|
||
return execScript.apply(scripts, args); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess some things in this comment regarding the grace period are not relevant anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yes, fixed!