Skip to content

Commit

Permalink
Kafka fix (#1212)
Browse files Browse the repository at this point in the history
* Update Listener.ts

* added new branch

* Update Listener.ts

* fixed mapping kafka error

* mapping kafka fixed

* fix kafka

* fix kafka

* Removing foreign key constraint

* Producer update

* Revert "Ashish egov patch 2 (#1178)"

This reverts commit e86a4dc.

* Update Producer.ts

* Update Producer.ts

* Feat : updated producemodified message

* Feat : removed waiting

* adding constraint

* Update V20240731162600__add_uniqiue_constraint_process_track.sql

* Update constants.ts

* Feat : improved kafka

* Fix kafka restart issue
  • Loading branch information
ashish-egov authored Aug 2, 2024
1 parent 84cda56 commit ae1e89e
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 76 deletions.
2 changes: 1 addition & 1 deletion utilities/project-factory/src/server/api/campaignApis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { immediateValidationForTargetSheet, validateSheetData, validateTargetShe
import { callMdmsTypeSchema, getCampaignNumber } from "./genericApis";
import { boundaryBulkUpload, convertToTypeData, generateHierarchy, generateProcessedFileAndPersist, getBoundaryOnWhichWeSplit, getLocalizedName, reorderBoundariesOfDataAndValidate, checkIfSourceIsMicroplan } from "../utils/campaignUtils";
const _ = require('lodash');
import { produceModifiedMessages } from "../kafka/Listener";
import { produceModifiedMessages } from "../kafka/Producer";
import { createDataService } from "../service/dataManageService";
import { searchProjectTypeCampaignService } from "../service/campaignManageService";
import { getExcelWorkbookFromFileURL } from "../utils/excelUtils";
Expand Down
41 changes: 1 addition & 40 deletions utilities/project-factory/src/server/kafka/Listener.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { ConsumerGroup, ConsumerGroupOptions, Message } from 'kafka-node';
import config from '../config';
import { getFormattedStringForDebug, logger } from '../utils/logger';
import { shutdownGracefully, throwError } from '../utils/genericUtils';
import { shutdownGracefully } from '../utils/genericUtils';
import { handleCampaignMapping } from '../utils/campaignMappingUtils';
import { producer } from './Producer';

// Kafka Configuration
const kafkaConfig: ConsumerGroupOptions = {
Expand Down Expand Up @@ -54,41 +53,3 @@ export function listener() {
logger.error(`Offset out of range error: ${err}`);
});
}


/**
* Produces modified messages to a specified Kafka topic.
* @param modifiedMessages An array of modified messages to be produced.
* @param topic The Kafka topic to which the messages will be produced.
* @returns A promise that resolves when the messages are successfully produced.
*/
async function produceModifiedMessages(modifiedMessages: any[], topic: any) {
try {
logger.info(`KAFKA :: PRODUCER :: a message sent to topic ${topic}`);
logger.debug(`KAFKA :: PRODUCER :: message ${getFormattedStringForDebug(modifiedMessages)}`);
const payloads = [
{
topic: topic,
messages: JSON.stringify(modifiedMessages), // Convert modified messages to JSON string
},
];

// Send payloads to the Kafka producer
producer.send(payloads, (err: any) => {
if (err) {
console.error(err);
console.log('Error coming for message : ', modifiedMessages);
logger.info('KAFKA :: PRODUCER :: Some Error Occurred ');
logger.error(`KAFKA :: PRODUCER :: Error : ${JSON.stringify(err)}`);
} else {
logger.info('KAFKA :: PRODUCER :: message sent successfully ');
}
});
} catch (error) {
console.error(error);
logger.error(`KAFKA :: PRODUCER :: Exception caught: ${JSON.stringify(error)}`);
throwError("COMMON", 400, "KAKFA_ERROR", "Some error occured in kafka"); // Re-throw the error after logging it
}
}

export { produceModifiedMessages } // Export the produceModifiedMessages function for external use
110 changes: 79 additions & 31 deletions utilities/project-factory/src/server/kafka/Producer.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,37 @@
import config from '../config'; // Importing configuration settings
import { Producer, KafkaClient } from 'kafka-node'; // Importing Producer and KafkaClient from 'kafka-node' library
import { Producer, KafkaClient } from 'kafka-node';
import { logger } from "../utils/logger";
import { shutdownGracefully } from '../utils/genericUtils';
import { shutdownGracefully, throwError } from '../utils/genericUtils';
import config from '../config';

// Global producer instance
let producer: Producer;

// Creating a new Kafka client instance using the configured Kafka broker host
const kafkaClient = new KafkaClient({
kafkaHost: config?.host?.KAFKA_BROKER_HOST, // Configuring Kafka broker host
connectRetryOptions: { retries: 1 }, // Configuring connection retry options
kafkaHost: config?.host?.KAFKA_BROKER_HOST,
connectRetryOptions: { retries: 1 },
});

// Event listener for 'error' event, indicating that the client encountered an error
kafkaClient.on('error', (err: any) => {
logger.error('Kafka client is in error state'); // Log message indicating client is in error state
console.error(err.stack || err); // Log the error stack or message
shutdownGracefully();
});

// Creating a new Kafka producer instance using the Kafka client
const producer = new Producer(kafkaClient, { partitionerType: 2 }); // Using partitioner type 2
const createProducer = () => {
producer = new Producer(kafkaClient, { partitionerType: 2 });

producer.on('ready', () => {
logger.info('Producer is ready');
checkBrokerAvailability();
});

producer.on('error', (err: any) => {
logger.error('Producer is in error state');
console.error(err);
shutdownGracefully();
});
};

// Function to check broker availability by listing all brokers
const checkBrokerAvailability = () => {
Expand All @@ -33,30 +54,57 @@ const checkBrokerAvailability = () => {
});
};

// Event listener for 'ready' event, indicating that the client is ready to check broker availability
kafkaClient.on('ready', () => {
logger.info('Kafka client is ready'); // Log message indicating client is ready
checkBrokerAvailability(); // Check broker availability
});

// Event listener for 'ready' event, indicating that the producer is ready to send messages
producer.on('ready', () => {
logger.info('Producer is ready'); // Log message indicating producer is ready
checkBrokerAvailability(); // Check broker availability
});
createProducer();

// Event listener for 'error' event, indicating that the client encountered an error
kafkaClient.on('error', (err: any) => {
logger.error('Kafka client is in error state'); // Log message indicating client is in error state
console.error(err.stack || err); // Log the error stack or message
shutdownGracefully();
});
const sendWithRetries = (payloads: any[], retries = 3, shutdown: boolean = false): Promise<void> => {
return new Promise((resolve, reject) => {
producer.send(payloads, async (err: any) => {
if (err) {
logger.error('Error sending message:', err);
if (retries > 0) {
logger.info(`Retrying to send message. Retries left: ${retries}`);
await new Promise(resolve => setTimeout(resolve, 2000)); // wait before retrying
resolve(sendWithRetries(payloads, retries - 1));
} else {
// Attempt to reconnect and retry
logger.error('Failed to send message after retries. Reconnecting producer...');
if (shutdown) {
shutdownGracefully();
}
else {
producer.close(() => {
createProducer(); // Recreate the producer
setTimeout(() => {
sendWithRetries(payloads, 1, true).catch(reject);
}, 2000); // wait before retrying after reconnect
});
}
}
} else {
logger.info('Message sent successfully');
resolve();
}
});
});
};

// Event listener for 'error' event, indicating that the producer encountered an error
producer.on('error', (err: any) => {
logger.error('Producer is in error state'); // Log message indicating producer is in error state
console.error(err); // Log the error stack or message
shutdownGracefully();
});
async function produceModifiedMessages(modifiedMessages: any[], topic: any) {
try {
logger.info(`KAFKA :: PRODUCER :: A message sent to topic ${topic}`);
logger.debug(`KAFKA :: PRODUCER :: Message ${JSON.stringify(modifiedMessages)}`);
const payloads = [
{
topic: topic,
messages: JSON.stringify(modifiedMessages),
},
];

await sendWithRetries(payloads, 3);
} catch (error) {
logger.error(`KAFKA :: PRODUCER :: Exception caught: ${JSON.stringify(error)}`);
throwError("COMMON", 400, "KAFKA_ERROR", "Some error occurred in Kafka"); // Re-throw the error after logging it
}
}

export { producer }; // Exporting the producer instance for external use
export { produceModifiedMessages };
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import config from "../config";
import { getDataFromSheet, throwError } from "./genericUtils";
import { getFormattedStringForDebug, logger } from "./logger";
import { defaultheader, httpRequest } from "./request";
import { produceModifiedMessages } from "../kafka/Listener";
import { produceModifiedMessages } from "../kafka/Producer";
import { enrichAndPersistCampaignWithError, getLocalizedName } from "./campaignUtils";
import { campaignStatuses, resourceDataStatuses } from "../config/constants";
import { createCampaignService } from "../service/campaignManageService";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import { defaultheader, httpRequest } from "./request";
import config from "../config/index";
import { v4 as uuidv4 } from 'uuid';
import { produceModifiedMessages } from '../kafka/Listener'
import { produceModifiedMessages } from "../kafka/Producer";
import { confirmProjectParentCreation, createProjectCampaignResourcData, getCampaignSearchResponse, getHierarchy, handleResouceDetailsError, projectCreate } from "../api/campaignApis";
import { getCampaignNumber, createAndUploadFile, getSheetData, createExcelSheet, getAutoGeneratedBoundaryCodesHandler, createBoundaryEntities, createBoundaryRelationship, getMDMSV1Data, getTargetSheetDataAfterCode, callMdmsTypeSchema, getSheetDataFromWorksheet } from "../api/genericApis";
import { getFormattedStringForDebug, logger } from "./logger";
Expand Down
2 changes: 1 addition & 1 deletion utilities/project-factory/src/server/utils/genericUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { NextFunction, Request, Response } from "express";
import { httpRequest, defaultheader } from "./request";
import config, { getErrorCodes } from "../config/index";
import { v4 as uuidv4 } from 'uuid';
import { produceModifiedMessages } from "../kafka/Listener";
import { produceModifiedMessages } from "../kafka/Producer";
import { generateHierarchyList, getAllFacilities, getCampaignSearchResponse, getHierarchy } from "../api/campaignApis";
import { getBoundarySheetData, getSheetData, createAndUploadFile, createExcelSheet, getTargetSheetData, callMdmsData, callMdmsTypeSchema } from "../api/genericApis";
import { logger } from "./logger";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import config from './../config';
import { produceModifiedMessages } from '../kafka/Listener';
import { produceModifiedMessages } from "../kafka/Producer";;
import { v4 as uuidv4 } from 'uuid';
import { executeQuery } from './db';
import { processTrackForUi, processTrackStatuses, processTrackTypes } from '../config/constants';
Expand Down

0 comments on commit ae1e89e

Please sign in to comment.