Skip to content

Commit

Permalink
Merge pull request #3834 from airqo-platform/hf-ut-kafka
Browse files Browse the repository at this point in the history
just updating the unit testing file for Kafka Consumer
  • Loading branch information
Baalmart authored Nov 11, 2024
2 parents 4587bef + 769c27f commit 96bf472
Showing 1 changed file with 86 additions and 160 deletions.
246 changes: 86 additions & 160 deletions src/auth-service/bin/jobs/test/ut_kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,26 @@ const Joi = require("joi");
const { jsonrepair } = require("jsonrepair");

describe("Kafka Consumer", () => {
let connectStub;
let subscribeStub;
let runStub;
let kafkaStub;
let consumerStub;
let mailerStub;
let loggerErrorStub;
let loggerInfoStub;
let jsonrepairStub;

beforeEach(() => {
// Create stubs for required modules/functions
connectStub = sinon.stub();
subscribeStub = sinon.stub();
runStub = sinon.stub();
// Create stubs for Kafka consumer
consumerStub = {
connect: sinon.stub().resolves(),
subscribe: sinon.stub().resolves(),
run: sinon.stub().resolves(),
};

kafkaStub = {
consumer: sinon.stub().returns(consumerStub),
};

// Other stubs
mailerStub = sinon.stub(mailer, "newMobileAppUser");
loggerErrorStub = sinon.stub(console, "error");
loggerInfoStub = sinon.stub(console, "info");
Expand All @@ -34,188 +41,107 @@ describe("Kafka Consumer", () => {
sinon.restore();
});

it("should consume messages from the Kafka broker", async () => {
// Set up the topic-to-operation function mapping
const operationFunction1 = sinon.stub();
const operationFunction2 = sinon.stub();
const topicOperations = {
[constants.NEW_MOBILE_APP_USER_TOPIC]: operationFunction1,
topic2: operationFunction2,
};
it("should properly initialize and subscribe to all topics", async () => {
const expectedTopics = ["ip-address", "deploy-topic", "recall-topic"];

// Stub the required Kafka functions
connectStub.resolves();
subscribeStub.resolves();
runStub.callsFake(async (options) => {
const { eachMessage } = options;
const fakeMessage = {
topic: constants.NEW_MOBILE_APP_USER_TOPIC,
partition: 0,
message: {
value: "testMessageData",
},
};
await eachMessage(fakeMessage);
});
await kafkaConsumer();

try {
// Call the kafkaConsumer function
await kafkaConsumer();
// Verify connection and subscription sequence
expect(consumerStub.connect.calledOnce).to.be.true;

// Check if Kafka functions are called with the correct arguments
expect(connectStub.calledOnce).to.be.true;
// Verify subscriptions to all topics
expectedTopics.forEach((topic) => {
expect(
subscribeStub.calledOnceWithExactly({
topic: constants.NEW_MOBILE_APP_USER_TOPIC,
consumerStub.subscribe.calledWith({
topic,
fromBeginning: true,
})
).to.be.true;
expect(runStub.calledOnce).to.be.true;
expect(operationFunction1.calledOnceWithExactly("testMessageData")).to.be
.true;
expect(operationFunction2.notCalled).to.be.true; // topic2 not subscribed, so operationFunction2 should not be called
} catch (error) {
throw error;
}
});

// Verify consumer.run called once after all subscriptions
expect(consumerStub.run.calledOnce).to.be.true;
expect(consumerStub.run.calledAfter(consumerStub.subscribe)).to.be.true;
});

it("should handle Kafka errors", async () => {
// Stub the required Kafka functions to throw an error
connectStub.rejects(new Error("Kafka connection error"));
it("should handle messages for each topic correctly", async () => {
// Simulate message processing by triggering the eachMessage callback
consumerStub.run.callsFake(async ({ eachMessage }) => {
await eachMessage({
topic: "ip-address",
partition: 0,
message: { value: JSON.stringify({ ip: "192.168.1.1" }) },
});
});

try {
// Call the kafkaConsumer function
await kafkaConsumer();
await kafkaConsumer();

// Check if the error is caught and logged
expect(
loggerErrorStub.calledOnceWithExactly(
"Error connecting to Kafka: Kafka connection error"
)
).to.be.true;
} catch (error) {
throw error;
}
// Verify message processing
expect(consumerStub.run.calledOnce).to.be.true;
// Add specific verification for your message handlers
});

it("should handle topic without operation function", async () => {
// Set up the topic-to-operation function mapping without an operation for topic2
const operationFunction1 = sinon.stub();
const topicOperations = {
[constants.NEW_MOBILE_APP_USER_TOPIC]: operationFunction1,
// No operation for topic2
};
it("should handle Kafka connection errors", async () => {
const connectionError = new Error("Kafka connection error");
consumerStub.connect.rejects(connectionError);

await kafkaConsumer();

// Stub the required Kafka functions
connectStub.resolves();
subscribeStub.resolves();
runStub.callsFake(async (options) => {
const { eachMessage } = options;
const fakeMessage = {
topic: "topic2", // topic2 without a defined operation
expect(
loggerErrorStub.calledWith(
`📶📶 Error connecting to Kafka: ${connectionError}`
)
).to.be.true;
});

it("should handle message processing errors", async () => {
consumerStub.run.callsFake(async ({ eachMessage }) => {
await eachMessage({
topic: "ip-address",
partition: 0,
message: {
value: "testMessageData",
},
};
await eachMessage(fakeMessage);
message: { value: "invalid-json" },
});
});

try {
// Call the kafkaConsumer function
await kafkaConsumer();
await kafkaConsumer();

// Check if the error is caught and logged
expect(
loggerErrorStub.calledOnceWithExactly(
"No operation defined for topic: topic2"
)
).to.be.true;
} catch (error) {
throw error;
}
expect(loggerErrorStub.called).to.be.true;
});

it("should handle error in operation function", async () => {
// Set up the topic-to-operation function mapping with an operation function that throws an error
const operationFunction1 = sinon
.stub()
.throws(new Error("Test operation function error"));
const topicOperations = {
[constants.NEW_MOBILE_APP_USER_TOPIC]: operationFunction1,
};

// Stub the required Kafka functions
connectStub.resolves();
subscribeStub.resolves();
runStub.callsFake(async (options) => {
const { eachMessage } = options;
const fakeMessage = {
topic: constants.NEW_MOBILE_APP_USER_TOPIC,
it("should handle undefined topic operations", async () => {
consumerStub.run.callsFake(async ({ eachMessage }) => {
await eachMessage({
topic: "unknown-topic",
partition: 0,
message: {
value: "testMessageData",
},
};
await eachMessage(fakeMessage);
message: { value: "test" },
});
});

try {
// Call the kafkaConsumer function
await kafkaConsumer();
await kafkaConsumer();

// Check if the error is caught and logged
expect(
loggerErrorStub.calledOnceWithExactly(
"Error processing Kafka message for topic NEW_MOBILE_APP_USER_TOPIC: Error: Test operation function error"
)
).to.be.true;
} catch (error) {
throw error;
}
expect(
loggerErrorStub.calledWith(
"🐛🐛 No operation defined for topic: unknown-topic"
)
).to.be.true;
});

it("should handle successful operation function", async () => {
// Set up the topic-to-operation function mapping with a successful operation function
const operationFunction1 = sinon.stub();
const topicOperations = {
[constants.NEW_MOBILE_APP_USER_TOPIC]: operationFunction1,
};

// Stub the required Kafka functions
connectStub.resolves();
subscribeStub.resolves();
runStub.callsFake(async (options) => {
const { eachMessage } = options;
const fakeMessage = {
topic: constants.NEW_MOBILE_APP_USER_TOPIC,
partition: 0,
message: {
value: "testMessageData",
},
};
await eachMessage(fakeMessage);
it("should subscribe to all topics before starting consumer", async () => {
const subscribePromises = [];
consumerStub.subscribe.callsFake(() => {
return new Promise((resolve) => {
subscribePromises.push(resolve);
});
});

// Stub the mailer function to return a success response
mailerStub.resolves({ success: true });
const consumerPromise = kafkaConsumer();

try {
// Call the kafkaConsumer function
await kafkaConsumer();
// Resolve all subscriptions
subscribePromises.forEach((resolve) => resolve());

// Check if the operation function is called and the success response is logged
expect(operationFunction1.calledOnceWithExactly("testMessageData")).to.be
.true;
expect(
loggerInfoStub.calledOnceWithExactly(
'KAFKA: successfully received the new Mobile App User --- {"success":true}'
)
).to.be.true;
} catch (error) {
throw error;
}
});
await consumerPromise;

// Add more test cases as needed
// Verify that run was called after all subscriptions were complete
expect(consumerStub.run.calledAfter(consumerStub.subscribe)).to.be.true;
});
});

0 comments on commit 96bf472

Please sign in to comment.