Skip to content

Commit

Permalink
Fixes double-processing issue described in OptimalBits/bull#371 (comm…
Browse files Browse the repository at this point in the history
…ent).

Double-processing happens when two workers find out about the same job at the same time via `getNextJob`. One worker is taking the lock, processing the job, and moving it to completed before the second worker can even try to get the lock. When the second worker finally gets around to trying to get the lock, the job is already in the completed state. But it processes it anyways since it got the lock.

So the fix here is for the takeLock script to ensure the job is in the active queue prior to taking the lock. That will make sure jobs that are in wait, completed, or even removed from the queue altogether don't get double processed. Per the discussion in #370 though, takeLock is parameterized to only require the job be in active when taking the lock while processing the job. There are other cases such as job.remove() that the job might be in a different state, but we still want to be able to lock it.

This fixes existing existing broken unit test "should process each job once".

This also prevents hazard OptimalBits/bull#370.
  • Loading branch information
duyenddd authored and bradvogel committed Nov 13, 2016
1 parent baf6653 commit e779aff
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 6 deletions.
2 changes: 1 addition & 1 deletion lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ Queue.prototype.processJob = function(job, renew){
// by another worker. See #308
//
var lockRenewer = function(){
return job.takeLock(_this.token, renew).then(function(locked){
return job.takeLock(_this.token, renew, true /* Ensure in active */).then(function(locked){
if(locked){
renew = true;
lockRenewId = _this.timers.set('lockRenewer', _this.LOCK_RENEW_TIME / 2, lockRenewer);
Expand Down
40 changes: 35 additions & 5 deletions lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,36 @@ var scripts = {
},

/**
* Takes a lock
* Gets a lock for a job.
*
* @param {Queue} queue The queue for the job
* @param {Job} job The job
* @param {Boolean=false} renew Whether to renew to lock, meaning it will assume the job
* is already locked and just reset the lock expiration.
* @param {Boolean=false} ensureActive Ensures that the job is in the 'active' state.
*/
takeLock: function(queue, job, token, renew){
takeLock: function(queue, job, token, renew, ensureActive){
var ensureActiveCall = '';
if (ensureActive) {
ensureActiveCall = [
// Note that while this is inefficient to run a O(n) traversal of the 'active' queue,
// it's highly likely that the job is within the first few elements of the active
// list. The only time this isn't the case is if two workers learned of a job in the
// 'active' queue at the same time.
'local activeJobs = redis.call("LRANGE", KEYS[3], 0, -1)',
'local found = false',
'for _, job in ipairs(activeJobs) do',
' if(job == ARGV[3]) then',
' found = true',
' break',
' end',
'end',
'if (found == false) then',
' return -1',
'end'
].join('\n');
}

var lockCall;
if (renew){
lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])';
Expand All @@ -312,6 +339,7 @@ var scripts = {
}

var script = [
ensureActiveCall,
'if(' + lockCall + ') then',
// Mark the job as having been locked at least once. Used to determine if the job was stalled.
' redis.call("HSET", KEYS[2], "lockAcquired", "1")',
Expand All @@ -323,13 +351,15 @@ var scripts = {

var args = [
queue.client,
'takeLock' + (renew ? 'Renew' : ''),
'takeLock' + (renew ? 'Renew' : '') + (ensureActive ? 'EnsureActive' : ''),
script,
2,
3,
job.lockKey(),
queue.toKey(job.jobId),
queue.toKey('active'),
token,
queue.LOCK_RENEW_TIME
queue.LOCK_RENEW_TIME,
job.jobId
];

return execScript.apply(scripts, args);
Expand Down

0 comments on commit e779aff

Please sign in to comment.