Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cb2-14226) refactor and remove SQS snowball pattern #84

Merged
merged 13 commits into from
Oct 16, 2024
Merged
29,166 changes: 8,030 additions & 21,136 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
"reflect-metadata": "^0.1.13"
},
"devDependencies": {
"@commitlint/cli": "^12.1.4",
"@commitlint/cli": "^19.5.0",
"@commitlint/config-conventional": "^12.1.4",
"@types/aws-lambda": "^8.10.34",
"@types/jest": "^24.0.21",
Expand All @@ -72,7 +72,7 @@
"jest-plugin-context": "^2.9.0",
"jest-sonar-reporter": "^2.0.0",
"prettier": "^2.3.2",
"serverless": "^2.19.0",
"serverless": "^3.0.0",
"serverless-plugin-tracing": "^2.0.0",
"serverless-plugin-typescript": "^2.1.2",
"sonar-scanner": "^3.1.0",
Expand Down
94 changes: 57 additions & 37 deletions src/functions/certGenInit.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { Callback, Context, Handler } from "aws-lambda";
import { ServiceException } from "@smithy/smithy-client";
import { SendMessageCommandOutput, SQSClient } from "@aws-sdk/client-sqs";
import { SQSClient } from "@aws-sdk/client-sqs";
import {
Callback,
Context,
DynamoDBBatchItemFailure,
DynamoDBBatchResponse,
DynamoDBStreamEvent,
Handler,
} from "aws-lambda";
import { SQService } from "../services/SQService";
import { StreamService } from "../services/StreamService";
import { Utils } from "../utils/Utils";
Expand All @@ -12,47 +18,61 @@ import { Utils } from "../utils/Utils";
* @param callback - callback function
*/
const certGenInit: Handler = async (
event: any,
event: DynamoDBStreamEvent,
context?: Context,
callback?: Callback
): Promise<void | Array<SendMessageCommandOutput | any>> => {
): Promise<DynamoDBBatchResponse> => {
if (!event) {
console.error("ERROR: event is not defined.");
naathanbrown marked this conversation as resolved.
Show resolved Hide resolved
return;
throw new Error("ERROR: event is not defined");
}

// Convert the received event into a readable array of filtered test results
const expandedRecords: any[] = StreamService.getTestResultStream(event);
console.log(`Number of Retrieved records: ${expandedRecords.length}`);
const certGenFilteredRecords: any[] =
Utils.filterCertificateGenerationRecords(expandedRecords);

console.log(`Number of Filtered Retrieved Records: ${certGenFilteredRecords.length}`);

// Instantiate the Simple Queue Service
const sqService: SQService = new SQService(new SQSClient());
const sendMessagePromises: Array<
Promise<SendMessageCommandOutput | ServiceException>
> = [];

certGenFilteredRecords.forEach((record: any) => {
const stringifiedRecord = JSON.stringify(record);
console.log(stringifiedRecord);
sendMessagePromises.push(
sqService.sendCertGenMessage(stringifiedRecord)
);
});

return Promise.all(sendMessagePromises).catch((error) => {
console.error(error);
console.log("expandedRecords");
console.log(JSON.stringify(expandedRecords));
console.log("certGenFilteredRecords");
console.log(JSON.stringify(certGenFilteredRecords));
if (error.code !== "InvalidParameterValue") {
throw error;
const batchItemFailures: DynamoDBBatchItemFailure[] = [];
let expandedRecords: any[] = [];
let certGenFilteredRecords: any[] = [];
let sqService: SQService;

try {
// Instantiate the Simple Queue Service
sqService = new SQService(new SQSClient());
} catch (e) {
console.error(`Error creating SQS instance: ${e}`);
throw new Error("Failed to initialize SQS service");
}

for (const record of event.Records) {
try {
expandedRecords = StreamService.getTestResultStream(record);
console.log(`Number of Retrieved records: ${expandedRecords.length}`);

certGenFilteredRecords =
Utils.filterCertificateGenerationRecords(expandedRecords);
console.log(
`Number of Filtered Retrieved Records: ${certGenFilteredRecords.length}`
);

for (const record of certGenFilteredRecords) {
const stringifiedRecord = JSON.stringify(record);
console.log(stringifiedRecord);
await sqService.sendCertGenMessage(stringifiedRecord);
}

console.log(
`event ${record.dynamodb?.SequenceNumber} successfully processed`
);
} catch (err) {
console.error(err);
console.log("expandedRecords");
console.log(JSON.stringify(expandedRecords));
console.log("certGenFilteredRecords");
console.log(JSON.stringify(certGenFilteredRecords));
batchItemFailures.push({
itemIdentifier: record.dynamodb?.SequenceNumber ?? "",
});
}
naathanbrown marked this conversation as resolved.
Show resolved Hide resolved
});
}

return { batchItemFailures };
};

export { certGenInit };
6 changes: 2 additions & 4 deletions src/services/SQService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import {
SQSClient,
SendMessageCommand,
SendMessageCommandInput,
SendMessageCommandOutput,
SetQueueAttributesCommand,
} from "@aws-sdk/client-sqs";

import { Service } from "../models/injector/ServiceDecorator";
Expand Down Expand Up @@ -69,7 +67,7 @@ class SQService {
messageBody: string,
queueName: string,
messageAttributes?: Record<string, MessageAttributeValue>
): Promise<SendMessageCommandOutput | ServiceException> {
) {
// Get the queue URL for the provided queue name
const queueUrlResult: GetQueueUrlCommandOutput = await this.sqsClient.send(
new GetQueueUrlCommand({ QueueName: queueName })
Expand All @@ -86,7 +84,7 @@ class SQService {
}

// Send a message to the queue
return this.sqsClient.send(
await this.sqsClient.send(
new SendMessageCommand(params as SendMessageCommandInput)
);
}
Expand Down
60 changes: 31 additions & 29 deletions src/services/StreamService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,25 @@ class StreamService {
* └── test-type-3
* @param event
*/
public static getTestResultStream(event: any) {
console.log(event);
public static getTestResultStream(record: DynamoDBRecord) {
console.log(record);
let records = [];
// Create from a test result with multiple test types, multiple test result with one test type each
const records: any[] = event.Records.filter((record: DynamoDBRecord) => {
// Retrieve "INSERT" events
return (
record.eventName === "INSERT" ||
(record.eventName === "MODIFY" &&
StreamService.isProcessModifyEventsEnabled())
);
}).map((record: any) => {
// Convert to JS object
if (
record.eventName === "INSERT" ||
(record.eventName === "MODIFY" &&
StreamService.isProcessModifyEventsEnabled())
) {
if (record.dynamodb && record.dynamodb.NewImage) {
return unmarshall(record.dynamodb.NewImage);
const unmarshalledRecord = unmarshall(
(record as any).dynamodb.NewImage
);
records = StreamService.expandRecords([unmarshalledRecord]);
}
});

return StreamService.expandRecords(records);
} else {
console.log("event name was not of correct type");
}
return records;
}

/**
Expand Down Expand Up @@ -75,22 +76,23 @@ class StreamService {
Object.assign(templateRecord, {});
naathanbrown marked this conversation as resolved.
Show resolved Hide resolved
console.log("before for each");
naathanbrown marked this conversation as resolved.
Show resolved Hide resolved
if (record.testTypes instanceof Array) {
record.testTypes?.forEach((testType: any, i: number, array: any[]) => {
console.log("in for each");
const clonedRecord: any = Object.assign({}, templateRecord); // Create record from template
Object.assign(clonedRecord, { testTypes: testType }); // Assign it the test type
Object.assign(clonedRecord, {
// Assign certificate order number
order: {
current: i + 1,
total: array.length,
},
});

splittedRecords.push(clonedRecord);
record.testTypes?.forEach(
(testType: any, i: number, array: any[]) => {
console.log("in for each");
naathanbrown marked this conversation as resolved.
Show resolved Hide resolved
const clonedRecord: any = Object.assign({}, templateRecord); // Create record from template
Object.assign(clonedRecord, { testTypes: testType }); // Assign it the test type
Object.assign(clonedRecord, {
// Assign certificate order number
order: {
current: i + 1,
total: array.length,
},
});
}

splittedRecords.push(clonedRecord);
}
);
}

console.log("after for each");
return splittedRecords;
Expand Down
37 changes: 26 additions & 11 deletions tests/unit/certGenInit.unitTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@ import {
GetQueueUrlCommand,
ReceiveMessageCommand,
ReceiveMessageCommandOutput,
SQSClient,
SendMessageCommand,
SendMessageCommandOutput,
SQSClient,
} from "@aws-sdk/client-sqs";
import { marshall, unmarshall } from "@aws-sdk/util-dynamodb";
import { DynamoDBRecord } from "aws-lambda";
import { mockClient } from "aws-sdk-client-mock";
import { Injector } from "../../src/models/injector/Injector";
import { SQService } from "../../src/services/SQService";
import { StreamService } from "../../src/services/StreamService";
import { Configuration } from "../../src/utils/Configuration";
import { SQMockClient } from "../models/SQMockClient";
import event from "../resources/stream-event.json";
import { mockClient } from "aws-sdk-client-mock";
import {marshall, unmarshall} from "@aws-sdk/util-dynamodb";

const record = {
testerStaffId: "1",
Expand Down Expand Up @@ -109,7 +110,9 @@ describe("cert-gen-init", () => {
"when fetching test result stream and the eventName is INSERT",
() => {
it("should result in an array of filtered js objects", () => {
processedEvent = StreamService.getTestResultStream(event);
processedEvent = StreamService.getTestResultStream(
event.Records[0] as DynamoDBRecord
);
expect(processedEvent).toEqual(expectedResult);
});
}
Expand All @@ -121,34 +124,46 @@ describe("cert-gen-init", () => {
it("shouldn't result in an array of filtered js objects when PROCESS_MODIFY_EVENTS is false", () => {
process.env.PROCESS_MODIFY_EVENTS = "false";
event.Records[0].eventName = "MODIFY";
processedEvent = StreamService.getTestResultStream(event);
processedEvent = StreamService.getTestResultStream(
event.Records[0] as DynamoDBRecord
);
expect(processedEvent).toHaveLength(0);
});

it("should result in an array of filtered js objects when PROCESS_MODIFY_EVENTS is true", () => {
process.env.PROCESS_MODIFY_EVENTS = "true";
event.Records[0].eventName = "MODIFY";
processedEvent = StreamService.getTestResultStream(event);
processedEvent = StreamService.getTestResultStream(
event.Records[0] as DynamoDBRecord
);
expect(processedEvent).toHaveLength(1);
expect(processedEvent).toEqual(expectedResult);
});

it("should result in an empty array when the test type is an object", () => {
process.env.PROCESS_MODIFY_EVENTS = "true";
const eventWithTestTypeObject = unmarshall({...event}.Records[0].dynamodb.NewImage);
const eventWithTestTypeObject = unmarshall(
{ ...event }.Records[0].dynamodb.NewImage
);
eventWithTestTypeObject.testTypes = {};
event.Records[0].eventName = "MODIFY";
const mainEvent = {...event};
mainEvent.Records[0].dynamodb.NewImage = marshall(eventWithTestTypeObject) as any;
processedEvent = StreamService.getTestResultStream(mainEvent);
const mainEvent = { ...event };
mainEvent.Records[0].dynamodb.NewImage = marshall(
eventWithTestTypeObject
) as any;
processedEvent = StreamService.getTestResultStream(
mainEvent.Records[0] as DynamoDBRecord
);
expect(processedEvent).toEqual([]);
});

it("should throw an error if PROCESS_MODIFY_EVENTS is not true or false", () => {
process.env.PROCESS_MODIFY_EVENTS = "";
event.Records[0].eventName = "MODIFY";
expect(() => {
StreamService.getTestResultStream(event);
StreamService.getTestResultStream(
event.Records[0] as DynamoDBRecord
);
}).toThrowError();
});
}
Expand Down
33 changes: 19 additions & 14 deletions tests/unit/certGenInitFunction.unitTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ describe("certGenInit Function", () => {
const result = await certGenInit(undefined, ctx, () => {
naathanbrown marked this conversation as resolved.
Show resolved Hide resolved
return;
});
expect(result).toBe(undefined);
} catch (e) {
} catch (e: any) {
expect(e.message).toBe("ERROR: event is not defined");
console.log(e);
}
});
Expand All @@ -36,7 +36,7 @@ describe("certGenInit Function", () => {
.mockReturnValue([{ TestRecord: "certGenMessage" }]);

try {
await certGenInit({}, ctx, () => {
await certGenInit({ Records: ["this is an event"] }, ctx, () => {
return;
});
} catch (e) {
Expand Down Expand Up @@ -64,15 +64,16 @@ describe("certGenInit Function", () => {
.fn()
.mockReturnValue([{ test: "thing" }]);

expect.assertions(2);
try {
await certGenInit({}, ctx, () => {
expect.assertions(1);

const returnedInfo = await certGenInit(
{ Records: ["this is an event"] },
ctx,
() => {
return;
});
} catch (e: any) {
expect(e.message).toEqual(myError.message);
expect(e.code).toEqual(myError.code);
}
}
);
expect(returnedInfo.batchItemFailures.length).toBe(1);
});
it("should not throw error if code is InvalidParameterValue", async () => {
StreamService.getTestResultStream = jest.fn().mockReturnValue([{}]);
Expand All @@ -90,9 +91,13 @@ describe("certGenInit Function", () => {

expect.assertions(1);
try {
const result = await certGenInit({}, ctx, () => {
return;
});
const result = await certGenInit(
{ Records: ["this is an event"] },
ctx,
() => {
return;
}
);
expect(result).toBe({});
} catch (e) {
console.log(e);
Expand Down
Loading
Loading