Skip to content

Commit

Permalink
feat: Support for retrying to deliver SQS message (#304)
Browse files Browse the repository at this point in the history
* LoggerWrapper: Add support for `warn()`

* feat: Support for retrying to deliver SQS message

* Add some unit tests for QueueSubjectListener

* Include `visibilityTimeout` in debug log
  • Loading branch information
fredrjoh authored Aug 29, 2024
1 parent eca5d2c commit 08bd12a
Show file tree
Hide file tree
Showing 6 changed files with 368 additions and 17 deletions.
2 changes: 2 additions & 0 deletions src/ILogger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@ export interface ILogger {

info: (message: string) => void;

warn: (message: string) => void;

error: (message: string) => void;
}
4 changes: 4 additions & 0 deletions src/LoggerWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ export class LoggerWrapper implements ILogger {
this._logger?.info && this._logger.info(message);
}

warn(message: string) {
this._logger?.warn && this._logger.warn(message);
}

error(message: string) {
this._logger?.error ? this._logger.error(message) : console.log(message);
}
Expand Down
13 changes: 13 additions & 0 deletions src/queue/Queue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {SNS} from '@aws-sdk/client-sns';
import {
ChangeMessageVisibilityCommandInput,
DeleteMessageCommandInput,
ReceiveMessageCommandInput,
SendMessageCommandInput,
Expand Down Expand Up @@ -159,4 +160,16 @@ export class Queue {
};
await this.sqs.deleteMessage(request);
}

async changeMessageVisibility(
receiptHandle: string,
visibilityTimeout: number
) {
const request: ChangeMessageVisibilityCommandInput = {
QueueUrl: this.queueUrl,
ReceiptHandle: receiptHandle,
VisibilityTimeout: visibilityTimeout,
};
await this.sqs.changeMessageVisibility(request);
}
}
133 changes: 116 additions & 17 deletions src/queue/QueueSubjectListener.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import {ReceiveMessageCommandInput} from '@aws-sdk/client-sqs';
import {
ReceiveMessageCommandInput,
QueueAttributeName,
} from '@aws-sdk/client-sqs';
import {ILogger} from '../ILogger';
import {LoggerWrapper} from '../LoggerWrapper';
import {Queue} from './Queue';
Expand All @@ -11,12 +14,37 @@ export type QueueSubjectListenerOptions = {
};

export type QueueSubjectListenerMessageHandler = {
(message: unknown, subject: string): void;
(message: unknown, subject: string): Promise<void>;
};

export type QueueSubjectListenerRetryPolicyOptions = {
maxAttempts: number;
backoffDelaySeconds: number;
retryPolicy?: (
attempt: number,
backoffDelaySeconds: number,
error: unknown
) => number;
};

export const LinearRetryPolicy = (
attempt: number,
backoffDelaySeconds: number
): number => backoffDelaySeconds * attempt;

export const ExponentialRetryPolicy = (
attempt: number,
backoffDelaySeconds: number
) => Math.pow(attempt, backoffDelaySeconds);

export class QueueSubjectListener {
public handlers: Record<string, Array<QueueSubjectListenerMessageHandler>> =
{};
public handlers: Record<
string,
Array<{
handler: QueueSubjectListenerMessageHandler;
retryPolicyOptions?: QueueSubjectListenerRetryPolicyOptions;
}>
> = {};
public isStopped = false;

public logger: ILogger;
Expand All @@ -37,9 +65,22 @@ export class QueueSubjectListener {
this.isStopped = true;
}

onSubject(subjectName: string, handler: QueueSubjectListenerMessageHandler) {
onSubject(
subjectName: string,
handler: QueueSubjectListenerMessageHandler,
retryPolicyOptions?: QueueSubjectListenerRetryPolicyOptions
) {
this.handlers[subjectName] = this.handlers[subjectName] || [];
this.handlers[subjectName].push(handler);
this.handlers[subjectName].push({
handler,
retryPolicyOptions: retryPolicyOptions
? {
maxAttempts: retryPolicyOptions.maxAttempts || 3,
backoffDelaySeconds: retryPolicyOptions.backoffDelaySeconds || 10,
retryPolicy: retryPolicyOptions.retryPolicy || LinearRetryPolicy,
}
: undefined,
});
}

listen(params?: ReceiveMessageCommandInput) {
Expand All @@ -66,6 +107,7 @@ export class QueueSubjectListener {
MaxNumberOfMessages: maxNumberOfMessagesOrUndefined,
VisibilityTimeout,
WaitTimeSeconds,
AttributeNames: [QueueAttributeName.All],
};

const response = await this.queue.receiveMessage(currentParams);
Expand All @@ -89,6 +131,7 @@ export class QueueSubjectListener {
message: {
message: JSON.parse(json.Message),
subject: json.Subject,
attributes: m.Attributes,
},
};
} catch (error) {
Expand All @@ -100,28 +143,84 @@ export class QueueSubjectListener {
cntInFlight += messages.length;

const promises = messages.map(async m => {
const {message, subject} = m.message;
const {message, subject, attributes} = m.message;
let shouldRetry = false;
let visibilityTimeout: number | undefined;
try {
if (this.handlers[subject] || this.handlers['*']) {
const subjectHandlers = (this.handlers[subject] || []).concat(
this.handlers['*'] || []
);
await Promise.all(
(this.handlers[subject] || [])
.concat(this.handlers['*'] || [])
.map(async h => {
try {
await h(message, subject);
} catch (error) {
typeof error === 'string' && this.logger.error(error);
subjectHandlers.map(async h => {
try {
shouldRetry = false;
await h.handler(message, subject);
} catch (error) {
typeof error === 'string' && this.logger.error(error);

if (!h.retryPolicyOptions) return;

if (Object.keys(subjectHandlers).length > 1) {
this.logger.info(
`Multiple handlers for message with subject "${m.message.subject}"`
);
return;
}
})

const {maxAttempts, backoffDelaySeconds, retryPolicy} =
h.retryPolicyOptions;
const attempt = parseInt(
attributes?.ApproximateReceiveCount || '1'
);

if (attempt < maxAttempts) {
shouldRetry = true;
visibilityTimeout = retryPolicy?.(
attempt,
backoffDelaySeconds,
error
);

this.logger.debug(
`Message with subject "${m.message.subject}" will be retried`
);
}
}
})
);
}
if (!m.handle)
throw Error("'handle' property on message was undefined.");

await this.queue.deleteMessage(m.handle);
if (!shouldRetry) {
await this.queue.deleteMessage(m.handle);

this.logger.debug(
`Message with subject "${m.message.subject}" deleted`
);
return;
}

if (
typeof visibilityTimeout === 'number' &&
visibilityTimeout >= 0 &&
visibilityTimeout !== currentParams.VisibilityTimeout
) {
if (visibilityTimeout >= 0 && visibilityTimeout <= 43200) {
await this.queue.changeMessageVisibility(
m.handle,
visibilityTimeout
);
} else {
this.logger.warn(
`Invalid visibilityTimeout value: ${visibilityTimeout}`
);
}
}

this.logger.debug(
`Message with subject "${m.message.subject}" deleted`
`Message with subject "${m.message.subject}" kept, visibilityTimeout: ${visibilityTimeout}`
);
} catch (error) {
typeof error === 'string' && this.logger.error(error);
Expand Down
Loading

0 comments on commit 08bd12a

Please sign in to comment.