From c0dea86be38a9dc735f18ced2d1af6af5b1b6526 Mon Sep 17 00:00:00 2001 From: octopus <912554887@qq.com> Date: Tue, 27 Aug 2024 21:36:49 +0800 Subject: [PATCH] add the relationship of job and dataset --- wedpr-adm/db/wedpr_ddl.sql | 9 +++ .../adm/controller/ProjectController.java | 19 +++++ .../wedpr/components/project/dao/JobDO.java | 9 +++ .../components/project/dao/ProjectMapper.java | 5 ++ .../project/dao/ProjectMapperWrapper.java | 23 ++++-- .../components/project/model/JobRequest.java | 8 -- .../model/QueryJobsByDatasetIDResponse.java | 14 ++++ .../project/service/ProjectService.java | 3 + .../service/impl/ProjectServiceImpl.java | 74 +++++++++++++++++++ .../main/resources/mapper/ProjectMapper.xml | 21 ++++++ 10 files changed, 172 insertions(+), 13 deletions(-) create mode 100644 wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/model/QueryJobsByDatasetIDResponse.java diff --git a/wedpr-adm/db/wedpr_ddl.sql b/wedpr-adm/db/wedpr_ddl.sql index 89f50fad..b5507fe2 100644 --- a/wedpr-adm/db/wedpr_ddl.sql +++ b/wedpr-adm/db/wedpr_ddl.sql @@ -134,6 +134,15 @@ create table if not exists `wedpr_job_table`( index status_index(`status`(128)) )ENGINE=InnoDB default charset=utf8mb4 default collate=utf8mb4_bin ROW_FORMAT=DYNAMIC; +-- job dataset relationship table +create table if not exists `wedpr_job_dataset_relation`( + `job_id` varchar(64) not null comment "任务ID", + `dataset_id` varchar(64) not null comment "数据集ID", + `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP comment "任务创建时间", + index job_id_index(`job_id`), + index dataset_id_index(`dataset_id`) +)ENGINE=InnoDB default charset=utf8mb4 default collate=utf8mb4_bin ROW_FORMAT=DYNAMIC; + -- the algorithm_setting template create table if not exists `wedpr_setting_template`( `id` varchar(64) not null comment "配置模板ID", diff --git a/wedpr-adm/src/main/java/com/webank/wedpr/adm/controller/ProjectController.java b/wedpr-adm/src/main/java/com/webank/wedpr/adm/controller/ProjectController.java index ac5b23c8..0e2df841 100644 --- a/wedpr-adm/src/main/java/com/webank/wedpr/adm/controller/ProjectController.java +++ b/wedpr-adm/src/main/java/com/webank/wedpr/adm/controller/ProjectController.java @@ -25,9 +25,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @@ -140,6 +142,23 @@ public WeDPRResponse queryJobByCondition( } } + // query job by condition + @GetMapping("/queryJobsByDatasetID") + public WeDPRResponse queryJobsByDatasetID( + HttpServletRequest request, + @RequestParam(value = "datasetID", required = true) String datasetID, + @RequestParam(value = "pageNum", required = false) Integer pageNum, + @RequestParam(value = "pageSize", required = false) Integer pageSize) { + try { + return projectService.queryJobsByDatasetID( + TokenUtils.getLoginUser(request).getUsername(), datasetID, pageNum, pageSize); + } catch (Exception e) { + logger.warn("queryJobsByDatasetID exception, condition: {}, error: ", null, e); + return new WeDPRResponse( + Constant.WEDPR_FAILED, "queryJobsByDatasetID failed for " + e.getMessage()); + } + } + // query jobOverview @PostMapping("/queryJobOverview") public WeDPRResponse queryJobOverview( diff --git a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/JobDO.java b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/JobDO.java index 00d10dd8..3282ea47 100644 --- a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/JobDO.java +++ b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/JobDO.java @@ -453,6 +453,15 @@ public Boolean isJobParty(String agency) { return Boolean.FALSE; } + // TODO: verify dataset + public boolean isJobDataset(String datasetId) { + if (datasetList == null) { + return false; + } + + return datasetList.contains(datasetId); + } + public Object getJobRequest() { return jobRequest; } diff --git a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/ProjectMapper.java b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/ProjectMapper.java index c268df33..535e6cd1 100644 --- a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/ProjectMapper.java +++ b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/ProjectMapper.java @@ -47,6 +47,9 @@ public long queryFollowerJobCount( public int batchUpdateJobInfo(@Param("jobDOList") List jobDOList); + public int batchInsertJobDatasetRelationInfo( + @Param("jobID") String jobID, @Param("datasetIDs") List datasetIDs); + public List queryJobs( @Param("onlyMeta") Boolean onlyMeta, @Param("condition") JobDO condition, @@ -57,4 +60,6 @@ public List queryFollowerJobByCondition( @Param("followerUser") String followerUser, @Param("followerAgency") String followerAgency, @Param("condition") JobDO condition); + + public List queryJobsByDatasetID(@Param("datasetID") String datasetID); } diff --git a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/ProjectMapperWrapper.java b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/ProjectMapperWrapper.java index b04e6cb5..534f667f 100644 --- a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/ProjectMapperWrapper.java +++ b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/ProjectMapperWrapper.java @@ -151,16 +151,29 @@ public void recordJobStatus(JobDO jobDO) { @Transactional(rollbackFor = Exception.class) public void insertJob(JobDO jobDO) { String id = jobDO.getId(); + + logger.info(" => insert job, jobID: {}", id); + this.projectMapper.insertJobInfo(jobDO); - if (jobDO.getTaskParties() == null || jobDO.getTaskParties().isEmpty()) { - return; - } - this.followerMapper.batchInsert(jobDO.getTaskParties()); + int insertC = -1; List datasetList = jobDO.getDatasetList(); if (datasetList != null && !datasetList.isEmpty()) { - // TODO: + insertC = this.projectMapper.batchInsertJobDatasetRelationInfo(id, datasetList); } + + if (logger.isDebugEnabled()) { + logger.debug( + "batch insert job datasets relation, jobID: {}, datasetIDs: {}, insertCount: {}", + id, + datasetList, + insertC); + } + + if (jobDO.getTaskParties() == null || jobDO.getTaskParties().isEmpty()) { + return; + } + this.followerMapper.batchInsert(jobDO.getTaskParties()); } public void updateFinalJobResult(JobDO job, JobStatus status, String result) { diff --git a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/model/JobRequest.java b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/model/JobRequest.java index 085508ae..ed1208c5 100644 --- a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/model/JobRequest.java +++ b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/model/JobRequest.java @@ -49,14 +49,6 @@ public void setTaskParties(List taskParties) { checkAndConfigTaskParities(taskParties); } - public boolean isJobDataset(String datasetId) { - if (datasetList == null) { - return false; - } - - return datasetList.contains(datasetId); - } - @SneakyThrows(WeDPRException.class) private void checkAndConfigTaskParities(List taskParties) { if (taskParties == null || taskParties.isEmpty()) { diff --git a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/model/QueryJobsByDatasetIDResponse.java b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/model/QueryJobsByDatasetIDResponse.java new file mode 100644 index 00000000..cc92df69 --- /dev/null +++ b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/model/QueryJobsByDatasetIDResponse.java @@ -0,0 +1,14 @@ +package com.webank.wedpr.components.project.model; + +import com.webank.wedpr.components.project.dao.JobDO; +import java.util.List; +import lombok.Builder; +import lombok.Data; + +@Builder +@Data +public class QueryJobsByDatasetIDResponse { + long totalCount; + boolean isLast; + List content; +} diff --git a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/service/ProjectService.java b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/service/ProjectService.java index 373f90de..67e6d629 100644 --- a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/service/ProjectService.java +++ b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/service/ProjectService.java @@ -39,6 +39,9 @@ public abstract Object queryJobOverview(String user, JobOverviewRequest jobOverv public abstract WeDPRResponse submitJob(String user, JobRequest request); // query job by condition public abstract WeDPRResponse queryJobByCondition(String user, JobRequest request); + // query job list by dataset id + public abstract WeDPRResponse queryJobsByDatasetID( + String user, String datasetID, Integer pageNum, Integer pageSize); public abstract WeDPRResponse queryFollowerJobByCondition(String user, JobRequest request); diff --git a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/service/impl/ProjectServiceImpl.java b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/service/impl/ProjectServiceImpl.java index d4e073f8..f0636601 100644 --- a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/service/impl/ProjectServiceImpl.java +++ b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/service/impl/ProjectServiceImpl.java @@ -15,7 +15,9 @@ package com.webank.wedpr.components.project.service.impl; +import com.github.pagehelper.Page; import com.github.pagehelper.PageInfo; +import com.github.pagehelper.page.PageMethod; import com.webank.wedpr.components.dataset.dao.DatasetUserPermissions; import com.webank.wedpr.components.dataset.mapper.DatasetMapper; import com.webank.wedpr.components.dataset.mapper.DatasetPermissionMapper; @@ -356,6 +358,78 @@ public WeDPRResponse queryJobByCondition(String user, JobRequest request) { return response; } + // query job list by dataset id + @Override + public WeDPRResponse queryJobsByDatasetID( + String user, String datasetID, Integer pageNum, Integer pageSize) { + + long startTimeMillis = System.currentTimeMillis(); + + logger.info( + "query jobs by dataset id begin, user: {}, datasetID: {}, pageNum: {}, pageSize: {}", + user, + datasetID, + pageNum, + pageSize); + + if (pageNum == null || pageNum < 1) { + pageNum = 1; + } + + // limit pageSize + if (pageSize == null || pageSize < 0) { + // TODO: 配置项 + pageSize = 15; + } + + try { + try (Page objectPage = PageMethod.startPage(pageNum, pageSize)) { + + List jobDOs = + this.projectMapperWrapper + .getProjectMapper() + .queryJobsByDatasetID(datasetID); + + long totalCount = new PageInfo<>(jobDOs).getTotal(); + long pageEndOffset = (long) pageNum * pageSize; + boolean isLast = (pageEndOffset >= totalCount); + + long endTimeMillis = System.currentTimeMillis(); + + QueryJobsByDatasetIDResponse queryJobsByDatasetIDResponse = + QueryJobsByDatasetIDResponse.builder() + .totalCount(totalCount) + .isLast(isLast) + .content(jobDOs) + .build(); + + WeDPRResponse response = + new WeDPRResponse(Constant.WEDPR_SUCCESS, Constant.WEDPR_SUCCESS_MSG); + response.setData(queryJobsByDatasetIDResponse); + + logger.info( + "query jobs by dataset id end, datasetID: {}, totalCount: {}, isLast: {}, cost(ms): {}", + datasetID, + totalCount, + isLast, + (endTimeMillis - startTimeMillis)); + + return response; + } + } catch (Exception e) { + + long endTimeMillis = System.currentTimeMillis(); + logger.error( + "query jobs by dataset id exception, datasetID:{}, cost(ms): {}, e: ", + datasetID, + (endTimeMillis - startTimeMillis), + e); + + WeDPRResponse response = new WeDPRResponse(Constant.WEDPR_FAILED, e.getMessage()); + return response; + } + } + // query follower job by condition @Override public WeDPRResponse queryFollowerJobByCondition(String user, JobRequest request) { diff --git a/wedpr-components/meta/project/src/main/resources/mapper/ProjectMapper.xml b/wedpr-components/meta/project/src/main/resources/mapper/ProjectMapper.xml index 49099295..7dd90a6c 100644 --- a/wedpr-components/meta/project/src/main/resources/mapper/ProjectMapper.xml +++ b/wedpr-components/meta/project/src/main/resources/mapper/ProjectMapper.xml @@ -152,6 +152,13 @@ (#{jobDO.id}, #{jobDO.name}, #{jobDO.jobType}, #{jobDO.projectName}, #{jobDO.parties}, #{jobDO.owner}, #{jobDO.ownerAgency}, #{jobDO.param},#{jobDO.status}, #{jobDO.result}, NOW()) + + INSERT INTO `wedpr_job_dataset_relation` (`job_id`, `dataset_id`) + VALUES + + (#{jobID}, #{datasetID}) + + + +