Skip to content

Commit

Permalink
optimize pir service publish perf (#122)
Browse files Browse the repository at this point in the history
* optimize pir service publish perf

* optimize pir

* use jdbcTemplate to execute sql
  • Loading branch information
cyjseagull authored Nov 6, 2024
1 parent a9e8693 commit 8e32dad
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 228 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

package com.webank.wedpr.common.utils;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.opencsv.CSVReaderHeaderAware;
import com.webank.wedpr.common.config.WeDPRCommonConfig;
import java.io.*;
import java.nio.file.Paths;
import java.util.*;
import lombok.Data;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,19 +47,8 @@ private static Object loadCSVFile(String filePath, int chunkSize, ParseHandler h
}
}

public static Set<String> getFields(String filePath) throws Exception {
return (Set<String>)
(loadCSVFile(
filePath,
WeDPRCommonConfig.getReadChunkSize(),
new ParseHandler() {
@Override
public Object call(CSVReaderHeaderAware reader) throws Exception {
return reader.readMap().keySet();
}
}));
}

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public static class ExtractConfig {
private String originalFilePath;
private List<String> extractFields;
Expand All @@ -66,63 +57,13 @@ public static class ExtractConfig {
private Integer writeChunkSize = WeDPRCommonConfig.getWriteChunkSize();
private Integer readChunkSize = WeDPRCommonConfig.getReadChunkSize();

public ExtractConfig() {}

public ExtractConfig(
String originalFilePath, List<String> extractFields, String extractFilePath) {
this.originalFilePath = originalFilePath;
this.extractFields = extractFields;
this.extractFilePath = extractFilePath;
}

public String getOriginalFilePath() {
return originalFilePath;
}

public void setOriginalFilePath(String originalFilePath) {
this.originalFilePath = originalFilePath;
}

public List<String> getExtractFields() {
return extractFields;
}

public void setExtractFields(List<String> extractFields) {
this.extractFields = extractFields;
}

public String getExtractFilePath() {
return extractFilePath;
}

public void setExtractFilePath(String extractFilePath) {
this.extractFilePath = extractFilePath;
}

public String getFieldSplitter() {
return fieldSplitter;
}

public void setFieldSplitter(String fieldSplitter) {
this.fieldSplitter = fieldSplitter;
}

public Integer getWriteChunkSize() {
return writeChunkSize;
}

public void setWriteChunkSize(Integer writeChunkSize) {
this.writeChunkSize = writeChunkSize;
}

public Integer getReadChunkSize() {
return readChunkSize;
}

public void setReadChunkSize(Integer readChunkSize) {
this.readChunkSize = readChunkSize;
}

@Override
public String toString() {
return "ExtractConfig{"
Expand Down Expand Up @@ -157,7 +98,7 @@ public Object call(CSVReaderHeaderAware reader) throws Exception {
Map<String, String> fieldsMapping =
Common.trimAndMapping(headerInfo.keySet());
for (String field : extractConfig.getExtractFields()) {
if (!fieldsMapping.keySet().contains(field.trim())) {
if (!fieldsMapping.containsKey(field.trim())) {
String errorMsg =
"extractFields failed for the field "
+ field
Expand Down Expand Up @@ -204,36 +145,38 @@ public Object call(CSVReaderHeaderAware reader) throws Exception {
});
}

public static List<List<String>> processCsv2SqlMap(String[] tableFields, String csvFilePath)
public interface RowContentHandler {
void handle(List<String> rowContent) throws Exception;
}

public static void processCsvContent(
String[] tableFields, String csvFilePath, RowContentHandler rowContentHandler)
throws Exception {
return (List<List<String>>)
loadCSVFile(
csvFilePath,
WeDPRCommonConfig.getReadChunkSize(),
reader -> {
List<List<String>> resultValue = new ArrayList<>();
Map<String, String> row;
while ((row = reader.readMap()) != null) {
List<String> rowValue = new ArrayList<>();
for (String field : tableFields) {
Map<String, String> rowFieldsMapping =
Common.trimAndMapping(row.keySet());
if (!rowFieldsMapping.keySet().contains(field.trim())) {
String errorMsg =
"extractFields failed for the field "
+ field
+ " not existed in the file "
+ ArrayUtils.toString(
rowFieldsMapping.keySet());
logger.warn(errorMsg);
throw new WeDPRException(-1, errorMsg);
}
rowValue.add(row.get(rowFieldsMapping.get(field)));
}
resultValue.add(rowValue);
loadCSVFile(
csvFilePath,
WeDPRCommonConfig.getReadChunkSize(),
reader -> {
Map<String, String> row;
while ((row = reader.readMap()) != null) {
List<String> rowValue = new ArrayList<>();
for (String field : tableFields) {
Map<String, String> rowFieldsMapping =
Common.trimAndMapping(row.keySet());
if (!rowFieldsMapping.containsKey(field.trim())) {
String errorMsg =
"extractFields failed for the field "
+ field
+ " not existed in the file "
+ ArrayUtils.toString(rowFieldsMapping.keySet());
logger.warn(errorMsg);
throw new WeDPRException(-1, errorMsg);
}
return resultValue;
});
rowValue.add(row.get(rowFieldsMapping.get(field)));
}
rowContentHandler.handle(rowValue);
}
return Boolean.TRUE;
});
}

public static boolean writeMapData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.webank.wedpr.components.db.mapper.service.publish.model;

import com.webank.wedpr.common.utils.Common;
import com.webank.wedpr.common.utils.Constant;
import com.webank.wedpr.common.utils.ObjectMapperFactory;
import com.webank.wedpr.common.utils.WeDPRException;
import java.util.Collections;
Expand All @@ -42,14 +41,10 @@ public List<String> obtainQueriedFields(PirSearchType searchType, List<String> q
if (searchType == PirSearchType.SearchValue) {
// remove duplicated fields
Set<String> queriedFieldSet = new HashSet<>(queriedFields);
if (queriedFieldSet.contains(idField)) {
queriedFieldSet.remove(idField);
queriedFieldSet.add(Constant.PIR_ID_FIELD_NAME);
}
return (List<String>)
CollectionUtils.intersection(queriedFieldSet, accessibleValueQueryFields);
}
return Collections.singletonList(Constant.PIR_ID_FIELD_NAME);
return Collections.singletonList(idField);
}

public void setSearchType(String searchType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
Expand Down Expand Up @@ -117,6 +118,12 @@ public PageInterceptor pageInterceptor() {
return pageInterceptor;
}

@Bean
@Primary
public JdbcTemplate primaryJdbcTemplate() {
return new JdbcTemplate(dataSource);
}

@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
Expand Down
Loading

0 comments on commit 8e32dad

Please sign in to comment.