Skip to content

Commit

Permalink
add the relationship of job and dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
ywy2090 committed Aug 28, 2024
1 parent b6c611f commit c0dea86
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 13 deletions.
9 changes: 9 additions & 0 deletions wedpr-adm/db/wedpr_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ create table if not exists `wedpr_job_table`(
index status_index(`status`(128))
)ENGINE=InnoDB default charset=utf8mb4 default collate=utf8mb4_bin ROW_FORMAT=DYNAMIC;

-- job dataset relationship table
create table if not exists `wedpr_job_dataset_relation`(
`job_id` varchar(64) not null comment "任务ID",
`dataset_id` varchar(64) not null comment "数据集ID",
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP comment "任务创建时间",
index job_id_index(`job_id`),
index dataset_id_index(`dataset_id`)
)ENGINE=InnoDB default charset=utf8mb4 default collate=utf8mb4_bin ROW_FORMAT=DYNAMIC;

-- the algorithm_setting template
create table if not exists `wedpr_setting_template`(
`id` varchar(64) not null comment "配置模板ID",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
Expand Down Expand Up @@ -140,6 +142,23 @@ public WeDPRResponse queryJobByCondition(
}
}

// query job by condition
@GetMapping("/queryJobsByDatasetID")
public WeDPRResponse queryJobsByDatasetID(
HttpServletRequest request,
@RequestParam(value = "datasetID", required = true) String datasetID,
@RequestParam(value = "pageNum", required = false) Integer pageNum,
@RequestParam(value = "pageSize", required = false) Integer pageSize) {
try {
return projectService.queryJobsByDatasetID(
TokenUtils.getLoginUser(request).getUsername(), datasetID, pageNum, pageSize);
} catch (Exception e) {
logger.warn("queryJobsByDatasetID exception, condition: {}, error: ", null, e);
return new WeDPRResponse(
Constant.WEDPR_FAILED, "queryJobsByDatasetID failed for " + e.getMessage());
}
}

// query jobOverview
@PostMapping("/queryJobOverview")
public WeDPRResponse queryJobOverview(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,15 @@ public Boolean isJobParty(String agency) {
return Boolean.FALSE;
}

// TODO: verify dataset
public boolean isJobDataset(String datasetId) {
if (datasetList == null) {
return false;
}

return datasetList.contains(datasetId);
}

public Object getJobRequest() {
return jobRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public long queryFollowerJobCount(

public int batchUpdateJobInfo(@Param("jobDOList") List<JobDO> jobDOList);

public int batchInsertJobDatasetRelationInfo(
@Param("jobID") String jobID, @Param("datasetIDs") List<String> datasetIDs);

public List<JobDO> queryJobs(
@Param("onlyMeta") Boolean onlyMeta,
@Param("condition") JobDO condition,
Expand All @@ -57,4 +60,6 @@ public List<JobDO> queryFollowerJobByCondition(
@Param("followerUser") String followerUser,
@Param("followerAgency") String followerAgency,
@Param("condition") JobDO condition);

public List<JobDO> queryJobsByDatasetID(@Param("datasetID") String datasetID);
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,29 @@ public void recordJobStatus(JobDO jobDO) {
@Transactional(rollbackFor = Exception.class)
public void insertJob(JobDO jobDO) {
String id = jobDO.getId();

logger.info(" => insert job, jobID: {}", id);

this.projectMapper.insertJobInfo(jobDO);
if (jobDO.getTaskParties() == null || jobDO.getTaskParties().isEmpty()) {
return;
}
this.followerMapper.batchInsert(jobDO.getTaskParties());

int insertC = -1;
List<String> datasetList = jobDO.getDatasetList();
if (datasetList != null && !datasetList.isEmpty()) {
// TODO:
insertC = this.projectMapper.batchInsertJobDatasetRelationInfo(id, datasetList);
}

if (logger.isDebugEnabled()) {
logger.debug(
"batch insert job datasets relation, jobID: {}, datasetIDs: {}, insertCount: {}",
id,
datasetList,
insertC);
}

if (jobDO.getTaskParties() == null || jobDO.getTaskParties().isEmpty()) {
return;
}
this.followerMapper.batchInsert(jobDO.getTaskParties());
}

public void updateFinalJobResult(JobDO job, JobStatus status, String result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,6 @@ public void setTaskParties(List<FollowerDO> taskParties) {
checkAndConfigTaskParities(taskParties);
}

public boolean isJobDataset(String datasetId) {
if (datasetList == null) {
return false;
}

return datasetList.contains(datasetId);
}

@SneakyThrows(WeDPRException.class)
private void checkAndConfigTaskParities(List<FollowerDO> taskParties) {
if (taskParties == null || taskParties.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.webank.wedpr.components.project.model;

import com.webank.wedpr.components.project.dao.JobDO;
import java.util.List;
import lombok.Builder;
import lombok.Data;

@Builder
@Data
public class QueryJobsByDatasetIDResponse {
long totalCount;
boolean isLast;
List<JobDO> content;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public abstract Object queryJobOverview(String user, JobOverviewRequest jobOverv
public abstract WeDPRResponse submitJob(String user, JobRequest request);
// query job by condition
public abstract WeDPRResponse queryJobByCondition(String user, JobRequest request);
// query job list by dataset id
public abstract WeDPRResponse queryJobsByDatasetID(
String user, String datasetID, Integer pageNum, Integer pageSize);

public abstract WeDPRResponse queryFollowerJobByCondition(String user, JobRequest request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

package com.webank.wedpr.components.project.service.impl;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageInfo;
import com.github.pagehelper.page.PageMethod;
import com.webank.wedpr.components.dataset.dao.DatasetUserPermissions;
import com.webank.wedpr.components.dataset.mapper.DatasetMapper;
import com.webank.wedpr.components.dataset.mapper.DatasetPermissionMapper;
Expand Down Expand Up @@ -356,6 +358,78 @@ public WeDPRResponse queryJobByCondition(String user, JobRequest request) {
return response;
}

// query job list by dataset id
@Override
public WeDPRResponse queryJobsByDatasetID(
String user, String datasetID, Integer pageNum, Integer pageSize) {

long startTimeMillis = System.currentTimeMillis();

logger.info(
"query jobs by dataset id begin, user: {}, datasetID: {}, pageNum: {}, pageSize: {}",
user,
datasetID,
pageNum,
pageSize);

if (pageNum == null || pageNum < 1) {
pageNum = 1;
}

// limit pageSize
if (pageSize == null || pageSize < 0) {
// TODO: 配置项
pageSize = 15;
}

try {
try (Page<Object> objectPage = PageMethod.startPage(pageNum, pageSize)) {

List<JobDO> jobDOs =
this.projectMapperWrapper
.getProjectMapper()
.queryJobsByDatasetID(datasetID);

long totalCount = new PageInfo<>(jobDOs).getTotal();
long pageEndOffset = (long) pageNum * pageSize;
boolean isLast = (pageEndOffset >= totalCount);

long endTimeMillis = System.currentTimeMillis();

QueryJobsByDatasetIDResponse queryJobsByDatasetIDResponse =
QueryJobsByDatasetIDResponse.builder()
.totalCount(totalCount)
.isLast(isLast)
.content(jobDOs)
.build();

WeDPRResponse response =
new WeDPRResponse(Constant.WEDPR_SUCCESS, Constant.WEDPR_SUCCESS_MSG);
response.setData(queryJobsByDatasetIDResponse);

logger.info(
"query jobs by dataset id end, datasetID: {}, totalCount: {}, isLast: {}, cost(ms): {}",
datasetID,
totalCount,
isLast,
(endTimeMillis - startTimeMillis));

return response;
}
} catch (Exception e) {

long endTimeMillis = System.currentTimeMillis();
logger.error(
"query jobs by dataset id exception, datasetID:{}, cost(ms): {}, e: ",
datasetID,
(endTimeMillis - startTimeMillis),
e);

WeDPRResponse response = new WeDPRResponse(Constant.WEDPR_FAILED, e.getMessage());
return response;
}
}

// query follower job by condition
@Override
public WeDPRResponse queryFollowerJobByCondition(String user, JobRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@
(#{jobDO.id}, #{jobDO.name}, #{jobDO.jobType}, #{jobDO.projectName}, #{jobDO.parties}, #{jobDO.owner},
#{jobDO.ownerAgency}, #{jobDO.param},#{jobDO.status}, #{jobDO.result}, NOW())
</insert>
<insert id="batchInsertJobDatasetRelationInfo">
INSERT INTO `wedpr_job_dataset_relation` (`job_id`, `dataset_id`)
VALUES
<foreach collection="datasetIDs" item="datasetID" separator=",">
(#{jobID}, #{datasetID})
</foreach>
</insert>
<select id="queryJobs" resultMap="JobDOMap">
select
<choose>
Expand Down Expand Up @@ -206,6 +213,20 @@
</if>
</select>

<select id="queryJobsByDatasetID" resultMap="JobDOMap">
SELECT t.*
FROM `wedpr_job_table` t
INNER JOIN (
SELECT
DISTINCT job_id
FROM
`wedpr_job_dataset_relation`
WHERE
dataset_id = #{datasetID}
) temp ON t.id = temp.job_id
ORDER BY create_time DESC
</select>

<select id="queryJobCount" resultType="java.lang.Long">
select count(1) from `wedpr_job_table` where 1 = 1
<choose>
Expand Down

0 comments on commit c0dea86

Please sign in to comment.