diff --git a/Gruntfile.js b/Gruntfile.js index 82e9cb2..1dfa489 100644 --- a/Gruntfile.js +++ b/Gruntfile.js @@ -15,7 +15,7 @@ module.exports = function (grunt) { timeout: 20000 }, // src: ['test/**/*.js'] - src: ['test/schedule/every.spec.js'] + src: ['test/schedule/schedule.spec.js'] } }, jshint: { diff --git a/index.js b/index.js index 2e10e51..3b2b79d 100644 --- a/index.js +++ b/index.js @@ -858,6 +858,7 @@ Queue.prototype.every = function (interval, job, done) { * * @param {Date|String} when when should this job run * @param {Job} jobDefinition valid kue job instance which has not been saved + * @param {Fuction} [done] a callback to invoke on success or error * @example * 1. create non-unique job * var job = Queue @@ -865,7 +866,7 @@ Queue.prototype.every = function (interval, job, done) { * .attempts(3) * .priority('normal'); * - * Queue.schedule('2 seconds from now', job); + * Queue.schedule('2 seconds from now', job, done); * * 2. create unique job * var job = Queue @@ -874,112 +875,102 @@ Queue.prototype.every = function (interval, job, done) { * .priority('normal') * .unique(); * - * Queue.schedule('2 seconds from now', job); + * Queue.schedule('2 seconds from now', job, done); * @public */ -Queue.prototype.schedule = function (when, job) { +Queue.prototype.schedule = function (when, job, done) { //this refer to kue Queue instance context - //back-off if no interval and job - if (!when || !job) { - this.emit( - 'schedule error', - new Error('Invalid number of parameters') - ); - } + async.waterfall([ + function ensureInterval(next) { + if (!when && !(_.isString(when) || _.isDate(when))) { + next(new Error('Missing Schedule Interval')); + } else { + next(null, when, job); + } + }, - //check for job instance - else if (!(job instanceof Job)) { - this.emit( - 'schedule error', - new Error('Invalid job type') - ); - } + function ensureJobInstance(when, job, next) { + if (!job && !(job instanceof Job)) { + next(new Error('Invalid Job Instance')); + } else { + next(null, when, job); + } + }, - //continue with processing job - else { + function prepareJobDefinition(when, job, next) { + var jobDefinition = _.extend(job.toJSON(), { + backoff: job._backoff + }); - var jobDefinition = _.extend(job.toJSON(), { - backoff: job._backoff - }); + next(null, when, job, jobDefinition); + }, - async.waterfall( - [ - function computeDelay(next) { - //when is date instance - if (when instanceof Date) { - next(null, when); - } + function computeDelay(when, job, jobDefinition, next) { + //when is date instance + if (when instanceof Date) { + next(null, jobDefinition, when); + } - //otherwise parse as date.js string - else { - this._parse(when, next); + //otherwise parse as date.js string + else { + this._parse(when, function (error, scheduledDate) { + next(error, jobDefinition, scheduledDate); + }); + } + }.bind(this), + + //set job delay + function setDelay(jobDefinition, scheduledDate, next) { + next( + null, + _.merge({}, jobDefinition, { + delay: scheduledDate, + data: { + schedule: 'ONCE' } - }.bind(this), + }) + ); + }, - //set job delay - function setDelay(scheduledDate, next) { - next( - null, - _.merge({}, jobDefinition, { - delay: scheduledDate, - data: { - schedule: 'ONCE' - } - }) - ); - }, + function buildJob(delayedJobDefinition, next) { + this._buildJob(delayedJobDefinition, next); + }.bind(this), - function buildJob(delayedJobDefinition, next) { - this._buildJob(delayedJobDefinition, next); - }.bind(this), - function saveJob(job, validations, next) { - job.save(function (error, existJob) { - if (error) { - next(error); - } else { - //ensure unique job - if (existJob && existJob.alreadyExist) { - //inactivate to signal next run - if (existJob.state() === 'complete' || existJob.state() === - 'failed') { - //unset the unique mapping for this. - //existJob.inactive(); - kue.Job.removeUniqueJobData(existJob.id, function ( - err /*, deletedJobData*/ ) { - //resave initial job, which should set new unique constraint. - if (err) { - return next(err, null); - } - job.save(function (error, newJob) { - if (error) { - return next(error, null); - } - return next(null, newJob); - }); - }); - } else { - return next(null, existJob || job); - } - } else { - return next(null, existJob || job); - } + function saveJob(job, validations, next) { + job.save(function (error, existJob) { + next(error, existJob || job); + }); + }, + + function ensureSingleUniqueJob(job, next) { + ensureUniqueJob(job, next); + } + + ], function (error, job) { + //fire schedule error event + if (error) { + this.emit('schedule error', error); + } + + //fire already schedule event + else if (job.alreadyExist) { + this.emit('already scheduled', job); + } + + //fire schedule success event + else { + this.emit('schedule success', job); + } + + //invoke callback if provided + if (done && _.isFuction(done)) { + done(error, job); + } + + }.bind(this)); - } - }); - } - ], - function finish(error, job) { - if (error) { - this.emit('schedule error', error); - } else if (job.alreadyExist) { - this.emit('already scheduled', job); - } else { - this.emit('schedule success', job); - } - }.bind(this)); - } }; diff --git a/test/schedule/schedule.spec.js b/test/schedule/schedule.spec.js index b44ba9b..de3e975 100644 --- a/test/schedule/schedule.spec.js +++ b/test/schedule/schedule.spec.js @@ -13,11 +13,13 @@ describe('Queue#schedule', function () { beforeEach(function (done) { Queue = kue.createQueue(); - done(); + Queue.clear(done); }); afterEach(function (done) { - Queue.shutdown(done); + Queue.clear(function ( /*error,results*/ ) { + Queue.shutdown(done); + }); }); it('should be a function', function (done) { @@ -202,29 +204,28 @@ describe('Queue#schedule', function () { Queue.once('schedule error', function (error) { expect(error.message).to.be.equal( - 'Invalid number of parameters'); + 'Missing Schedule Interval'); done(); }); - Queue.schedule('2 seconds from now', undefined); + Queue.schedule(undefined, undefined); }); it( - 'should be able to emit `schedule error` if job is not an instance of Job', + 'should be able to emit `schedule error` if job is not given', function (done) { Queue.once('schedule error', function (error) { - expect(error.message).to.be.equal('Invalid job type'); + expect(error.message).to.be.equal( + 'Invalid Job Instance'); done(); }); - Queue.schedule('2 seconds from now', { - name: faker.name.firstName() - }); + Queue.schedule('2 seconds from now', undefined); });