Skip to content

Commit

Permalink
Merge pull request #516 from JackChen0810/dev-1.0.1
Browse files Browse the repository at this point in the history
Fix the problem of killing workflow.
  • Loading branch information
liuyou2 authored Feb 18, 2022
2 parents 7870327 + 5540a8d commit 6b0616f
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.webank.wedatasphere.dss.flow.execution.entrance.restful;

import com.fasterxml.jackson.databind.JsonNode;
import com.webank.wedatasphere.dss.common.entity.DSSWorkspace;
import com.webank.wedatasphere.dss.common.utils.DSSCommonUtils;
import com.webank.wedatasphere.dss.standard.sso.utils.SSOHelper;
Expand All @@ -29,15 +30,19 @@
import org.apache.linkis.protocol.constants.TaskConstant;
import org.apache.linkis.protocol.utils.ZuulEntranceUtils;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.scheduler.listener.LogListener;
import org.apache.linkis.scheduler.queue.Job;
import org.apache.linkis.scheduler.queue.SchedulerEventState;
import org.apache.linkis.server.Message;
import org.apache.linkis.server.security.SecurityFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
import scala.Function0;
import scala.Option;

import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
import java.util.Map;


Expand Down Expand Up @@ -126,6 +131,54 @@ public Message status(@PathVariable("id") String id, @RequestParam(required = fa
return message;
}

@Override
@RequestMapping(path = {"/{id}/kill"},method = {RequestMethod.GET})
public Message kill(@PathVariable("id") String id, @RequestParam(value = "taskID",required = false) Long taskID) {
String realId = ZuulEntranceUtils.parseExecID(id)[3];
Option job = Option.apply((Object)null);
try {
job = this.entranceServer.getJob(realId);
} catch (Exception var10) {
logger.warn("can not find a job in entranceServer, will force to kill it", var10);
JobHistoryHelper.forceKill(taskID);
Message message = Message.ok("Forced Kill task (强制杀死任务)");
message.setMethod("/api/entrance/" + id + "/kill");
message.setStatus(0);
return message;
}
Message message = null;
if (job.isEmpty()) {
logger.warn("can not find a job in entranceServer, will force to kill it");
JobHistoryHelper.forceKill(taskID);
message = Message.ok("Forced Kill task (强制杀死任务)");
message.setMethod("/api/entrance/" + id + "/kill");
message.setStatus(0);
return message;
} else {
try {
logger.info("begin to kill job {} ", ((Job)job.get()).getId());
((Job)job.get()).kill();
message = Message.ok("Successfully killed the job(成功kill了job)");
message.setMethod("/api/entrance/" + id + "/kill");
message.setStatus(0);
message.data("execID", id);
if (job.get() instanceof EntranceJob) {
EntranceJob entranceJob = (EntranceJob)job.get();
JobRequest jobReq = entranceJob.getJobRequest();
entranceJob.updateJobRequestStatus(SchedulerEventState.Cancelled().toString());
this.entranceServer.getEntranceContext().getOrCreatePersistenceManager().createPersistenceEngine().updateIfNeeded(jobReq);
}
logger.info("end to kill job {} ", ((Job)job.get()).getId());
} catch (Throwable var9) {
logger.error("kill job {} failed ", ((Job)job.get()).getId(), var9);
message = Message.error("An exception occurred while killing the job, kill failed(kill job的时候出现了异常,kill失败)");
message.setMethod("/api/entrance/" + id + "/kill");
message.setStatus(1);
}
return message;
}
}

private void pushLog(String log, Job job) {
entranceServer.getEntranceContext().getOrCreateLogManager().onLogUpdate(job, log);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ class FlowEntranceJob(persistManager:PersistenceManager) extends EntranceExecuti
if(! SchedulerEventState.isCompleted(this.getState)){
super.kill()
Utils.tryAndWarn(this.killNodes)
transitionCompleted(ErrorExecuteResponse(s"execute job(${getId}) failed!", new FlowExecutionErrorException(90101, s"This Flow killed by user") ))
}
Utils.tryAndWarn(transitionCompleted(ErrorExecuteResponse(s"execute job(${getId}) failed!", new FlowExecutionErrorException(90101, s"This Flow killed by user"))))
}
}

override def cancel(): Unit = if (! SchedulerEventState.isCompleted(this.getState)) this synchronized {
if(! SchedulerEventState.isCompleted(this.getState)){
Utils.tryAndWarn(this.killNodes)
super.cancel()
transitionCompleted(ErrorExecuteResponse(s"cancel job(${getId}) execution!", new FlowExecutionErrorException(90101, s"This Flow killed by user") ))
Utils.tryAndWarn(transitionCompleted(ErrorExecuteResponse(s"cancel job(${getId}) execution!", new FlowExecutionErrorException(90101, s"This Flow killed by user"))))
}
}

Expand Down

0 comments on commit 6b0616f

Please sign in to comment.