Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add get dataset storage path by dataset id #30

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import static com.webank.wedpr.components.dataset.service.ChunkUploadImpl.UPLOAD_CHUNK_FILE_NAME_PREFIX;

import com.webank.wedpr.components.dataset.common.DatasetConstant;
import com.webank.wedpr.core.utils.Common;
import java.io.File;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -68,4 +70,21 @@ public String getDatasetChunksDir(String datasetId) {
String datasetChunksBaseDir = getDatasetChunksBaseDir();
return String.format("%s/%s", datasetChunksBaseDir, datasetId);
}

public String getDatasetStoragePath(String user, String datasetId, boolean dynamic) {
if (dynamic) {
// ${user}/dy/${currentTimeMillis}/${datasetId}
long currentTimeMillis = System.currentTimeMillis();
return user
+ File.separator
+ "dy"
+ File.separator
+ currentTimeMillis
+ File.separator
+ datasetId;
} else {
// ${user}/${datasetId}
return Common.joinPath(user, datasetId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.webank.wedpr.components.dataset.utils.TimeUtils;
import com.webank.wedpr.components.dataset.utils.UserTokenUtils;
import com.webank.wedpr.components.storage.api.FileStorageInterface;
import com.webank.wedpr.components.storage.api.StoragePath;
import com.webank.wedpr.core.utils.Common;
import com.webank.wedpr.core.utils.Constant;
import com.webank.wedpr.core.utils.WeDPRResponse;
Expand Down Expand Up @@ -456,4 +457,40 @@ public WeDPRResponse updateDatasetList(

return weDPRResponse;
}

@GetMapping(value = "getDatasetStoragePath")
public WeDPRResponse getDatasetStoragePath(
HttpServletRequest httpServletRequest,
@RequestParam(value = "datasetID", required = true) String datasetID) {

long startTimeMillis = System.currentTimeMillis();
logger.info("get dataset storage path begin, datasetID: {}", datasetID);

WeDPRResponse weDPRResponse =
new WeDPRResponse(Constant.WEDPR_SUCCESS, Constant.WEDPR_SUCCESS_MSG);

try {
StoragePath storagePath = datasetService.getDatasetStoragePath(datasetID);
weDPRResponse.setData(storagePath);

long endTimeMillis = System.currentTimeMillis();
logger.info(
"get dataset storage path success, datasetID: {}, cost(ms): {}",
datasetID,
(endTimeMillis - startTimeMillis));
} catch (Exception e) {

weDPRResponse.setCode(Constant.WEDPR_FAILED);
weDPRResponse.setMsg(e.getMessage());

long endTimeMillis = System.currentTimeMillis();
logger.error(
"get dataset storage path failed, datasetID: {}, cost(ms): {}, e: ",
datasetID,
(endTimeMillis - startTimeMillis),
e);
}

return weDPRResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public class DBDataSource implements DataSourceMeta {
private String sql;
// Data is loaded once when a data source is created, or on each access
Boolean dynamicDataSource = false;
// verify sql syntax and test connectivity
boolean verifySqlSyntaxAndTestCon = true;

@Override
public boolean dynamicDataSource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.webank.wedpr.components.dataset.utils.JsonUtils;
import com.webank.wedpr.components.storage.api.FileStorageInterface;
import com.webank.wedpr.components.storage.api.StoragePath;
import com.webank.wedpr.core.config.WeDPRCommonConfig;
import com.webank.wedpr.core.utils.Common;
import com.webank.wedpr.core.utils.ObjectMapperFactory;
import java.util.Arrays;
Expand Down Expand Up @@ -164,12 +163,13 @@ public void uploadData() throws DatasetException {

String csvFilePath = dataSourceProcessorContext.getCvsFilePath();
UserInfo userInfo = dataSourceProcessorContext.getUserInfo();
DatasetConfig datasetConfig = dataSourceProcessorContext.getDatasetConfig();

FileStorageInterface fileStorage = dataSourceProcessorContext.getFileStorage();

try {
String userDatasetPath =
WeDPRCommonConfig.getUserDatasetPath(userInfo.getUser(), datasetId);
datasetConfig.getDatasetStoragePath(userInfo.getUser(), datasetId, false);

StoragePath storagePath = fileStorage.upload(true, csvFilePath, userDatasetPath, false);

Expand All @@ -179,6 +179,7 @@ public void uploadData() throws DatasetException {
.getDataset()
.setDatasetStorageType(fileStorage.type().toString());
this.dataSourceProcessorContext.getDataset().setDatasetStoragePath(storagePathStr);
this.dataSourceProcessorContext.setStoragePath(storagePath);

long endTimeMillis = System.currentTimeMillis();
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.webank.wedpr.components.dataset.utils.JsonUtils;
import com.webank.wedpr.components.storage.api.FileStorageInterface;
import com.webank.wedpr.components.storage.api.StoragePath;
import com.webank.wedpr.core.config.WeDPRCommonConfig;
import com.webank.wedpr.core.utils.Common;
import com.webank.wedpr.core.utils.ObjectMapperFactory;
import java.io.File;
Expand Down Expand Up @@ -57,8 +56,12 @@ public DataSourceMeta parseDataSourceMeta(String strDataSourceMeta) throws Datas

// check if single select
SQLUtils.isSingleSelectStatement(sql);
// validate parameters, test db connectivity, validate SQL syntax
SQLUtils.validateDataSourceParameters(dbType, dbDataSource);

boolean verifySqlSyntaxAndTestCon = dbDataSource.isVerifySqlSyntaxAndTestCon();
if (verifySqlSyntaxAndTestCon) {
// validate parameters, test db connectivity, validate SQL syntax
SQLUtils.validateDataSourceParameters(dbType, dbDataSource);
}

long endTimeMillis = System.currentTimeMillis();
logger.info(
Expand Down Expand Up @@ -91,7 +94,7 @@ public void prepareData() throws DatasetException {

long endTimeMillis = System.currentTimeMillis();
logger.info(
" ==> data source processor stage prepare data end merge chunk data, datasetId: {}, cvsFilePath: {}, cost(ms): {}",
" ==> data source processor stage prepare data end, datasetId: {}, cvsFilePath: {}, cost(ms): {}",
datasetId,
cvsFilePath,
endTimeMillis - startTimeMillis);
Expand Down Expand Up @@ -149,12 +152,15 @@ public void uploadData() throws DatasetException {

String csvFilePath = dataSourceProcessorContext.getCvsFilePath();
UserInfo userInfo = dataSourceProcessorContext.getUserInfo();
DataSourceMeta dataSourceMeta = dataSourceProcessorContext.getDataSourceMeta();
DatasetConfig datasetConfig = dataSourceProcessorContext.getDatasetConfig();

FileStorageInterface fileStorage = dataSourceProcessorContext.getFileStorage();

try {
String userDatasetPath =
WeDPRCommonConfig.getUserDatasetPath(userInfo.getUser(), datasetId);
datasetConfig.getDatasetStoragePath(
userInfo.getUser(), datasetId, dataSourceMeta.dynamicDataSource());

StoragePath storagePath = fileStorage.upload(true, csvFilePath, userDatasetPath, false);

Expand All @@ -164,6 +170,7 @@ public void uploadData() throws DatasetException {
.getDataset()
.setDatasetStorageType(fileStorage.type().toString());
this.dataSourceProcessorContext.getDataset().setDatasetStoragePath(storagePathStr);
this.dataSourceProcessorContext.setStoragePath(storagePath);

long endTimeMillis = System.currentTimeMillis();
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ public interface DataSourceProcessor {
default void setContext(DataSourceProcessorContext context) {}

// prepare data
// ie:
// merge chunk data
// convert excel to csv
// ie: merge chunk data 、convert excel to csv
void prepareData() throws DatasetException;

// analyze data
Expand All @@ -32,9 +30,9 @@ default void setContext(DataSourceProcessorContext context) {}
// process
default void processData(DataSourceProcessorContext context) throws DatasetException {
try {
// init context
setContext(context);
// preprocess data
// ie: convert data to .cvs format, and other operations
prepareData();
// data analysis, reading data fields and data volume
analyzeData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.webank.wedpr.components.dataset.mapper.wapper.DatasetTransactionalWrapper;
import com.webank.wedpr.components.dataset.service.ChunkUploadApi;
import com.webank.wedpr.components.storage.api.FileStorageInterface;
import com.webank.wedpr.components.storage.api.StoragePath;
import lombok.Builder;
import lombok.Data;

Expand All @@ -25,4 +26,5 @@ public class DataSourceProcessorContext {
// intermediate state
private String cvsFilePath;
private String mergedFilePath;
private StoragePath storagePath;
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package com.webank.wedpr.components.dataset.datasource.processor;

import com.webank.wedpr.components.dataset.config.DatasetConfig;
import com.webank.wedpr.components.dataset.dao.Dataset;
import com.webank.wedpr.components.dataset.datasource.DataSourceMeta;
import com.webank.wedpr.components.dataset.datasource.category.HdfsDataSource;
import com.webank.wedpr.components.dataset.exception.DatasetException;
import com.webank.wedpr.components.dataset.utils.CsvUtils;
import com.webank.wedpr.components.dataset.utils.FileUtils;
import com.webank.wedpr.components.dataset.utils.JsonUtils;
import com.webank.wedpr.components.storage.api.FileStorageInterface;
import com.webank.wedpr.components.storage.impl.hdfs.HDFSStoragePath;
import com.webank.wedpr.core.protocol.StorageType;
import com.webank.wedpr.core.utils.Common;
import java.io.File;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,6 +41,15 @@ public DataSourceMeta parseDataSourceMeta(String strDataSourceMeta) throws Datas
Common.requireNonEmpty("filePath", filePath);

FileStorageInterface fileStorage = dataSourceProcessorContext.getFileStorage();

StorageType storageType = fileStorage.type();
if (!storageType.getName().equalsIgnoreCase(StorageType.HDFS.getName())) {
// NOT HDFS Storage
logger.error("Not supported for HDFS data source, type: {}", storageType);
throw new DatasetException(
"Not supported for HDFS data source, type: " + storageType.getName());
}

checkFileExists(fileStorage, filePath);

long endTimeMillis = System.currentTimeMillis();
Expand All @@ -56,16 +73,108 @@ public void checkFileExists(FileStorageInterface storageInterface, String filePa
}

@Override
public void prepareData() throws DatasetException {}
public void prepareData() throws DatasetException {
long startTimeMillis = System.currentTimeMillis();

DatasetConfig datasetConfig = dataSourceProcessorContext.getDatasetConfig();
FileStorageInterface fileStorage = dataSourceProcessorContext.getFileStorage();
HdfsDataSource hdfsDataSource =
(HdfsDataSource) dataSourceProcessorContext.getDataSourceMeta();

Dataset dataset = dataSourceProcessorContext.getDataset();
String datasetId = dataset.getDatasetId();

String datasetBaseDir = datasetConfig.getDatasetBaseDir();
String cvsFilePath = datasetBaseDir + File.separator + datasetId;

String filePath = hdfsDataSource.getFilePath();
HDFSStoragePath hdfsStoragePath = new HDFSStoragePath(filePath);
fileStorage.download(hdfsStoragePath, cvsFilePath);

dataSourceProcessorContext.setCvsFilePath(cvsFilePath);

long endTimeMillis = System.currentTimeMillis();
logger.info(
" ==> data source processor stage prepare data end, datasetId: {}, cvsFilePath: {}, cost(ms): {}",
datasetId,
cvsFilePath,
endTimeMillis - startTimeMillis);
}

@Override
public void analyzeData() throws DatasetException {}
public void analyzeData() throws DatasetException {
String cvsFilePath = dataSourceProcessorContext.getCvsFilePath();
Dataset dataset = dataSourceProcessorContext.getDataset();

long startTimeMillis = System.currentTimeMillis();

// read csv header field
List<String> fieldList = CsvUtils.readCsvHeader(cvsFilePath);

// [ x, y ,z] => x,y,z
String fieldListString = Arrays.toString(fieldList.toArray());
String fieldString =
fieldListString
.replace("'", "")
.replace("\\r", "")
.replace("[", "")
.replace("]", "")
.trim();

int columnNum = fieldList.size();
int rowNum = FileUtils.getFileLinesNumber(cvsFilePath);
String md5Hash = FileUtils.calculateFileHash(cvsFilePath, "MD5");
long fileSize = FileUtils.getFileSize(cvsFilePath);

this.dataSourceProcessorContext.getDataset().setDatasetFields(fieldString);
this.dataSourceProcessorContext.getDataset().setDatasetColumnCount(columnNum);
this.dataSourceProcessorContext.getDataset().setDatasetRecordCount(rowNum);
this.dataSourceProcessorContext.getDataset().setDatasetVersionHash(md5Hash);
this.dataSourceProcessorContext.getDataset().setDatasetSize(fileSize);

String datasetId = dataset.getDatasetId();

long endTimeMillis = System.currentTimeMillis();
logger.info(
" => data source processor stage analyze data end, datasetId: {}, fieldString: {}, columnNum: {}, rowNum: {}, cost(ms): {}",
datasetId,
fieldString,
columnNum,
rowNum,
endTimeMillis - startTimeMillis);
}

@Override
public void uploadData() throws DatasetException {
// do nothing
}

@Override
public void cleanupData() throws DatasetException {}
public void cleanupData() throws DatasetException {
long startTimeMillis = System.currentTimeMillis();
DatasetConfig datasetConfig = dataSourceProcessorContext.getDatasetConfig();
Dataset dataset = dataSourceProcessorContext.getDataset();

String datasetId = dataset.getDatasetId();
String datasetBaseDir = datasetConfig.getDatasetBaseDir();
String cvsFilePath = datasetBaseDir + File.separator + datasetId;
try {
FileUtils.deleteDirectory(new File(cvsFilePath));
logger.info(
"remove temp csv success, datasetId: {}, cvsFilePath: {}",
datasetId,
cvsFilePath);
} catch (Exception e) {
logger.warn(
"remove temp csv failed, datasetId: {}, cvsFilePath: {}",
datasetId,
cvsFilePath);
}

long endTimeMillis = System.currentTimeMillis();
logger.info(
" => data source processor stage cleanup data end, datasetId: {}, cost(ms): {}",
datasetId,
endTimeMillis - startTimeMillis);
}
}
Loading
Loading