Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka fix #1212

Merged
merged 36 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
084552e
Update Listener.ts
ashish-egov Jul 25, 2024
47e0404
added new branch
ashish-egov Jul 25, 2024
03580a7
Update Listener.ts
ashish-egov Jul 25, 2024
69ef770
Merge pull request #1148 from egovernments/ashish-egov-patch-3
ashish-egov Jul 25, 2024
151a2e4
fixed mapping kafka error
ashish-egov Jul 25, 2024
8d9f93c
Merge branch 'campaign-for-test' of https://github.com/egovernments/D…
ashish-egov Jul 25, 2024
ac319d0
mapping kafka fixed
ashish-egov Jul 25, 2024
3ab35d4
fix kafka
ashish-egov Jul 25, 2024
ae6d57e
fix kafka
ashish-egov Jul 25, 2024
8a2947f
Removing foreign key constraint
ashish-egov Jul 26, 2024
a56af52
Merge remote-tracking branch 'origin/campaign' into campaign-for-test
ashish-egov Jul 29, 2024
dd1fadd
Producer update
ashish-egov Jul 29, 2024
35a3abc
Merge branch 'campaign-for-test' into ashish-patch-1
ashish-egov Jul 29, 2024
787d41d
Merge pull request #1177 from egovernments/ashish-patch-1
ashish-egov Jul 29, 2024
eb77bfd
Revert "Ashish egov patch 2 (#1178)"
ashish-egov Jul 30, 2024
a6422d4
Update Producer.ts
ashish-egov Jul 30, 2024
dfc5aff
Merge pull request #1183 from egovernments/ashish-egov-patch-2
ashish-egov Jul 30, 2024
a362074
Update Producer.ts
ashish-egov Jul 30, 2024
28806eb
Merge pull request #1184 from egovernments/ashish-egov-patch-2
ashish-egov Jul 30, 2024
12d05a1
Feat : updated producemodified message
ashish-egov Jul 31, 2024
8b68a55
Merge remote-tracking branch 'origin/campaign' into campaign-for-test
ashish-egov Jul 31, 2024
a56c5a7
Feat : removed waiting
ashish-egov Jul 31, 2024
1196562
Merge branch 'campaign-for-test' of https://github.com/egovernments/D…
ashish-egov Jul 31, 2024
45a6ad8
adding constraint
ashish-egov Jul 31, 2024
6668394
Merge pull request #1196 from egovernments/processTracKConstraint
ashish-egov Jul 31, 2024
68ace0d
Update V20240731162600__add_uniqiue_constraint_process_track.sql
ashish-egov Jul 31, 2024
2128542
Merge pull request #1198 from egovernments/ashish-egov-patch-1
ashish-egov Jul 31, 2024
9de0f8b
Update constants.ts
ashish-egov Jul 31, 2024
8cef4b2
Merge pull request #1199 from egovernments/ashish-egov-patch-1
ashish-egov Jul 31, 2024
8b021fc
Merge remote-tracking branch 'origin/campaign' into campaign-for-test
ashish-egov Jul 31, 2024
4588480
Feat : improved kafka
ashish-egov Aug 1, 2024
2931601
Merge remote-tracking branch 'origin/campaign' into improvedKafka
ashish-egov Aug 1, 2024
8494786
Merge remote-tracking branch 'origin/campaign-for-test' into improved…
ashish-egov Aug 1, 2024
970af1e
Merge pull request #1208 from egovernments/improvedKafka
ashish-egov Aug 1, 2024
40fc96e
Merge remote-tracking branch 'origin/campaign' into campaign-for-test
ashish-egov Aug 2, 2024
7f84d1a
Fix kafka restart issue
ashish-egov Aug 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion utilities/project-factory/src/server/api/campaignApis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { immediateValidationForTargetSheet, validateSheetData, validateTargetShe
import { callMdmsTypeSchema, getCampaignNumber } from "./genericApis";
import { boundaryBulkUpload, convertToTypeData, generateHierarchy, generateProcessedFileAndPersist, getBoundaryOnWhichWeSplit, getLocalizedName, reorderBoundariesOfDataAndValidate, checkIfSourceIsMicroplan } from "../utils/campaignUtils";
const _ = require('lodash');
import { produceModifiedMessages } from "../kafka/Listener";
import { produceModifiedMessages } from "../kafka/Producer";
import { createDataService } from "../service/dataManageService";
import { searchProjectTypeCampaignService } from "../service/campaignManageService";
import { getExcelWorkbookFromFileURL } from "../utils/excelUtils";
Expand Down
41 changes: 1 addition & 40 deletions utilities/project-factory/src/server/kafka/Listener.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { ConsumerGroup, ConsumerGroupOptions, Message } from 'kafka-node';
import config from '../config';
import { getFormattedStringForDebug, logger } from '../utils/logger';
import { shutdownGracefully, throwError } from '../utils/genericUtils';
import { shutdownGracefully } from '../utils/genericUtils';
import { handleCampaignMapping } from '../utils/campaignMappingUtils';
import { producer } from './Producer';

// Kafka Configuration
const kafkaConfig: ConsumerGroupOptions = {
Expand Down Expand Up @@ -54,41 +53,3 @@ export function listener() {
logger.error(`Offset out of range error: ${err}`);
});
}


/**
* Produces modified messages to a specified Kafka topic.
* @param modifiedMessages An array of modified messages to be produced.
* @param topic The Kafka topic to which the messages will be produced.
* @returns A promise that resolves when the messages are successfully produced.
*/
async function produceModifiedMessages(modifiedMessages: any[], topic: any) {
try {
logger.info(`KAFKA :: PRODUCER :: a message sent to topic ${topic}`);
logger.debug(`KAFKA :: PRODUCER :: message ${getFormattedStringForDebug(modifiedMessages)}`);
const payloads = [
{
topic: topic,
messages: JSON.stringify(modifiedMessages), // Convert modified messages to JSON string
},
];

// Send payloads to the Kafka producer
producer.send(payloads, (err: any) => {
if (err) {
console.error(err);
console.log('Error coming for message : ', modifiedMessages);
logger.info('KAFKA :: PRODUCER :: Some Error Occurred ');
logger.error(`KAFKA :: PRODUCER :: Error : ${JSON.stringify(err)}`);
} else {
logger.info('KAFKA :: PRODUCER :: message sent successfully ');
}
});
} catch (error) {
console.error(error);
logger.error(`KAFKA :: PRODUCER :: Exception caught: ${JSON.stringify(error)}`);
throwError("COMMON", 400, "KAKFA_ERROR", "Some error occured in kafka"); // Re-throw the error after logging it
}
}

export { produceModifiedMessages } // Export the produceModifiedMessages function for external use
110 changes: 79 additions & 31 deletions utilities/project-factory/src/server/kafka/Producer.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,37 @@
import config from '../config'; // Importing configuration settings
import { Producer, KafkaClient } from 'kafka-node'; // Importing Producer and KafkaClient from 'kafka-node' library
import { Producer, KafkaClient } from 'kafka-node';
import { logger } from "../utils/logger";
import { shutdownGracefully } from '../utils/genericUtils';
import { shutdownGracefully, throwError } from '../utils/genericUtils';
import config from '../config';

// Global producer instance
let producer: Producer;

// Creating a new Kafka client instance using the configured Kafka broker host
const kafkaClient = new KafkaClient({
kafkaHost: config?.host?.KAFKA_BROKER_HOST, // Configuring Kafka broker host
connectRetryOptions: { retries: 1 }, // Configuring connection retry options
kafkaHost: config?.host?.KAFKA_BROKER_HOST,
connectRetryOptions: { retries: 1 },
});

// Event listener for 'error' event, indicating that the client encountered an error
kafkaClient.on('error', (err: any) => {
logger.error('Kafka client is in error state'); // Log message indicating client is in error state
console.error(err.stack || err); // Log the error stack or message
shutdownGracefully();
});

// Creating a new Kafka producer instance using the Kafka client
const producer = new Producer(kafkaClient, { partitionerType: 2 }); // Using partitioner type 2
const createProducer = () => {
producer = new Producer(kafkaClient, { partitionerType: 2 });

producer.on('ready', () => {
logger.info('Producer is ready');
checkBrokerAvailability();
});

producer.on('error', (err: any) => {
logger.error('Producer is in error state');
console.error(err);
shutdownGracefully();
});
};

// Function to check broker availability by listing all brokers
const checkBrokerAvailability = () => {
Expand All @@ -33,30 +54,57 @@ const checkBrokerAvailability = () => {
});
};

// Event listener for 'ready' event, indicating that the client is ready to check broker availability
kafkaClient.on('ready', () => {
logger.info('Kafka client is ready'); // Log message indicating client is ready
checkBrokerAvailability(); // Check broker availability
});

// Event listener for 'ready' event, indicating that the producer is ready to send messages
producer.on('ready', () => {
logger.info('Producer is ready'); // Log message indicating producer is ready
checkBrokerAvailability(); // Check broker availability
});
createProducer();

// Event listener for 'error' event, indicating that the client encountered an error
kafkaClient.on('error', (err: any) => {
logger.error('Kafka client is in error state'); // Log message indicating client is in error state
console.error(err.stack || err); // Log the error stack or message
shutdownGracefully();
});
const sendWithRetries = (payloads: any[], retries = 3, shutdown: boolean = false): Promise<void> => {
return new Promise((resolve, reject) => {
producer.send(payloads, async (err: any) => {
if (err) {
logger.error('Error sending message:', err);
if (retries > 0) {
logger.info(`Retrying to send message. Retries left: ${retries}`);
await new Promise(resolve => setTimeout(resolve, 2000)); // wait before retrying
resolve(sendWithRetries(payloads, retries - 1));
} else {
// Attempt to reconnect and retry
logger.error('Failed to send message after retries. Reconnecting producer...');
if (shutdown) {
shutdownGracefully();
}
else {
producer.close(() => {
createProducer(); // Recreate the producer
setTimeout(() => {
sendWithRetries(payloads, 1, true).catch(reject);
}, 2000); // wait before retrying after reconnect
});
}
}
} else {
logger.info('Message sent successfully');
resolve();
}
});
});
};

// Event listener for 'error' event, indicating that the producer encountered an error
producer.on('error', (err: any) => {
logger.error('Producer is in error state'); // Log message indicating producer is in error state
console.error(err); // Log the error stack or message
shutdownGracefully();
});
async function produceModifiedMessages(modifiedMessages: any[], topic: any) {
try {
logger.info(`KAFKA :: PRODUCER :: A message sent to topic ${topic}`);
logger.debug(`KAFKA :: PRODUCER :: Message ${JSON.stringify(modifiedMessages)}`);
const payloads = [
{
topic: topic,
messages: JSON.stringify(modifiedMessages),
},
];

await sendWithRetries(payloads, 3);
} catch (error) {
logger.error(`KAFKA :: PRODUCER :: Exception caught: ${JSON.stringify(error)}`);
throwError("COMMON", 400, "KAFKA_ERROR", "Some error occurred in Kafka"); // Re-throw the error after logging it
}
}

export { producer }; // Exporting the producer instance for external use
export { produceModifiedMessages };
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import config from "../config";
import { getDataFromSheet, throwError } from "./genericUtils";
import { getFormattedStringForDebug, logger } from "./logger";
import { defaultheader, httpRequest } from "./request";
import { produceModifiedMessages } from "../kafka/Listener";
import { produceModifiedMessages } from "../kafka/Producer";
import { enrichAndPersistCampaignWithError, getLocalizedName } from "./campaignUtils";
import { campaignStatuses, resourceDataStatuses } from "../config/constants";
import { createCampaignService } from "../service/campaignManageService";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import { defaultheader, httpRequest } from "./request";
import config from "../config/index";
import { v4 as uuidv4 } from 'uuid';
import { produceModifiedMessages } from '../kafka/Listener'
import { produceModifiedMessages } from "../kafka/Producer";
import { confirmProjectParentCreation, createProjectCampaignResourcData, getCampaignSearchResponse, getHierarchy, handleResouceDetailsError, projectCreate } from "../api/campaignApis";
import { getCampaignNumber, createAndUploadFile, getSheetData, createExcelSheet, getAutoGeneratedBoundaryCodesHandler, createBoundaryEntities, createBoundaryRelationship, getMDMSV1Data, getTargetSheetDataAfterCode, callMdmsTypeSchema, getSheetDataFromWorksheet } from "../api/genericApis";
import { getFormattedStringForDebug, logger } from "./logger";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { NextFunction, Request, Response } from "express";
import { httpRequest, defaultheader } from "./request";
import config, { getErrorCodes } from "../config/index";
import { v4 as uuidv4 } from 'uuid';
import { produceModifiedMessages } from "../kafka/Listener";
import { produceModifiedMessages } from "../kafka/Producer";
import { generateHierarchyList, getAllFacilities, getCampaignSearchResponse, getHierarchy } from "../api/campaignApis";
import { getBoundarySheetData, getSheetData, createAndUploadFile, createExcelSheet, getTargetSheetData, callMdmsData, callMdmsTypeSchema } from "../api/genericApis";
import { logger } from "./logger";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import config from './../config';
import { produceModifiedMessages } from '../kafka/Listener';
import { produceModifiedMessages } from "../kafka/Producer";;
import { v4 as uuidv4 } from 'uuid';
import { executeQuery } from './db';
import { processTrackForUi, processTrackStatuses, processTrackTypes } from '../config/constants';
Expand Down
Loading