Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop #919

Merged
merged 2 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.egov.wscalculation.web.models.users.UserDetailResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
Expand Down Expand Up @@ -71,6 +72,9 @@ public class DemandGenerationConsumer {
@Autowired
private NotificationUtil util;

@Autowired
private KafkaTemplate kafkaTemplate;

@Autowired
private CalculatorUtil calculatorUtils;

Expand Down Expand Up @@ -192,7 +196,7 @@ private void generateDemandInBatch(CalculationReq request, Map<String, Object> m
wsCalulationWorkflowValidator.applicationValidation(request.getRequestInfo(), criteria.getTenantId(),
criteria.getConnectionNo(), genratedemand);
}*/
System.out.println("Calling Bulk Demand generation connection Number" + request.getCalculationCriteria().get(0).getConnectionNo());
//System.out.println("Calling Bulk Demand generation connection Number" + request.getCalculationCriteria().get(0).getConnectionNo());
wSCalculationServiceImpl.bulkDemandGeneration(request, masterMap);
/*String connectionNoStrings = request.getCalculationCriteria().stream()
.map(criteria -> criteria.getConnectionNo()).collect(Collectors.toSet()).toString();
Expand Down Expand Up @@ -268,13 +272,13 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
Long dayEndTime = LocalDateTime
.of(toDate.getYear(), toDate.getMonth(), toDate.getDayOfMonth(), 23, 59, 59, 999000000)
.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();

Long StartTimeForGetConnetion = System.currentTimeMillis();

List<String> connectionNos = waterCalculatorDao.getNonMeterConnectionsList(tenantId, dayStartTime, dayEndTime);



Calendar previousFromDate = Calendar.getInstance();
/*Calendar previousFromDate = Calendar.getInstance();
Calendar previousToDate = Calendar.getInstance();

previousFromDate.setTimeInMillis(dayStartTime);
Expand All @@ -283,26 +287,27 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
previousFromDate.add(Calendar.MONTH, -1); //assuming billing cycle will be first day of month
previousToDate.add(Calendar.MONTH, -1);
int max = previousToDate.getActualMaximum(Calendar.DAY_OF_MONTH);
previousToDate.set(Calendar.DAY_OF_MONTH, max);
previousToDate.set(Calendar.DAY_OF_MONTH, max);*/
String assessmentYear = estimationService.getAssessmentYear();
ArrayList<String> failedConnectionNos = new ArrayList<String>();

Long startTimeForMdms= System.
currentTimeMillis();
Map<String, Object> masterMap = mDataService.loadMasterData(requestInfo,
tenantId);

log.info("connectionNos" + connectionNos.size());
log.info("connectionNos" + connectionNos);
log.info("dayStartTime:"+dayStartTime);
log.info("dayEndTime"+dayEndTime);

long startTimeForLoop= System.currentTimeMillis();
for (String connectionNo : connectionNos) {
long timeBeforePushToKafka = System.currentTimeMillis();
CalculationCriteria calculationCriteria = CalculationCriteria.builder().tenantId(tenantId)
.assessmentYear(assessmentYear).connectionNo(connectionNo).from(dayStartTime).to(dayEndTime).build();
List<CalculationCriteria> calculationCriteriaList = new ArrayList<>();
calculationCriteriaList.add(calculationCriteria);
CalculationReq calculationReq = CalculationReq.builder().calculationCriteria(calculationCriteriaList)
.requestInfo(requestInfo).isconnectionCalculation(true).isAdvanceCalculation(false).build();

Set<String> consumerCodes = new LinkedHashSet<String>();
/*Set<String> consumerCodes = new LinkedHashSet<String>();
consumerCodes.add(connectionNo);

if (!waterCalculatorDao.isDemandExists(tenantId, previousFromDate.getTimeInMillis(),
Expand All @@ -312,7 +317,7 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
log.warn("this connection doen't have the demand in previous billing cycle :" + connectionNo);
failedConnectionNos.add(connectionNo);
continue;
}
}*/
HashMap<Object, Object> genarateDemandData = new HashMap<Object, Object>();
genarateDemandData.put("calculationReq", calculationReq);
genarateDemandData.put("billingCycle",billingCycle);
Expand All @@ -327,10 +332,14 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
* log.warn("this connection doen't have the demand in previous billing cycle :"
* + connectionNo ); continue; }
*/
log.info("sending generate demand for connection no :"+connectionNo);
producer.push(config.getWsGenerateDemandBulktopic(),genarateDemandData);

long timetakenToPush= System.currentTimeMillis();
kafkaTemplate.send(config.getWsGenerateDemandBulktopic(),genarateDemandData);

}
log.info("Time taken for the for loop : "+(System.currentTimeMillis()-startTimeForLoop)/1000+ " Secondss");

Long starttimeforNotification= System.currentTimeMillis();
HashMap<String, String> demandMessage = util.getLocalizationMessage(requestInfo,
WSCalculationConstant.mGram_Consumer_NewDemand, tenantId);
HashMap<String, String> gpwscMap = util.getLocalizationMessage(requestInfo, tenantId, tenantId);
Expand Down Expand Up @@ -362,7 +371,7 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
producer.push(config.getSmsNotifTopic(), smsRequest);
}
}

log.info("Time taken for notification : "+(System.currentTimeMillis()-starttimeforNotification)/1000+ " Secondss");
});
/* if (isSendMessage && failedConnectionNos.size() > 0) {
List<ActionItem> actionItems = new ArrayList<>();
Expand Down Expand Up @@ -577,10 +586,15 @@ public void generateBulkDemandForULB(HashMap<Object, Object> messageData) {
String billingPeriod = bulkDemand.getBillingPeriod();
if (StringUtils.isEmpty(billingPeriod))
throw new CustomException("BILLING_PERIOD_PARSING_ISSUE", "Billing Period can not be empty!!");
log.info("CALL FROM TOPIC egov.generate.bulk.demand.manually.topic for tenantid:"
+bulkDemand.getTenantId()+" BillPeriod:"+billingPeriod+" Start Time:"+System.currentTimeMillis() );
Long starTime = System.currentTimeMillis();
log.info("CALL FROM TOPIC egov.generate.bulk.demand.manually.topic" );
generateDemandAndSendnotification(bulkDemand.getRequestInfo(), bulkDemand.getTenantId(), billingPeriod, billingMasterData,
isSendMessage, isManual);

long endTime=System.currentTimeMillis();
long diff = endTime-starTime;
log.info("time takenn to generate demand for Tenantid:"+bulkDemand.getTenantId()+" BillPeriod:"+billingPeriod+" is : "+diff/1000 +" seconds");
}
@KafkaListener(topics = {
"${egov.update.demand.add.penalty}" })
Expand All @@ -603,9 +617,44 @@ public void generateDemandInBulkListner(HashMap<Object, Object> messageData) {
billingCycle= (String) genarateDemandData.get("billingCycle");
isSendMessage= (boolean) genarateDemandData.get("isSendMessage");
tenantId=(String) genarateDemandData.get("tenantId");
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("d/MM/yyyy");


log.info("got generate demand call for :"+calculationReq.getCalculationCriteria().get(0).getConnectionNo());
generateDemandInBulk(calculationReq,billingCycle,masterMap,isSendMessage,tenantId);
LocalDate fromDate = LocalDate.parse(billingCycle.split("-")[0].trim(), formatter);
LocalDate toDate = LocalDate.parse(billingCycle.split("-")[1].trim(), formatter);

Long dayStartTime = LocalDateTime
.of(fromDate.getYear(), fromDate.getMonth(), fromDate.getDayOfMonth(), 0, 0, 0)
.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
Long dayEndTime = LocalDateTime
.of(toDate.getYear(), toDate.getMonth(), toDate.getDayOfMonth(), 23, 59, 59, 999000000)
.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
Calendar previousFromDate = Calendar.getInstance();
Calendar previousToDate = Calendar.getInstance();

previousFromDate.setTimeInMillis(dayStartTime);
previousToDate.setTimeInMillis(dayEndTime);

previousFromDate.add(Calendar.MONTH, -1); //assuming billing cycle will be first day of month
previousToDate.add(Calendar.MONTH, -1);
int max = previousToDate.getActualMaximum(Calendar.DAY_OF_MONTH);
previousToDate.set(Calendar.DAY_OF_MONTH, max);
//log.info("got generate demand call for :"+calculationReq.getCalculationCriteria().get(0).getConnectionNo());
Set<String> consumerCodes = new LinkedHashSet<String>();
consumerCodes.add(calculationReq.getCalculationCriteria().get(0).getConnectionNo());
if (!waterCalculatorDao.isDemandExists(tenantId, previousFromDate.getTimeInMillis(),
previousToDate.getTimeInMillis(), consumerCodes)
&& !waterCalculatorDao.isConnectionExists(tenantId, previousFromDate.getTimeInMillis(),
previousToDate.getTimeInMillis(), consumerCodes)) {
log.warn("this connection doen't have the demand in previous billing cycle :" + calculationReq.getCalculationCriteria().get(0).getConnectionNo());
} else {
Long starttime = System.currentTimeMillis();
generateDemandInBulk(calculationReq, billingCycle, masterMap, isSendMessage, tenantId);
log.info("GOt call inn ws-gennerate-demand-bulk topic end time:" + System.currentTimeMillis());
Long endtime = System.currentTimeMillis();
long diff = endtime - starttime;
log.info("Time taken to process request for :" + calculationReq.getCalculationCriteria().get(0).getConnectionNo() + " is :" + diff / 1000 + " secs");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ spring.kafka.consumer.group-id=egov-ws-calc-services
spring.kafka.consumer.properties.spring.json.use.type.headers=false
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.linger.ms=10
spring.kafka.producer.batch.size=32768
spring.kafka.producer.buffer.memory=33554432
spring.kafka.consumer.session.timeout.ms=30000
spring.kafka.consumer.heartbeat.interval.ms=10000
spring.kafka.consumer.max.poll.interval.ms=600000

$KAFKA TOPIC DETAILS
egov.watercalculatorservice.createdemand.topic=ws-generate-demand
Expand Down
Loading