diff --git a/README.md b/README.md index c0f2922..5a444aa 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,8 @@ $ npm install --save async lodash kue kue-scheduler ## Usage -### Schedule a job to run every after specified time interval +### Schedule a non unique job to run every after specified time interval +Use this if you want to maintain different(multiple) job instances on every run Example `schedule a job to run every two seconds from now` ```js @@ -52,7 +53,35 @@ Queue.process('every', function(job, done) { }); ``` -### Schedule a job to run only once after specified interval elapsed +### Schedule a unique job to run every after specified time interval +Use this if you want to maintain a single job instance on every run. + +Example `schedule a job to run every two seconds from now` +```js +var kue = require('kue-scheduler'); +var Queue = kue.createQueue(); + +//create a job instance +var job = Queue + .createJob('unique_every', data) + .attempts(3) + .backoff(backoff) + .priority('normal') + .unique('unique_every'); + +//schedule it to run every 2 seconds +Queue.every('2 seconds', job); + + +//somewhere process your scheduled jobs +Queue.process('unique_every', function(job, done) { + ... + done(); +}); +``` + +### Schedule a non unique job to run only once after specified interval elapsed +Use this if you want to maintain different(multiple) job instances on every run Example `schedule a job to run only once two seconds from now` ```js @@ -77,6 +106,33 @@ Queue.process('shedule', function(job, done) { }); ``` +### Schedule a unique job to run only once after specified interval elapsed +Use this if you want to maintain a single job instance on every run. + +Example `schedule a job to run only once two seconds from now` +```js +var kue = require('kue-scheduler'); +var Queue = kue.createQueue(); + +//create a job instance +var job = Queue + .createJob('unique_schedule', data) + .attempts(3) + .backoff(backoff) + .priority('normal') + .unique('unique_schedule'); + +//schedule it to run once 2 seconds from now +Queue.schedule('2 seconds from now', job); + + +//somewhere process your scheduled jobs +Queue.process('unique_shedule', function(job, done) { + ... + done(); +}); +``` + ### Schedule a job to run now ```js @@ -116,7 +172,7 @@ Queue.enableExpiryNotifications(); ``` ### `every(interval, job)` -Runs a given `job instance` every after a given `interval`. +Runs a given `job instance` every after a given `interval`. If `unique key` is provided only single instance job will exists otherwise on every run new job istance will be used. `interval` can either be a [human-interval](https://github.com/rschmukler/human-interval) `String` format or a [cron](https://github.com/ncb000gt/node-cron) `String` format. @@ -144,7 +200,7 @@ Queue.process('every', function(job, done) { ### `schedule(when, job)` -Schedules a given `job instance` to run once at a given time. `when` can either be a `Date instance` or a [date.js](https://github.com/matthewmueller/date) `String` such as `tomorrow at 5pm`. +Schedules a given `job instance` to run once at a given time. `when` can either be a `Date instance` or a [date.js](https://github.com/matthewmueller/date) `String` such as `tomorrow at 5pm`. If `unique key` is provided only single instance job will exists otherwise on every run new job istance will be used. ```js var kue = require('kue-scheduler'); @@ -193,7 +249,7 @@ Queue.process('now', function(job, done) { ``` ## Events -Currently the only way to interact with `kue-scheduler` is through its events. `kue-scheduler` fires `schedule error`, `schedule success` and `scheduler unknown job expiry key` events. +Currently the only way to interact with `kue-scheduler` is through its events. `kue-scheduler` fires `schedule error`, `schedule success`, `already scheduled` and `scheduler unknown job expiry key` events. ### `schedule error` Use it to interact with `kue-scheduler` to get notified when an error occur. @@ -233,6 +289,26 @@ var job = Queue Queue.now(job); ``` +### `already scheduled` +Use it to interact with `kue-scheduler` to be notified if the current instance of job is unique and already schedule to run. + +*Note: Use this event to attach instance level job events* + +```js +//listen alrady scheduled jobs +Queue.on('already scheduled', function(job) { + ... +}); + +var job = Queue + .createJob('now', data) + .attempts(3) + .backoff(backoff) + .priority('normal'); + +Queue.now(job); +``` + ### `scheduler unknown job expiry key` Fired when `kue-scheduler` receive unknown key event from redis. Use it to be notified on unknown key(s) events. @@ -264,10 +340,10 @@ It will be nice, if you open an issue first so that we can know what is going on ## TODO -- [ ] Scheduler restart after shutdown -- [ ] Reschedule/scan jobs on restart +- [x] Scheduler restart after shutdown +- [x] Reschedule/scan jobs on restart - [ ] Test multi process scheduler -- [ ] Support unique reccur jobs +- [x] Support unique reccur jobs ## License diff --git a/examples/every_unique.js b/examples/every_unique.js new file mode 100644 index 0000000..ea6719d --- /dev/null +++ b/examples/every_unique.js @@ -0,0 +1,68 @@ +'use strict'; + +/** + * @description example of scheduling a unique jobs to run every after specified interval + */ + +//dependencies +var path = require('path'); + +//require('kue-scheduler') here +var kue = require(path.join(__dirname, '..', 'index')); +var Queue = kue.createQueue(); + + +//processing jobs +Queue.process('unique_every', function(job, done) { + console.log('\nProcessing job with id %s at %s', job.id, new Date()); + done(null, { + deliveredAt: new Date() + }); +}); + + +//listen on scheduler errors +Queue.on('schedule error', function(error) { + //handle all scheduling errors here + console.log(error); +}); + + +//listen on success scheduling +Queue.on('schedule success', function(job) { + //a highly recommended place to attach + //job instance level events listeners + + job.on('complete', function(result) { + console.log('Job completed with data ', result); + + }).on('failed attempt', function(errorMessage, doneAttempts) { + console.log('Job failed'); + + }).on('failed', function(errorMessage) { + console.log('Job failed'); + + }).on('progress', function(progress, data) { + console.log('\r job #' + job.id + ' ' + progress + '% complete with data ', data); + + }); + +}); + +//prepare a job to perform +//dont save it +var job = Queue + .createJob('unique_every', { + to: 'any' + }) + .attempts(3) + .backoff({ + delay: 60000, + type: 'fixed' + }) + .priority('normal') + .unique('unique_every'); + + +//schedule a job then +Queue.every('2 seconds', job); \ No newline at end of file diff --git a/examples/schedule.js b/examples/schedule.js index 77f74dd..9a26872 100644 --- a/examples/schedule.js +++ b/examples/schedule.js @@ -20,9 +20,6 @@ Queue.process('schedule', function(job, done) { }); }); -//promote unless you use kue version with auto promote -Queue.promote(3000); - //listen on scheduler errors Queue.on('schedule error', function(error) { //handle all scheduling errors here diff --git a/examples/schedule_unique.js b/examples/schedule_unique.js new file mode 100644 index 0000000..94ea0f2 --- /dev/null +++ b/examples/schedule_unique.js @@ -0,0 +1,68 @@ +'use strict'; + +/** + * @description example of scheduling unique jobs to run once after specified interval + */ + +//dependencies +var path = require('path'); + +//require('kue-scheduler') here +var kue = require(path.join(__dirname, '..', 'index')); +var Queue = kue.createQueue(); + + +//processing jobs +Queue.process('unique_schedule', function(job, done) { + console.log('\nProcessing job with id %s at %s', job.id, new Date()); + done(null, { + deliveredAt: new Date() + }); +}); + + +//listen on scheduler errors +Queue.on('schedule error', function(error) { + //handle all scheduling errors here + console.log(error); +}); + + +//listen on success scheduling +Queue.on('schedule success', function(job) { + //a highly recommended place to attach + //job instance level events listeners + + job.on('complete', function(result) { + console.log('Job completed with data ', result); + + }).on('failed attempt', function(errorMessage, doneAttempts) { + console.log('Job failed'); + + }).on('failed', function(errorMessage) { + console.log('Job failed'); + + }).on('progress', function(progress, data) { + console.log('\r job #' + job.id + ' ' + progress + '% complete with data ', data); + + }); + +}); + +//prepare a job to perform +//dont save it +var job = Queue + .createJob('unique_schedule', { + to: 'any' + }) + .attempts(3) + .backoff({ + delay: 60000, + type: 'fixed' + }) + .priority('normal') + .unique('unique_schedule'); + + +//schedule a job then +Queue.schedule('2 seconds from now', job); \ No newline at end of file diff --git a/index.js b/index.js index 62aa7a8..84bec64 100644 --- a/index.js +++ b/index.js @@ -10,7 +10,7 @@ //dependencies -var kue = require('kue'); +var kue = require('kue-unique'); var Job = kue.Job; var Queue = kue; // @@ -55,6 +55,70 @@ Queue.prototype._isJobExpiryKey = function(jobExpiryKey) { }; +/** + * @description check if job exists and its ttl has not timeout + * @param {String} jobExpiryKey valid job expiry key + * @param {Function} done a function to invoke on success or error + * @return {Boolean} whether job already scheduled or not + * @private + */ +Queue.prototype._isJobAlreadyScheduled = function(jobExpiryKey, done) { + //this refer to kue Queue instance context + + async.parallel({ + + exists: function isKeyExists(next) { + this._scheduler.exists(jobExpiryKey, next); + }.bind(this), + + ttl: function isKeyExpired(next) { + this._scheduler.pttl(jobExpiryKey, next); + }.bind(this) + + }, function(error, results) { + if (error) { + done(error); + } else { + var exists = (results.exists && results.exists === 1) ? true : false; + var active = (results.ttl && results.ttl > 0) ? true : false; + + var alreadyScheduled = exists && active; + done(null, alreadyScheduled); + } + }); +}; + + +/** + * @description generate job uuid from job definition + * @param {Object} jobDefinition valid job definition + * @return {String} job uuid + * @private + */ +Queue.prototype._generateJobUUID = function(jobDefinition) { + //this refer to kue Queue instance context + + var unique = jobDefinition.data ? jobDefinition.data.unique : undefined; + var type = jobDefinition.type ? jobDefinition.type : undefined; + + //deduce job uuid from unique key + if (unique) { + return _.snakeCase(unique); + } + + //deduce uuid from job type + else if (type) { + return _.snakeCase(type); + } + + //otherwise generate uuid + else { + return uuid.v1(); + } + +}; + + /** * @function * @description generate job uuid from job expiry key @@ -91,12 +155,9 @@ Queue.prototype._saveJobData = function(jobDataKey, jobData, done) { this ._scheduler - .set( - jobDataKey, - JSON.stringify(jobData), - function(error /*, response*/ ) { - done(error, jobData); - }); + .set(jobDataKey, JSON.stringify(jobData), function(error /*, response*/ ) { + done(error, jobData); + }); }; @@ -157,11 +218,8 @@ Queue.prototype._parse = function(str, done) { */ Queue.prototype._buildJob = function(jobDefinition, done) { //this refer to kue Queue instance context - var self = this; - - async - .parallel({ + async.parallel({ isDefined: function(next) { //is job definition provided var isObject = _.isPlainObject(jobDefinition); @@ -210,10 +268,7 @@ Queue.prototype._buildJob = function(jobDefinition, done) { //instantiate kue job var job = - self.createJob( - jobDefinition.type, - jobDefinition.data - ); + this.createJob(jobDefinition.type, jobDefinition.data); //apply all job attributes into kue job instance _.keys(jobDefinition).forEach(function(attribute) { @@ -246,7 +301,7 @@ Queue.prototype._buildJob = function(jobDefinition, done) { //we are done done(null, job, validations); } - }); + }.bind(this)); }; @@ -267,8 +322,7 @@ Queue.prototype._computeNextRunTime = function(jobData, done) { var interval = jobData.reccurInterval; - async - .parallel({ + async.parallel({ //compute next run from cron interval cron: function(after) { try { @@ -340,23 +394,21 @@ Queue.prototype._computeNextRunTime = function(jobData, done) { */ Queue.prototype._onJobKeyExpiry = function(jobExpiryKey) { //this refer to kue Queue instance context - var self = this; - async - .waterfall( + async.waterfall( [ //get job data function getJobData(next) { //get job uuid - var jobUUID = self._getJobUUID(jobExpiryKey); + var jobUUID = this._getJobUUID(jobExpiryKey); //get saved job data - self._readJobData(self._getJobDataKey(jobUUID), next); - }, + this._readJobData(this._getJobDataKey(jobUUID), next); + }.bind(this), //compute next run time function computeNextRun(jobData, next) { - self + this ._computeNextRunTime(jobData, function(error, nextRunTime) { if (error) { next(error); @@ -364,7 +416,7 @@ Queue.prototype._onJobKeyExpiry = function(jobExpiryKey) { next(null, jobData, nextRunTime); } }); - }, + }.bind(this), //resave the key to rerun this job again function resaveJobKey(jobData, nextRunTime, next) { @@ -373,7 +425,7 @@ Queue.prototype._onJobKeyExpiry = function(jobExpiryKey) { var now = new Date(); var delay = nextRunTime.getTime() - now.getTime(); - self + this ._scheduler .set(jobExpiryKey, '', 'PX', delay, function(error) { if (error) { @@ -382,20 +434,37 @@ Queue.prototype._onJobKeyExpiry = function(jobExpiryKey) { next(null, jobData); } }); - }, + }.bind(this), function buildJob(jobDefinition, next) { - self._buildJob(jobDefinition, next); + this._buildJob(jobDefinition, next); + }.bind(this), + + function runJob(job, validations, next) { + job.save(function(error, existJob) { + if (error) { + next(error); + } else { + //ensure unique job + if (existJob && existJob.alreadyExist) { + //inactivate to signal next run + existJob.inactive(); + } + + next(null, existJob || job); + } + }); } ], function(error, job) { if (error) { - self.emit('schedule error', error); + this.emit('schedule error', error); + } else if (job.alreadyExist) { + this.emit('already scheduled', job); } else { - //run job immediately - self.now(job); + this.emit('schedule success', job); } - }); + }.bind(this)); }; @@ -406,7 +475,6 @@ Queue.prototype._onJobKeyExpiry = function(jobExpiryKey) { */ Queue.prototype._subscribe = function() { //this refer to kue Queue instance context - var self = this; //listen for job key expiry this @@ -415,17 +483,17 @@ Queue.prototype._subscribe = function() { //test if the event key is job expiry key //and emit `scheduler unknown job expiry key` if not - if (!self._isJobExpiryKey(jobExpiryKey)) { - self.emit('scheduler unknown job expiry key', jobExpiryKey); + if (!this._isJobExpiryKey(jobExpiryKey)) { + this.emit('scheduler unknown job expiry key', jobExpiryKey); return; } - self._onJobKeyExpiry(jobExpiryKey); + this._onJobKeyExpiry(jobExpiryKey); - }); + }.bind(this)); //subscribe to key expiration events - self._listener.subscribe('__keyevent@0__:expired'); + this._listener.subscribe('__keyevent@0__:expired'); }; @@ -444,15 +512,31 @@ Queue.prototype._subscribe = function() { * @param {String} interval scheduled interval in either human interval or * cron format * @param {Job} job valid kue job instance which has not been saved + * @example + * 1. create non-unique job + * var job = Queue + * .createJob('every', data) + * .attempts(3) + * .priority('normal'); + * + * Queue.every('2 seconds', job); + * + * 2. create unique job + * var job = Queue + * .create('every', data) + * .attempts(3) + * .priority('normal') + * .unique(); + * + * Queue.every('2 seconds', job); * @private */ Queue.prototype.every = function(interval, job) { //this refer to kue Queue instance context - var self = this; //back-off if no interval and job if (!interval || !job) { - self.emit( + this.emit( 'schedule error', new Error('Invalid number of parameters') ); @@ -460,7 +544,7 @@ Queue.prototype.every = function(interval, job) { //check for job instance else if (!(job instanceof Job)) { - self.emit( + this.emit( 'schedule error', new Error('Invalid job type') ); @@ -480,45 +564,60 @@ Queue.prototype.every = function(interval, job) { }); //generate job uuid - var jobUUID = uuid.v1(); + var jobUUID = this._generateJobUUID(jobDefinition); - async - .parallel({ - jobExpiryKey: function(next) { - next(null, self._getJobExpiryKey(jobUUID)); - }, + //check if job already scheduled + this._isJobAlreadyScheduled(this._getJobExpiryKey(jobUUID), function(error, isAlreadyScheduled) { + if (error) { + this.emit('schedule error', error); + } - jobDataKey: function(next) { - next(null, self._getJobDataKey(jobUUID)); - }, + if (!isAlreadyScheduled) { + async.parallel({ - nextRunTime: function(next) { - self._computeNextRunTime(jobDefinition, next); - } - }, function finish(error, results) { + jobExpiryKey: function(next) { + next(null, this._getJobExpiryKey(jobUUID)); + }.bind(this), - var now = new Date(); - var delay = results.nextRunTime.getTime() - now.getTime(); + jobDataKey: function(next) { + next(null, this._getJobDataKey(jobUUID)); + }.bind(this), - async - .waterfall([ + nextRunTime: function(next) { + this._computeNextRunTime(jobDefinition, next); + }.bind(this) - function saveJobData(next) { - //save job data - self._saveJobData(results.jobDataKey, jobDefinition, next); - }, + }, function finish(error, results) { + if (error) { + this.emit('schedule error', error); + } else { - function setJobKeyExpiry(jobData, next) { - //save key an wait for it to expiry - self._scheduler.set(results.jobExpiryKey, '', 'PX', delay, next); - } - ], function(error) { - if (error) { - self.emit('schedule error', error); - } - }); + var now = new Date(); + var delay = results.nextRunTime.getTime() - now.getTime(); - }); + async + .waterfall([ + + function saveJobData(next) { + //save job data + this._saveJobData(results.jobDataKey, jobDefinition, next); + }.bind(this), + + function setJobKeyExpiry(jobData, next) { + //save key an wait for it to expiry + this._scheduler.set(results.jobExpiryKey, '', 'PX', delay, next); + }.bind(this) + + ], function(error) { + if (error) { + this.emit('schedule error', error); + } + }.bind(this)); + } + + }.bind(this)); + } + }.bind(this)); } @@ -540,15 +639,31 @@ Queue.prototype.every = function(interval, job) { * * @param {Date|String} when when should this job run * @param {Job} jobDefinition valid kue job instance which has not been saved + * @example + * 1. create non-unique job + * var job = Queue + * .createJob('schedule', data) + * .attempts(3) + * .priority('normal'); + * + * Queue.schedule('2 seconds from now', job); + * + * 2. create unique job + * var job = Queue + * .create('schedule', data) + * .attempts(3) + * .priority('normal') + * .unique(); + * + * Queue.schedule('2 seconds from now', job); * @private */ Queue.prototype.schedule = function(when, job) { //this refer to kue Queue instance context - var self = this; //back-off if no interval and job if (!when || !job) { - self.emit( + this.emit( 'schedule error', new Error('Invalid number of parameters') ); @@ -556,7 +671,7 @@ Queue.prototype.schedule = function(when, job) { //check for job instance else if (!(job instanceof Job)) { - self.emit( + this.emit( 'schedule error', new Error('Invalid job type') ); @@ -569,8 +684,7 @@ Queue.prototype.schedule = function(when, job) { backoff: job._backoff }); - async - .waterfall( + async.waterfall( [ function computeDelay(next) { //when is date instance @@ -580,9 +694,9 @@ Queue.prototype.schedule = function(when, job) { //otherwise parse as date.js string else { - self._parse(when, next); + this._parse(when, next); } - }, + }.bind(this), //set job delay function setDelay(scheduledDate, next) { @@ -598,26 +712,28 @@ Queue.prototype.schedule = function(when, job) { }, function buildJob(delayedJobDefinition, next) { - self._buildJob(delayedJobDefinition, next); - }, + this._buildJob(delayedJobDefinition, next); + }.bind(this), function saveJob(job, validations, next) { - job.save(function(error) { + job.save(function(error, existJob) { if (error) { next(error); } else { - next(null, job); + next(null, existJob || job); } }); } ], function finish(error, job) { if (error) { - self.emit('schedule error', error); + this.emit('schedule error', error); + } else if (job.alreadyExist) { + this.emit('already scheduled', job); } else { - self.emit('schedule success', job); + this.emit('schedule success', job); } - }); + }.bind(this)); } }; @@ -634,44 +750,62 @@ Queue.prototype.schedule = function(when, job) { * on event. * * @param {Job} job a valid kue job instance which has not been saved + * @example + * 1. create non-unique job + * var job = Queue + * .createJob('now', data) + * .attempts(3) + * .priority('normal'); + * + * Queue.now(job); + * + * 2. create unique job + * var job = Queue + * .create('now', data) + * .attempts(3) + * .priority('normal') + * .unique(); + * + * Queue.now(job); * @private */ Queue.prototype.now = function(job) { //this refer to kue Queue instance context - var self = this; if (!job || !(job instanceof Job)) { - self.emit('schedule error', new Error('Invalid job type')); + this.emit('schedule error', new Error('Invalid job type')); } else { var jobDefinition = _.extend(job.toJSON(), { backoff: job._backoff }); - async - .waterfall( + async.waterfall( [ function buildJob(next) { - self._buildJob(jobDefinition, next); - }, + this._buildJob(jobDefinition, next); + }.bind(this), function saveJob(job, validations, next) { - job.save(function(error) { + job.save(function(error, existJob) { if (error) { next(error); } else { - next(null, job); + next(null, existJob || job); } }); } ], function finish(error, job) { + //TODO fire already scheduled events if (error) { - self.emit('schedule error', error); + this.emit('schedule error', error); + } else if (job.alreadyExist) { + this.emit('already scheduled', job); } else { - self.emit('schedule success', job); + this.emit('schedule success', job); } - }); + }.bind(this)); } }; diff --git a/package.json b/package.json index 6bb128b..8497534 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "kue-scheduler", - "version": "0.2.2", + "version": "0.3.0", "description": "A job scheduler utility for kue, backed by redis and built for node.js", "main": "index.js", "scripts": { @@ -45,7 +45,7 @@ "cron": "^1.0.9", "date.js": "^0.2.0", "human-interval": "^0.1.5", - "kue-unique": "^0.1.2", + "kue-unique": "^0.1.3", "node-uuid": "^1.4.3" }, "devDependencies": { diff --git a/test/bootstrap.spec.js b/test/bootstrap.spec.js index 653db32..e82ef4f 100644 --- a/test/bootstrap.spec.js +++ b/test/bootstrap.spec.js @@ -18,13 +18,9 @@ function cleanup(callback) { if (error) { callback(error); } else { - async - .each( - rows, - function(row, next) { - redis.del(row, next); - }, - callback); + async.each(rows, function(row, next) { + redis.del(row, next); + }, callback); } }); } diff --git a/test/schedule/every.spec.js b/test/schedule/every.spec.js index 85aa949..f2af3ca 100644 --- a/test/schedule/every.spec.js +++ b/test/schedule/every.spec.js @@ -2,6 +2,7 @@ //dependencies var expect = require('chai').expect; +var _ = require('lodash'); var path = require('path'); var kue = require(path.join(__dirname, '..', '..', 'index')); var faker = require('faker'); @@ -9,12 +10,12 @@ var Queue; describe('Queue#every', function() { - before(function(done) { + beforeEach(function(done) { Queue = kue.createQueue(); done(); }); - after(function(done) { + afterEach(function(done) { Queue.shutdown(done); }); @@ -23,7 +24,7 @@ describe('Queue#every', function() { done(); }); - it('should be able to schedule a job to run every 2 seconds from now', function(done) { + it('should be able to schedule a non unique job to run every 2 seconds from now', function(done) { var data = { to: faker.internet.email() @@ -34,11 +35,14 @@ describe('Queue#every', function() { type: 'fixed' }; var runCount = 0; + var jobs = []; Queue.process('every', function(job, finalize) { //increament run counts runCount++; + jobs.push(job); + /*jshint camelcase:false */ expect(job.id).to.exist; expect(job.type).to.equal('every'); @@ -80,6 +84,76 @@ describe('Queue#every', function() { //wait for two jobs to be runned setTimeout(function() { expect(runCount).to.equal(2); + var ids = _.map(jobs, 'id'); + expect(ids[0]).to.not.equal(ids[1]); + + done(); + }, 6000); + }); + + it('should be able to schedule a unique job to run every 2 seconds from now', function(done) { + + var data = { + to: faker.internet.email() + }; + + var backoff = { + delay: 60000, + type: 'fixed' + }; + var runCount = 0; + var jobs = []; + + Queue.process('unique_every', function(job, finalize) { + //increament run counts + runCount++; + + jobs.push(job); + + /*jshint camelcase:false */ + expect(job.id).to.exist; + expect(job.type).to.equal('unique_every'); + expect(parseInt(job._max_attempts)).to.equal(3); + expect(job.data.to).to.equal(data.to); + expect(job.data.schedule).to.equal('RECCUR'); + + expect(job._backoff).to.eql(backoff); + expect(parseInt(job._priority)).to.equal(0); + /*jshint camelcase:true */ + + finalize(); + }); + + //listen on success scheduling + Queue.on('schedule success', function(job) { + if (job.type === 'unique_every') { + /*jshint camelcase:false */ + expect(job.id).to.exist; + expect(job.type).to.equal('unique_every'); + expect(parseInt(job._max_attempts)).to.equal(3); + expect(job.data.to).to.equal(data.to); + expect(job.data.schedule).to.equal('RECCUR'); + + expect(job._backoff).to.eql(backoff); + expect(parseInt(job._priority)).to.equal(0); + /*jshint camelcase:true */ + } + }); + + var job = Queue + .createJob('unique_every', data) + .attempts(3) + .backoff(backoff) + .priority('normal') + .unique('every_mail'); + + Queue.every('2 seconds', job); + + //wait for two jobs to be runned + setTimeout(function() { + expect(runCount).to.equal(2); + var ids = _.map(jobs, 'id'); + expect(ids[0]).to.equal(ids[1]); done(); }, 6000); }); diff --git a/test/schedule/now.spec.js b/test/schedule/now.spec.js index ba652e1..754507c 100644 --- a/test/schedule/now.spec.js +++ b/test/schedule/now.spec.js @@ -9,12 +9,12 @@ var Queue; describe('Queue#now', function() { - before(function(done) { + beforeEach(function(done) { Queue = kue.createQueue(); done(); }); - after(function(done) { + afterEach(function(done) { Queue.shutdown(done); }); @@ -23,7 +23,7 @@ describe('Queue#now', function() { done(); }); - it('should be able to schedule a job to run now', function(done) { + it('should be able to schedule a non unique job to run now', function(done) { var data = { to: faker.internet.email() }; @@ -77,6 +77,105 @@ describe('Queue#now', function() { Queue.now(job); }); + it('should be able to schedule a unique job to run now', function(done) { + var data = { + to: faker.internet.email() + }; + + var backoff = { + delay: 60000, + type: 'fixed' + }; + var runCount = 0; + var processedJob; + var existJob; + + + Queue.process('unique_now', function(job, finalize) { + //increament run counts + runCount++; + + /*jshint camelcase:false */ + expect(job.id).to.exist; + expect(job.type).to.equal('unique_now'); + expect(parseInt(job._max_attempts)).to.equal(3); + expect(job.data.to).to.equal(data.to); + expect(job.data.schedule).to.equal('NOW'); + + expect(job._backoff).to.eql(backoff); + expect(parseInt(job._priority)).to.equal(0); + /*jshint camelcase:true */ + + finalize(); + }); + + //listen on success scheduling + Queue.on('schedule success', function(job) { + + if (job.type === 'unique_now') { + //collect jobs + processedJob = job; + + /*jshint camelcase:false */ + expect(job.id).to.exist; + expect(job.type).to.equal('unique_now'); + expect(parseInt(job._max_attempts)).to.equal(3); + expect(job.data.to).to.equal(data.to); + expect(job.data.schedule).to.equal('NOW'); + + expect(job._backoff).to.eql(backoff); + expect(parseInt(job._priority)).to.equal(0); + /*jshint camelcase:true */ + } + }); + + //listen for already scheduled jobs + Queue.on('already scheduled', function(job) { + + if (job.type === 'unique_now') { + + existJob = job; + + /*jshint camelcase:false */ + expect(job.id).to.exist; + expect(job.type).to.equal('unique_now'); + expect(parseInt(job._max_attempts)).to.equal(3); + expect(job.data.to).to.equal(data.to); + expect(job.data.schedule).to.equal('NOW'); + + expect(job._backoff).to.eql(backoff); + expect(parseInt(job._priority)).to.equal(0); + /*jshint camelcase:true */ + } + }); + + + var job = Queue + .createJob('unique_now', data) + .attempts(3) + .backoff(backoff) + .priority('normal') + .unique('mail_now'); + + //fire job at first + Queue.now(job); + + //try fire it again + setTimeout(function() { + Queue.now(job); + }, 1000); + + + //wait for some seconds jobs to be runned + setTimeout(function() { + + expect(runCount).to.equal(1); + expect(existJob.id).to.equal(processedJob.id); + + done(); + }, 3000); + }); + it('should be able to emit `schedule error` if job is not an instance of Job', function(done) { diff --git a/test/schedule/schedule.spec.js b/test/schedule/schedule.spec.js index 6de33b5..0105797 100644 --- a/test/schedule/schedule.spec.js +++ b/test/schedule/schedule.spec.js @@ -11,12 +11,12 @@ var Queue; describe('Queue#schedule', function() { - before(function(done) { + beforeEach(function(done) { Queue = kue.createQueue(); done(); }); - after(function(done) { + afterEach(function(done) { Queue.shutdown(done); }); @@ -34,7 +34,7 @@ describe('Queue#schedule', function() { }); }); - it('should be able to schedule a job to run after 2 seconds from now', function(done) { + it('should be able to schedule a non unique job to run after 2 seconds from now', function(done) { var data = { to: faker.internet.email() }; @@ -95,6 +95,102 @@ describe('Queue#schedule', function() { }, 4000); }); + it('should be able to schedule a unique job to run after 2 seconds from now', function(done) { + var data = { + to: faker.internet.email() + }; + + var backoff = { + delay: 60000, + type: 'fixed' + }; + var runCount = 0; + var processedJob; + var existJob; + + Queue.process('unique_schedule', function(job, finalize) { + //increament run counts + runCount++; + + /*jshint camelcase:false */ + expect(job.id).to.exist; + expect(job.type).to.equal('unique_schedule'); + expect(parseInt(job._max_attempts)).to.equal(3); + expect(job.data.to).to.equal(data.to); + expect(job.data.schedule).to.equal('ONCE'); + + expect(job._backoff).to.eql(backoff); + expect(parseInt(job._priority)).to.equal(0); + /*jshint camelcase:true */ + + finalize(); + }); + + //listen on success scheduling + Queue.on('schedule success', function(job) { + if (job.type === 'unique_schedule') { + //collect jobs + processedJob = job; + + /*jshint camelcase:false */ + expect(job.id).to.exist; + expect(job.type).to.equal('unique_schedule'); + expect(parseInt(job._max_attempts)).to.equal(3); + expect(job.data.to).to.equal(data.to); + expect(job.data.schedule).to.equal('ONCE'); + + expect(job._backoff).to.eql(backoff); + expect(parseInt(job._priority)).to.equal(0); + /*jshint camelcase:true */ + } + }); + + //listen for already scheduled jobs + Queue.on('already scheduled', function(job) { + + if (job.type === 'unique_schedule') { + + existJob = job; + + /*jshint camelcase:false */ + expect(job.id).to.exist; + expect(job.type).to.equal('unique_schedule'); + expect(parseInt(job._max_attempts)).to.equal(3); + expect(job.data.to).to.equal(data.to); + expect(job.data.schedule).to.equal('ONCE'); + + expect(job._backoff).to.eql(backoff); + expect(parseInt(job._priority)).to.equal(0); + /*jshint camelcase:true */ + } + }); + + var job = Queue + .createJob('unique_schedule', data) + .attempts(3) + .backoff(backoff) + .priority('normal') + .unique('mail_schedule'); + + //fire job at first + Queue.schedule('2 seconds from now', job); + + //try fire it again + setTimeout(function() { + Queue.schedule('2 seconds from now', job); + }, 1000); + + + //wait for some seconds jobs to be runned + setTimeout(function() { + + expect(runCount).to.equal(1); + expect(existJob.id).to.equal(processedJob.id); + + done(); + }, 5000); + }); + it('should be able to emit `schedule error` if schedule or job is not given', function(done) { Queue.once('schedule error', function(error) {