diff --git a/src/auth-service/bin/jobs/test/ut_kafka-consumer.js b/src/auth-service/bin/jobs/test/ut_kafka-consumer.js index 0bdc9c424b..1aeb8ed4ef 100644 --- a/src/auth-service/bin/jobs/test/ut_kafka-consumer.js +++ b/src/auth-service/bin/jobs/test/ut_kafka-consumer.js @@ -11,19 +11,26 @@ const Joi = require("joi"); const { jsonrepair } = require("jsonrepair"); describe("Kafka Consumer", () => { - let connectStub; - let subscribeStub; - let runStub; + let kafkaStub; + let consumerStub; let mailerStub; let loggerErrorStub; let loggerInfoStub; let jsonrepairStub; beforeEach(() => { - // Create stubs for required modules/functions - connectStub = sinon.stub(); - subscribeStub = sinon.stub(); - runStub = sinon.stub(); + // Create stubs for Kafka consumer + consumerStub = { + connect: sinon.stub().resolves(), + subscribe: sinon.stub().resolves(), + run: sinon.stub().resolves(), + }; + + kafkaStub = { + consumer: sinon.stub().returns(consumerStub), + }; + + // Other stubs mailerStub = sinon.stub(mailer, "newMobileAppUser"); loggerErrorStub = sinon.stub(console, "error"); loggerInfoStub = sinon.stub(console, "info"); @@ -34,188 +41,107 @@ describe("Kafka Consumer", () => { sinon.restore(); }); - it("should consume messages from the Kafka broker", async () => { - // Set up the topic-to-operation function mapping - const operationFunction1 = sinon.stub(); - const operationFunction2 = sinon.stub(); - const topicOperations = { - [constants.NEW_MOBILE_APP_USER_TOPIC]: operationFunction1, - topic2: operationFunction2, - }; + it("should properly initialize and subscribe to all topics", async () => { + const expectedTopics = ["ip-address", "deploy-topic", "recall-topic"]; - // Stub the required Kafka functions - connectStub.resolves(); - subscribeStub.resolves(); - runStub.callsFake(async (options) => { - const { eachMessage } = options; - const fakeMessage = { - topic: constants.NEW_MOBILE_APP_USER_TOPIC, - partition: 0, - message: { - value: "testMessageData", - }, - }; - await eachMessage(fakeMessage); - }); + await kafkaConsumer(); - try { - // Call the kafkaConsumer function - await kafkaConsumer(); + // Verify connection and subscription sequence + expect(consumerStub.connect.calledOnce).to.be.true; - // Check if Kafka functions are called with the correct arguments - expect(connectStub.calledOnce).to.be.true; + // Verify subscriptions to all topics + expectedTopics.forEach((topic) => { expect( - subscribeStub.calledOnceWithExactly({ - topic: constants.NEW_MOBILE_APP_USER_TOPIC, + consumerStub.subscribe.calledWith({ + topic, fromBeginning: true, }) ).to.be.true; - expect(runStub.calledOnce).to.be.true; - expect(operationFunction1.calledOnceWithExactly("testMessageData")).to.be - .true; - expect(operationFunction2.notCalled).to.be.true; // topic2 not subscribed, so operationFunction2 should not be called - } catch (error) { - throw error; - } + }); + + // Verify consumer.run called once after all subscriptions + expect(consumerStub.run.calledOnce).to.be.true; + expect(consumerStub.run.calledAfter(consumerStub.subscribe)).to.be.true; }); - it("should handle Kafka errors", async () => { - // Stub the required Kafka functions to throw an error - connectStub.rejects(new Error("Kafka connection error")); + it("should handle messages for each topic correctly", async () => { + // Simulate message processing by triggering the eachMessage callback + consumerStub.run.callsFake(async ({ eachMessage }) => { + await eachMessage({ + topic: "ip-address", + partition: 0, + message: { value: JSON.stringify({ ip: "192.168.1.1" }) }, + }); + }); - try { - // Call the kafkaConsumer function - await kafkaConsumer(); + await kafkaConsumer(); - // Check if the error is caught and logged - expect( - loggerErrorStub.calledOnceWithExactly( - "Error connecting to Kafka: Kafka connection error" - ) - ).to.be.true; - } catch (error) { - throw error; - } + // Verify message processing + expect(consumerStub.run.calledOnce).to.be.true; + // Add specific verification for your message handlers }); - it("should handle topic without operation function", async () => { - // Set up the topic-to-operation function mapping without an operation for topic2 - const operationFunction1 = sinon.stub(); - const topicOperations = { - [constants.NEW_MOBILE_APP_USER_TOPIC]: operationFunction1, - // No operation for topic2 - }; + it("should handle Kafka connection errors", async () => { + const connectionError = new Error("Kafka connection error"); + consumerStub.connect.rejects(connectionError); + + await kafkaConsumer(); - // Stub the required Kafka functions - connectStub.resolves(); - subscribeStub.resolves(); - runStub.callsFake(async (options) => { - const { eachMessage } = options; - const fakeMessage = { - topic: "topic2", // topic2 without a defined operation + expect( + loggerErrorStub.calledWith( + `πŸ“ΆπŸ“Ά Error connecting to Kafka: ${connectionError}` + ) + ).to.be.true; + }); + + it("should handle message processing errors", async () => { + consumerStub.run.callsFake(async ({ eachMessage }) => { + await eachMessage({ + topic: "ip-address", partition: 0, - message: { - value: "testMessageData", - }, - }; - await eachMessage(fakeMessage); + message: { value: "invalid-json" }, + }); }); - try { - // Call the kafkaConsumer function - await kafkaConsumer(); + await kafkaConsumer(); - // Check if the error is caught and logged - expect( - loggerErrorStub.calledOnceWithExactly( - "No operation defined for topic: topic2" - ) - ).to.be.true; - } catch (error) { - throw error; - } + expect(loggerErrorStub.called).to.be.true; }); - it("should handle error in operation function", async () => { - // Set up the topic-to-operation function mapping with an operation function that throws an error - const operationFunction1 = sinon - .stub() - .throws(new Error("Test operation function error")); - const topicOperations = { - [constants.NEW_MOBILE_APP_USER_TOPIC]: operationFunction1, - }; - - // Stub the required Kafka functions - connectStub.resolves(); - subscribeStub.resolves(); - runStub.callsFake(async (options) => { - const { eachMessage } = options; - const fakeMessage = { - topic: constants.NEW_MOBILE_APP_USER_TOPIC, + it("should handle undefined topic operations", async () => { + consumerStub.run.callsFake(async ({ eachMessage }) => { + await eachMessage({ + topic: "unknown-topic", partition: 0, - message: { - value: "testMessageData", - }, - }; - await eachMessage(fakeMessage); + message: { value: "test" }, + }); }); - try { - // Call the kafkaConsumer function - await kafkaConsumer(); + await kafkaConsumer(); - // Check if the error is caught and logged - expect( - loggerErrorStub.calledOnceWithExactly( - "Error processing Kafka message for topic NEW_MOBILE_APP_USER_TOPIC: Error: Test operation function error" - ) - ).to.be.true; - } catch (error) { - throw error; - } + expect( + loggerErrorStub.calledWith( + "πŸ›πŸ› No operation defined for topic: unknown-topic" + ) + ).to.be.true; }); - it("should handle successful operation function", async () => { - // Set up the topic-to-operation function mapping with a successful operation function - const operationFunction1 = sinon.stub(); - const topicOperations = { - [constants.NEW_MOBILE_APP_USER_TOPIC]: operationFunction1, - }; - - // Stub the required Kafka functions - connectStub.resolves(); - subscribeStub.resolves(); - runStub.callsFake(async (options) => { - const { eachMessage } = options; - const fakeMessage = { - topic: constants.NEW_MOBILE_APP_USER_TOPIC, - partition: 0, - message: { - value: "testMessageData", - }, - }; - await eachMessage(fakeMessage); + it("should subscribe to all topics before starting consumer", async () => { + const subscribePromises = []; + consumerStub.subscribe.callsFake(() => { + return new Promise((resolve) => { + subscribePromises.push(resolve); + }); }); - // Stub the mailer function to return a success response - mailerStub.resolves({ success: true }); + const consumerPromise = kafkaConsumer(); - try { - // Call the kafkaConsumer function - await kafkaConsumer(); + // Resolve all subscriptions + subscribePromises.forEach((resolve) => resolve()); - // Check if the operation function is called and the success response is logged - expect(operationFunction1.calledOnceWithExactly("testMessageData")).to.be - .true; - expect( - loggerInfoStub.calledOnceWithExactly( - 'KAFKA: successfully received the new Mobile App User --- {"success":true}' - ) - ).to.be.true; - } catch (error) { - throw error; - } - }); + await consumerPromise; - // Add more test cases as needed + // Verify that run was called after all subscriptions were complete + expect(consumerStub.run.calledAfter(consumerStub.subscribe)).to.be.true; + }); });