Skip to content

Commit

Permalink
feat: allow updating job error/attempt info via script
Browse files Browse the repository at this point in the history
Affects the following scripts:
- moveToDelayed
- moveToFinished
- retryJob

Allows for the `attemptsMade`, `stacktrace` and `message` fields to be
updated through Lua scripts, rather than requiring a separate operation
(or multiple operations) to set these fields.

With the possibility of setting these fields within the context of a
single script, we avoid the need to craft a transaction containing both
HMSET and EVAL(SHA) commands to achieve atomicity. By shedding the need
for a transaction, we are able to avoid redis/ioredis#536 and thus solve
a common problem seen when targetting Redis clusters.

This isn’t necessarily an ideal means of solving this problem, given
we’re violating DRY principles by copying the same logic across
scripts. It’d be ideal if the underlying ioredis restriction was
removed (in a manner which doesn’t entail the performance cost of
resubmitting scripts each time).
  • Loading branch information
Rua-Yuki committed Sep 9, 2020
1 parent d479e1f commit 522fe4d
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 8 deletions.
8 changes: 8 additions & 0 deletions lib/commands/moveToDelayed-3.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
ARGV[1] delayedTimestamp
ARGV[2] the id of the job
ARGV[3] queue token
ARGV[4] should update job error data (with args 5-7)
ARGV[5] |- attemptsMade
ARGV[6] |- stacktrace
ARGV[7] |- failedReason
Output:
0 - OK
Expand All @@ -21,6 +25,10 @@
local rcall = redis.call

if rcall("EXISTS", KEYS[3]) == 1 then
-- Update job data if desired.
if ARGV[4] == "1" then
rcall("HMSET", KEYS[3], "attemptsMade", ARGV[5], "stacktrace", ARGV[6], "failedReason", ARGV[7])
end

-- Check for job lock
if ARGV[3] ~= "0" then
Expand Down
9 changes: 9 additions & 0 deletions lib/commands/moveToFinished-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
ARGV[9] base key
ARGV[10] lock duration
ARGV[11] token
ARGV[12] should update job error data (with args 13-15)
ARGV[13] |- attemptsMade
ARGV[14] |- stacktrace
ARGV[15] |- failedReason
Output:
0 OK
Expand All @@ -38,6 +42,11 @@
local rcall = redis.call

if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
-- Update job data if desired.
if ARGV[12] == "1" then
rcall("HMSET", KEYS[3], "attemptsMade", ARGV[13], "stacktrace", ARGV[14], "failedReason", ARGV[15])
end

if ARGV[5] ~= "0" then
local lockKey = KEYS[3] .. ':lock'
if rcall("GET", lockKey) == ARGV[5] then
Expand Down
8 changes: 8 additions & 0 deletions lib/commands/retryJob-3.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
ARGV[1] pushCmd
ARGV[2] jobId
ARGV[3] token
ARGV[4] should update job error data (with args 5-7)
ARGV[5] |- attemptsMade
ARGV[6] |- stacktrace
ARGV[7] |- failedReason
Events:
'prefix:added'
Expand All @@ -19,6 +23,10 @@
-2 - Job Not locked
]]
if redis.call("EXISTS", KEYS[3]) == 1 then
-- Update job data if desired.
if ARGV[4] == "1" then
redis.call("HMSET", KEYS[3], "attemptsMade", ARGV[5], "stacktrace", ARGV[6], "failedReason", ARGV[7])
end

-- Check for job lock
if ARGV[3] ~= "0" then
Expand Down
40 changes: 32 additions & 8 deletions lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ const scripts = {
shouldRemove,
target,
ignoreLock,
notFetch
notFetch,
errorInfo
) {
const queue = job.queue;
const queueKeys = queue.keys;
Expand All @@ -131,6 +132,8 @@ const scripts = {
shouldRemove = `${shouldRemove + 1}`;
}

errorInfo = errorInfo instanceof Object ? errorInfo : {};

const args = [
job.id,
job.finishedOn,
Expand All @@ -142,7 +145,11 @@ const scripts = {
notFetch || queue.paused || queue.closing || queue.limiter ? 0 : 1,
queueKeys[''],
queue.settings.lockDuration,
queue.token
queue.token,
errorInfo.attemptsMade ? '1' : '0',
errorInfo.attemptsMade,
errorInfo.stacktrace,
errorInfo.message
];

return keys.concat(args);
Expand Down Expand Up @@ -205,15 +212,16 @@ const scripts = {
);
},

moveToFailedArgs(job, failedReason, removeOnFailed, ignoreLock) {
moveToFailedArgs(job, failedReason, removeOnFailed, ignoreLock, errorInfo) {
return scripts.moveToFinishedArgs(
job,
failedReason,
'failedReason',
removeOnFailed,
'failed',
ignoreLock,
true
true,
errorInfo
);
},

Expand All @@ -235,7 +243,7 @@ const scripts = {
return job.queue.client.isFinished(keys.concat([job.id]));
},

moveToDelayedArgs(queue, jobId, timestamp, ignoreLock) {
moveToDelayedArgs(queue, jobId, timestamp, ignoreLock, errorInfo) {
//
// Bake in the job id first 12 bits into the timestamp
// to guarantee correct execution order of delayed jobs
Expand All @@ -251,13 +259,19 @@ const scripts = {
timestamp = timestamp * 0x1000 + (jobId & 0xfff);
}

errorInfo = errorInfo instanceof Object ? errorInfo : {};

const keys = _.map(['active', 'delayed', jobId], name => {
return queue.toKey(name);
});
return keys.concat([
JSON.stringify(timestamp),
jobId,
ignoreLock ? '0' : queue.token
ignoreLock ? '0' : queue.token,
errorInfo.attemptsMade ? '1' : '0',
errorInfo.attemptsMade,
errorInfo.stacktrace,
errorInfo.message
]);
},

Expand Down Expand Up @@ -411,7 +425,7 @@ const scripts = {
]);
},

retryJobArgs(job, ignoreLock) {
retryJobArgs(job, ignoreLock, errorInfo) {
const queue = job.queue;
const jobId = job.id;

Expand All @@ -421,7 +435,17 @@ const scripts = {

const pushCmd = (job.opts.lifo ? 'R' : 'L') + 'PUSH';

return keys.concat([pushCmd, jobId, ignoreLock ? '0' : job.queue.token]);
errorInfo = errorInfo instanceof Object ? errorInfo : {};

return keys.concat([
pushCmd,
jobId,
ignoreLock ? '0' : job.queue.token,
errorInfo.attemptsMade ? '1' : '0',
errorInfo.attemptsMade,
errorInfo.stacktrace,
errorInfo.message
]);
},

/**
Expand Down

0 comments on commit 522fe4d

Please sign in to comment.