Skip to content

Commit

Permalink
Datasync changes
Browse files Browse the repository at this point in the history
  • Loading branch information
indraniBan authored and indraniBan committed Aug 5, 2024
1 parent d042ca5 commit 9247fdc
Showing 1 changed file with 144 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package com.iemr.mmu.service.dataSyncActivity;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -40,6 +41,7 @@
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.iemr.mmu.data.syncActivity_syncLayer.DataSyncGroups;
Expand Down Expand Up @@ -121,91 +123,147 @@ private String startDataSync(int vanID, String user, String Authorization) throw
String serverAcknowledgement = null;
List<Map<String, String>> responseStatus = new ArrayList<>();
boolean isProgress = false;
boolean hasSyncFailed = false;
ObjectMapper objectMapper = new ObjectMapper();
// fetch group masters
List<DataSyncGroups> dataSyncGroupList = dataSyncGroupsRepo.findByDeleted(false);
logger.debug("Fetched DataSyncGroups: {}",
objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(dataSyncGroupList));
for (DataSyncGroups dataSyncGroups : dataSyncGroupList) {
int groupId = dataSyncGroups.getSyncTableGroupID();
List<SyncUtilityClass> syncUtilityClassList = getVanAndServerColumns(groupId);
logger.debug("Fetched SyncUtilityClass for groupId {}: {}", groupId,
objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(syncUtilityClassList));
List<Map<String, Object>> syncData;
List<Map<String, Object>> syncDataBatch;
Map<String, String> groupIdStatus = new HashMap<>();
for (SyncUtilityClass obj : syncUtilityClassList) {
// if (!isProgress) {
// get data from DB to sync to server
syncData = getDataToSync(obj.getSchemaName(), obj.getTableName(), obj.getVanColumnName());
// System.out.println(new Gson().toJson(syncData));
if (syncData != null && syncData.size() > 0) {
int dataSize = syncData.size();
int startIndex = 0;
int fullBatchCount = dataSize / BATCH_SIZE;
int remainder = dataSize % BATCH_SIZE;

// sync data to server for batches
for (int i = 0; i < fullBatchCount; i++) {
// get data for each batch
syncDataBatch = getBatchOfAskedSizeDataToSync(syncData, startIndex, BATCH_SIZE);
// for each batch sync data to central server
serverAcknowledgement = syncDataToServer(vanID, obj.getSchemaName(), obj.getTableName(),
obj.getVanAutoIncColumnName(), obj.getServerColumnName(), syncDataBatch, user,
Authorization);
// isProgress = setResponseStatus(groupIdStatus, groupId, serverAcknowledgement, responseStatus, isProgress);
startIndex += BATCH_SIZE;
// if(isProgress)
// break;
// if (!isProgress) {
// get data from DB to sync to server
syncData = getDataToSync(obj.getSchemaName(), obj.getTableName(), obj.getVanColumnName());
logger.debug("Fetched syncData for schema {} and table {}: {}", obj.getSchemaName(), obj.getTableName(),
objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(syncData));
// System.out.println(new Gson().toJson(syncData));
if (syncData != null && syncData.size() > 0) {
int dataSize = syncData.size();
int startIndex = 0;
int fullBatchCount = dataSize / BATCH_SIZE;
int remainder = dataSize % BATCH_SIZE;

logger.info("Starting batch sync for schema: {}, table: {} with {} full batches and {} remainder",
obj.getSchemaName(), obj.getTableName(), fullBatchCount, remainder);


for (int i = 0; i < fullBatchCount; i++) {
syncDataBatch = getBatchOfAskedSizeDataToSync(syncData, startIndex,
BATCH_SIZE);
serverAcknowledgement = syncDataToServer(vanID, obj.getSchemaName(), obj.getTableName(),
obj.getVanAutoIncColumnName(), obj.getServerColumnName(), syncDataBatch, user,
Authorization);
logger.debug("Server acknowledgement for batch {}: {}", i, serverAcknowledgement);

if (serverAcknowledgement == null || !serverAcknowledgement.contains("success")) {
logger.error("Sync failed for batch {} in schema: {}, table: {}", i, obj.getSchemaName(),
obj.getTableName());
hasSyncFailed = true;
setResponseStatus(groupIdStatus, groupId, "failed", responseStatus);
break;
}
// sync data to server for rest data left from batch
if (remainder > 0 && !isProgress) {
// get data for extra data from batch
syncDataBatch = getBatchOfAskedSizeDataToSync(syncData, startIndex, remainder);
// for extra data from batch sync data to central server
serverAcknowledgement = syncDataToServer(vanID, obj.getSchemaName(), obj.getTableName(),
obj.getVanAutoIncColumnName(), obj.getServerColumnName(), syncDataBatch, user,
Authorization);
// isProgress = setResponseStatus(groupIdStatus, groupId, serverAcknowledgement, responseStatus, isProgress);

startIndex += BATCH_SIZE;
}

if (!hasSyncFailed && remainder > 0) {
syncDataBatch = getBatchOfAskedSizeDataToSync(syncData, startIndex,
remainder);
serverAcknowledgement = syncDataToServer(vanID, obj.getSchemaName(), obj.getTableName(),
obj.getVanAutoIncColumnName(), obj.getServerColumnName(), syncDataBatch, user,
Authorization);
logger.debug("Server acknowledgement for remaining data: {}", serverAcknowledgement);

if (serverAcknowledgement == null || !serverAcknowledgement.contains("success")) {
logger.error("Sync failed for remaining data in schema: {}, table: {}", obj.getSchemaName(),
obj.getTableName());
hasSyncFailed = true;
setResponseStatus(groupIdStatus, groupId, "failed", responseStatus);
break;
}
}

} else {
// nothing to sync
serverAcknowledgement = "Data successfully synced";
if (!hasSyncFailed) {
logger.info("Data sync completed for schema: {}, table: {}", obj.getSchemaName(),
obj.getTableName());
setResponseStatus(groupIdStatus, groupId, "completed", responseStatus);
}
} else {
logger.info("No data to sync for schema {} and table {}", obj.getSchemaName(), obj.getTableName());
setResponseStatus(groupIdStatus, groupId, "completed", responseStatus);
}

if (hasSyncFailed) {
// Mark all subsequent groups as "pending"
for (DataSyncGroups remainingGroup : dataSyncGroupList
.subList(dataSyncGroupList.indexOf(dataSyncGroups) + 1, dataSyncGroupList.size())) {
Map<String, String> pendingGroupIdStatus = new HashMap<>();
pendingGroupIdStatus.put("groupId", String.valueOf(remainingGroup.getSyncTableGroupID()));
pendingGroupIdStatus.put("status", "pending");
responseStatus.add(pendingGroupIdStatus);
}
// } else {
// groupIdStatus.put("groupId", String.valueOf(groupId));
// groupIdStatus.put("status", "pending");
// responseStatus.add(groupIdStatus);
// }
} if(isProgress) {
isProgress = setResponseStatus(groupIdStatus, groupId, serverAcknowledgement, responseStatus, isProgress);
break;
}
}
}
if (isProgress) {
return responseStatus.toString();
} else {

return serverAcknowledgement;
}
}

private boolean setResponseStatus(Map<String, String> groupIdStatus, int groupId, String serverAcknowledgement,
List<Map<String, String>> responseStatus, boolean isProgress) {
if (serverAcknowledgement != null) {
groupIdStatus.put("groupId", String.valueOf(groupId));
groupIdStatus.put("status", serverAcknowledgement);
responseStatus.add(groupIdStatus);
} else if (isProgress) {
groupIdStatus.put("groupId", String.valueOf(groupId));
groupIdStatus.put("status", "pending");
responseStatus.add(groupIdStatus);
if (hasSyncFailed) {
Map<String, Object> response = new HashMap<>();
response.put("response", "Data sync failed");
response.put("groupsProgress", responseStatus);
logger.debug("Final response: {}",
objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(response));
return objectMapper.writerWithDefaultPrettyPrinter()
.writeValueAsString(Collections.singletonMap("data", response));
} else {
isProgress = true;
groupIdStatus.put("groupId", String.valueOf(groupId));
groupIdStatus.put("status", "failed");
responseStatus.add(groupIdStatus);
if ("No data to sync".equals(serverAcknowledgement)) {
logger.info("No data to sync across all groups.");
return serverAcknowledgement;
} else {
logger.info("Data successfully synced across all groups.");
return "Data successfully synced";
}
}
return isProgress;
}

private void setResponseStatus(Map<String, String> groupIdStatus, int groupId, String serverAcknowledgement,
List<Map<String, String>> responseStatus) {
groupIdStatus.put("groupId", String.valueOf(groupId));
groupIdStatus.put("status", serverAcknowledgement);
responseStatus.add(groupIdStatus);
logger.info("Response from data sync: {}", responseStatus);
}

// private boolean setResponseStatus(Map<String, String> groupIdStatus, int groupId, String serverAcknowledgement,
// List<Map<String, String>> responseStatus, boolean isProgress) {
// if (serverAcknowledgement != null) {
// groupIdStatus.put("groupId", String.valueOf(groupId));
// groupIdStatus.put("status", serverAcknowledgement);
// responseStatus.add(groupIdStatus);
// logger.info("Response from data sync", responseStatus);
// } else if (isProgress) {
// groupIdStatus.put("groupId", String.valueOf(groupId));
// groupIdStatus.put("status", "pending");
// responseStatus.add(groupIdStatus);
// logger.info("Response from data sync", responseStatus);
// } else {
// isProgress = true;
// groupIdStatus.put("groupId", String.valueOf(groupId));
// groupIdStatus.put("status", "failed");
// responseStatus.add(groupIdStatus);
// logger.info("Response from data sync", responseStatus);
// }
// return isProgress;
//
// }

/**
*
* @param syncTableDetailsIDs
Expand All @@ -214,13 +272,15 @@ private boolean setResponseStatus(Map<String, String> groupIdStatus, int groupId

private List<SyncUtilityClass> getVanAndServerColumns(Integer groupID) throws Exception {
List<SyncUtilityClass> syncUtilityClassList = getVanAndServerColumnList(groupID);
logger.debug("Fetched SyncUtilityClass list for groupID {}: {}", groupID, syncUtilityClassList);

return syncUtilityClassList;
}

public List<SyncUtilityClass> getVanAndServerColumnList(Integer groupID) throws Exception {
List<SyncUtilityClass> syncUtilityClassList = syncutilityClassRepo
.findBySyncTableGroupIDAndDeletedOrderBySyncTableDetailID(groupID, false);
logger.debug("Fetched SyncUtilityClass list from repository for groupID {}: {}", groupID, syncUtilityClassList);
return syncUtilityClassList;
}

Expand All @@ -236,6 +296,16 @@ private List<Map<String, Object>> getDataToSync(String schemaName, String tableN
throws Exception {
List<Map<String, Object>> resultSetList = dataSyncRepository.getDataForGivenSchemaAndTable(schemaName,
tableName, columnNames);
if (resultSetList != null) {
logger.debug("Fetched {} records for schema '{}', table '{}'", resultSetList.size(), schemaName, tableName);
// Optionally log a sample of the resultSetList for verification (be careful
// with large datasets)
if (!resultSetList.isEmpty()) {
logger.debug("Sample record: {}", resultSetList.get(0));
}
} else {
logger.debug("No records found for schema '{}', table '{}'", schemaName, tableName);
}
return resultSetList;
}

Expand Down Expand Up @@ -267,10 +337,14 @@ private List<Map<String, Object>> getBatchOfAskedSizeDataToSync(List<Map<String,
public String syncDataToServer(int vanID, String schemaName, String tableName, String vanAutoIncColumnName,
String serverColumns, List<Map<String, Object>> dataToBesync, String user, String Authorization)
throws Exception {
logger.debug(
"Entering syncDataToServer with vanID: {}, schemaName: '{}', tableName: '{}', vanAutoIncColumnName: '{}', serverColumns: '{}', user: '{}'",
vanID, schemaName, tableName, vanAutoIncColumnName, serverColumns, user);

RestTemplate restTemplate = new RestTemplate();

Integer facilityID = masterVanRepo.getFacilityID(vanID);
logger.debug("Fetched facilityID for vanID {}: {}", vanID, facilityID);

// serialize null
GsonBuilder gsonBuilder = new GsonBuilder();
Expand All @@ -288,6 +362,7 @@ public String syncDataToServer(int vanID, String schemaName, String tableName, S
dataMap.put("facilityID", facilityID);

String requestOBJ = gson.toJson(dataMap);
logger.debug("Serialized request object: {}", requestOBJ);

MultiValueMap<String, String> headers = new LinkedMultiValueMap<String, String>();
headers.add("Content-Type", "application/json");
Expand All @@ -296,6 +371,8 @@ public String syncDataToServer(int vanID, String schemaName, String tableName, S
logger.info("Before Data sync upload Url" + dataSyncUploadUrl);
ResponseEntity<String> response = restTemplate.exchange(dataSyncUploadUrl, HttpMethod.POST, request,
String.class);
logger.info("Received response from data sync URL: {}", response);
logger.info("Received response from data sync URL: {}", dataSyncUploadUrl);

logger.info("After Data sync upload Url" + dataSyncUploadUrl);
/**
Expand All @@ -307,11 +384,15 @@ public String syncDataToServer(int vanID, String schemaName, String tableName, S
JSONObject obj = new JSONObject(response.getBody());
if (obj != null && obj.has("statusCode") && obj.getInt("statusCode") == 200) {
StringBuilder vanSerialNos = getVanSerialNoListForSyncedData(vanAutoIncColumnName, dataToBesync);
logger.info(
"Updating processed flag for schemaName: {}, tableName: {}, vanSerialNos: {}, vanAutoIncColumnName: {}, user: {}",
schemaName, tableName, vanSerialNos.toString(), vanAutoIncColumnName, user);
// update table for processed flag = "P"
logger.info(schemaName + "|" + tableName + "|" + vanSerialNos.toString() + "|" + vanAutoIncColumnName
+ "|" + user);
i = dataSyncRepository.updateProcessedFlagInVan(schemaName, tableName, vanSerialNos,
vanAutoIncColumnName, user);
logger.debug("Updated processed flag in database. Records affected: {}", i);
}
}
if (i > 0)
Expand Down

0 comments on commit 9247fdc

Please sign in to comment.