diff --git a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/EntryPointInfo.java b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/EntryPointInfo.java index 57ed9c63..797cf5a9 100644 --- a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/EntryPointInfo.java +++ b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/EntryPointInfo.java @@ -21,6 +21,7 @@ import java.util.List; import lombok.Data; import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; @Data @NoArgsConstructor @@ -34,14 +35,12 @@ public EntryPointInfo(String serviceName, String entryPoint) { } public String getUrl(String uriPath) { + if (StringUtils.isBlank(uriPath)) { + return Common.getUrl(entryPoint); + } if (uriPath.startsWith(Constant.URI_SPLITER)) { return Common.getUrl(entryPoint + uriPath); } - - if (uriPath.isEmpty()) { - return Common.getUrl(entryPoint); - } - return Common.getUrl(entryPoint + Constant.URI_SPLITER + uriPath); } diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/client/PsiClient.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/client/PsiClient.java index de463391..0484b031 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/client/PsiClient.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/client/PsiClient.java @@ -2,13 +2,13 @@ import static com.webank.wedpr.components.scheduler.client.common.ClientCommon.*; +import com.fasterxml.jackson.core.JsonProcessingException; import com.webank.wedpr.common.utils.Constant; import com.webank.wedpr.common.utils.ObjectMapperFactory; import com.webank.wedpr.common.utils.WeDPRException; import com.webank.wedpr.components.http.client.JsonRpcClient; import com.webank.wedpr.components.http.client.model.JsonRpcResponse; import com.webank.wedpr.components.scheduler.dag.utils.WorkerUtils; -import com.webank.wedpr.components.scheduler.executor.impl.psi.PSIExecutor; import com.webank.wedpr.components.scheduler.executor.impl.psi.PSIExecutorConfig; import com.webank.wedpr.components.scheduler.executor.impl.psi.model.PSIRequest; import org.slf4j.Logger; @@ -18,6 +18,28 @@ public class PsiClient { private static final Logger logger = LoggerFactory.getLogger(PsiClient.class); + public static class QueryTaskParam { + private String taskID; + + public QueryTaskParam() {} + + public QueryTaskParam(String taskID) { + this.taskID = taskID; + } + + public String getTaskID() { + return taskID; + } + + public void setTaskID(String taskID) { + this.taskID = taskID; + } + + public String serialize() throws JsonProcessingException { + return ObjectMapperFactory.getObjectMapper().writeValueAsString(this); + } + } + private static final String RUN_FINISHED_STATUS = "COMPLETED"; private final int pollIntervalMilli = DEFAULT_HTTP_POLL_TASK_INTERVAL_MILLI; @@ -34,6 +56,10 @@ public PsiClient(String url) { PSIExecutorConfig.buildConfig()); } + public JsonRpcClient getJsonRpcClient() { + return this.jsonRpcClient; + } + public String submitTask(String params) throws Exception { logger.info("begin submit job to PSI, jobRequest: {}", params); @@ -77,7 +103,7 @@ public void pollTask(String taskId) throws WeDPRException { taskId, PSIExecutorConfig.getPsiGetTaskStatusMethod(), PSIExecutorConfig.getPsiToken(), - new PSIExecutor.QueryTaskParam(taskId)); + new QueryTaskParam(taskId)); // response error if (!response.statusOk()) { diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/config/SchedulerLoader.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/config/SchedulerLoader.java index 986636af..5aaee532 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/config/SchedulerLoader.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/config/SchedulerLoader.java @@ -132,24 +132,6 @@ protected void registerExecutors( new ExecutiveContextBuilder(projectMapperWrapper), threadPoolService); executorManager.registerExecutor(ExecutorType.DAG.getType(), dagSchedulerExecutor); - /* - // register the executor - PSIExecutor psiExecutor = new PSIExecutor(storage, fileMetaBuilder, jobChecker); - executorManager.registerExecutor(JobType.PSI.getType(), psiExecutor); - - logger.info("register PSIExecutor success"); - MLPSIExecutor mlpsiExecutor = new MLPSIExecutor(storage, fileMetaBuilder); - executorManager.registerExecutor(JobType.ML_PSI.getType(), mlpsiExecutor); - - logger.info("register ML-PSIExecutor success"); - MLExecutor mlExecutor = new MLExecutor(); - executorManager.registerExecutor(JobType.MLPreprocessing.getType(), mlExecutor); - executorManager.registerExecutor(JobType.FeatureEngineer.getType(), mlExecutor); - executorManager.registerExecutor(JobType.XGB_TRAIN.getType(), mlExecutor); - executorManager.registerExecutor(JobType.XGB_PREDICT.getType(), mlExecutor); - logger.info("register MLExecutor success"); - */ - // register the pir executor, TODO: implement the taskFinishHandler executorManager.registerExecutor( ExecutorType.PIR.getType(), diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/ModelWorker.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/ModelWorker.java index 28a9fe01..2ad59a94 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/ModelWorker.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/ModelWorker.java @@ -6,7 +6,6 @@ import com.webank.wedpr.components.scheduler.client.ModelClient; import com.webank.wedpr.components.scheduler.dag.entity.JobWorker; import com.webank.wedpr.components.scheduler.dag.utils.ServiceName; -import com.webank.wedpr.components.scheduler.executor.impl.ml.MLExecutorConfig; import com.webank.wedpr.components.scheduler.mapper.JobWorkerMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,9 +40,7 @@ public void engineRun() throws Exception { } long startTimeMillis = System.currentTimeMillis(); - - String modelUrl = MLExecutorConfig.getUrl(); - String url = entryPoint.getUrl(modelUrl); + String url = entryPoint.getUrl(null); if (logger.isDebugEnabled()) { logger.debug("model url: {}, jobId: {}", url, jobId); diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/PsiWorker.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/PsiWorker.java index a536d2a0..279db358 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/PsiWorker.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/PsiWorker.java @@ -6,7 +6,6 @@ import com.webank.wedpr.components.scheduler.client.PsiClient; import com.webank.wedpr.components.scheduler.dag.entity.JobWorker; import com.webank.wedpr.components.scheduler.dag.utils.ServiceName; -import com.webank.wedpr.components.scheduler.executor.impl.psi.PSIExecutorConfig; import com.webank.wedpr.components.scheduler.mapper.JobWorkerMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,9 +45,7 @@ public void engineRun() throws Exception { jobId, workerId, workerArgs); - - String psiUrl = PSIExecutorConfig.getPsiUrl(); - String url = entryPoint.getUrl(psiUrl); + String url = entryPoint.getUrl(null); if (logger.isDebugEnabled()) { logger.debug("psi url: {}, jobId: {}", url, jobId); diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/hook/ExecutorHook.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/hook/ExecutorHook.java new file mode 100644 index 00000000..f3cb2021 --- /dev/null +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/hook/ExecutorHook.java @@ -0,0 +1,23 @@ +/* + * Copyright 2017-2025 [webank-wedpr] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package com.webank.wedpr.components.scheduler.executor.hook; + +import com.webank.wedpr.components.project.dao.JobDO; + +public interface ExecutorHook { + // prepare for the job + Object prepare(JobDO jobDO) throws Exception; +} diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/hook/MLExecutorHook.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/hook/MLExecutorHook.java new file mode 100644 index 00000000..7b5460cb --- /dev/null +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/hook/MLExecutorHook.java @@ -0,0 +1,34 @@ +/* + * Copyright 2017-2025 [webank-wedpr] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package com.webank.wedpr.components.scheduler.executor.hook; + +import com.webank.wedpr.components.project.dao.JobDO; +import com.webank.wedpr.components.scheduler.executor.impl.ml.request.ModelJobRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MLExecutorHook implements ExecutorHook { + private static final Logger logger = LoggerFactory.getLogger(MLExecutorHook.class); + + public MLExecutorHook() {} + + @Override + public Object prepare(JobDO jobDO) throws Exception { + ModelJobRequest modelJobRequest = (ModelJobRequest) jobDO.getJobRequest(); + modelJobRequest.setTaskID(jobDO.getTaskID()); + return jobDO.getJobRequest(); + } +} diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/psi/MLPSIExecutor.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/hook/MLPSIExecutorHook.java similarity index 87% rename from wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/psi/MLPSIExecutor.java rename to wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/hook/MLPSIExecutorHook.java index ee850d37..b1938b8c 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/psi/MLPSIExecutor.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/hook/MLPSIExecutorHook.java @@ -13,7 +13,7 @@ * */ -package com.webank.wedpr.components.scheduler.executor.impl.psi; +package com.webank.wedpr.components.scheduler.executor.hook; import com.webank.wedpr.components.project.dao.JobDO; import com.webank.wedpr.components.scheduler.executor.impl.ml.model.ModelJobParam; @@ -21,8 +21,8 @@ import com.webank.wedpr.components.scheduler.executor.impl.psi.model.PSIJobParam; import com.webank.wedpr.components.storage.api.FileStorageInterface; -public class MLPSIExecutor extends PSIExecutor { - public MLPSIExecutor(FileStorageInterface storage, FileMetaBuilder fileMetaBuilder) { +public class MLPSIExecutorHook extends PSIExecutorHook { + public MLPSIExecutorHook(FileStorageInterface storage, FileMetaBuilder fileMetaBuilder) { // no need to check here since MLJobParam has been checked in JobOrchestrate super(storage, fileMetaBuilder, null); } diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/hook/MPCExecutorHook.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/hook/MPCExecutorHook.java new file mode 100644 index 00000000..fdf2971e --- /dev/null +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/hook/MPCExecutorHook.java @@ -0,0 +1,18 @@ +package com.webank.wedpr.components.scheduler.executor.hook; + +import com.webank.wedpr.components.project.dao.JobDO; +import com.webank.wedpr.components.scheduler.executor.impl.mpc.MPCJobParam; +import com.webank.wedpr.components.scheduler.executor.impl.mpc.request.MPCJobRequest; + +public class MPCExecutorHook implements ExecutorHook { + @Override + public Object prepare(JobDO jobDO) throws Exception { + // get the jobParam + MPCJobParam jobParam = (MPCJobParam) jobDO.getJobParam(); + + MPCJobRequest mpcJobRequest = (MPCJobRequest) jobParam.toMPCJobRequest(); + + mpcJobRequest.setTaskID(jobDO.getTaskID()); + return mpcJobRequest; + } +} diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/psi/MPCPSIExecutor.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/hook/MPCPSIExecutorHook.java similarity index 79% rename from wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/psi/MPCPSIExecutor.java rename to wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/hook/MPCPSIExecutorHook.java index db830c96..e2acb0da 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/psi/MPCPSIExecutor.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/hook/MPCPSIExecutorHook.java @@ -1,4 +1,4 @@ -package com.webank.wedpr.components.scheduler.executor.impl.psi; +package com.webank.wedpr.components.scheduler.executor.hook; import com.webank.wedpr.components.project.dao.JobDO; import com.webank.wedpr.components.scheduler.executor.impl.model.FileMetaBuilder; @@ -6,8 +6,8 @@ import com.webank.wedpr.components.scheduler.executor.impl.psi.model.PSIJobParam; import com.webank.wedpr.components.storage.api.FileStorageInterface; -public class MPCPSIExecutor extends PSIExecutor { - public MPCPSIExecutor(FileStorageInterface storage, FileMetaBuilder fileMetaBuilder) { +public class MPCPSIExecutorHook extends PSIExecutorHook { + public MPCPSIExecutorHook(FileStorageInterface storage, FileMetaBuilder fileMetaBuilder) { super(storage, fileMetaBuilder, null); } diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/hook/PSIExecutorHook.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/hook/PSIExecutorHook.java new file mode 100644 index 00000000..b77d4e0a --- /dev/null +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/hook/PSIExecutorHook.java @@ -0,0 +1,55 @@ +/* + * Copyright 2017-2025 [webank-wedpr] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package com.webank.wedpr.components.scheduler.executor.hook; + +import com.webank.wedpr.components.project.JobChecker; +import com.webank.wedpr.components.project.dao.JobDO; +import com.webank.wedpr.components.scheduler.executor.impl.model.FileMetaBuilder; +import com.webank.wedpr.components.scheduler.executor.impl.psi.model.PSIJobParam; +import com.webank.wedpr.components.storage.api.FileStorageInterface; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PSIExecutorHook implements ExecutorHook { + private static final Logger logger = LoggerFactory.getLogger(PSIExecutorHook.class); + + protected final FileStorageInterface storage; + protected final FileMetaBuilder fileMetaBuilder; + protected final JobChecker jobChecker; + + public PSIExecutorHook( + FileStorageInterface storage, FileMetaBuilder fileMetaBuilder, JobChecker jobChecker) { + this.storage = storage; + this.fileMetaBuilder = fileMetaBuilder; + this.jobChecker = jobChecker; + } + + @Override + public Object prepare(JobDO jobDO) throws Exception { + // deserialize the jobParam + PSIJobParam psiJobParam = (PSIJobParam) this.jobChecker.checkAndParseParam(jobDO); + psiJobParam.setTaskID(jobDO.getTaskID()); + preparePSIJob(jobDO, psiJobParam); + return jobDO.getJobRequest(); + } + + protected void preparePSIJob(JobDO jobDO, PSIJobParam psiJobParam) throws Exception { + // download and prepare the psi file + psiJobParam.prepare(this.fileMetaBuilder, storage); + // convert to PSIRequest + jobDO.setJobRequest(psiJobParam.convert(jobDO.getOwnerAgency())); + } +} diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ExecutorConfig.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ExecutorConfig.java index 190ce685..61fc6b8d 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ExecutorConfig.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ExecutorConfig.java @@ -71,7 +71,7 @@ public static String getPSIPrepareFileName() { public static String getDefaultPSIResultPath(String user, String jobID) { return WeDPRCommonConfig.getUserJobCachePath( - user, JobType.PIR.getType(), jobID, PSI_RESULT_FILE); + user, JobType.PSI.getType(), jobID, PSI_RESULT_FILE); } public static String getPirJobResultPath(String user, String jobID) { diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/MLExecutor.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/MLExecutor.java deleted file mode 100644 index 9c18da37..00000000 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/MLExecutor.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2017-2025 [webank-wedpr] - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * - */ - -package com.webank.wedpr.components.scheduler.executor.impl.ml; - -import com.webank.wedpr.common.utils.BaseResponse; -import com.webank.wedpr.common.utils.WeDPRException; -import com.webank.wedpr.components.http.client.HttpClientImpl; -import com.webank.wedpr.components.project.dao.JobDO; -import com.webank.wedpr.components.scheduler.executor.ExecuteResult; -import com.webank.wedpr.components.scheduler.executor.Executor; -import com.webank.wedpr.components.scheduler.executor.impl.ml.request.ModelJobRequest; -import com.webank.wedpr.components.scheduler.executor.impl.ml.response.MLResponse; -import com.webank.wedpr.components.scheduler.executor.impl.ml.response.MLResponseFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MLExecutor implements Executor { - private static final Logger logger = LoggerFactory.getLogger(MLExecutor.class); - - public MLExecutor() {} - - @Override - public void execute(JobDO jobDO) throws Exception { - ModelJobRequest modelJobRequest = (ModelJobRequest) jobDO.getJobRequest(); - modelJobRequest.setTaskID(jobDO.getTaskID()); - HttpClientImpl httpClient = - new HttpClientImpl( - MLExecutorConfig.getRunTaskApiUrl(jobDO.getTaskID()), - MLExecutorConfig.getMaxTotalConnection(), - MLExecutorConfig.buildConfig(), - new MLResponseFactory()); - logger.info("execute job: {}", jobDO.toString()); - httpClient.executePost(modelJobRequest); - } - - @Override - public Object prepare(JobDO jobDO) throws Exception { - ModelJobRequest modelJobRequest = (ModelJobRequest) jobDO.getJobRequest(); - modelJobRequest.setTaskID(jobDO.getTaskID()); - return jobDO.getJobRequest(); - } - - @Override - public void kill(JobDO jobDO) throws Exception { - logger.info("kill job: {}", jobDO.toString()); - // TODO: model node support kill by jobID - HttpClientImpl httpClient = - new HttpClientImpl( - MLExecutorConfig.getRunTaskApiUrl(jobDO.getId()), - MLExecutorConfig.getMaxTotalConnection(), - MLExecutorConfig.buildConfig(), - new MLResponseFactory()); - BaseResponse response = httpClient.execute(httpClient.getUrl(), true); - if (response.statusOk()) { - logger.info("kill job success: {}", jobDO.getJobRequest()); - return; - } - logger.error("kill job {} failed, response: {}", jobDO.getId(), response.serialize()); - throw new WeDPRException(response.serialize()); - } - - @Override - public ExecuteResult queryStatus(String taskID) throws Exception { - logger.info("query job status for {}", taskID); - HttpClientImpl httpClient = - new HttpClientImpl( - MLExecutorConfig.getRunTaskApiUrl(taskID), - MLExecutorConfig.getMaxTotalConnection(), - MLExecutorConfig.buildConfig(), - new MLResponseFactory()); - BaseResponse response = httpClient.execute(httpClient.getUrl(), false); - if (!response.statusOk()) { - logger.error( - "query job status for {} failed, response: {}", taskID, response.toString()); - return new ExecuteResult(response.serialize(), ExecuteResult.ResultStatus.FAILED); - } - MLResponse mlResponse = (MLResponse) response; - if (mlResponse.success()) { - return new ExecuteResult(mlResponse.serialize(), ExecuteResult.ResultStatus.SUCCESS); - } - if (mlResponse.failed()) { - return new ExecuteResult(mlResponse.serialize(), ExecuteResult.ResultStatus.FAILED); - } - return new ExecuteResult(mlResponse.serialize(), ExecuteResult.ResultStatus.RUNNING); - } -} diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/MLExecutorClient.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/MLExecutorClient.java index e75df5b4..24877679 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/MLExecutorClient.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/MLExecutorClient.java @@ -34,10 +34,7 @@ public static Object getJobResult(LoadBalancer loadBalancer, GetTaskResultReques if (entryPoint == null) { throw new WeDPRException("Cannot find ml client endpoint"); } - - String modelUrl = MLExecutorConfig.getUrl(); - String url = entryPoint.getUrl(modelUrl); - + String url = entryPoint.getUrl(null); HttpClientImpl httpClient = new HttpClientImpl( MLExecutorConfig.getObtainJobResultApiUrl(url, request.getJobID()), diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/MLExecutorConfig.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/MLExecutorConfig.java index cc46dd25..0f7bf10b 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/MLExecutorConfig.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/MLExecutorConfig.java @@ -22,7 +22,6 @@ public class MLExecutorConfig { private static final String DEFAULT_RUNTASK_API_PATH = "/api/ppc-model/pml/run-model-task/"; private static final String DEFAULT_GET_JOB_RESULT_API_PATH = "api/ppc-model/pml/get-job-result/"; - private static final String URL = WeDPRConfig.apply("wedpr.executor.ml.url", null, true); private static final String RUN_TASK_API_PATH = WeDPRConfig.apply("wedpr.executor.ml.method.runTask", DEFAULT_RUNTASK_API_PATH); @@ -50,22 +49,10 @@ public static RequestConfig buildConfig() { .build(); } - public static String getUrl() { - return URL; - } - - public static String getRunTaskApiUrl(String jobID) { - return URL + RUN_TASK_API_PATH + jobID; - } - public static String getRunTaskApiUrl(String url, String jobID) { return url + RUN_TASK_API_PATH + jobID; } - public static String getObtainJobResultApiUrl(String jobID) { - return URL + "/" + OBTAIN_JOB_RESULT_API_PATH + "/" + jobID; - } - public static String getObtainJobResultApiUrl(String url, String jobID) { return url + "/" + OBTAIN_JOB_RESULT_API_PATH + "/" + jobID; } diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/mpc/MPCExecutor.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/mpc/MPCExecutor.java deleted file mode 100644 index 0f1c2d99..00000000 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/mpc/MPCExecutor.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.webank.wedpr.components.scheduler.executor.impl.mpc; - -import com.webank.wedpr.components.project.dao.JobDO; -import com.webank.wedpr.components.scheduler.executor.ExecuteResult; -import com.webank.wedpr.components.scheduler.executor.Executor; -import com.webank.wedpr.components.scheduler.executor.impl.mpc.request.MPCJobRequest; - -public class MPCExecutor implements Executor { - @Override - public Object prepare(JobDO jobDO) throws Exception { - // get the jobParam - MPCJobParam jobParam = (MPCJobParam) jobDO.getJobParam(); - - MPCJobRequest mpcJobRequest = (MPCJobRequest) jobParam.toMPCJobRequest(); - - mpcJobRequest.setTaskID(jobDO.getTaskID()); - return mpcJobRequest; - } - - @Override - public void execute(JobDO jobDO) throws Exception {} - - @Override - public void kill(JobDO jobDO) throws Exception {} - - @Override - public ExecuteResult queryStatus(String jobID) throws Exception { - return null; - } -} diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/psi/PSIExecutor.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/psi/PSIExecutor.java deleted file mode 100644 index a1cf4e81..00000000 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/psi/PSIExecutor.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Copyright 2017-2025 [webank-wedpr] - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * - */ - -package com.webank.wedpr.components.scheduler.executor.impl.psi; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.webank.wedpr.common.utils.Constant; -import com.webank.wedpr.common.utils.ObjectMapperFactory; -import com.webank.wedpr.common.utils.WeDPRException; -import com.webank.wedpr.components.http.client.JsonRpcClient; -import com.webank.wedpr.components.http.client.model.JsonRpcResponse; -import com.webank.wedpr.components.project.JobChecker; -import com.webank.wedpr.components.project.dao.JobDO; -import com.webank.wedpr.components.scheduler.executor.ExecuteResult; -import com.webank.wedpr.components.scheduler.executor.Executor; -import com.webank.wedpr.components.scheduler.executor.impl.model.FileMetaBuilder; -import com.webank.wedpr.components.scheduler.executor.impl.psi.model.PSIJobParam; -import com.webank.wedpr.components.storage.api.FileStorageInterface; -import lombok.SneakyThrows; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PSIExecutor implements Executor { - public static class QueryTaskParam { - private String taskID; - - public QueryTaskParam() {} - - public QueryTaskParam(String taskID) { - this.taskID = taskID; - } - - public String getTaskID() { - return taskID; - } - - public void setTaskID(String taskID) { - this.taskID = taskID; - } - - public String serialize() throws JsonProcessingException { - return ObjectMapperFactory.getObjectMapper().writeValueAsString(this); - } - } - - private static final Logger logger = LoggerFactory.getLogger(PSIExecutor.class); - - private static final String RUN_FINISHED_STATUS = "COMPLETED"; - protected final JsonRpcClient jsonRpcClient; - protected final FileStorageInterface storage; - protected final FileMetaBuilder fileMetaBuilder; - protected final JobChecker jobChecker; - - public PSIExecutor( - FileStorageInterface storage, FileMetaBuilder fileMetaBuilder, JobChecker jobChecker) { - this.jsonRpcClient = - new JsonRpcClient( - PSIExecutorConfig.getPsiUrl(), - PSIExecutorConfig.getMaxTotalConnection(), - PSIExecutorConfig.buildConfig()); - this.storage = storage; - this.fileMetaBuilder = fileMetaBuilder; - this.jobChecker = jobChecker; - } - - @Override - public Object prepare(JobDO jobDO) throws Exception { - // deserialize the jobParam - PSIJobParam psiJobParam = (PSIJobParam) this.jobChecker.checkAndParseParam(jobDO); - psiJobParam.setTaskID(jobDO.getTaskID()); - preparePSIJob(jobDO, psiJobParam); - return jobDO.getJobRequest(); - } - - protected void preparePSIJob(JobDO jobDO, PSIJobParam psiJobParam) throws Exception { - // download and prepare the psi file - psiJobParam.prepare(this.fileMetaBuilder, storage); - // convert to PSIRequest - jobDO.setJobRequest(psiJobParam.convert(jobDO.getOwnerAgency())); - } - - @Override - public void execute(JobDO jobDO) throws Exception { - prepare(jobDO); - - Object jobRequest = jobDO.getJobRequest(); - - logger.info( - "begin to submit job to PSI node, jobID: {}, taskID: {}, jobRequest: {}", - jobDO.getId(), - jobDO.getTaskID(), - jobRequest); - - JsonRpcResponse response = - this.jsonRpcClient.post( - PSIExecutorConfig.getPsiToken(), - PSIExecutorConfig.getPsiRunTaskMethod(), - jobDO.getJobRequest()); - if (response.statusOk()) { - logger.info( - "submit PSI job to node success, jobID: {}, taskID: {}, jobRequest: {}", - jobDO.getId(), - jobDO.getTaskID(), - jobRequest); - return; - } - if (response.getResult().getCode().equals(Constant.DuplicatedTaskCode)) { - logger.info( - "PSI job has already been submitted, jobID: {}, taskID: {}, jobRequest: {}", - jobDO.getId(), - jobDO.getTaskID(), - jobRequest); - return; - } - logger.warn( - "Run PSI job failed, jobDetail: {}, jobRequest: {}, result: {}", - jobDO, - jobRequest, - response.getResult().toString()); - throw new WeDPRException( - "Run PSI job " - + jobDO.getTaskID() - + " failed for " - + response.getResult().getMessage()); - } - - @Override - public void kill(JobDO jobDO) throws Exception { - logger.warn("PSI not support kill! jobDetail: {}", jobDO.toString()); - } - - @SneakyThrows(Exception.class) - private ExecuteResult parseStatusResponse(String jobID, JsonRpcResponse response) { - if (!response.statusOk()) { - logger.warn("queryStatus error, job: {}, response: {}", jobID, response); - return new ExecuteResult(response.serialize(), ExecuteResult.ResultStatus.FAILED); - } - if (response.getResult().getStatus().compareToIgnoreCase(RUN_FINISHED_STATUS) == 0) { - logger.info( - "queryStatus, job execute success, job: {}, response: {}", - jobID, - response.toString()); - return new ExecuteResult( - response.getResult().serialize(), ExecuteResult.ResultStatus.SUCCESS); - } - return new ExecuteResult(ExecuteResult.ResultStatus.RUNNING); - } - - @Override - public ExecuteResult queryStatus(String jobID) throws Exception { - JsonRpcResponse response = - this.jsonRpcClient.post( - PSIExecutorConfig.getPsiToken(), - PSIExecutorConfig.getPsiGetTaskStatusMethod(), - new QueryTaskParam(jobID)); - return parseStatusResponse(jobID, response); - } -} diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/psi/PSIExecutorConfig.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/psi/PSIExecutorConfig.java index ffa89a8e..91f7dcef 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/psi/PSIExecutorConfig.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/psi/PSIExecutorConfig.java @@ -19,7 +19,6 @@ import org.apache.http.client.config.RequestConfig; public class PSIExecutorConfig { - private static final String PSI_URL = WeDPRConfig.apply("wedpr.executor.psi.url", null, true); private static final String PSI_RUN_TASK_METHOD = WeDPRConfig.apply("wedpr.executor.psi.method.runTask", "asyncRunTask"); private static final String PSI_GET_TASK_STATUS_METHOD = @@ -44,10 +43,6 @@ public static RequestConfig buildConfig() { .build(); } - public static String getPsiUrl() { - return PSI_URL; - } - public static String getPsiRunTaskMethod() { return PSI_RUN_TASK_METHOD; } diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/mapper/JobWorkerMapper.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/mapper/JobWorkerMapper.java index 0e2af48f..a010c007 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/mapper/JobWorkerMapper.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/mapper/JobWorkerMapper.java @@ -14,7 +14,7 @@ public interface JobWorkerMapper { * @param workerId * @return */ - JobWorker selectJobWorkerById(@Param("workId") String workerId); + JobWorker selectJobWorkerById(@Param("workerId") String workerId); /** * insert jobWorker diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/workflow/builder/JobWorkFlowBuilderApi.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/workflow/builder/JobWorkFlowBuilderApi.java index 7d691b22..27b222c1 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/workflow/builder/JobWorkFlowBuilderApi.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/workflow/builder/JobWorkFlowBuilderApi.java @@ -1,12 +1,14 @@ package com.webank.wedpr.components.scheduler.workflow.builder; import com.webank.wedpr.components.project.dao.JobDO; -import com.webank.wedpr.components.scheduler.executor.Executor; +import com.webank.wedpr.components.scheduler.executor.hook.ExecutorHook; import com.webank.wedpr.components.scheduler.workflow.WorkFlow; import com.webank.wedpr.components.scheduler.workflow.WorkFlowNode; -public interface JobWorkFlowBuilderApi extends Executor { +public interface JobWorkFlowBuilderApi { WorkFlow createWorkFlow(JobDO jobDO) throws Exception; void appendWorkFlowNode(JobDO jobDO, WorkFlow workflow, WorkFlowNode upstream) throws Exception; + + public abstract ExecutorHook getExecutorHook(); } diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/workflow/builder/JobWorkFlowBuilderImpl.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/workflow/builder/JobWorkFlowBuilderImpl.java index 20ff6149..6e8f3e45 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/workflow/builder/JobWorkFlowBuilderImpl.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/workflow/builder/JobWorkFlowBuilderImpl.java @@ -3,8 +3,7 @@ import com.webank.wedpr.common.protocol.WorkerNodeType; import com.webank.wedpr.common.utils.WeDPRException; import com.webank.wedpr.components.project.dao.JobDO; -import com.webank.wedpr.components.scheduler.executor.ExecuteResult; -import com.webank.wedpr.components.scheduler.executor.Executor; +import com.webank.wedpr.components.scheduler.executor.hook.ExecutorHook; import com.webank.wedpr.components.scheduler.workflow.WorkFlow; import com.webank.wedpr.components.scheduler.workflow.WorkFlowNode; import java.util.Collections; @@ -16,18 +15,23 @@ public class JobWorkFlowBuilderImpl implements JobWorkFlowBuilderApi { private static final Logger logger = LoggerFactory.getLogger(JobWorkFlowBuilderImpl.class); - private final Executor executor; + private final ExecutorHook executorHook; private final JobWorkFlowBuilderManager jobWorkflowBuilderManager; public JobWorkFlowBuilderImpl( - Executor executor, JobWorkFlowBuilderManager jobWorkflowBuilderManager) { - this.executor = executor; + ExecutorHook executorHook, JobWorkFlowBuilderManager jobWorkflowBuilderManager) { + this.executorHook = executorHook; this.jobWorkflowBuilderManager = jobWorkflowBuilderManager; } + @Override + public ExecutorHook getExecutorHook() { + return this.executorHook; + } + @Override public WorkFlow createWorkFlow(JobDO jobDO) throws Exception { - Object args = this.prepare(jobDO); + Object args = this.executorHook.prepare(jobDO); if (args == null) { logger.error("executor prepare ret null, job: {}", jobDO); throw new WeDPRException("executor prepare ret null, jobId: " + jobDO.getId()); @@ -51,7 +55,7 @@ public WorkFlow createWorkFlow(JobDO jobDO) throws Exception { @Override public void appendWorkFlowNode(JobDO jobDO, WorkFlow workflow, WorkFlowNode upstream) throws Exception { - Object args = this.prepare(jobDO); + Object args = this.executorHook.prepare(jobDO); int index = upstream.getIndex(); WorkFlowNode workflowNode = addWorkFlowNode( @@ -79,24 +83,4 @@ private WorkFlowNode addWorkFlowNode( // workflow build return workflow.addWorkFlowNode(upstreams, workerNodeType.getType(), args); } - - @Override - public Object prepare(JobDO jobDO) throws Exception { - return executor.prepare(jobDO); - } - - @Override - public void execute(JobDO jobDO) throws Exception { - throw new UnsupportedOperationException("Not supported"); - } - - @Override - public void kill(JobDO jobDO) throws Exception { - throw new UnsupportedOperationException("Not supported"); - } - - @Override - public ExecuteResult queryStatus(String jobID) throws Exception { - throw new UnsupportedOperationException("Not supported"); - } } diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/workflow/builder/JobWorkFlowBuilderManager.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/workflow/builder/JobWorkFlowBuilderManager.java index 83894a3d..df339efa 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/workflow/builder/JobWorkFlowBuilderManager.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/workflow/builder/JobWorkFlowBuilderManager.java @@ -3,17 +3,13 @@ import com.webank.wedpr.common.protocol.JobType; import com.webank.wedpr.components.project.JobChecker; import com.webank.wedpr.components.project.dao.JobDO; -import com.webank.wedpr.components.scheduler.executor.impl.ml.MLExecutor; +import com.webank.wedpr.components.scheduler.executor.hook.*; import com.webank.wedpr.components.scheduler.executor.impl.ml.model.ModelJobParam; import com.webank.wedpr.components.scheduler.executor.impl.ml.request.FeatureEngineeringRequest; import com.webank.wedpr.components.scheduler.executor.impl.ml.request.ModelJobRequest; import com.webank.wedpr.components.scheduler.executor.impl.ml.request.PreprocessingRequest; import com.webank.wedpr.components.scheduler.executor.impl.model.FileMetaBuilder; -import com.webank.wedpr.components.scheduler.executor.impl.mpc.MPCExecutor; import com.webank.wedpr.components.scheduler.executor.impl.mpc.MPCJobParam; -import com.webank.wedpr.components.scheduler.executor.impl.psi.MLPSIExecutor; -import com.webank.wedpr.components.scheduler.executor.impl.psi.MPCPSIExecutor; -import com.webank.wedpr.components.scheduler.executor.impl.psi.PSIExecutor; import com.webank.wedpr.components.scheduler.workflow.WorkFlow; import com.webank.wedpr.components.scheduler.workflow.WorkFlowNode; import com.webank.wedpr.components.storage.api.FileStorageInterface; @@ -58,37 +54,40 @@ public void initializeJobWorkFlowBuilderManager() { registerJobWorkFlowBuilder( JobType.PSI.getType(), new JobWorkFlowBuilderImpl( - new PSIExecutor(storage, fileMetaBuilder, jobChecker), this)); + new PSIExecutorHook(storage, fileMetaBuilder, jobChecker), this)); logger.info("register ML PSI workflow builder success"); registerJobWorkFlowBuilder( JobType.ML_PSI.getType(), - new JobWorkFlowBuilderImpl(new MLPSIExecutor(storage, fileMetaBuilder), this)); + new JobWorkFlowBuilderImpl(new MLPSIExecutorHook(storage, fileMetaBuilder), this)); logger.info("register MPC PSI workflow builder success"); registerJobWorkFlowBuilder( JobType.MPC_PSI.getType(), - new JobWorkFlowBuilderImpl(new MPCPSIExecutor(storage, fileMetaBuilder), this)); + new JobWorkFlowBuilderImpl(new MPCPSIExecutorHook(storage, fileMetaBuilder), this)); logger.info("register MPC workflow builder success"); registerJobWorkFlowBuilder( - JobType.MPC.getType(), new JobWorkFlowBuilderImpl(new MPCExecutor(), this)); + JobType.MPC.getType(), new JobWorkFlowBuilderImpl(new MPCExecutorHook(), this)); logger.info("register ML workflow builder success"); registerJobWorkFlowBuilder( JobType.MLPreprocessing.getType(), - new JobWorkFlowBuilderImpl(new MLExecutor(), this)); + new JobWorkFlowBuilderImpl(new MLExecutorHook(), this)); registerJobWorkFlowBuilder( JobType.FeatureEngineer.getType(), - new JobWorkFlowBuilderImpl(new MLExecutor(), this)); + new JobWorkFlowBuilderImpl(new MLExecutorHook(), this)); registerJobWorkFlowBuilder( - JobType.XGB_TRAIN.getType(), new JobWorkFlowBuilderImpl(new MLExecutor(), this)); + JobType.XGB_TRAIN.getType(), + new JobWorkFlowBuilderImpl(new MLExecutorHook(), this)); registerJobWorkFlowBuilder( - JobType.XGB_PREDICT.getType(), new JobWorkFlowBuilderImpl(new MLExecutor(), this)); + JobType.XGB_PREDICT.getType(), + new JobWorkFlowBuilderImpl(new MLExecutorHook(), this)); registerJobWorkFlowBuilder( - JobType.LR_TRAIN.getType(), new JobWorkFlowBuilderImpl(new MLExecutor(), this)); + JobType.LR_TRAIN.getType(), new JobWorkFlowBuilderImpl(new MLExecutorHook(), this)); registerJobWorkFlowBuilder( - JobType.LR_PREDICT.getType(), new JobWorkFlowBuilderImpl(new MLExecutor(), this)); + JobType.LR_PREDICT.getType(), + new JobWorkFlowBuilderImpl(new MLExecutorHook(), this)); logger.info("register job workflow builder end"); } @@ -105,8 +104,7 @@ public void initializeJobWorkFlowDependencyHandler() { jobDO.setJobType(JobType.MLPreprocessing.getType()); JobWorkFlowBuilderImpl jobWorkFlowBuilder = - new JobWorkFlowBuilderImpl( - getJobWorkFlowBuilder(jobDO.getJobType()), this); + new JobWorkFlowBuilderImpl(getExecutorHook(jobDO.getJobType()), this); jobWorkFlowBuilder.appendWorkFlowNode(jobDO, workflow, upstream); }); @@ -122,8 +120,7 @@ public void initializeJobWorkFlowDependencyHandler() { jobDO.setJobType(JobType.MPC.getType()) */ JobWorkFlowBuilderImpl jobWorkFlowBuilder = - new JobWorkFlowBuilderImpl( - getJobWorkFlowBuilder(jobDO.getJobType()), this); + new JobWorkFlowBuilderImpl(getExecutorHook(jobDO.getJobType()), this); jobWorkFlowBuilder.appendWorkFlowNode(jobDO, workflow, upstream); }); @@ -171,7 +168,7 @@ private boolean executeFeatureEngineerJob( jobDO.setJobRequest(featureEngineeringRequest); JobWorkFlowBuilderImpl jobWorkFlowBuilder = - new JobWorkFlowBuilderImpl(getJobWorkFlowBuilder(jobDO.getJobType()), this); + new JobWorkFlowBuilderImpl(getExecutorHook(jobDO.getJobType()), this); jobWorkFlowBuilder.appendWorkFlowNode(jobDO, workflow, upstream); return true; } @@ -188,7 +185,7 @@ private boolean executeMultiPartyMlJob( jobDO.setJobRequest(xgbJobRequest); JobWorkFlowBuilderImpl jobWorkFlowBuilder = - new JobWorkFlowBuilderImpl(getJobWorkFlowBuilder(jobDO.getJobType()), this); + new JobWorkFlowBuilderImpl(getExecutorHook(jobDO.getJobType()), this); jobWorkFlowBuilder.appendWorkFlowNode(jobDO, workflow, upstream); return true; } @@ -206,10 +203,17 @@ public JobWorkFlowBuilderApi getJobWorkFlowBuilder(String jobType) { throw new UnsupportedOperationException( "Unsupported job workflow type, jobType: " + jobType); } - return jobWorkFlowBuilder; } + public ExecutorHook getExecutorHook(String jobType) { + JobWorkFlowBuilderApi result = getJobWorkFlowBuilder(jobType); + if (result == null || result.getExecutorHook() == null) { + return null; + } + return result.getExecutorHook(); + } + public void registerJobWorkFlowDependencyHandler( String jobType, WorkFlowBuilderDependencyHandler workFlowBuilderDependencyHandler) { List workFlowBuilderDependencyHandlers = diff --git a/wedpr-site/conf/wedpr.properties b/wedpr-site/conf/wedpr.properties index 14981c85..8859f75f 100644 --- a/wedpr-site/conf/wedpr.properties +++ b/wedpr-site/conf/wedpr.properties @@ -55,13 +55,11 @@ wedpr.scheduler.interval.ms=30000 wedpr.executor.job.cache.dir=./.cache/jobs wedpr.executor.psi.tmp.file.name=psi_prepare.csv wedpr.executor.psi.prepare.file.name=psi_prepare.csv -wedpr.executor.ml.url= wedpr.executor.ml.connect.request.timeout.ms=10000 wedpr.executor.ml.connect.timeout.ms=5000 wedpr.executor.ml.request.timeout.ms=60000 wedpr.executor.ml.max.total.connection=5 -wedpr.executor.psi.url= wedpr.executor.psi.connect.request.timeout.ms=10000 wedpr.executor.psi.connect.timeout.ms=5000 wedpr.executor.psi.request.timeout.ms=60000