From 5540a8db1e7b99d994792bb05c05c6af28c5502d Mon Sep 17 00:00:00 2001 From: JackChen0810 Date: Fri, 18 Feb 2022 14:59:00 +0800 Subject: [PATCH] Fix the problem of killing workflow. --- .../restful/FlowEntranceRestfulApi.java | 53 +++++++++++++++++++ .../entrance/job/FlowEntranceJob.scala | 6 +-- 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/java/com/webank/wedatasphere/dss/flow/execution/entrance/restful/FlowEntranceRestfulApi.java b/dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/java/com/webank/wedatasphere/dss/flow/execution/entrance/restful/FlowEntranceRestfulApi.java index ec6fa4ec27..82a7414ae6 100644 --- a/dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/java/com/webank/wedatasphere/dss/flow/execution/entrance/restful/FlowEntranceRestfulApi.java +++ b/dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/java/com/webank/wedatasphere/dss/flow/execution/entrance/restful/FlowEntranceRestfulApi.java @@ -16,6 +16,7 @@ package com.webank.wedatasphere.dss.flow.execution.entrance.restful; +import com.fasterxml.jackson.databind.JsonNode; import com.webank.wedatasphere.dss.common.entity.DSSWorkspace; import com.webank.wedatasphere.dss.common.utils.DSSCommonUtils; import com.webank.wedatasphere.dss.standard.sso.utils.SSOHelper; @@ -29,15 +30,19 @@ import org.apache.linkis.protocol.constants.TaskConstant; import org.apache.linkis.protocol.utils.ZuulEntranceUtils; import org.apache.linkis.rpc.Sender; +import org.apache.linkis.scheduler.listener.LogListener; import org.apache.linkis.scheduler.queue.Job; +import org.apache.linkis.scheduler.queue.SchedulerEventState; import org.apache.linkis.server.Message; import org.apache.linkis.server.security.SecurityFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.*; +import scala.Function0; import scala.Option; import javax.servlet.http.HttpServletRequest; +import java.util.ArrayList; import java.util.Map; @@ -126,6 +131,54 @@ public Message status(@PathVariable("id") String id, @RequestParam(required = fa return message; } + @Override + @RequestMapping(path = {"/{id}/kill"},method = {RequestMethod.GET}) + public Message kill(@PathVariable("id") String id, @RequestParam(value = "taskID",required = false) Long taskID) { + String realId = ZuulEntranceUtils.parseExecID(id)[3]; + Option job = Option.apply((Object)null); + try { + job = this.entranceServer.getJob(realId); + } catch (Exception var10) { + logger.warn("can not find a job in entranceServer, will force to kill it", var10); + JobHistoryHelper.forceKill(taskID); + Message message = Message.ok("Forced Kill task (强制杀死任务)"); + message.setMethod("/api/entrance/" + id + "/kill"); + message.setStatus(0); + return message; + } + Message message = null; + if (job.isEmpty()) { + logger.warn("can not find a job in entranceServer, will force to kill it"); + JobHistoryHelper.forceKill(taskID); + message = Message.ok("Forced Kill task (强制杀死任务)"); + message.setMethod("/api/entrance/" + id + "/kill"); + message.setStatus(0); + return message; + } else { + try { + logger.info("begin to kill job {} ", ((Job)job.get()).getId()); + ((Job)job.get()).kill(); + message = Message.ok("Successfully killed the job(成功kill了job)"); + message.setMethod("/api/entrance/" + id + "/kill"); + message.setStatus(0); + message.data("execID", id); + if (job.get() instanceof EntranceJob) { + EntranceJob entranceJob = (EntranceJob)job.get(); + JobRequest jobReq = entranceJob.getJobRequest(); + entranceJob.updateJobRequestStatus(SchedulerEventState.Cancelled().toString()); + this.entranceServer.getEntranceContext().getOrCreatePersistenceManager().createPersistenceEngine().updateIfNeeded(jobReq); + } + logger.info("end to kill job {} ", ((Job)job.get()).getId()); + } catch (Throwable var9) { + logger.error("kill job {} failed ", ((Job)job.get()).getId(), var9); + message = Message.error("An exception occurred while killing the job, kill failed(kill job的时候出现了异常,kill失败)"); + message.setMethod("/api/entrance/" + id + "/kill"); + message.setStatus(1); + } + return message; + } + } + private void pushLog(String log, Job job) { entranceServer.getEntranceContext().getOrCreateLogManager().onLogUpdate(job, log); } diff --git a/dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/job/FlowEntranceJob.scala b/dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/job/FlowEntranceJob.scala index 759c9f7509..92f011a1c2 100644 --- a/dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/job/FlowEntranceJob.scala +++ b/dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/job/FlowEntranceJob.scala @@ -137,15 +137,15 @@ class FlowEntranceJob(persistManager:PersistenceManager) extends EntranceExecuti if(! SchedulerEventState.isCompleted(this.getState)){ super.kill() Utils.tryAndWarn(this.killNodes) - transitionCompleted(ErrorExecuteResponse(s"execute job(${getId}) failed!", new FlowExecutionErrorException(90101, s"This Flow killed by user") )) - } + Utils.tryAndWarn(transitionCompleted(ErrorExecuteResponse(s"execute job(${getId}) failed!", new FlowExecutionErrorException(90101, s"This Flow killed by user")))) + } } override def cancel(): Unit = if (! SchedulerEventState.isCompleted(this.getState)) this synchronized { if(! SchedulerEventState.isCompleted(this.getState)){ Utils.tryAndWarn(this.killNodes) super.cancel() - transitionCompleted(ErrorExecuteResponse(s"cancel job(${getId}) execution!", new FlowExecutionErrorException(90101, s"This Flow killed by user") )) + Utils.tryAndWarn(transitionCompleted(ErrorExecuteResponse(s"cancel job(${getId}) execution!", new FlowExecutionErrorException(90101, s"This Flow killed by user")))) } }