Skip to content

Commit

Permalink
Kafka restart (#1217)
Browse files Browse the repository at this point in the history
* Update Listener.ts

* added new branch

* Update Listener.ts

* fixed mapping kafka error

* mapping kafka fixed

* fix kafka

* fix kafka

* Removing foreign key constraint

* Producer update

* Revert "Ashish egov patch 2 (#1178)"

This reverts commit e86a4dc.

* Update Producer.ts

* Update Producer.ts

* Feat : updated producemodified message

* Feat : removed waiting

* adding constraint

* Update V20240731162600__add_uniqiue_constraint_process_track.sql

* Update constants.ts

* Feat : improved kafka

* Update Producer.ts

* Update Producer.ts

* Update publishProjectFactory.yml
  • Loading branch information
ashish-egov authored Aug 2, 2024
1 parent 7d9e007 commit 407ab85
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions utilities/project-factory/src/server/kafka/Producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@ import { logger } from "../utils/logger";
import { shutdownGracefully, throwError } from '../utils/genericUtils';
import config from '../config';

// Global producer instance
let kafkaClient: KafkaClient;
let producer: Producer;

const kafkaClient = new KafkaClient({
kafkaHost: config?.host?.KAFKA_BROKER_HOST,
connectRetryOptions: { retries: 1 },
});
const createKafkaClientAndProducer = () => {
kafkaClient = new KafkaClient({
kafkaHost: config?.host?.KAFKA_BROKER_HOST,
connectRetryOptions: { retries: 1 },
});

// Event listener for 'error' event, indicating that the client encountered an error
kafkaClient.on('error', (err: any) => {
logger.error('Kafka client is in error state'); // Log message indicating client is in error state
console.error(err.stack || err); // Log the error stack or message
shutdownGracefully();
});
// Event listener for 'error' event, indicating that the client encountered an error
kafkaClient.on('error', (err: any) => {
logger.error('Kafka client is in error state'); // Log message indicating client is in error state
console.error(err.stack || err); // Log the error stack or message
shutdownGracefully();
});

const createProducer = () => {
producer = new Producer(kafkaClient, { partitionerType: 2 });

producer.on('ready', () => {
Expand Down Expand Up @@ -55,17 +55,18 @@ const checkBrokerAvailability = () => {
};


createProducer();
createKafkaClientAndProducer();

const sendWithRetries = (payloads: any[], retries = 3, shutdown: boolean = false): Promise<void> => {
return new Promise((resolve, reject) => {
producer.send(payloads, async (err: any) => {
if (err) {
logger.error('Error sending message:', err);
logger.debug(`Was trying to send: ${JSON.stringify(payloads)}`);
if (retries > 0) {
logger.info(`Retrying to send message. Retries left: ${retries}`);
await new Promise(resolve => setTimeout(resolve, 2000)); // wait before retrying
resolve(sendWithRetries(payloads, retries - 1));
resolve(sendWithRetries(payloads, retries - 1, shutdown));
} else {
// Attempt to reconnect and retry
logger.error('Failed to send message after retries. Reconnecting producer...');
Expand All @@ -74,7 +75,7 @@ const sendWithRetries = (payloads: any[], retries = 3, shutdown: boolean = false
}
else {
producer.close(() => {
createProducer(); // Recreate the producer
createKafkaClientAndProducer();
setTimeout(() => {
sendWithRetries(payloads, 1, true).catch(reject);
}, 2000); // wait before retrying after reconnect
Expand Down

0 comments on commit 407ab85

Please sign in to comment.