Skip to content

Commit

Permalink
[refactor] use redlock algo for reliable distributed locks [Fixes Opt…
Browse files Browse the repository at this point in the history
  • Loading branch information
doublerebel committed Nov 15, 2016
1 parent 3492ea5 commit 2981100
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 155 deletions.
36 changes: 19 additions & 17 deletions lib/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ var Promise = require('bluebird');
var _ = require('lodash');
var scripts = require('./scripts');
var debuglog = require('debuglog')('bull');
var uuid = require('node-uuid');

/**
interface JobOptions
Expand Down Expand Up @@ -123,24 +122,29 @@ Job.prototype.lockKey = function(){
Takes a lock for this job so that no other queue worker can process it at the
same time.
*/
Job.prototype.takeLock = function(token, renew, ensureActive){
return scripts.takeLock(this.queue, this, token, renew, ensureActive).then(function(res){
return res === 1; // Indicates successful lock.
Job.prototype.takeLock = function(renew, ensureActive){
var _this = this;
return scripts.takeLock(this.queue, this, renew, ensureActive)
.then(function(lock) {
if (lock) _this.lock = lock;
return lock || false;
});
};

/**
Renews a lock so that it gets some more time before expiring.
*/
Job.prototype.renewLock = function(token){
return this.takeLock(token, true /* Renew */);
Job.prototype.renewLock = function(){
return this.takeLock(true /* Renew */);
};

/**
Releases the lock. Only locks owned by the queue instance can be released.
*/
Job.prototype.releaseLock = function(token){
return scripts.releaseLock(this, token);
Job.prototype.releaseLock = function(){
var _this = this;
return scripts.releaseLock(this)
.then(function() { _this.lock = null; });
};

Job.prototype.delayIfNeeded = function(){
Expand All @@ -155,19 +159,19 @@ Job.prototype.delayIfNeeded = function(){
return Promise.resolve(false);
};

Job.prototype.moveToCompleted = function(returnValue, token){
Job.prototype.moveToCompleted = function(returnValue){
this.returnvalue = returnValue || 0;
return scripts.moveToCompleted(this, token || 0, this.opts.removeOnComplete);
return scripts.moveToCompleted(this || 0, this.opts.removeOnComplete);
};

Job.prototype.move = function(src, target, token, returnValue){
Job.prototype.move = function(src, target, returnValue){
if(target === 'completed'){
this.returnvalue = returnValue || 0;
if(this.opts.removeOnComplete){
target = void 0;
}
}
return scripts.move(this, token || 0, src, target);
return scripts.move(this || 0, src, target);
}

Job.prototype.moveToFailed = function(err){
Expand Down Expand Up @@ -305,13 +309,11 @@ Job.prototype.getState = function() {
/**
Removes a job from the queue and from all the lists where it may be stored.
*/
Job.prototype.remove = function(token){
Job.prototype.remove = function(){
var queue = this.queue;
var job = this;

var token = token || uuid();

return job.takeLock(token).then(function(lock) {
return job.takeLock().then(function(lock) {
if (!lock) {
throw new Error('Could not get lock for job: ' + job.jobId + '. Cannot remove job.');
}
Expand All @@ -320,7 +322,7 @@ Job.prototype.remove = function(token){
queue.emit('removed', job.toJSON());
})
.finally(function () {
return job.releaseLock(token);
return job.releaseLock();
});
});
};
Expand Down
33 changes: 23 additions & 10 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ var scripts = require('./scripts');
var TimerManager = require('./timer-manager');
var _ = require('lodash');
var Promise = require('bluebird');
var uuid = require('node-uuid');
var semver = require('semver');
var debuglog = require('debuglog')('bull');

Expand Down Expand Up @@ -56,6 +55,10 @@ var MAX_STALLED_JOB_COUNT = 1;
var CLIENT_CLOSE_TIMEOUT_MS = 5000;
var POLLING_INTERVAL = 5000;

var REDLOCK_DRIFT_FACTOR = 0.01;
var REDLOCK_RETRY_COUNT = 0;
var REDLOCK_RETRY_DELAY = 200;

var Queue = function Queue(name, redisPort, redisHost, redisOptions){
if(!(this instanceof Queue)){
return new Queue(name, redisPort, redisHost, redisOptions);
Expand Down Expand Up @@ -119,6 +122,20 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){
_this.emit('error', err);
});

//
// Keep track of cluster clients for redlock
//
this.clients = [this.client];
if (redisOptions.clients) {
this.clients.push.apply(this.clients, redisOptions.clients);
}
this.redlock = {
driftFactor: REDLOCK_DRIFT_FACTOR,
retryCount: REDLOCK_RETRY_COUNT,
retryDelay: REDLOCK_RETRY_DELAY
};
_.extend(this.redlock, redisOptions.redlock || {});

//
// Create blocking client (used to wait for jobs)
//
Expand All @@ -132,7 +149,6 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){
this.delayTimer = null;
this.processing = 0;

this.token = uuid();
this.LOCK_RENEW_TIME = LOCK_RENEW_TIME;
this.STALLED_JOB_CHECK_INTERVAL = STALLED_JOB_CHECK_INTERVAL;
this.MAX_STALLED_JOB_COUNT = MAX_STALLED_JOB_COUNT;
Expand Down Expand Up @@ -621,17 +637,14 @@ Queue.prototype.processJob = function(job){
//
var renew = false;
var lockRenewer = function(){
// The first call to lock the job should ensure that the job is in the 'active' state,
// because it might have gotten picked up already by another processor. We don't need
// to do this on subsequent calls.
return job.takeLock(_this.token, renew, !renew).then(function(locked){
if(locked){
return job.takeLock(renew, true).then(function(lock){
if(lock){
renew = true;
lockRenewId = _this.timers.set('lockRenewer', _this.LOCK_RENEW_TIME / 2, lockRenewer);
}
// TODO: if we failed to re-acquire the lock while trying to renew, should we let the job
// handler know and cancel the timer?
return locked;
return lock;
}, function(err){
console.error('Error renewing lock ' + err);
});
Expand All @@ -653,7 +666,7 @@ Queue.prototype.processJob = function(job){
return;
}

return job.moveToCompleted(data, _this.token)
return job.moveToCompleted(data)
.then(function(){
return _this.distEmit('completed', job.toJSON(), data);
});
Expand All @@ -663,7 +676,7 @@ Queue.prototype.processJob = function(job){
_this.processing--;
var error = err.cause || err; //Handle explicit rejection
return job.moveToFailed(err)
.then(job.releaseLock.bind(job, _this.token))
.then(job.releaseLock.bind(job))
.then(function(){
return _this.distEmit('failed', job.toJSON(), error);
});
Expand Down
Loading

0 comments on commit 2981100

Please sign in to comment.