Skip to content

Commit

Permalink
fix: avoid transaction usage in Job#moveToFailed
Browse files Browse the repository at this point in the history
By using just a single ioredis custom command invocation now, rather
than a transaction including a custom command, we should have OptimalBits#758
resolved.

Slightly simplified logic (hopefully).
  • Loading branch information
Rua-Yuki committed Sep 9, 2020
1 parent 2d7c1fe commit 3dd84bc
Showing 1 changed file with 52 additions and 52 deletions.
104 changes: 52 additions & 52 deletions lib/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,59 +248,53 @@ Job.prototype.moveToFailed = async function(err, ignoreLock) {
this.failedReason = err.message;
await this.queue.isReady();

let command;
const multi = this.queue.client.multi();
this._saveAttempt(multi, err);

// Check if an automatic retry should be performed
let moveToFailed = false;
if (this.attemptsMade < this.opts.attempts && !this._discarded) {
// Check if backoff is needed
const delay = await backoffs.calculate(
// Helper which wraps a command response promise, handling errors as needed.
const wrapCommandProm = async (commandName, prom) => {
const code = await prom;
if (code < 0) {
throw scripts.finishedErrors(code, this.id, commandName);
}
};

// Adjust job state to reflect the error we've encountered. The returned object will be
// used to form patches for the job hash object in Redis.
const errorInfo = this._recordError(err);

// Check if backoff is needed.
const delay = !this._discarded && this.attemptsMade < this.opts.attempts
? await backoffs.calculate(
this.opts.backoff,
this.attemptsMade,
this.queue.settings.backoffStrategies,
err
);

if (delay === -1) {
// If delay is -1, we should no continue retrying
moveToFailed = true;
} else if (delay) {
// If so, move to delayed (need to unlock job in this case!)
const args = scripts.moveToDelayedArgs(
this.queue,
this.id,
Date.now() + delay,
ignoreLock
);
multi.moveToDelayed(args);
command = 'delayed';
} else {
// If not, retry immediately
multi.retryJob(scripts.retryJobArgs(this, ignoreLock));
command = 'retry';
}
} else {
// If not, move to failed
moveToFailed = true;
}
)
: -1;

if (moveToFailed) {
if (delay === -1) {
// A delay of -1 indicates we should cease retry efforts and fail immediately.
this.finishedOn = Date.now();
const args = scripts.moveToFailedArgs(
this,
err.message,
this.opts.removeOnFail,
ignoreLock
ignoreLock,
errorInfo
);
multi.moveToFinished(args);
command = 'failed';
}
const results = await multi.exec();
const code = _.last(results)[1];
if (code < 0) {
throw scripts.finishedErrors(code, this.id, command);
return wrapCommandProm('failed', this.queue.client.moveToFinished(args));
} else if (delay) {
// Move to delayed set so we can retry eventually.
const args = scripts.moveToDelayedArgs(
this.queue,
this.id,
Date.now() + delay,
ignoreLock,
errorInfo
);
return wrapCommandProm('delayed', this.queue.client.moveToDelayed(args));
} else {
// Retry immediately.
const args = scripts.retryJobArgs(this, ignoreLock, errorInfo);
return wrapCommandProm('retry', this.queue.client.retryJob(args));
}
};

Expand Down Expand Up @@ -545,22 +539,28 @@ Job.prototype._isInList = function(list) {
);
};

Job.prototype._saveAttempt = function(multi, err) {
this.attemptsMade++;

const params = {
attemptsMade: this.attemptsMade
};

/**
* Updates local job data to reflect the given error's occurrence. Increments the count
* of consumed attempts and appends the error trace to the job's stacktrace list.
*
* Note that the job's Redis hash is left unaffected by this function; the caller ought
* to attempt state synchronisation using the returned data if necessary.
*
* @returns {object} Returns an object containing fields which should be updated on the
* Redis hash entry for the job in order to synchronise state.
*/
Job.prototype._recordError = function(err) {
if (this.opts.stackTraceLimit) {
this.stacktrace = this.stacktrace.slice(0, this.opts.stackTraceLimit - 1);
}

this.stacktrace.push(err.stack);
params.stacktrace = JSON.stringify(this.stacktrace);
params.failedReason = err.message;

multi.hmset(this.queue.toKey(this.id), params);
return {
attemptsMade: ++this.attemptsMade,
stacktrace: JSON.stringify(this.stacktrace),
message: err.message
};
};

Job.fromJSON = function(queue, json, jobId) {
Expand Down

0 comments on commit 3dd84bc

Please sign in to comment.