Skip to content

Commit

Permalink
Merge pull request #6447 from irwinsun/issue_5109
Browse files Browse the repository at this point in the history
feat: 完善流水线流转Redis异常处理 #5109
  • Loading branch information
mingshewhe authored Mar 29, 2022
2 parents 987e427 + 651d9f7 commit 10dc04c
Showing 1 changed file with 23 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class EngineVMBuildService @Autowired(required = false) constructor(
val buildInfo = pipelineRuntimeService.getBuildInfo(projectId, buildId)
?: throw NotFoundException("Fail to find build: buildId($buildId)")
Preconditions.checkNotNull(buildInfo, NotFoundException("Pipeline build ($buildId) is not exist"))
LOG.info("ENGINE|$buildId|Agent|BUILD_VM_START|j($vmSeqId)|vmName($vmName)")
LOG.info("ENGINE|$buildId|BUILD_VM_START|j($vmSeqId)|vmName($vmName)")
// var表中获取环境变量,并对老版本变量进行兼容
val variables = buildVariableService.getAllVariable(projectId, buildId)

Expand Down Expand Up @@ -253,7 +253,7 @@ class EngineVMBuildService @Autowired(required = false) constructor(
vmId++
}
}
LOG.info("ENGINE|$buildId|Agent|BUILD_VM_START|j($vmSeqId)|$vmName|Not Found VMContainer")
LOG.info("ENGINE|$buildId|BUILD_VM_START|j($vmSeqId)|$vmName|Not Found VMContainer")
throw NotFoundException("Fail to find the vm build container: j($vmSeqId) vmName($vmName)")
}

Expand Down Expand Up @@ -297,7 +297,7 @@ class EngineVMBuildService @Autowired(required = false) constructor(
}
}

LOG.info("ENGINE|$buildId|Agent|SETUP_VM_STATUS|j($vmSeqId)|${startUpVMTask?.taskId}|status=$buildStatus")
LOG.info("ENGINE|$buildId|SETUP_VM_STATUS|j($vmSeqId)|${startUpVMTask?.taskId}|status=$buildStatus")
if (startUpVMTask == null) {
return false
}
Expand Down Expand Up @@ -382,7 +382,7 @@ class EngineVMBuildService @Autowired(required = false) constructor(
containerIdLock.lock()
val buildInfo = pipelineRuntimeService.getBuildInfo(projectId, buildId)
if (buildInfo == null || buildInfo.status.isFinish()) {
LOG.info("ENGINE|$buildId|Agent|CLAIM_TASK_END|j($vmSeqId|$vmName|buildInfo ${buildInfo?.status}")
LOG.info("ENGINE|$buildId|BC_END|$projectId|j($vmSeqId|$vmName|buildInfo ${buildInfo?.status}")
return BuildTask(buildId, vmSeqId, BuildTaskStatus.END)
}
val container = pipelineContainerService.getContainer(
Expand All @@ -392,7 +392,7 @@ class EngineVMBuildService @Autowired(required = false) constructor(
containerId = vmSeqId
)
if (container == null || container.status.isFinish()) {
LOG.info("ENGINE|$buildId|Agent|CLAIM_TASK_END|j($vmSeqId)|container ${container?.status}")
LOG.info("ENGINE|$buildId|BC_END|$projectId|j($vmSeqId)|container ${container?.status}")
return BuildTask(buildId, vmSeqId, BuildTaskStatus.END)
}

Expand All @@ -413,14 +413,14 @@ class EngineVMBuildService @Autowired(required = false) constructor(
}

private fun claim(task: PipelineBuildTask, buildId: String, userId: String, vmSeqId: String): BuildTask {
LOG.info("ENGINE|$buildId|Agent|CLAIM_TASK_ING|j($vmSeqId)|[${task.taskId}-${task.taskName}]")
LOG.info("ENGINE|$buildId|BC_ING|${task.projectId}|j($vmSeqId)|[${task.taskId}-${task.taskName}]")
return when {
task.status == BuildStatus.QUEUE -> { // 初始化状态,表明任务还未准备好
LOG.info("ENGINE|$buildId|taskId=${task.taskId}|name=${task.taskName}|NOT_READY, WAIT!")
LOG.info("ENGINE|$buildId|BC_QUEUE|${task.projectId}|j($vmSeqId)|${task.taskId}|${task.taskName}")
BuildTask(buildId, vmSeqId, BuildTaskStatus.WAIT)
}
task.taskAtom.isNotBlank() -> { // 排除非构建机的插件任务 继续等待直到它完成
LOG.info("ENGINE|$buildId|taskId=${task.taskId}|taskAtom=${task.taskAtom}|NOT VM TASK, SKIP!")
LOG.info("ENGINE|$buildId|BC_NOT_VM|${task.projectId}|j($vmSeqId)|${task.taskId}|${task.taskName}")
BuildTask(buildId, vmSeqId, BuildTaskStatus.WAIT)
}
task.taskId == VMUtils.genEndPointTaskId(task.taskSeq) -> { // 全部完成了
Expand All @@ -430,7 +430,7 @@ class EngineVMBuildService @Autowired(required = false) constructor(
pipelineTaskService.isNeedPause(taskId = task.taskId, buildId = task.buildId, taskRecord = task) -> {
// 如果插件配置了前置暂停, 暂停期间关闭当前构建机,节约资源。
pipelineTaskService.executePause(taskId = task.taskId, buildId = task.buildId, taskRecord = task)
LOG.info("ENGINE|$buildId|taskId=${task.taskId}|taskAtom=${task.taskAtom} cfg pause, shutdown agent")
LOG.info("ENGINE|$buildId|BC_PAUSE|${task.projectId}|j($vmSeqId)|${task.taskId}|${task.taskAtom}")
pipelineEventDispatcher.dispatch(
PipelineBuildStatusBroadCastEvent(
source = "TaskPause-${task.containerId}-${task.buildId}",
Expand All @@ -446,7 +446,7 @@ class EngineVMBuildService @Autowired(required = false) constructor(
BuildTask(buildId, vmSeqId, BuildTaskStatus.END)
} // #5109 如果因平台异常原因导致上一个任务还处于运行中,结束处理不及时 ,需要等待
task.status.isRunning() && task.taskId == redisOperation.get(completeTaskKey(buildId, vmSeqId)) -> {
LOG.info("ENGINE|$buildId|taskId=${task.taskId}|waiting complete task, status=${task.status}")
LOG.info("ENGINE|$buildId|BC_RUNNING|${task.projectId}|j($vmSeqId)|${task.taskId}|${task.status}")
buildLogPrinter.addYellowLine(
buildId = buildId,
message = "正在处理当前上报的任务, 请稍等。。。(Waiting please)",
Expand Down Expand Up @@ -527,28 +527,25 @@ class EngineVMBuildService @Autowired(required = false) constructor(
val tCompleteTaskKey = completeTaskKey(buildId, vmSeqId)
// #5109 提前做重复请求检测, 当弱网络或平台故障降级处理之前的请求较慢时,key仍然存在,需要拒绝客户端的重试产生的请求
if (redisOperation.get(key = tCompleteTaskKey) == result.taskId) {
LOG.warn("BKSystemErrorMonitor|ENGINE|DUPLICATE_T_COMPLETE|$buildId|job#$vmSeqId|${result.taskId}")
LOG.warn("ENGINE|$buildId|BCT_DUPLICATE|$projectId|job#$vmSeqId|${result.taskId}")
return
}
// #5109 不需要Job级别的Redis锁保护的数据, 仅查询用
val buildTask = pipelineTaskService.getBuildTask(projectId, buildId, result.taskId)
val taskStatus = buildTask?.status
if (taskStatus == null) {
// 当上报的任务不存在,则直接返回
LOG.warn("BKSystemErrorMonitor|ENGINE|$buildId|$vmName|${result.taskId}|invalid")
LOG.warn("ENGINE|$buildId|BCT_INVALID_TASK|$projectId|$vmName|${result.taskId}|")
return
}

val buildInfo = pipelineRuntimeService.getBuildInfo(projectId, buildId) ?: run {
LOG.warn("BKSystemErrorMonitor|ENGINE|$buildId|$vmName|buildInfo is null")
LOG.warn("ENGINE|BCT_NULL_BUILD|$buildId|$vmName|buildInfo is null")
return // 数据为空是平台异常,正常情况不应该出现
}
// #5109 提前判断,防止异常数据流入,后续各类Redis锁定出现无必要的额外开启。
if (taskStatus.isFinish() || buildInfo.isFinish()) {
LOG.warn(
"BKSystemErrorMonitor|ENGINE|finish|$buildId|job#$vmSeqId|${result.taskId}" +
"task=$taskStatus|build=${buildInfo.status}"
)
LOG.warn("ENGINE|BCT_END|$buildId|$projectId|j($vmSeqId)|${result.taskId}|$taskStatus|${buildInfo.status}")
return
}

Expand Down Expand Up @@ -579,23 +576,18 @@ class EngineVMBuildService @Autowired(required = false) constructor(
buildInfo: BuildInfo,
vmSeqId: String
) {
val buildTask = pipelineTaskService.getBuildTask(projectId, buildId, result.taskId)
val taskStatus = buildTask?.status
if (taskStatus == null || buildTask.status.isFinish()) {
LOG.warn("BKSystemErrorMonitor|ENGINE|$buildId|${result.taskId}|invalid")
return
}
// 只要buildResult不为空,都写入到环境变量里面
if (result.buildResult.isNotEmpty()) {
LOG.info("ENGINE|$buildId| Add the build result size(${result.buildResult.size}) to var")
LOG.info("ENGINE|$buildId|BCT_ADD_VAR|$projectId| vars=${result.buildResult.size}")
try {
buildVariableService.batchUpdateVariable(
projectId = projectId,
pipelineId = buildInfo.pipelineId, buildId = buildId, variables = result.buildResult
)
LOG.info("ENGINE|$buildId|BCT_ADD_VAR_DONE|$projectId")
writeRemark(result.buildResult, projectId, buildInfo.pipelineId, buildId, buildInfo.startUser)
} catch (ignored: Exception) { // 防止因为变量字符过长而失败。
LOG.warn("ENGINE|$buildId| save var fail: ${ignored.message}", ignored)
LOG.warn("ENGINE|$buildId|BCT_ADD_VAR_FAIL|$projectId| save var fail: ${ignored.message}", ignored)
}
}
val errorType = ErrorType.getErrorType(result.errorType)
Expand Down Expand Up @@ -662,7 +654,7 @@ class EngineVMBuildService @Autowired(required = false) constructor(
sendElementData(projectId = projectId, buildId = buildId, result = result)

LOG.info(
"ENGINE|$buildId|Agent|END_TASK|j($vmSeqId)|${result.taskId}|$buildStatus|" +
"ENGINE|$buildId|BCT_DONE|$projectId|j($vmSeqId)|${result.taskId}|$buildStatus|" +
"type=$errorType|code=${result.errorCode}|msg=${result.message}]"
)
buildLogPrinter.stopLog(buildId = buildId, tag = result.elementId, jobId = result.containerId ?: "")
Expand Down Expand Up @@ -712,7 +704,7 @@ class EngineVMBuildService @Autowired(required = false) constructor(
.firstOrNull { it.taskId == VMUtils.genEndPointTaskId(it.taskSeq) }

return if (task == null || task.status.isFinish()) {
LOG.warn("ENGINE|$buildId|Agent|$vmName|END_JOB|j($vmSeqId)|Task[${task?.taskName}] ${task?.status}")
LOG.warn("ENGINE|$buildId|BE_END|$projectId|$vmName|j($vmSeqId)|[${task?.taskName}] ${task?.status}")
false
} else {
pipelineRuntimeService.completeClaimBuildTask(
Expand All @@ -725,7 +717,7 @@ class EngineVMBuildService @Autowired(required = false) constructor(
),
endBuild = true
)
LOG.info("ENGINE|$buildId|Agent|END_JOB|${task.stageId}|j($vmSeqId)|${task.taskId}|${task.taskName}")
LOG.info("ENGINE|$buildId|BE_DONE|${task.stageId}|j($vmSeqId)|${task.taskId}|${task.taskName}")
buildExtService.endBuild(task)
true
}
Expand All @@ -736,7 +728,7 @@ class EngineVMBuildService @Autowired(required = false) constructor(
}

fun heartbeat(buildId: String, vmSeqId: String, vmName: String): Boolean {
LOG.info("ENGINE|$buildId|Agent|HEART_BEAT|j($vmSeqId)|$vmName")
LOG.info("ENGINE|$buildId|HEART_BEAT|j($vmSeqId)|$vmName")
buildingHeartBeatUtils.addHeartBeat(buildId = buildId, vmSeqId = vmSeqId, time = System.currentTimeMillis())
return true
}
Expand All @@ -748,7 +740,7 @@ class EngineVMBuildService @Autowired(required = false) constructor(
vmName: String,
executeCount: Int? = null
): HeartBeatInfo {
LOG.info("ENGINE|$projectId|$buildId|Agent|HEART_BEAT|j($vmSeqId)|$vmName|$executeCount")
LOG.info("ENGINE|$projectId|$buildId|HEART_BEAT|j($vmSeqId)|$vmName|$executeCount")
buildingHeartBeatUtils.addHeartBeat(buildId = buildId, vmSeqId = vmSeqId, time = System.currentTimeMillis())
var cancelTaskIds: MutableSet<String>? = null
val key = TaskUtils.getCancelTaskIdRedisKey(buildId, vmSeqId)
Expand Down Expand Up @@ -792,7 +784,7 @@ class EngineVMBuildService @Autowired(required = false) constructor(
monitorDataMap = result.monitorData ?: emptyMap()
)
} catch (ignored: Throwable) {
LOG.warn("ENGINE|$buildId|Agent|MEASURE|j(${result.containerId})|${result.taskId}|error=$ignored")
LOG.warn("ENGINE|$buildId|MEASURE|j(${result.containerId})|${result.taskId}|error=$ignored")
}
}

Expand Down

0 comments on commit 10dc04c

Please sign in to comment.