-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improvements for Queue#processStalledJobs #311
Changes from all commits
e9b69c7
2fdd71c
bbe3587
42eaeb1
433d5fb
9ecda58
e3051f8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,7 +47,7 @@ var LOCK_RENEW_TIME = 5000; // 5 seconds is the renew time. | |
var CLIENT_CLOSE_TIMEOUT_MS = 5000; | ||
var POLLING_INTERVAL = 5000; | ||
|
||
var Queue = function Queue(name, redisPort, redisHost, redisOptions){ | ||
var Queue = function Queue(name, redisPort, redisHost, redisOptions, queueOptions){ | ||
if(!(this instanceof Queue)){ | ||
return new Queue(name, redisPort, redisHost, redisOptions); | ||
} | ||
|
@@ -57,7 +57,7 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){ | |
var redisOpts = opts.redis || {}; | ||
redisPort = redisOpts.port; | ||
redisHost = redisOpts.host; | ||
redisOptions = redisOpts.opts || {}; | ||
redisOptions = redisOpts.opts || {}; | ||
redisOptions.db = redisOpts.DB; | ||
} | ||
|
||
|
@@ -77,6 +77,13 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){ | |
redisPort = redisPort || 6379; | ||
redisHost = redisHost || '127.0.0.1'; | ||
|
||
queueOptions = _.pick(queueOptions, ['processStalledJobs']); | ||
queueOptions = _.defaults(queueOptions, { | ||
processStalledJobs: true | ||
}); | ||
|
||
this.opts = queueOptions; | ||
|
||
var _this = this; | ||
|
||
this.name = name; | ||
|
@@ -423,7 +430,10 @@ Queue.prototype.run = function(concurrency){ | |
var promises = []; | ||
var _this = this; | ||
|
||
return this.processStalledJobs().then(function(){ | ||
// In case of connection loss, running `processStalledJobs` will repick jobs here. | ||
var start = this.opts.processStalledJobs ? this.processStalledJobs() : Promise.resolve(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this still needed to be run when the queue starts, instead of just letting it start after There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The expectations of Bull is to process them immediately on reconnection. Not calling it to begin with result in many failures across tests. |
||
|
||
return start.then(function(){ | ||
|
||
while(concurrency--){ | ||
promises.push(new Promise(_this.processJobs)); | ||
|
@@ -433,8 +443,11 @@ Queue.prototype.run = function(concurrency){ | |
// Set process Stalled jobs intervall | ||
// | ||
clearInterval(_this.stalledJobsInterval); | ||
_this.stalledJobsInterval = | ||
setInterval(_this.processStalledJobs, _this.LOCK_RENEW_TIME); | ||
if(_this.opts.processStalledJobs) { | ||
_this.stalledJobsInterval = | ||
setInterval(_this.processStalledJobs, _this.LOCK_RENEW_TIME); | ||
|
||
} | ||
|
||
return Promise.all(promises); | ||
}); | ||
|
@@ -475,15 +488,19 @@ Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){ | |
}; | ||
|
||
/** | ||
Process jobs that have been added to the active list but are not being | ||
processed properly. | ||
* Process jobs that have been added to the active list but are not being | ||
* processed properly. | ||
* | ||
* @param {Number?} limit Only process this many number of jobs. Greater than 1, otherwise -1 | ||
*/ | ||
Queue.prototype.processStalledJobs = function(){ | ||
Queue.prototype.processStalledJobs = function(limit){ | ||
var _this = this; | ||
limit = limit > 0 ? limit - 1 : -1; | ||
|
||
if(this.closing){ | ||
return this.closing; | ||
} else{ | ||
return this.client.lrangeAsync(this.toKey('active'), 0, -1).then(function(jobs){ | ||
return this.client.lrangeAsync(this.toKey('active'), 0, limit).then(function(jobs){ | ||
return Promise.each(jobs, function(jobId) { | ||
return Job.fromId(_this, jobId).then(_this.processStalledJob); | ||
}); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs readme update?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bull's documentation is currently outdated in many regards, I'm addressing this: #309 by documenting everything in jsdoc format, if documentation lives closer to the code, it's much more likely it will be up to date. There are many helpers that generate nice looking documentation, I particularly liked this one: https://camo.githubusercontent.com/724b9224844b6b4f2cd19b3bce8d25015fa54cfa/687474703a2f2f7075752e73682f674f794e652f363663336164636239372e706e67
In any case, yes, this needs a readme update