Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved kafka #1208

Merged
merged 10 commits into from
Aug 1, 2024
4 changes: 2 additions & 2 deletions micro-ui/web/micro-ui-internals/example/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
"start": "react-scripts start"
},
"devDependencies": {
"@egovernments/digit-ui-libraries": "1.8.2-beta.5",
"@egovernments/digit-ui-libraries": "1.8.2-beta.6",
"@egovernments/digit-ui-module-workbench": "1.0.2-beta.3",
"@egovernments/digit-ui-components": "0.0.2-beta.19",
"@egovernments/digit-ui-module-core": "1.8.2-beta.9",
"@egovernments/digit-ui-module-core": "1.8.2-beta.10",
"@egovernments/digit-ui-module-utilities": "1.0.1-beta.30",
"@egovernments/digit-ui-react-components": "1.8.2-beta.11",
"@egovernments/digit-ui-module-hcmworkbench":"0.0.38",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
rel="stylesheet"
href="https://unpkg.com/@egovernments/[email protected]/dist/index.css"
/> -->
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected].7/dist/index.css" />
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected].8/dist/index.css" />
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected]/dist/index.css" />
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected]/dist/index.css" />
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected]/dist/index.css" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const BoundaryWithDate = ({ project, props, onSelect, dateReducerDispatch, canDe
// const { t } = useTranslation();
const ONE_DAY_IN_MS = 24 * 60 * 60 * 1000;
const today = Digit.Utils.date.getDate(Date.now());
const tomorrow = Digit.Utils.date.getDate(new Date(today).getTime() + ONE_DAY_IN_MS);
const [startDate, setStartDate] = useState(project?.startDate ? Digit.Utils.date.getDate(project?.startDate) : ""); // Set default start date to today
const [endDate, setEndDate] = useState(project?.endDate ? Digit.Utils.date.getDate(project?.endDate) : ""); // Default end date
const [cycleDates, setCycleDates] = useState(null);
Expand All @@ -29,7 +28,7 @@ const BoundaryWithDate = ({ project, props, onSelect, dateReducerDispatch, canDe
}, [project]);

const handleDateChange = ({ date, endDate = false, cycleDate = false, cycleIndex }) => {
if (typeof date === "undefined") {
if (typeof date === "undefined" || date <= today) {
return null;
}
if (!endDate) {
Expand All @@ -48,7 +47,7 @@ const BoundaryWithDate = ({ project, props, onSelect, dateReducerDispatch, canDe
};

const handleCycleDateChange = ({ date, endDate = false, cycleIndex }) => {
if (typeof date === "undefined") {
if (typeof date === "undefined" || date <= today) {
return null;
}
if (!endDate) {
Expand Down Expand Up @@ -153,7 +152,7 @@ const BoundaryWithDate = ({ project, props, onSelect, dateReducerDispatch, canDe
?.toISOString()
?.split("T")?.[0]
: today >= startDate
? tomorrow
? today
: startDate,
max: endDate,
},
Expand All @@ -180,16 +179,13 @@ const BoundaryWithDate = ({ project, props, onSelect, dateReducerDispatch, canDe
placeholder={t("HCM_END_DATE")}
populators={{
validation: {
min:
!isNaN(new Date(cycleDates?.find((j) => j.cycleIndex == index + 1)?.startDate)?.getTime()) &&
Digit.Utils.date.getDate(new Date(cycleDates?.find((j) => j.cycleIndex == index + 1)?.startDate)?.getTime() + ONE_DAY_IN_MS) >
today
? new Date(new Date(cycleDates?.find((j) => j.cycleIndex == index + 1)?.startDate)?.getTime() + ONE_DAY_IN_MS)
?.toISOString()
?.split("T")?.[0]
: today >= startDate
? tomorrow
: startDate,
min: !isNaN(new Date(cycleDates?.find((j) => j.cycleIndex == index + 1)?.startDate)?.getTime())
? new Date(new Date(cycleDates?.find((j) => j.cycleIndex == index + 1)?.startDate)?.getTime() + ONE_DAY_IN_MS)
?.toISOString()
?.split("T")?.[0]
: today >= startDate
? today
: startDate,
max: endDate,
},
}}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Link, useHistory } from "react-router-dom";
import { Link } from "react-router-dom";
import _ from "lodash";
import React from "react";
import { Fragment } from "react";
Expand Down Expand Up @@ -118,8 +118,8 @@ export const UICustomizations = {
"",
`/${window.contextPath}/employee/campaign/update-dates-boundary?id=${row?.id}`
);
window.location.href = `/${window.contextPath}/employee/campaign/update-dates-boundary?id=${row?.id}`;

const navEvent = new PopStateEvent("popstate");
window.dispatchEvent(navEvent);
break;
case "ACTION_LABEL_VIEW_TIMELINE":
setTimeline(true);
Expand Down Expand Up @@ -393,8 +393,8 @@ export const UICustomizations = {
"",
`/${window.contextPath}/employee/campaign/update-dates-boundary?id=${row?.id}`
);
window.location.href = `/${window.contextPath}/employee/campaign/update-dates-boundary?id=${row?.id}`;

const navEvent = new PopStateEvent("popstate");
window.dispatchEvent(navEvent);
break;
case "ACTION_LABEL_VIEW_TIMELINE":
setTimeline(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ const DateAndCycleUpdate = ({ onSelect, formData, ...props }) => {
};

const handleCycleDateChange = ({ date, endDate = false, cycleIndex }) => {
if (typeof date === "undefined") {
if (typeof date === "undefined" || date <= today) {
return null;
}
if (!endDate) {
Expand Down Expand Up @@ -278,7 +278,14 @@ const DateAndCycleUpdate = ({ onSelect, formData, ...props }) => {
withoutLabel={true}
type="date"
value={item?.endDate}
nonEditable={item?.endDate && item?.endDate?.length > 0 && today >= item?.endDate ? true : false}
nonEditable={
item?.endDate &&
item?.endDate?.length > 0 &&
today >= item?.endDate &&
(cycleDates?.[index + 1] ? today >= cycleDates?.[index + 1]?.startDate : true)
? true
: false
}
Comment on lines +281 to +288
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplify conditional expression.

The use of boolean literals in the conditional expression is unnecessary. Simplify the code by directly assigning the result.

-  nonEditable={
-    item?.endDate &&
-    item?.endDate?.length > 0 &&
-    today >= item?.endDate &&
-    (cycleDates?.[index + 1] ? today >= cycleDates?.[index + 1]?.startDate : true)
-      ? true
-      : false
-  }
+  nonEditable={
+    item?.endDate &&
+    item?.endDate?.length > 0 &&
+    today >= item?.endDate &&
+    (!cycleDates?.[index + 1] || today >= cycleDates?.[index + 1]?.startDate)
+  }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
nonEditable={
item?.endDate &&
item?.endDate?.length > 0 &&
today >= item?.endDate &&
(cycleDates?.[index + 1] ? today >= cycleDates?.[index + 1]?.startDate : true)
? true
: false
}
nonEditable={
item?.endDate &&
item?.endDate?.length > 0 &&
today >= item?.endDate &&
(!cycleDates?.[index + 1] || today >= cycleDates?.[index + 1]?.startDate)
}
Tools
Biome

[error] 282-287: Unnecessary use of boolean literals in conditional expression.

Simplify your code by directly assigning the result without using a ternary operator.
If your goal is negation, you may use the logical NOT (!) or double NOT (!!) operator for clearer and concise code.
Check for more details about NOT operator.
Unsafe fix: Remove the conditional expression with

(lint/complexity/noUselessTernary)

placeholder={t("HCM_END_DATE")}
populators={{
validation: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import React, { useState, useEffect } from "react";
import { useTranslation } from "react-i18next";
import { useHistory, useLocation } from "react-router-dom";
import { dateChangeBoundaryConfig, dateChangeConfig } from "../../configs/dateChangeBoundaryConfig";
import { Button, PopUp, Toast } from "@egovernments/digit-ui-components";
import { Button, InfoCard, PopUp, Toast } from "@egovernments/digit-ui-components";

function UpdateDatesWithBoundaries() {
const { t } = useTranslation();
Expand Down Expand Up @@ -152,6 +152,17 @@ function UpdateDatesWithBoundaries() {
actionClassName={"dateUpdateAction"}
noCardStyle={true}
/>
<InfoCard
className={"infoClass"}
populators={{
name: "infocard",
}}
variant="default"
style={{ marginBottom: "1.5rem", marginTop: "1.5rem", marginLeft: "0rem", maxWidth: "100%" }}
additionalElements={[<span style={{ color: "#505A5F" }}>{t(`UPDATE_DATE_CHANGE_INFO_TEXT`)}</span>]}
label={"Info"}
headerClassName={"headerClassName"}
/>
{showPopUp && (
<PopUp
className={"boundaries-pop-module"}
Expand Down
4 changes: 2 additions & 2 deletions micro-ui/web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
],
"homepage": "/digit-ui",
"dependencies": {
"@egovernments/digit-ui-libraries": "1.8.2-beta.5",
"@egovernments/digit-ui-libraries": "1.8.2-beta.6",
"@egovernments/digit-ui-module-workbench": "1.0.1-beta.16",
"@egovernments/digit-ui-module-core": "1.8.2-beta.9",
"@egovernments/digit-ui-module-core": "1.8.2-beta.10",
"@egovernments/digit-ui-module-hrms": "1.8.0-beta.2",
"@egovernments/digit-ui-react-components": "1.8.2-beta.11",
"@egovernments/digit-ui-components": "0.0.2-beta.19",
Expand Down
2 changes: 1 addition & 1 deletion micro-ui/web/public/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
href="https://fonts.googleapis.com/css2?family=Roboto+Condensed:wght@400;500;700&family=Roboto:wght@400;500;700&display=swap"
rel="stylesheet" type="text/css" />
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected]/dist/index.css" />
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected].7/dist/index.css" />
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected].8/dist/index.css" />
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected]/dist/index.css" />
<!-- added below css for hcm-workbench module inclusion-->
<link rel="stylesheet" href="https://unpkg.com/@egovernments/[email protected]/dist/index.css" />
Expand Down
4 changes: 2 additions & 2 deletions micro-ui/web/workbench/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
],
"homepage": "/workbench-ui",
"dependencies": {
"@egovernments/digit-ui-libraries": "1.8.2-beta.5",
"@egovernments/digit-ui-libraries": "1.8.2-beta.6",
"@egovernments/digit-ui-module-workbench": "1.0.2-beta.3",
"@egovernments/digit-ui-components": "0.0.2-beta.19",
"@egovernments/digit-ui-module-core": "1.8.2-beta.9",
"@egovernments/digit-ui-module-core": "1.8.2-beta.10",
"@egovernments/digit-ui-module-utilities": "1.0.1-beta.30",
"@egovernments/digit-ui-react-components": "1.8.2-beta.11",
"@egovernments/digit-ui-module-hcmworkbench":"0.0.38",
Expand Down
2 changes: 1 addition & 1 deletion utilities/project-factory/src/server/api/campaignApis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { immediateValidationForTargetSheet, validateSheetData, validateTargetShe
import { callMdmsTypeSchema, getCampaignNumber } from "./genericApis";
import { boundaryBulkUpload, convertToTypeData, generateHierarchy, generateProcessedFileAndPersist, getBoundaryOnWhichWeSplit, getLocalizedName, reorderBoundariesOfDataAndValidate, checkIfSourceIsMicroplan } from "../utils/campaignUtils";
const _ = require('lodash');
import { produceModifiedMessages } from "../kafka/Listener";
import { produceModifiedMessages } from "../kafka/Producer";
import { createDataService } from "../service/dataManageService";
import { searchProjectTypeCampaignService } from "../service/campaignManageService";
import { getExcelWorkbookFromFileURL } from "../utils/excelUtils";
Expand Down
6 changes: 3 additions & 3 deletions utilities/project-factory/src/server/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ const getDBSchemaName = (dbSchema = "") => {
const config = {
cacheTime : 300,
enableDynamicTemplateFor: process.env.ENABLE_DYNAMIC_TEMPLATE_FOR || "MR-DN",
isCallGenerateWhenDeliveryConditionsDiffer: process.env.IS_CALL_GENERATE_WHEN_DELIVERY_CONDITIONS_DIFFER || false,
isCallGenerateWhenDeliveryConditionsDiffer: (process.env.IS_CALL_GENERATE_WHEN_DELIVERY_CONDITIONS_DIFFER === "true") || false,
prefixForMicroplanCampaigns: "MP",
excludeHierarchyTypeFromBoundaryCodes: process.env.EXCLUDE_HIERARCHY_TYPE_FROM_BOUNDARY_CODES || false,
excludeBoundaryNameAtLastFromBoundaryCodes: process.env.EXCLUDE_BOUNDARY_NAME_AT_LAST_FROM_BOUNDARY_CODES || false,
excludeHierarchyTypeFromBoundaryCodes: (process.env.EXCLUDE_HIERARCHY_TYPE_FROM_BOUNDARY_CODES === "true") || false,
excludeBoundaryNameAtLastFromBoundaryCodes: (process.env.EXCLUDE_BOUNDARY_NAME_AT_LAST_FROM_BOUNDARY_CODES === "true") || false,
masterNameForSchemaOfColumnHeaders: "adminSchema",
masterNameForSplitBoundariesOn: "hierarchyConfig",
boundary: {
Expand Down
41 changes: 1 addition & 40 deletions utilities/project-factory/src/server/kafka/Listener.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { ConsumerGroup, ConsumerGroupOptions, Message } from 'kafka-node';
import config from '../config';
import { getFormattedStringForDebug, logger } from '../utils/logger';
import { shutdownGracefully, throwError } from '../utils/genericUtils';
import { shutdownGracefully } from '../utils/genericUtils';
import { handleCampaignMapping } from '../utils/campaignMappingUtils';
import { producer } from './Producer';

// Kafka Configuration
const kafkaConfig: ConsumerGroupOptions = {
Expand Down Expand Up @@ -54,41 +53,3 @@ export function listener() {
logger.error(`Offset out of range error: ${err}`);
});
}


/**
* Produces modified messages to a specified Kafka topic.
* @param modifiedMessages An array of modified messages to be produced.
* @param topic The Kafka topic to which the messages will be produced.
* @returns A promise that resolves when the messages are successfully produced.
*/
async function produceModifiedMessages(modifiedMessages: any[], topic: any) {
try {
logger.info(`KAFKA :: PRODUCER :: a message sent to topic ${topic}`);
logger.debug(`KAFKA :: PRODUCER :: message ${getFormattedStringForDebug(modifiedMessages)}`);
const payloads = [
{
topic: topic,
messages: JSON.stringify(modifiedMessages), // Convert modified messages to JSON string
},
];

// Send payloads to the Kafka producer
producer.send(payloads, (err: any) => {
if (err) {
console.error(err);
console.log('Error coming for message : ', modifiedMessages);
logger.info('KAFKA :: PRODUCER :: Some Error Occurred ');
logger.error(`KAFKA :: PRODUCER :: Error : ${JSON.stringify(err)}`);
} else {
logger.info('KAFKA :: PRODUCER :: message sent successfully ');
}
});
} catch (error) {
console.error(error);
logger.error(`KAFKA :: PRODUCER :: Exception caught: ${JSON.stringify(error)}`);
throwError("COMMON", 400, "KAKFA_ERROR", "Some error occured in kafka"); // Re-throw the error after logging it
}
}

export { produceModifiedMessages } // Export the produceModifiedMessages function for external use
110 changes: 79 additions & 31 deletions utilities/project-factory/src/server/kafka/Producer.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,37 @@
import config from '../config'; // Importing configuration settings
import { Producer, KafkaClient } from 'kafka-node'; // Importing Producer and KafkaClient from 'kafka-node' library
import { Producer, KafkaClient } from 'kafka-node';
import { logger } from "../utils/logger";
import { shutdownGracefully } from '../utils/genericUtils';
import { shutdownGracefully, throwError } from '../utils/genericUtils';
import config from '../config';

// Global producer instance
let producer: Producer;

// Creating a new Kafka client instance using the configured Kafka broker host
const kafkaClient = new KafkaClient({
kafkaHost: config?.host?.KAFKA_BROKER_HOST, // Configuring Kafka broker host
connectRetryOptions: { retries: 1 }, // Configuring connection retry options
kafkaHost: config?.host?.KAFKA_BROKER_HOST,
connectRetryOptions: { retries: 1 },
});

// Event listener for 'error' event, indicating that the client encountered an error
kafkaClient.on('error', (err: any) => {
logger.error('Kafka client is in error state'); // Log message indicating client is in error state
console.error(err.stack || err); // Log the error stack or message
shutdownGracefully();
});

// Creating a new Kafka producer instance using the Kafka client
const producer = new Producer(kafkaClient, { partitionerType: 2 }); // Using partitioner type 2
const createProducer = () => {
producer = new Producer(kafkaClient, { partitionerType: 2 });

producer.on('ready', () => {
logger.info('Producer is ready');
checkBrokerAvailability();
});

producer.on('error', (err: any) => {
logger.error('Producer is in error state');
console.error(err);
shutdownGracefully();
});
};

// Function to check broker availability by listing all brokers
const checkBrokerAvailability = () => {
Expand All @@ -33,30 +54,57 @@ const checkBrokerAvailability = () => {
});
};

// Event listener for 'ready' event, indicating that the client is ready to check broker availability
kafkaClient.on('ready', () => {
logger.info('Kafka client is ready'); // Log message indicating client is ready
checkBrokerAvailability(); // Check broker availability
});

// Event listener for 'ready' event, indicating that the producer is ready to send messages
producer.on('ready', () => {
logger.info('Producer is ready'); // Log message indicating producer is ready
checkBrokerAvailability(); // Check broker availability
});
createProducer();

// Event listener for 'error' event, indicating that the client encountered an error
kafkaClient.on('error', (err: any) => {
logger.error('Kafka client is in error state'); // Log message indicating client is in error state
console.error(err.stack || err); // Log the error stack or message
shutdownGracefully();
});
const sendWithRetries = (payloads: any[], retries = 3, shutdown: boolean = false): Promise<void> => {
return new Promise((resolve, reject) => {
producer.send(payloads, async (err: any) => {
if (err) {
logger.error('Error sending message:', err);
if (retries > 0) {
logger.info(`Retrying to send message. Retries left: ${retries}`);
await new Promise(resolve => setTimeout(resolve, 2000)); // wait before retrying
resolve(sendWithRetries(payloads, retries - 1));
} else {
// Attempt to reconnect and retry
logger.error('Failed to send message after retries. Reconnecting producer...');
if (shutdown) {
shutdownGracefully();
}
else {
producer.close(() => {
createProducer(); // Recreate the producer
setTimeout(() => {
sendWithRetries(payloads, 1, true).catch(reject);
}, 2000); // wait before retrying after reconnect
});
}
}
} else {
logger.info('Message sent successfully');
resolve();
}
});
});
};

// Event listener for 'error' event, indicating that the producer encountered an error
producer.on('error', (err: any) => {
logger.error('Producer is in error state'); // Log message indicating producer is in error state
console.error(err); // Log the error stack or message
shutdownGracefully();
});
async function produceModifiedMessages(modifiedMessages: any[], topic: any) {
try {
logger.info(`KAFKA :: PRODUCER :: A message sent to topic ${topic}`);
logger.debug(`KAFKA :: PRODUCER :: Message ${JSON.stringify(modifiedMessages)}`);
const payloads = [
{
topic: topic,
messages: JSON.stringify(modifiedMessages),
},
];

await sendWithRetries(payloads, 3);
} catch (error) {
logger.error(`KAFKA :: PRODUCER :: Exception caught: ${JSON.stringify(error)}`);
throwError("COMMON", 400, "KAFKA_ERROR", "Some error occurred in Kafka"); // Re-throw the error after logging it
}
}

export { producer }; // Exporting the producer instance for external use
export { produceModifiedMessages };
Loading
Loading