Skip to content

Commit

Permalink
bugfix: [ipv6]第三方文件源分发文件,会产生重复的执行结果 TencentBlueKing#1563
Browse files Browse the repository at this point in the history
1. 第三方源文件准备过程日志保存前解析主机信息补充hostId;
2. file-gateway解除对api-logsvr的依赖。
  • Loading branch information
jsonwan committed Dec 10, 2022
1 parent 5d0998b commit b4b7046
Show file tree
Hide file tree
Showing 19 changed files with 286 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.manage.model.credential;
package com.tencent.bk.job.common.model.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.tencent.bk.job.common.model.dto.HostDTO;
import com.tencent.bk.job.common.util.json.JsonUtils;
import com.tencent.bk.job.execute.client.FileSourceTaskResourceClient;
import com.tencent.bk.job.execute.common.constants.FileDistModeEnum;
import com.tencent.bk.job.execute.common.constants.RunStatusEnum;
import com.tencent.bk.job.execute.dao.FileSourceTaskLogDAO;
import com.tencent.bk.job.execute.engine.listener.event.EventSource;
Expand All @@ -50,10 +51,14 @@
import com.tencent.bk.job.file_gateway.consts.TaskStatusEnum;
import com.tencent.bk.job.file_gateway.model.req.inner.StopBatchTaskReq;
import com.tencent.bk.job.file_gateway.model.resp.inner.BatchTaskStatusDTO;
import com.tencent.bk.job.file_gateway.model.resp.inner.FileLogPieceDTO;
import com.tencent.bk.job.file_gateway.model.resp.inner.FileSourceTaskStatusDTO;
import com.tencent.bk.job.file_gateway.model.resp.inner.ThirdFileSourceTaskLogDTO;
import com.tencent.bk.job.logsvr.model.service.ServiceFileTaskLogDTO;
import com.tencent.bk.job.logsvr.model.service.ServiceHostLogDTO;
import com.tencent.bk.job.manage.model.inner.ServiceHostDTO;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.helpers.FormattingTuple;
Expand All @@ -66,6 +71,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/**
* 第三方文件源文件下载进度拉取任务调度
Expand All @@ -90,10 +96,10 @@ public class ThirdFilePrepareTask implements ContinuousScheduledTask, JobTaskCon
private LogService logService;
private TaskExecuteMQEventDispatcher taskExecuteMQEventDispatcher;
private FileSourceTaskLogDAO fileSourceTaskLogDAO;
private ThirdFilePrepareTaskResultHandler resultHandler;
private final ThirdFilePrepareTaskResultHandler resultHandler;
private int pullTimes = 0;
private long logStart = 0L;
private long logLength = 100L;

/**
* 任务是否已停止
*/
Expand Down Expand Up @@ -182,6 +188,7 @@ public BatchTaskStatusDTO getFileSourceBatchTaskResults(StepInstanceDTO stepInst
BatchTaskStatusDTO batchTaskStatusDTO = null;
boolean allLogDone = true;
try {
long logLength = 100L;
InternalResponse<BatchTaskStatusDTO> resp = fileSourceTaskResource.getBatchTaskStatusAndLogs(
batchTaskId,
logStart,
Expand All @@ -201,7 +208,7 @@ public BatchTaskStatusDTO getFileSourceBatchTaskResults(StepInstanceDTO stepInst
int maxLogSize = 0;
// 写日志
for (FileSourceTaskStatusDTO fileSourceTaskStatusDTO : fileSourceTaskStatusInfoList) {
List<ServiceHostLogDTO> logList = fileSourceTaskStatusDTO.getLogList();
List<ThirdFileSourceTaskLogDTO> logList = fileSourceTaskStatusDTO.getLogList();
if (logList != null && !logList.isEmpty()) {
writeLogs(stepInstance, logList);
if (logList.size() > maxLogSize) {
Expand Down Expand Up @@ -385,12 +392,84 @@ private void onSuccess(List<FileSourceTaskStatusDTO> fileSourceTaskStatusList) {
resultHandler.onSuccess(this);
}

private void writeLogs(StepInstanceDTO stepInstance, List<ServiceHostLogDTO> logDTOList) {
for (ServiceHostLogDTO serviceLogDTO : logDTOList) {
logService.writeFileLogWithTimestamp(stepInstance.getCreateTime(), stepInstance.getId(),
stepInstance.getExecuteCount(), stepInstance.getBatch(),
HostDTO.fromHostId(serviceLogDTO.getHostId()), serviceLogDTO, System.currentTimeMillis());
private void writeLogs(StepInstanceDTO stepInstance, List<ThirdFileSourceTaskLogDTO> logDTOList) {
for (ThirdFileSourceTaskLogDTO logDTO : logDTOList) {
HostDTO host = buildHost(logDTO);
logService.writeFileLogWithTimestamp(
stepInstance.getCreateTime(),
stepInstance.getId(),
stepInstance.getExecuteCount(),
stepInstance.getBatch(),
host,
buildServiceHostLogDTO(host, logDTO),
System.currentTimeMillis()
);
}
}

/**
* 根据第三方文件源任务日志中的file-worker所在IP获取对应的主机信息
*
* @param thirdFileSourceTaskLog 第三方文件源任务日志
* @return 主机对象
*/
private HostDTO buildHost(ThirdFileSourceTaskLogDTO thirdFileSourceTaskLog) {
String cloudIp = thirdFileSourceTaskLog.getIp();
HostDTO host = HostDTO.fromCloudIp(cloudIp);
ServiceHostDTO serviceHost = hostService.getHost(host);
fillHostInfo(host, serviceHost);
return host;
}

/**
* 将第三方文件源文件的下载任务日志转为统一的日志格式进行存储
*
* @param host 机器信息
* @param thirdFileSourceTaskLog 第三方源文件下载任务日志
* @return 统一格式日志实体
*/
private ServiceHostLogDTO buildServiceHostLogDTO(HostDTO host, ThirdFileSourceTaskLogDTO thirdFileSourceTaskLog) {
if (thirdFileSourceTaskLog == null) {
return null;
}
ServiceHostLogDTO serviceHostLog = new ServiceHostLogDTO();
serviceHostLog.setStepInstanceId(stepInstance.getId());
serviceHostLog.setExecuteCount(stepInstance.getExecuteCount());
serviceHostLog.setBatch(stepInstance.getBatch());
serviceHostLog.setHostId(host.getHostId());
serviceHostLog.setIp(host.getIp());
serviceHostLog.setFileTaskLogs(buildFileTaskLogs(host, thirdFileSourceTaskLog.getFileTaskLogs()));
return serviceHostLog;
}

/**
* 将第三方文件源任务日志转为Job系统统一格式日志用于存储与展示
*
* @param host 日志关联的主机信息
* @param fileLogPieces 文件下载日志列表
* @return Job日志列表
*/
private List<ServiceFileTaskLogDTO> buildFileTaskLogs(HostDTO host, List<FileLogPieceDTO> fileLogPieces) {
if (CollectionUtils.isEmpty(fileLogPieces)) {
return Collections.emptyList();
}
return fileLogPieces.stream().map(fileLogPiece -> {
ServiceFileTaskLogDTO serviceFileTaskLog = new ServiceFileTaskLogDTO();
// 第三方源文件下载后用于分发上传,仅有源文件相关属性
serviceFileTaskLog.setMode(FileDistModeEnum.UPLOAD.getValue());
serviceFileTaskLog.setSrcIp(fileLogPiece.getSrcIp());
serviceFileTaskLog.setSrcHostId(host.getHostId());
serviceFileTaskLog.setDisplaySrcIp(fileLogPiece.getDisplaySrcIp());
serviceFileTaskLog.setSrcFile(fileLogPiece.getSrcFile());
serviceFileTaskLog.setDisplaySrcFile(fileLogPiece.getDisplaySrcFile());
serviceFileTaskLog.setSize(fileLogPiece.getSize());
serviceFileTaskLog.setStatus(fileLogPiece.getStatus());
serviceFileTaskLog.setStatusDesc(fileLogPiece.getStatusDesc());
serviceFileTaskLog.setSpeed(fileLogPiece.getSpeed());
serviceFileTaskLog.setProcess(fileLogPiece.getProcess());
serviceFileTaskLog.setContent(fileLogPiece.getContent());
return serviceFileTaskLog;
}).collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ dependencies {
api project(':commons:common')
api project(':commons:common-i18n')
api project(':commons:common-iam')
api project(":job-logsvr:api-job-logsvr")
api project(":job-file-gateway:api-job-file-gateway-worker")
api(project(":commons:common-api"))
api project(':commons:esb-sdk')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-JOB蓝鲸智云作业平台 is licensed under the MIT License.
*
* License for BK-JOB蓝鲸智云作业平台:
* --------------------------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.file_gateway.model.resp.inner;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;

/**
* 文件下载日志
*/
@ApiModel("单个文件的一条下载日志")
@Data
@NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public class FileLogPieceDTO {

/**
* 下载文件的file-worker的云区域ID:IP
*/
@JsonProperty("srcIp")
private String srcIp;

/**
* 下载文件的file-worker的云区域ID:IP - 显示
*/
@JsonProperty("displaySrcIp")
private String displaySrcIp;

/**
* 源文件路径 - 真实路径
*/
@JsonProperty("srcFile")
private String srcFile;

/**
* 源文件路径 - 用于显示
*/
@JsonProperty("displaySrcFile")
private String displaySrcFile;

/**
* 文件大小
*/
@JsonProperty("size")
private String size;

/**
* 文件任务状态
*/
@JsonProperty("status")
private Integer status;

/**
* 文件任务状态描述
*/
@JsonProperty("statusDesc")
private String statusDesc;

/**
* 速度
*/
@JsonProperty("speed")
private String speed;

/**
* 进度
*/
@JsonProperty("process")
private String process;

/**
* 日志内容
*/
@JsonProperty("content")
private String content;

public FileLogPieceDTO(String srcIp,
String displaySrcIp,
String srcFile,
String displaySrcFile,
String size,
Integer status,
String statusDesc,
String speed,
String process,
String content) {
this.srcIp = srcIp;
this.displaySrcIp = displaySrcIp;
this.srcFile = srcFile;
this.displaySrcFile = displaySrcFile;
this.size = size;
this.status = status;
this.statusDesc = statusDesc;
this.speed = speed;
this.process = process;
this.content = content;
}

public String getDisplaySrcIp() {
if (StringUtils.isNotEmpty(this.displaySrcIp)) {
return this.displaySrcIp;
} else {
return this.srcIp;
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.tencent.bk.job.common.util.json.JsonUtils;
import com.tencent.bk.job.file_gateway.consts.TaskStatusEnum;
import com.tencent.bk.job.logsvr.model.service.ServiceHostLogDTO;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
Expand All @@ -50,7 +49,7 @@ public class FileSourceTaskStatusDTO {
* 文件源文件路径->文件在机器上的真实路径
*/
Map<String, String> filePathMap;
List<ServiceHostLogDTO> logList;
List<ThirdFileSourceTaskLogDTO> logList;
Boolean logEnd;

public boolean isDone() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,41 @@
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.file_gateway.client;
package com.tencent.bk.job.file_gateway.model.resp.inner;

import com.tencent.bk.job.logsvr.api.ServiceLogResource;
import org.springframework.cloud.openfeign.FeignClient;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

/**
* 日志服务远程调用客户端
*/
@FeignClient(value = "job-logsvr", contextId = "log")
public interface LogServiceResourceClient extends ServiceLogResource {
import java.util.ArrayList;
import java.util.List;

@ApiModel("第三方文件源文件下载任务日志")
@Data
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public class ThirdFileSourceTaskLogDTO {
/**
* 文件源下载任务ID
*/
@ApiModelProperty("文件源下载任务ID")
private String taskId;

/**
* 任务执行所在的file-worker的cloudIp
*/
@ApiModelProperty(value = "任务执行所在的file-worker的云区域ID:IP")
private String ip;

/**
* 文件下载日志
*/
private List<FileLogPieceDTO> fileTaskLogs;

public void addFileTaskLog(FileLogPieceDTO fileTaskDetailLog) {
if (fileTaskLogs == null) {
fileTaskLogs = new ArrayList<>();
}
fileTaskLogs.add(fileTaskDetailLog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ dependencies {
api project(":commons:common-web")
api project(":commons:common-redis")
api project(":job-manage:api-job-manage")
api project(":job-execute:api-job-execute")
api project(":job-file-worker-sdk:api-job-file-worker-sdk")
api project(":job-file-gateway:api-job-file-gateway")
api project(":job-file-gateway:model-job-file-gateway")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

package com.tencent.bk.job.file_gateway.service;

import com.tencent.bk.job.manage.model.credential.CommonCredential;
import com.tencent.bk.job.common.model.dto.CommonCredential;


public interface CredentialService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import com.tencent.bk.job.common.util.json.JsonUtils;
import com.tencent.bk.job.file_gateway.client.ServiceCredentialResourceClient;
import com.tencent.bk.job.file_gateway.service.CredentialService;
import com.tencent.bk.job.manage.model.credential.CommonCredential;
import com.tencent.bk.job.common.model.dto.CommonCredential;
import com.tencent.bk.job.manage.model.inner.resp.ServiceCredentialDTO;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.helpers.FormattingTuple;
Expand Down
Loading

0 comments on commit b4b7046

Please sign in to comment.