Skip to content

Commit

Permalink
refactor Queue.schedule api
Browse files Browse the repository at this point in the history
  • Loading branch information
lykmapipo committed Sep 6, 2016
1 parent 4c6fec0 commit 55151e7
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 103 deletions.
2 changes: 1 addition & 1 deletion Gruntfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ module.exports = function (grunt) {
timeout: 20000
},
// src: ['test/**/*.js']
src: ['test/schedule/every.spec.js']
src: ['test/schedule/schedule.spec.js']
}
},
jshint: {
Expand Down
177 changes: 84 additions & 93 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -858,14 +858,15 @@ Queue.prototype.every = function (interval, job, done) {
*
* @param {Date|String} when when should this job run
* @param {Job} jobDefinition valid kue job instance which has not been saved
* @param {Fuction} [done] a callback to invoke on success or error
* @example
* 1. create non-unique job
* var job = Queue
* .createJob('schedule', data)
* .attempts(3)
* .priority('normal');
*
* Queue.schedule('2 seconds from now', job);
* Queue.schedule('2 seconds from now', job, done);
*
* 2. create unique job
* var job = Queue
Expand All @@ -874,112 +875,102 @@ Queue.prototype.every = function (interval, job, done) {
* .priority('normal')
* .unique(<unique_key>);
*
* Queue.schedule('2 seconds from now', job);
* Queue.schedule('2 seconds from now', job, done);
* @public
*/
Queue.prototype.schedule = function (when, job) {
Queue.prototype.schedule = function (when, job, done) {
//this refer to kue Queue instance context

//back-off if no interval and job
if (!when || !job) {
this.emit(
'schedule error',
new Error('Invalid number of parameters')
);
}
async.waterfall([
function ensureInterval(next) {
if (!when && !(_.isString(when) || _.isDate(when))) {
next(new Error('Missing Schedule Interval'));
} else {
next(null, when, job);
}
},

//check for job instance
else if (!(job instanceof Job)) {
this.emit(
'schedule error',
new Error('Invalid job type')
);
}
function ensureJobInstance(when, job, next) {
if (!job && !(job instanceof Job)) {
next(new Error('Invalid Job Instance'));
} else {
next(null, when, job);
}
},

//continue with processing job
else {
function prepareJobDefinition(when, job, next) {
var jobDefinition = _.extend(job.toJSON(), {
backoff: job._backoff
});

var jobDefinition = _.extend(job.toJSON(), {
backoff: job._backoff
});
next(null, when, job, jobDefinition);
},

async.waterfall(
[
function computeDelay(next) {
//when is date instance
if (when instanceof Date) {
next(null, when);
}
function computeDelay(when, job, jobDefinition, next) {
//when is date instance
if (when instanceof Date) {
next(null, jobDefinition, when);
}

//otherwise parse as date.js string
else {
this._parse(when, next);
//otherwise parse as date.js string
else {
this._parse(when, function (error, scheduledDate) {
next(error, jobDefinition, scheduledDate);
});
}
}.bind(this),

//set job delay
function setDelay(jobDefinition, scheduledDate, next) {
next(
null,
_.merge({}, jobDefinition, {
delay: scheduledDate,
data: {
schedule: 'ONCE'
}
}.bind(this),
})
);
},

//set job delay
function setDelay(scheduledDate, next) {
next(
null,
_.merge({}, jobDefinition, {
delay: scheduledDate,
data: {
schedule: 'ONCE'
}
})
);
},
function buildJob(delayedJobDefinition, next) {
this._buildJob(delayedJobDefinition, next);
}.bind(this),

function buildJob(delayedJobDefinition, next) {
this._buildJob(delayedJobDefinition, next);
}.bind(this),

function saveJob(job, validations, next) {
job.save(function (error, existJob) {
if (error) {
next(error);
} else {
//ensure unique job
if (existJob && existJob.alreadyExist) {
//inactivate to signal next run
if (existJob.state() === 'complete' || existJob.state() ===
'failed') {
//unset the unique mapping for this.
//existJob.inactive();
kue.Job.removeUniqueJobData(existJob.id, function (
err /*, deletedJobData*/ ) {
//resave initial job, which should set new unique constraint.
if (err) {
return next(err, null);
}
job.save(function (error, newJob) {
if (error) {
return next(error, null);
}
return next(null, newJob);
});
});
} else {
return next(null, existJob || job);
}
} else {
return next(null, existJob || job);
}
function saveJob(job, validations, next) {
job.save(function (error, existJob) {
next(error, existJob || job);
});
},

function ensureSingleUniqueJob(job, next) {
ensureUniqueJob(job, next);
}

], function (error, job) {
//fire schedule error event
if (error) {
this.emit('schedule error', error);
}

//fire already schedule event
else if (job.alreadyExist) {
this.emit('already scheduled', job);
}

//fire schedule success event
else {
this.emit('schedule success', job);
}

//invoke callback if provided
if (done && _.isFuction(done)) {
done(error, job);
}

}.bind(this));

}
});
}
],
function finish(error, job) {
if (error) {
this.emit('schedule error', error);
} else if (job.alreadyExist) {
this.emit('already scheduled', job);
} else {
this.emit('schedule success', job);
}
}.bind(this));
}
};


Expand Down
19 changes: 10 additions & 9 deletions test/schedule/schedule.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ describe('Queue#schedule', function () {

beforeEach(function (done) {
Queue = kue.createQueue();
done();
Queue.clear(done);
});

afterEach(function (done) {
Queue.shutdown(done);
Queue.clear(function ( /*error,results*/ ) {
Queue.shutdown(done);
});
});

it('should be a function', function (done) {
Expand Down Expand Up @@ -202,29 +204,28 @@ describe('Queue#schedule', function () {
Queue.once('schedule error', function (error) {

expect(error.message).to.be.equal(
'Invalid number of parameters');
'Missing Schedule Interval');

done();
});

Queue.schedule('2 seconds from now', undefined);
Queue.schedule(undefined, undefined);

});

it(
'should be able to emit `schedule error` if job is not an instance of Job',
'should be able to emit `schedule error` if job is not given',
function (done) {

Queue.once('schedule error', function (error) {

expect(error.message).to.be.equal('Invalid job type');
expect(error.message).to.be.equal(
'Invalid Job Instance');

done();
});

Queue.schedule('2 seconds from now', {
name: faker.name.firstName()
});
Queue.schedule('2 seconds from now', undefined);

});

Expand Down

0 comments on commit 55151e7

Please sign in to comment.