Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ashish egov patch 2 #1178

Merged
merged 15 commits into from
Jul 29, 2024
Merged
32 changes: 19 additions & 13 deletions utilities/project-factory/src/server/kafka/Producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,39 @@ const kafkaClient = new KafkaClient({
// Creating a new Kafka producer instance using the Kafka client
const producer = new Producer(kafkaClient, { partitionerType: 2 }); // Using partitioner type 2

// Function to send a test message to check broker availability
// Function to check broker availability
const checkBrokerAvailability = () => {
const payloads = [
{
topic: config.kafka.KAFKA_TEST_TOPIC,
messages: JSON.stringify({ message: 'Test message to check broker availability' }),
},
];

producer.send(payloads, (err, data) => {
kafkaClient.loadMetadataForTopics([], (err, data) => {
if (err) {
if (err.message && err.message.toLowerCase().includes('broker not available')) {
logger.error('Broker not available. Shutting down the service.');
shutdownGracefully();
} else {
logger.error('Error sending test message:', err);
logger.error('Error checking broker availability:', err);
}
} else {
logger.info('Test message sent successfully:', data);
logger.info('Broker is available:', data);
}
});
};
ashish-egov marked this conversation as resolved.
Show resolved Hide resolved

// Event listener for 'ready' event, indicating that the client is ready to check broker availability
kafkaClient.on('ready', () => {
logger.info('Kafka client is ready'); // Log message indicating client is ready
checkBrokerAvailability(); // Check broker availability
});

// Event listener for 'ready' event, indicating that the producer is ready to send messages
producer.on('ready', () => {
logger.info('Producer is ready'); // Log message indicating producer is ready
checkBrokerAvailability(); // Check broker availability by sending a test message
checkBrokerAvailability();
});

// Event listener for 'error' event, indicating that the client encountered an error
kafkaClient.on('error', (err) => {
logger.error('Kafka client is in error state'); // Log message indicating client is in error state
console.error(err.stack || err); // Log the error stack or message
shutdownGracefully();
});

// Event listener for 'error' event, indicating that the producer encountered an error
Expand All @@ -48,4 +54,4 @@ producer.on('error', (err) => {
shutdownGracefully();
});

export { producer }; // Exporting the producer instance for external use
export { producer }; // Exporting the producer instance for external use
Loading