diff --git a/utilities/project-factory/src/server/api/campaignApis.ts b/utilities/project-factory/src/server/api/campaignApis.ts index 648498225f1..35311aa1254 100644 --- a/utilities/project-factory/src/server/api/campaignApis.ts +++ b/utilities/project-factory/src/server/api/campaignApis.ts @@ -8,7 +8,7 @@ import { immediateValidationForTargetSheet, validateSheetData, validateTargetShe import { callMdmsTypeSchema, getCampaignNumber } from "./genericApis"; import { boundaryBulkUpload, convertToTypeData, generateHierarchy, generateProcessedFileAndPersist, getBoundaryOnWhichWeSplit, getLocalizedName, reorderBoundariesOfDataAndValidate, checkIfSourceIsMicroplan } from "../utils/campaignUtils"; const _ = require('lodash'); -import { produceModifiedMessages } from "../kafka/Listener"; +import { produceModifiedMessages } from "../kafka/Producer"; import { createDataService } from "../service/dataManageService"; import { searchProjectTypeCampaignService } from "../service/campaignManageService"; import { getExcelWorkbookFromFileURL } from "../utils/excelUtils"; diff --git a/utilities/project-factory/src/server/kafka/Listener.ts b/utilities/project-factory/src/server/kafka/Listener.ts index 12a16269288..128f99c08cb 100644 --- a/utilities/project-factory/src/server/kafka/Listener.ts +++ b/utilities/project-factory/src/server/kafka/Listener.ts @@ -1,9 +1,8 @@ import { ConsumerGroup, ConsumerGroupOptions, Message } from 'kafka-node'; import config from '../config'; import { getFormattedStringForDebug, logger } from '../utils/logger'; -import { shutdownGracefully, throwError } from '../utils/genericUtils'; +import { shutdownGracefully } from '../utils/genericUtils'; import { handleCampaignMapping } from '../utils/campaignMappingUtils'; -import { producer } from './Producer'; // Kafka Configuration const kafkaConfig: ConsumerGroupOptions = { @@ -54,41 +53,3 @@ export function listener() { logger.error(`Offset out of range error: ${err}`); }); } - - -/** - * Produces modified messages to a specified Kafka topic. - * @param modifiedMessages An array of modified messages to be produced. - * @param topic The Kafka topic to which the messages will be produced. - * @returns A promise that resolves when the messages are successfully produced. - */ -async function produceModifiedMessages(modifiedMessages: any[], topic: any) { - try { - logger.info(`KAFKA :: PRODUCER :: a message sent to topic ${topic}`); - logger.debug(`KAFKA :: PRODUCER :: message ${getFormattedStringForDebug(modifiedMessages)}`); - const payloads = [ - { - topic: topic, - messages: JSON.stringify(modifiedMessages), // Convert modified messages to JSON string - }, - ]; - - // Send payloads to the Kafka producer - producer.send(payloads, (err: any) => { - if (err) { - console.error(err); - console.log('Error coming for message : ', modifiedMessages); - logger.info('KAFKA :: PRODUCER :: Some Error Occurred '); - logger.error(`KAFKA :: PRODUCER :: Error : ${JSON.stringify(err)}`); - } else { - logger.info('KAFKA :: PRODUCER :: message sent successfully '); - } - }); - } catch (error) { - console.error(error); - logger.error(`KAFKA :: PRODUCER :: Exception caught: ${JSON.stringify(error)}`); - throwError("COMMON", 400, "KAKFA_ERROR", "Some error occured in kafka"); // Re-throw the error after logging it - } -} - -export { produceModifiedMessages } // Export the produceModifiedMessages function for external use diff --git a/utilities/project-factory/src/server/kafka/Producer.ts b/utilities/project-factory/src/server/kafka/Producer.ts index 541b4dd45c7..8163e11b0cd 100644 --- a/utilities/project-factory/src/server/kafka/Producer.ts +++ b/utilities/project-factory/src/server/kafka/Producer.ts @@ -1,16 +1,37 @@ -import config from '../config'; // Importing configuration settings -import { Producer, KafkaClient } from 'kafka-node'; // Importing Producer and KafkaClient from 'kafka-node' library +import { Producer, KafkaClient } from 'kafka-node'; import { logger } from "../utils/logger"; -import { shutdownGracefully } from '../utils/genericUtils'; +import { shutdownGracefully, throwError } from '../utils/genericUtils'; +import config from '../config'; + +// Global producer instance +let producer: Producer; -// Creating a new Kafka client instance using the configured Kafka broker host const kafkaClient = new KafkaClient({ - kafkaHost: config?.host?.KAFKA_BROKER_HOST, // Configuring Kafka broker host - connectRetryOptions: { retries: 1 }, // Configuring connection retry options + kafkaHost: config?.host?.KAFKA_BROKER_HOST, + connectRetryOptions: { retries: 1 }, +}); + +// Event listener for 'error' event, indicating that the client encountered an error +kafkaClient.on('error', (err: any) => { + logger.error('Kafka client is in error state'); // Log message indicating client is in error state + console.error(err.stack || err); // Log the error stack or message + shutdownGracefully(); }); -// Creating a new Kafka producer instance using the Kafka client -const producer = new Producer(kafkaClient, { partitionerType: 2 }); // Using partitioner type 2 +const createProducer = () => { + producer = new Producer(kafkaClient, { partitionerType: 2 }); + + producer.on('ready', () => { + logger.info('Producer is ready'); + checkBrokerAvailability(); + }); + + producer.on('error', (err: any) => { + logger.error('Producer is in error state'); + console.error(err); + shutdownGracefully(); + }); +}; // Function to check broker availability by listing all brokers const checkBrokerAvailability = () => { @@ -33,30 +54,57 @@ const checkBrokerAvailability = () => { }); }; -// Event listener for 'ready' event, indicating that the client is ready to check broker availability -kafkaClient.on('ready', () => { - logger.info('Kafka client is ready'); // Log message indicating client is ready - checkBrokerAvailability(); // Check broker availability -}); -// Event listener for 'ready' event, indicating that the producer is ready to send messages -producer.on('ready', () => { - logger.info('Producer is ready'); // Log message indicating producer is ready - checkBrokerAvailability(); // Check broker availability -}); +createProducer(); -// Event listener for 'error' event, indicating that the client encountered an error -kafkaClient.on('error', (err: any) => { - logger.error('Kafka client is in error state'); // Log message indicating client is in error state - console.error(err.stack || err); // Log the error stack or message - shutdownGracefully(); -}); +const sendWithRetries = (payloads: any[], retries = 3, shutdown: boolean = false): Promise => { + return new Promise((resolve, reject) => { + producer.send(payloads, async (err: any) => { + if (err) { + logger.error('Error sending message:', err); + if (retries > 0) { + logger.info(`Retrying to send message. Retries left: ${retries}`); + await new Promise(resolve => setTimeout(resolve, 2000)); // wait before retrying + resolve(sendWithRetries(payloads, retries - 1)); + } else { + // Attempt to reconnect and retry + logger.error('Failed to send message after retries. Reconnecting producer...'); + if (shutdown) { + shutdownGracefully(); + } + else { + producer.close(() => { + createProducer(); // Recreate the producer + setTimeout(() => { + sendWithRetries(payloads, 1, true).catch(reject); + }, 2000); // wait before retrying after reconnect + }); + } + } + } else { + logger.info('Message sent successfully'); + resolve(); + } + }); + }); +}; -// Event listener for 'error' event, indicating that the producer encountered an error -producer.on('error', (err: any) => { - logger.error('Producer is in error state'); // Log message indicating producer is in error state - console.error(err); // Log the error stack or message - shutdownGracefully(); -}); +async function produceModifiedMessages(modifiedMessages: any[], topic: any) { + try { + logger.info(`KAFKA :: PRODUCER :: A message sent to topic ${topic}`); + logger.debug(`KAFKA :: PRODUCER :: Message ${JSON.stringify(modifiedMessages)}`); + const payloads = [ + { + topic: topic, + messages: JSON.stringify(modifiedMessages), + }, + ]; + + await sendWithRetries(payloads, 3); + } catch (error) { + logger.error(`KAFKA :: PRODUCER :: Exception caught: ${JSON.stringify(error)}`); + throwError("COMMON", 400, "KAFKA_ERROR", "Some error occurred in Kafka"); // Re-throw the error after logging it + } +} -export { producer }; // Exporting the producer instance for external use +export { produceModifiedMessages }; diff --git a/utilities/project-factory/src/server/utils/campaignMappingUtils.ts b/utilities/project-factory/src/server/utils/campaignMappingUtils.ts index 36a5584291a..036b22e17eb 100644 --- a/utilities/project-factory/src/server/utils/campaignMappingUtils.ts +++ b/utilities/project-factory/src/server/utils/campaignMappingUtils.ts @@ -3,7 +3,7 @@ import config from "../config"; import { getDataFromSheet, throwError } from "./genericUtils"; import { getFormattedStringForDebug, logger } from "./logger"; import { defaultheader, httpRequest } from "./request"; -import { produceModifiedMessages } from "../kafka/Listener"; +import { produceModifiedMessages } from "../kafka/Producer"; import { enrichAndPersistCampaignWithError, getLocalizedName } from "./campaignUtils"; import { campaignStatuses, resourceDataStatuses } from "../config/constants"; import { createCampaignService } from "../service/campaignManageService"; diff --git a/utilities/project-factory/src/server/utils/campaignUtils.ts b/utilities/project-factory/src/server/utils/campaignUtils.ts index 7c81f7fc501..424a96f80e6 100644 --- a/utilities/project-factory/src/server/utils/campaignUtils.ts +++ b/utilities/project-factory/src/server/utils/campaignUtils.ts @@ -2,7 +2,7 @@ import { defaultheader, httpRequest } from "./request"; import config from "../config/index"; import { v4 as uuidv4 } from 'uuid'; -import { produceModifiedMessages } from '../kafka/Listener' +import { produceModifiedMessages } from "../kafka/Producer"; import { confirmProjectParentCreation, createProjectCampaignResourcData, getCampaignSearchResponse, getHierarchy, handleResouceDetailsError, projectCreate } from "../api/campaignApis"; import { getCampaignNumber, createAndUploadFile, getSheetData, createExcelSheet, getAutoGeneratedBoundaryCodesHandler, createBoundaryEntities, createBoundaryRelationship, getMDMSV1Data, getTargetSheetDataAfterCode, callMdmsTypeSchema, getSheetDataFromWorksheet } from "../api/genericApis"; import { getFormattedStringForDebug, logger } from "./logger"; diff --git a/utilities/project-factory/src/server/utils/genericUtils.ts b/utilities/project-factory/src/server/utils/genericUtils.ts index 390adb4fe83..704aa4ea390 100644 --- a/utilities/project-factory/src/server/utils/genericUtils.ts +++ b/utilities/project-factory/src/server/utils/genericUtils.ts @@ -2,7 +2,7 @@ import { NextFunction, Request, Response } from "express"; import { httpRequest, defaultheader } from "./request"; import config, { getErrorCodes } from "../config/index"; import { v4 as uuidv4 } from 'uuid'; -import { produceModifiedMessages } from "../kafka/Listener"; +import { produceModifiedMessages } from "../kafka/Producer"; import { generateHierarchyList, getAllFacilities, getCampaignSearchResponse, getHierarchy } from "../api/campaignApis"; import { getBoundarySheetData, getSheetData, createAndUploadFile, createExcelSheet, getTargetSheetData, callMdmsData, callMdmsTypeSchema } from "../api/genericApis"; import { logger } from "./logger"; diff --git a/utilities/project-factory/src/server/utils/processTrackUtils.ts b/utilities/project-factory/src/server/utils/processTrackUtils.ts index 7cf57958b4d..6d80346825f 100644 --- a/utilities/project-factory/src/server/utils/processTrackUtils.ts +++ b/utilities/project-factory/src/server/utils/processTrackUtils.ts @@ -1,5 +1,5 @@ import config from './../config'; -import { produceModifiedMessages } from '../kafka/Listener'; +import { produceModifiedMessages } from "../kafka/Producer";; import { v4 as uuidv4 } from 'uuid'; import { executeQuery } from './db'; import { processTrackForUi, processTrackStatuses, processTrackTypes } from '../config/constants';