diff --git a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/config/DatasetConfig.java b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/config/DatasetConfig.java index 0f962ab0..1007e0ab 100644 --- a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/config/DatasetConfig.java +++ b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/config/DatasetConfig.java @@ -18,6 +18,8 @@ import static com.webank.wedpr.components.dataset.service.ChunkUploadImpl.UPLOAD_CHUNK_FILE_NAME_PREFIX; import com.webank.wedpr.components.dataset.common.DatasetConstant; +import com.webank.wedpr.core.utils.Common; +import java.io.File; import lombok.Data; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; @@ -68,4 +70,21 @@ public String getDatasetChunksDir(String datasetId) { String datasetChunksBaseDir = getDatasetChunksBaseDir(); return String.format("%s/%s", datasetChunksBaseDir, datasetId); } + + public String getDatasetStoragePath(String user, String datasetId, boolean dynamic) { + if (dynamic) { + // ${user}/dy/${currentTimeMillis}/${datasetId} + long currentTimeMillis = System.currentTimeMillis(); + return user + + File.separator + + "dy" + + File.separator + + currentTimeMillis + + File.separator + + datasetId; + } else { + // ${user}/${datasetId} + return Common.joinPath(user, datasetId); + } + } } diff --git a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/controller/DatasetController.java b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/controller/DatasetController.java index 54c3f587..cf43f401 100644 --- a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/controller/DatasetController.java +++ b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/controller/DatasetController.java @@ -25,6 +25,7 @@ import com.webank.wedpr.components.dataset.utils.TimeUtils; import com.webank.wedpr.components.dataset.utils.UserTokenUtils; import com.webank.wedpr.components.storage.api.FileStorageInterface; +import com.webank.wedpr.components.storage.api.StoragePath; import com.webank.wedpr.core.utils.Common; import com.webank.wedpr.core.utils.Constant; import com.webank.wedpr.core.utils.WeDPRResponse; @@ -456,4 +457,40 @@ public WeDPRResponse updateDatasetList( return weDPRResponse; } + + @GetMapping(value = "getDatasetStoragePath") + public WeDPRResponse getDatasetStoragePath( + HttpServletRequest httpServletRequest, + @RequestParam(value = "datasetID", required = true) String datasetID) { + + long startTimeMillis = System.currentTimeMillis(); + logger.info("get dataset storage path begin, datasetID: {}", datasetID); + + WeDPRResponse weDPRResponse = + new WeDPRResponse(Constant.WEDPR_SUCCESS, Constant.WEDPR_SUCCESS_MSG); + + try { + StoragePath storagePath = datasetService.getDatasetStoragePath(datasetID); + weDPRResponse.setData(storagePath); + + long endTimeMillis = System.currentTimeMillis(); + logger.info( + "get dataset storage path success, datasetID: {}, cost(ms): {}", + datasetID, + (endTimeMillis - startTimeMillis)); + } catch (Exception e) { + + weDPRResponse.setCode(Constant.WEDPR_FAILED); + weDPRResponse.setMsg(e.getMessage()); + + long endTimeMillis = System.currentTimeMillis(); + logger.error( + "get dataset storage path failed, datasetID: {}, cost(ms): {}, e: ", + datasetID, + (endTimeMillis - startTimeMillis), + e); + } + + return weDPRResponse; + } } diff --git a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/category/DBDataSource.java b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/category/DBDataSource.java index 77cce61c..e7ab6d19 100644 --- a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/category/DBDataSource.java +++ b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/category/DBDataSource.java @@ -14,6 +14,8 @@ public class DBDataSource implements DataSourceMeta { private String sql; // Data is loaded once when a data source is created, or on each access Boolean dynamicDataSource = false; + // verify sql syntax and test connectivity + boolean verifySqlSyntaxAndTestCon = true; @Override public boolean dynamicDataSource() { diff --git a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/CsvDataSourceProcessor.java b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/CsvDataSourceProcessor.java index 86fca1a2..6037d375 100644 --- a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/CsvDataSourceProcessor.java +++ b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/CsvDataSourceProcessor.java @@ -14,7 +14,6 @@ import com.webank.wedpr.components.dataset.utils.JsonUtils; import com.webank.wedpr.components.storage.api.FileStorageInterface; import com.webank.wedpr.components.storage.api.StoragePath; -import com.webank.wedpr.core.config.WeDPRCommonConfig; import com.webank.wedpr.core.utils.Common; import com.webank.wedpr.core.utils.ObjectMapperFactory; import java.util.Arrays; @@ -164,12 +163,13 @@ public void uploadData() throws DatasetException { String csvFilePath = dataSourceProcessorContext.getCvsFilePath(); UserInfo userInfo = dataSourceProcessorContext.getUserInfo(); + DatasetConfig datasetConfig = dataSourceProcessorContext.getDatasetConfig(); FileStorageInterface fileStorage = dataSourceProcessorContext.getFileStorage(); try { String userDatasetPath = - WeDPRCommonConfig.getUserDatasetPath(userInfo.getUser(), datasetId); + datasetConfig.getDatasetStoragePath(userInfo.getUser(), datasetId, false); StoragePath storagePath = fileStorage.upload(true, csvFilePath, userDatasetPath, false); @@ -179,6 +179,7 @@ public void uploadData() throws DatasetException { .getDataset() .setDatasetStorageType(fileStorage.type().toString()); this.dataSourceProcessorContext.getDataset().setDatasetStoragePath(storagePathStr); + this.dataSourceProcessorContext.setStoragePath(storagePath); long endTimeMillis = System.currentTimeMillis(); logger.info( diff --git a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/DBDataSourceProcessor.java b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/DBDataSourceProcessor.java index 672a827b..1fb3acfb 100644 --- a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/DBDataSourceProcessor.java +++ b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/DBDataSourceProcessor.java @@ -13,7 +13,6 @@ import com.webank.wedpr.components.dataset.utils.JsonUtils; import com.webank.wedpr.components.storage.api.FileStorageInterface; import com.webank.wedpr.components.storage.api.StoragePath; -import com.webank.wedpr.core.config.WeDPRCommonConfig; import com.webank.wedpr.core.utils.Common; import com.webank.wedpr.core.utils.ObjectMapperFactory; import java.io.File; @@ -57,8 +56,12 @@ public DataSourceMeta parseDataSourceMeta(String strDataSourceMeta) throws Datas // check if single select SQLUtils.isSingleSelectStatement(sql); - // validate parameters, test db connectivity, validate SQL syntax - SQLUtils.validateDataSourceParameters(dbType, dbDataSource); + + boolean verifySqlSyntaxAndTestCon = dbDataSource.isVerifySqlSyntaxAndTestCon(); + if (verifySqlSyntaxAndTestCon) { + // validate parameters, test db connectivity, validate SQL syntax + SQLUtils.validateDataSourceParameters(dbType, dbDataSource); + } long endTimeMillis = System.currentTimeMillis(); logger.info( @@ -91,7 +94,7 @@ public void prepareData() throws DatasetException { long endTimeMillis = System.currentTimeMillis(); logger.info( - " ==> data source processor stage prepare data end merge chunk data, datasetId: {}, cvsFilePath: {}, cost(ms): {}", + " ==> data source processor stage prepare data end, datasetId: {}, cvsFilePath: {}, cost(ms): {}", datasetId, cvsFilePath, endTimeMillis - startTimeMillis); @@ -149,12 +152,15 @@ public void uploadData() throws DatasetException { String csvFilePath = dataSourceProcessorContext.getCvsFilePath(); UserInfo userInfo = dataSourceProcessorContext.getUserInfo(); + DataSourceMeta dataSourceMeta = dataSourceProcessorContext.getDataSourceMeta(); + DatasetConfig datasetConfig = dataSourceProcessorContext.getDatasetConfig(); FileStorageInterface fileStorage = dataSourceProcessorContext.getFileStorage(); try { String userDatasetPath = - WeDPRCommonConfig.getUserDatasetPath(userInfo.getUser(), datasetId); + datasetConfig.getDatasetStoragePath( + userInfo.getUser(), datasetId, dataSourceMeta.dynamicDataSource()); StoragePath storagePath = fileStorage.upload(true, csvFilePath, userDatasetPath, false); @@ -164,6 +170,7 @@ public void uploadData() throws DatasetException { .getDataset() .setDatasetStorageType(fileStorage.type().toString()); this.dataSourceProcessorContext.getDataset().setDatasetStoragePath(storagePathStr); + this.dataSourceProcessorContext.setStoragePath(storagePath); long endTimeMillis = System.currentTimeMillis(); logger.info( diff --git a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/DataSourceProcessor.java b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/DataSourceProcessor.java index 892134c7..8bf1e985 100644 --- a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/DataSourceProcessor.java +++ b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/DataSourceProcessor.java @@ -15,9 +15,7 @@ public interface DataSourceProcessor { default void setContext(DataSourceProcessorContext context) {} // prepare data - // ie: - // merge chunk data - // convert excel to csv + // ie: merge chunk data 、convert excel to csv void prepareData() throws DatasetException; // analyze data @@ -32,9 +30,9 @@ default void setContext(DataSourceProcessorContext context) {} // process default void processData(DataSourceProcessorContext context) throws DatasetException { try { + // init context setContext(context); // preprocess data - // ie: convert data to .cvs format, and other operations prepareData(); // data analysis, reading data fields and data volume analyzeData(); diff --git a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/DataSourceProcessorContext.java b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/DataSourceProcessorContext.java index 6a62529d..c4635725 100644 --- a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/DataSourceProcessorContext.java +++ b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/DataSourceProcessorContext.java @@ -7,6 +7,7 @@ import com.webank.wedpr.components.dataset.mapper.wapper.DatasetTransactionalWrapper; import com.webank.wedpr.components.dataset.service.ChunkUploadApi; import com.webank.wedpr.components.storage.api.FileStorageInterface; +import com.webank.wedpr.components.storage.api.StoragePath; import lombok.Builder; import lombok.Data; @@ -25,4 +26,5 @@ public class DataSourceProcessorContext { // intermediate state private String cvsFilePath; private String mergedFilePath; + private StoragePath storagePath; } diff --git a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/HdfsDataSourceProcessor.java b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/HdfsDataSourceProcessor.java index adc8050f..4e2b09ec 100644 --- a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/HdfsDataSourceProcessor.java +++ b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/HdfsDataSourceProcessor.java @@ -1,12 +1,20 @@ package com.webank.wedpr.components.dataset.datasource.processor; +import com.webank.wedpr.components.dataset.config.DatasetConfig; +import com.webank.wedpr.components.dataset.dao.Dataset; import com.webank.wedpr.components.dataset.datasource.DataSourceMeta; import com.webank.wedpr.components.dataset.datasource.category.HdfsDataSource; import com.webank.wedpr.components.dataset.exception.DatasetException; +import com.webank.wedpr.components.dataset.utils.CsvUtils; +import com.webank.wedpr.components.dataset.utils.FileUtils; import com.webank.wedpr.components.dataset.utils.JsonUtils; import com.webank.wedpr.components.storage.api.FileStorageInterface; import com.webank.wedpr.components.storage.impl.hdfs.HDFSStoragePath; +import com.webank.wedpr.core.protocol.StorageType; import com.webank.wedpr.core.utils.Common; +import java.io.File; +import java.util.Arrays; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +41,15 @@ public DataSourceMeta parseDataSourceMeta(String strDataSourceMeta) throws Datas Common.requireNonEmpty("filePath", filePath); FileStorageInterface fileStorage = dataSourceProcessorContext.getFileStorage(); + + StorageType storageType = fileStorage.type(); + if (!storageType.getName().equalsIgnoreCase(StorageType.HDFS.getName())) { + // NOT HDFS Storage + logger.error("Not supported for HDFS data source, type: {}", storageType); + throw new DatasetException( + "Not supported for HDFS data source, type: " + storageType.getName()); + } + checkFileExists(fileStorage, filePath); long endTimeMillis = System.currentTimeMillis(); @@ -56,10 +73,76 @@ public void checkFileExists(FileStorageInterface storageInterface, String filePa } @Override - public void prepareData() throws DatasetException {} + public void prepareData() throws DatasetException { + long startTimeMillis = System.currentTimeMillis(); + + DatasetConfig datasetConfig = dataSourceProcessorContext.getDatasetConfig(); + FileStorageInterface fileStorage = dataSourceProcessorContext.getFileStorage(); + HdfsDataSource hdfsDataSource = + (HdfsDataSource) dataSourceProcessorContext.getDataSourceMeta(); + + Dataset dataset = dataSourceProcessorContext.getDataset(); + String datasetId = dataset.getDatasetId(); + + String datasetBaseDir = datasetConfig.getDatasetBaseDir(); + String cvsFilePath = datasetBaseDir + File.separator + datasetId; + + String filePath = hdfsDataSource.getFilePath(); + HDFSStoragePath hdfsStoragePath = new HDFSStoragePath(filePath); + fileStorage.download(hdfsStoragePath, cvsFilePath); + + dataSourceProcessorContext.setCvsFilePath(cvsFilePath); + + long endTimeMillis = System.currentTimeMillis(); + logger.info( + " ==> data source processor stage prepare data end, datasetId: {}, cvsFilePath: {}, cost(ms): {}", + datasetId, + cvsFilePath, + endTimeMillis - startTimeMillis); + } @Override - public void analyzeData() throws DatasetException {} + public void analyzeData() throws DatasetException { + String cvsFilePath = dataSourceProcessorContext.getCvsFilePath(); + Dataset dataset = dataSourceProcessorContext.getDataset(); + + long startTimeMillis = System.currentTimeMillis(); + + // read csv header field + List fieldList = CsvUtils.readCsvHeader(cvsFilePath); + + // [ x, y ,z] => x,y,z + String fieldListString = Arrays.toString(fieldList.toArray()); + String fieldString = + fieldListString + .replace("'", "") + .replace("\\r", "") + .replace("[", "") + .replace("]", "") + .trim(); + + int columnNum = fieldList.size(); + int rowNum = FileUtils.getFileLinesNumber(cvsFilePath); + String md5Hash = FileUtils.calculateFileHash(cvsFilePath, "MD5"); + long fileSize = FileUtils.getFileSize(cvsFilePath); + + this.dataSourceProcessorContext.getDataset().setDatasetFields(fieldString); + this.dataSourceProcessorContext.getDataset().setDatasetColumnCount(columnNum); + this.dataSourceProcessorContext.getDataset().setDatasetRecordCount(rowNum); + this.dataSourceProcessorContext.getDataset().setDatasetVersionHash(md5Hash); + this.dataSourceProcessorContext.getDataset().setDatasetSize(fileSize); + + String datasetId = dataset.getDatasetId(); + + long endTimeMillis = System.currentTimeMillis(); + logger.info( + " => data source processor stage analyze data end, datasetId: {}, fieldString: {}, columnNum: {}, rowNum: {}, cost(ms): {}", + datasetId, + fieldString, + columnNum, + rowNum, + endTimeMillis - startTimeMillis); + } @Override public void uploadData() throws DatasetException { @@ -67,5 +150,31 @@ public void uploadData() throws DatasetException { } @Override - public void cleanupData() throws DatasetException {} + public void cleanupData() throws DatasetException { + long startTimeMillis = System.currentTimeMillis(); + DatasetConfig datasetConfig = dataSourceProcessorContext.getDatasetConfig(); + Dataset dataset = dataSourceProcessorContext.getDataset(); + + String datasetId = dataset.getDatasetId(); + String datasetBaseDir = datasetConfig.getDatasetBaseDir(); + String cvsFilePath = datasetBaseDir + File.separator + datasetId; + try { + FileUtils.deleteDirectory(new File(cvsFilePath)); + logger.info( + "remove temp csv success, datasetId: {}, cvsFilePath: {}", + datasetId, + cvsFilePath); + } catch (Exception e) { + logger.warn( + "remove temp csv failed, datasetId: {}, cvsFilePath: {}", + datasetId, + cvsFilePath); + } + + long endTimeMillis = System.currentTimeMillis(); + logger.info( + " => data source processor stage cleanup data end, datasetId: {}, cost(ms): {}", + datasetId, + endTimeMillis - startTimeMillis); + } } diff --git a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/storage/DatasetStoragePathRetriever.java b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/storage/DatasetStoragePathRetriever.java new file mode 100644 index 00000000..bb51e00e --- /dev/null +++ b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/storage/DatasetStoragePathRetriever.java @@ -0,0 +1,153 @@ +package com.webank.wedpr.components.dataset.datasource.storage; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.webank.wedpr.components.dataset.common.DatasetStatus; +import com.webank.wedpr.components.dataset.config.DatasetConfig; +import com.webank.wedpr.components.dataset.dao.Dataset; +import com.webank.wedpr.components.dataset.dao.UserInfo; +import com.webank.wedpr.components.dataset.datasource.DataSourceMeta; +import com.webank.wedpr.components.dataset.datasource.dispatch.DataSourceProcessorDispatcher; +import com.webank.wedpr.components.dataset.datasource.processor.DataSourceProcessor; +import com.webank.wedpr.components.dataset.datasource.processor.DataSourceProcessorContext; +import com.webank.wedpr.components.dataset.exception.DatasetException; +import com.webank.wedpr.components.dataset.mapper.DatasetMapper; +import com.webank.wedpr.components.dataset.mapper.wapper.DatasetTransactionalWrapper; +import com.webank.wedpr.components.storage.api.FileStorageInterface; +import com.webank.wedpr.components.storage.api.StoragePath; +import com.webank.wedpr.components.storage.builder.StoragePathBuilder; +import com.webank.wedpr.core.utils.WeDPRException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +@Component +public class DatasetStoragePathRetriever { + + private static final Logger logger = LoggerFactory.getLogger(DatasetStoragePathRetriever.class); + + @Autowired private DatasetConfig datasetConfig; + @Autowired private DatasetMapper datasetMapper; + @Autowired private DatasetTransactionalWrapper datasetTransactionalWrapper; + + @Qualifier("fileStorage") + @Autowired + private FileStorageInterface fileStorage; + + /** + * get the storage path of the dataset NOTICE: This interface may block for a long time + * + * @param datasetID + * @return + * @throws DatasetException + */ + public StoragePath getDatasetStoragePath(String datasetID) throws DatasetException { + + DatasetMapper datasetMapper = datasetTransactionalWrapper.getDatasetMapper(); + Dataset dataset = datasetMapper.getDatasetByDatasetId(datasetID, false); + if (dataset == null) { + logger.error("dataset not found, dataset id: {}", datasetID); + throw new DatasetException("dataset not found, dataset id: " + datasetID); + } + + return getDatasetStoragePath(dataset); + } + + /** + * get the storage path of the dataset NOTICE: This interface may block for a long time + * + * @param dataset + * @return + * @throws DatasetException + */ + public StoragePath getDatasetStoragePath(Dataset dataset) throws DatasetException { + String datasetID = dataset.getDatasetId(); + int status = dataset.getStatus(); + if (status != DatasetStatus.Success.getCode().intValue()) { + logger.error( + "dataset is not available status, dataset id: {}, status: {}", + datasetID, + status); + throw new DatasetException( + "dataset is not available status, dataset id: " + + datasetID + + " status: " + + status); + } + + String strDataSourceType = dataset.getDataSourceType(); + String strDataSourceMeta = dataset.getDataSourceMeta(); + String strDatasetStorageType = dataset.getDatasetStorageType(); + String strDatasetStoragePath = dataset.getDatasetStoragePath(); + + DataSourceProcessorDispatcher dataSourceProcessorDispatcher = + new DataSourceProcessorDispatcher(); + + DataSourceProcessor dataSourceProcessor = + dataSourceProcessorDispatcher.getDataSourceProcessor(strDataSourceType); + DataSourceMeta dataSourceMeta = dataSourceProcessor.parseDataSourceMeta(strDataSourceMeta); + + if ((dataSourceMeta != null) && dataSourceMeta.dynamicDataSource()) { + // dynamic data source + return processDynamicDatasourceForStoragePath( + dataset, dataSourceMeta, dataSourceProcessor); + } else { + return createStoragePath(strDatasetStorageType, strDatasetStoragePath); + } + } + + public StoragePath createStoragePath(String strStorageType, String strStoragePath) + throws DatasetException { + try { + return StoragePathBuilder.getInstance(strStorageType, strStoragePath); + } catch (WeDPRException e) { + throw new DatasetException(e); + } catch (JsonProcessingException e) { + throw new DatasetException(e); + } + } + + public StoragePath processDynamicDatasourceForStoragePath( + Dataset dataset, DataSourceMeta dataSourceMeta, DataSourceProcessor dataSourceProcessor) + throws DatasetException { + + String ownerUserName = dataset.getOwnerUserName(); + String ownerAgencyName = dataset.getOwnerAgencyName(); + + UserInfo userInfo = UserInfo.builder().user(ownerUserName).agency(ownerAgencyName).build(); + + DataSourceProcessorContext context = + DataSourceProcessorContext.builder() + .dataset(dataset) + .dataSourceMeta(dataSourceMeta) + .datasetConfig(datasetConfig) + .userInfo(userInfo) + .datasetTransactionalWrapper(datasetTransactionalWrapper) + .fileStorage(fileStorage) + .build(); + + try { + dataSourceProcessor.setContext(context); + dataSourceProcessor.processData(context); + StoragePath storagePath = context.getStoragePath(); + + logger.info( + "process dynamic data source success, dataset id: {}, datasource type: {}, datasource meta: {}, storage path: {}", + dataset.getDatasetId(), + dataset.getDataSourceType(), + dataset.getDataSourceMeta(), + storagePath); + + return storagePath; + } catch (Exception e) { + logger.error( + "process dynamic data source exception, dataset id: {}, datasource type: {}, datasource meta: {}, e: ", + dataset.getDatasetId(), + dataset.getDataSourceType(), + dataset.getDataSourceMeta(), + e); + throw e; + } + } +} diff --git a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/service/DatasetServiceApi.java b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/service/DatasetServiceApi.java index 9463d7d6..ff248ff2 100644 --- a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/service/DatasetServiceApi.java +++ b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/service/DatasetServiceApi.java @@ -7,6 +7,7 @@ import com.webank.wedpr.components.dataset.message.CreateDatasetResponse; import com.webank.wedpr.components.dataset.message.ListDatasetResponse; import com.webank.wedpr.components.dataset.message.UpdateDatasetRequest; +import com.webank.wedpr.components.storage.api.StoragePath; import java.util.List; public interface DatasetServiceApi { @@ -88,4 +89,13 @@ ListDatasetResponse listDataset( Integer pageOffset, Integer pageSize) throws DatasetException; + + /** + * get dataset storage path + * + * @param datasetID + * @return + * @throws DatasetException + */ + StoragePath getDatasetStoragePath(String datasetID) throws DatasetException; } diff --git a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/service/DatasetServiceImpl.java b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/service/DatasetServiceImpl.java index 29a36b70..2025d8dc 100644 --- a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/service/DatasetServiceImpl.java +++ b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/service/DatasetServiceImpl.java @@ -15,6 +15,7 @@ import com.webank.wedpr.components.dataset.datasource.dispatch.DataSourceProcessorDispatcher; import com.webank.wedpr.components.dataset.datasource.processor.DataSourceProcessor; import com.webank.wedpr.components.dataset.datasource.processor.DataSourceProcessorContext; +import com.webank.wedpr.components.dataset.datasource.storage.DatasetStoragePathRetriever; import com.webank.wedpr.components.dataset.exception.DatasetException; import com.webank.wedpr.components.dataset.mapper.DatasetMapper; import com.webank.wedpr.components.dataset.mapper.DatasetPermissionMapper; @@ -57,6 +58,7 @@ public class DatasetServiceImpl implements DatasetServiceApi { @Autowired private DatasetTransactionalWrapper datasetTransactionalWrapper; @Autowired private DataSourceProcessorDispatcher dataSourceProcessorDispatcher; @Autowired private DatasetWrapper datasetWrapper; + @Autowired private DatasetStoragePathRetriever datasetStoragePathRetriever; @Qualifier("fileStorage") @Autowired @@ -169,6 +171,8 @@ public CreateDatasetResponse createDataset( "Unsupported data source type, dataSourceType: " + strDataSourceType); } + dataSourceProcessor.setContext( + DataSourceProcessorContext.builder().fileStorage(fileStorage).build()); boolean dynamicDataSource = false; // parse datasource meta @@ -552,4 +556,16 @@ public ListDatasetResponse listDataset( "query visible datasets for user db operation exception, " + e.getMessage()); } } + + /** + * get dataset storage path + * + * @param datasetID + * @return + * @throws DatasetException + */ + @Override + public StoragePath getDatasetStoragePath(String datasetID) throws DatasetException { + return datasetStoragePathRetriever.getDatasetStoragePath(datasetID); + } } diff --git a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/sqlutils/SQLExecutor.java b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/sqlutils/SQLExecutor.java index 68d97baa..4e59477d 100644 --- a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/sqlutils/SQLExecutor.java +++ b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/sqlutils/SQLExecutor.java @@ -250,25 +250,4 @@ public void executeSQL(DBType dbType, DBDataSource dbDataSource, ExecutorCallbac throw new DatasetException("execute sql Exception, e: " + e.getMessage()); } } - - // TODO: - public static void main(String[] args) throws DatasetException { - - DBDataSource dbDataSource = new DBDataSource(); - dbDataSource.setSql("select * from t_ucl_c_user_missed_record"); - dbDataSource.setDatabase("ppcs_integ"); - dbDataSource.setDbIp("127.0.0.1"); - dbDataSource.setDbPort(3306); - dbDataSource.setUserName("root"); - dbDataSource.setPassword("123456"); - - SQLExecutor sqlExecutor = new SQLExecutor(); - sqlExecutor.executeSQL( - DBType.MYSQL, - dbDataSource, - (fields, rowValues) -> { - System.out.println(fields); - System.out.println(rowValues); - }); - } } diff --git a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/sqlutils/SQLUtils.java b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/sqlutils/SQLUtils.java index b8c22298..7fd7857c 100644 --- a/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/sqlutils/SQLUtils.java +++ b/wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/sqlutils/SQLUtils.java @@ -33,6 +33,7 @@ public static void isSingleSelectStatement(String sql) throws DatasetException { } // regular expression for matching a single SELECT statement. + // TODO: make it configurable Pattern pattern = Pattern.compile( "^(SELECT.*?)(? { + System.out.println(fields); + System.out.println(rowValues); + }); + } +} diff --git a/wedpr-components/storage/src/main/java/com/webank/wedpr/components/storage/impl/hdfs/HDFSStorage.java b/wedpr-components/storage/src/main/java/com/webank/wedpr/components/storage/impl/hdfs/HDFSStorage.java index d39faccc..f3117098 100644 --- a/wedpr-components/storage/src/main/java/com/webank/wedpr/components/storage/impl/hdfs/HDFSStorage.java +++ b/wedpr-components/storage/src/main/java/com/webank/wedpr/components/storage/impl/hdfs/HDFSStorage.java @@ -55,6 +55,9 @@ public FsHandlerArgs(HdfsStorageConfig hdfsConfig) { Configuration hadoopConf = new Configuration(); hadoopConf.set(StorageConstant.FS_URI_CONFIG_KEY, hdfsConfig.getUrl()); this.fileSystem = FileSystem.get(hadoopConf); + // TODO: add hdfs account config + // this.fileSystem = FileSystem.get(new URI(hdfsConfig.getUrl()), hadoopConf, + // "root"); this.hdfsConfig = hdfsConfig; logger.info("connect to hdfs success, hdfsConfig: {}", hdfsConfig); } catch (Exception e) { diff --git a/wedpr-components/storage/src/main/java/com/webank/wedpr/components/storage/impl/local/LocalFileStorage.java b/wedpr-components/storage/src/main/java/com/webank/wedpr/components/storage/impl/local/LocalFileStorage.java index 7b98333a..0d850ff1 100644 --- a/wedpr-components/storage/src/main/java/com/webank/wedpr/components/storage/impl/local/LocalFileStorage.java +++ b/wedpr-components/storage/src/main/java/com/webank/wedpr/components/storage/impl/local/LocalFileStorage.java @@ -40,7 +40,12 @@ public class LocalFileStorage implements FileStorageInterface { public void createDirIfNotExist(String dir) throws IOException { File baseDirFile = new File(dir); if (!baseDirFile.exists()) { - Files.createDirectory(baseDirFile.toPath()); + // Files.createDirectory(baseDirFile.toPath()); + boolean mkdirs = baseDirFile.mkdirs(); + if (!mkdirs) { + logger.error("failed to create directory, dir: {}, mkdir: {}", dir, mkdirs); + throw new IOException("failed to create directory, dir: " + dir); + } logger.info("create directory {}", dir); } }