Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove useless executors #110

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;

@Data
@NoArgsConstructor
Expand All @@ -34,14 +35,12 @@ public EntryPointInfo(String serviceName, String entryPoint) {
}

public String getUrl(String uriPath) {
if (StringUtils.isBlank(uriPath)) {
return Common.getUrl(entryPoint);
}
if (uriPath.startsWith(Constant.URI_SPLITER)) {
return Common.getUrl(entryPoint + uriPath);
}

if (uriPath.isEmpty()) {
return Common.getUrl(entryPoint);
}

return Common.getUrl(entryPoint + Constant.URI_SPLITER + uriPath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import static com.webank.wedpr.components.scheduler.client.common.ClientCommon.*;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.webank.wedpr.common.utils.Constant;
import com.webank.wedpr.common.utils.ObjectMapperFactory;
import com.webank.wedpr.common.utils.WeDPRException;
import com.webank.wedpr.components.http.client.JsonRpcClient;
import com.webank.wedpr.components.http.client.model.JsonRpcResponse;
import com.webank.wedpr.components.scheduler.dag.utils.WorkerUtils;
import com.webank.wedpr.components.scheduler.executor.impl.psi.PSIExecutor;
import com.webank.wedpr.components.scheduler.executor.impl.psi.PSIExecutorConfig;
import com.webank.wedpr.components.scheduler.executor.impl.psi.model.PSIRequest;
import org.slf4j.Logger;
Expand All @@ -18,6 +18,28 @@ public class PsiClient {

private static final Logger logger = LoggerFactory.getLogger(PsiClient.class);

public static class QueryTaskParam {
private String taskID;

public QueryTaskParam() {}

public QueryTaskParam(String taskID) {
this.taskID = taskID;
}

public String getTaskID() {
return taskID;
}

public void setTaskID(String taskID) {
this.taskID = taskID;
}

public String serialize() throws JsonProcessingException {
return ObjectMapperFactory.getObjectMapper().writeValueAsString(this);
}
}

private static final String RUN_FINISHED_STATUS = "COMPLETED";

private final int pollIntervalMilli = DEFAULT_HTTP_POLL_TASK_INTERVAL_MILLI;
Expand All @@ -34,6 +56,10 @@ public PsiClient(String url) {
PSIExecutorConfig.buildConfig());
}

public JsonRpcClient getJsonRpcClient() {
return this.jsonRpcClient;
}

public String submitTask(String params) throws Exception {

logger.info("begin submit job to PSI, jobRequest: {}", params);
Expand Down Expand Up @@ -77,7 +103,7 @@ public void pollTask(String taskId) throws WeDPRException {
taskId,
PSIExecutorConfig.getPsiGetTaskStatusMethod(),
PSIExecutorConfig.getPsiToken(),
new PSIExecutor.QueryTaskParam(taskId));
new QueryTaskParam(taskId));

// response error
if (!response.statusOk()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,24 +132,6 @@ protected void registerExecutors(
new ExecutiveContextBuilder(projectMapperWrapper),
threadPoolService);
executorManager.registerExecutor(ExecutorType.DAG.getType(), dagSchedulerExecutor);
/*
// register the executor
PSIExecutor psiExecutor = new PSIExecutor(storage, fileMetaBuilder, jobChecker);
executorManager.registerExecutor(JobType.PSI.getType(), psiExecutor);

logger.info("register PSIExecutor success");
MLPSIExecutor mlpsiExecutor = new MLPSIExecutor(storage, fileMetaBuilder);
executorManager.registerExecutor(JobType.ML_PSI.getType(), mlpsiExecutor);

logger.info("register ML-PSIExecutor success");
MLExecutor mlExecutor = new MLExecutor();
executorManager.registerExecutor(JobType.MLPreprocessing.getType(), mlExecutor);
executorManager.registerExecutor(JobType.FeatureEngineer.getType(), mlExecutor);
executorManager.registerExecutor(JobType.XGB_TRAIN.getType(), mlExecutor);
executorManager.registerExecutor(JobType.XGB_PREDICT.getType(), mlExecutor);
logger.info("register MLExecutor success");
*/

// register the pir executor, TODO: implement the taskFinishHandler
executorManager.registerExecutor(
ExecutorType.PIR.getType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.webank.wedpr.components.scheduler.client.ModelClient;
import com.webank.wedpr.components.scheduler.dag.entity.JobWorker;
import com.webank.wedpr.components.scheduler.dag.utils.ServiceName;
import com.webank.wedpr.components.scheduler.executor.impl.ml.MLExecutorConfig;
import com.webank.wedpr.components.scheduler.mapper.JobWorkerMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -41,9 +40,7 @@ public void engineRun() throws Exception {
}

long startTimeMillis = System.currentTimeMillis();

String modelUrl = MLExecutorConfig.getUrl();
String url = entryPoint.getUrl(modelUrl);
String url = entryPoint.getUrl(null);

if (logger.isDebugEnabled()) {
logger.debug("model url: {}, jobId: {}", url, jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.webank.wedpr.components.scheduler.client.PsiClient;
import com.webank.wedpr.components.scheduler.dag.entity.JobWorker;
import com.webank.wedpr.components.scheduler.dag.utils.ServiceName;
import com.webank.wedpr.components.scheduler.executor.impl.psi.PSIExecutorConfig;
import com.webank.wedpr.components.scheduler.mapper.JobWorkerMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -46,9 +45,7 @@ public void engineRun() throws Exception {
jobId,
workerId,
workerArgs);

String psiUrl = PSIExecutorConfig.getPsiUrl();
String url = entryPoint.getUrl(psiUrl);
String url = entryPoint.getUrl(null);

if (logger.isDebugEnabled()) {
logger.debug("psi url: {}, jobId: {}", url, jobId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2017-2025 [webank-wedpr]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/

package com.webank.wedpr.components.scheduler.executor.hook;

import com.webank.wedpr.components.project.dao.JobDO;

public interface ExecutorHook {
// prepare for the job
Object prepare(JobDO jobDO) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2017-2025 [webank-wedpr]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/

package com.webank.wedpr.components.scheduler.executor.hook;

import com.webank.wedpr.components.project.dao.JobDO;
import com.webank.wedpr.components.scheduler.executor.impl.ml.request.ModelJobRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MLExecutorHook implements ExecutorHook {
private static final Logger logger = LoggerFactory.getLogger(MLExecutorHook.class);

public MLExecutorHook() {}

@Override
public Object prepare(JobDO jobDO) throws Exception {
ModelJobRequest modelJobRequest = (ModelJobRequest) jobDO.getJobRequest();
modelJobRequest.setTaskID(jobDO.getTaskID());
return jobDO.getJobRequest();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
*
*/

package com.webank.wedpr.components.scheduler.executor.impl.psi;
package com.webank.wedpr.components.scheduler.executor.hook;

import com.webank.wedpr.components.project.dao.JobDO;
import com.webank.wedpr.components.scheduler.executor.impl.ml.model.ModelJobParam;
import com.webank.wedpr.components.scheduler.executor.impl.model.FileMetaBuilder;
import com.webank.wedpr.components.scheduler.executor.impl.psi.model.PSIJobParam;
import com.webank.wedpr.components.storage.api.FileStorageInterface;

public class MLPSIExecutor extends PSIExecutor {
public MLPSIExecutor(FileStorageInterface storage, FileMetaBuilder fileMetaBuilder) {
public class MLPSIExecutorHook extends PSIExecutorHook {
public MLPSIExecutorHook(FileStorageInterface storage, FileMetaBuilder fileMetaBuilder) {
// no need to check here since MLJobParam has been checked in JobOrchestrate
super(storage, fileMetaBuilder, null);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.webank.wedpr.components.scheduler.executor.hook;

import com.webank.wedpr.components.project.dao.JobDO;
import com.webank.wedpr.components.scheduler.executor.impl.mpc.MPCJobParam;
import com.webank.wedpr.components.scheduler.executor.impl.mpc.request.MPCJobRequest;

public class MPCExecutorHook implements ExecutorHook {
@Override
public Object prepare(JobDO jobDO) throws Exception {
// get the jobParam
MPCJobParam jobParam = (MPCJobParam) jobDO.getJobParam();

MPCJobRequest mpcJobRequest = (MPCJobRequest) jobParam.toMPCJobRequest();

mpcJobRequest.setTaskID(jobDO.getTaskID());
return mpcJobRequest;
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.webank.wedpr.components.scheduler.executor.impl.psi;
package com.webank.wedpr.components.scheduler.executor.hook;

import com.webank.wedpr.components.project.dao.JobDO;
import com.webank.wedpr.components.scheduler.executor.impl.model.FileMetaBuilder;
import com.webank.wedpr.components.scheduler.executor.impl.mpc.MPCJobParam;
import com.webank.wedpr.components.scheduler.executor.impl.psi.model.PSIJobParam;
import com.webank.wedpr.components.storage.api.FileStorageInterface;

public class MPCPSIExecutor extends PSIExecutor {
public MPCPSIExecutor(FileStorageInterface storage, FileMetaBuilder fileMetaBuilder) {
public class MPCPSIExecutorHook extends PSIExecutorHook {
public MPCPSIExecutorHook(FileStorageInterface storage, FileMetaBuilder fileMetaBuilder) {
super(storage, fileMetaBuilder, null);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2017-2025 [webank-wedpr]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/

package com.webank.wedpr.components.scheduler.executor.hook;

import com.webank.wedpr.components.project.JobChecker;
import com.webank.wedpr.components.project.dao.JobDO;
import com.webank.wedpr.components.scheduler.executor.impl.model.FileMetaBuilder;
import com.webank.wedpr.components.scheduler.executor.impl.psi.model.PSIJobParam;
import com.webank.wedpr.components.storage.api.FileStorageInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PSIExecutorHook implements ExecutorHook {
private static final Logger logger = LoggerFactory.getLogger(PSIExecutorHook.class);

protected final FileStorageInterface storage;
protected final FileMetaBuilder fileMetaBuilder;
protected final JobChecker jobChecker;

public PSIExecutorHook(
FileStorageInterface storage, FileMetaBuilder fileMetaBuilder, JobChecker jobChecker) {
this.storage = storage;
this.fileMetaBuilder = fileMetaBuilder;
this.jobChecker = jobChecker;
}

@Override
public Object prepare(JobDO jobDO) throws Exception {
// deserialize the jobParam
PSIJobParam psiJobParam = (PSIJobParam) this.jobChecker.checkAndParseParam(jobDO);
psiJobParam.setTaskID(jobDO.getTaskID());
preparePSIJob(jobDO, psiJobParam);
return jobDO.getJobRequest();
}

protected void preparePSIJob(JobDO jobDO, PSIJobParam psiJobParam) throws Exception {
// download and prepare the psi file
psiJobParam.prepare(this.fileMetaBuilder, storage);
// convert to PSIRequest
jobDO.setJobRequest(psiJobParam.convert(jobDO.getOwnerAgency()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static String getPSIPrepareFileName() {

public static String getDefaultPSIResultPath(String user, String jobID) {
return WeDPRCommonConfig.getUserJobCachePath(
user, JobType.PIR.getType(), jobID, PSI_RESULT_FILE);
user, JobType.PSI.getType(), jobID, PSI_RESULT_FILE);
}

public static String getPirJobResultPath(String user, String jobID) {
Expand Down
Loading
Loading