Skip to content

Commit

Permalink
Merge pull request #2482 from jsonwan/github_fix/third_file
Browse files Browse the repository at this point in the history
fix: file-worker任务状态更新请求无序到达导致第三方源文件偶现分发失败 #2434
  • Loading branch information
wangyu096 authored Sep 26, 2023
2 parents 904e5fe + 310dc27 commit 9b51d2c
Show file tree
Hide file tree
Showing 45 changed files with 671 additions and 257 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private void logRespStr(String respStr) {
}

private void logAndThrow(ResponseEntity<String> respEntity) {
log.error("Fail to request fileWorker, status={}, msg={}", respEntity.getStatusCode(), respEntity.getBody());
log.error("Fail to request, status={}, msg={}", respEntity.getStatusCode(), respEntity.getBody());
throw new ServiceException(
ErrorType.INTERNAL,
ErrorCode.FAIL_TO_REQUEST_FILE_WORKER_WITH_REASON,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-JOB蓝鲸智云作业平台 is licensed under the MIT License.
*
* License for BK-JOB蓝鲸智云作业平台:
* --------------------------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.file_gateway.model.dto;

import com.tencent.bk.job.file_gateway.consts.TaskStatusEnum;
import com.tencent.bk.job.file_gateway.model.req.inner.UpdateFileSourceTaskReq;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
* 单个文件任务进度信息
*/
@NoArgsConstructor
@Getter
@Setter
public class FileTaskProgressDTO {
/**
* ID
*/
private String fileSourceTaskId;
/**
* 文件路径(含bucketName)
*/
private String filePath;
/**
* 文件下载到机器上的真实路径
*/
private String downloadPath;
/**
* 任务文件状态
*/
private TaskStatusEnum status;
/**
* 文件大小(字节)
*/
private Long fileSize;
/**
* 速度
*/
private String speed;
/**
* 进度
*/
private Integer progress;
/**
* 日志内容
*/
private String content;

public static FileTaskProgressDTO fromUpdateFileSourceTaskReq(UpdateFileSourceTaskReq req) {
if (req == null) {
return null;
}
FileTaskProgressDTO fileTaskProgressDTO = new FileTaskProgressDTO();
fileTaskProgressDTO.setFileSourceTaskId(req.getFileSourceTaskId());
fileTaskProgressDTO.setFilePath(req.getFilePath());
fileTaskProgressDTO.setDownloadPath(req.getDownloadPath());
fileTaskProgressDTO.setStatus(req.getStatus());
fileTaskProgressDTO.setFileSize(req.getFileSize());
fileTaskProgressDTO.setSpeed(req.getSpeed());
fileTaskProgressDTO.setProgress(req.getProgress());
fileTaskProgressDTO.setContent(req.getContent());
return fileTaskProgressDTO;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
package com.tencent.bk.job.file_gateway.api.remote;

import com.tencent.bk.job.common.model.Response;
import com.tencent.bk.job.file_gateway.consts.TaskStatusEnum;
import com.tencent.bk.job.file_gateway.model.dto.FileTaskProgressDTO;
import com.tencent.bk.job.file_gateway.model.dto.FileWorkerDTO;
import com.tencent.bk.job.file_gateway.model.req.inner.HeartBeatReq;
import com.tencent.bk.job.file_gateway.model.req.inner.OffLineAndReDispatchReq;
Expand Down Expand Up @@ -66,16 +66,9 @@ public Response<Long> heartBeat(HeartBeatReq heartBeatReq) {
@Override
public Response<String> updateFileSourceTask(UpdateFileSourceTaskReq updateFileSourceTaskReq) {
log.info("updateFileSourceTaskReq=({})", updateFileSourceTaskReq);
String taskId = updateFileSourceTaskReq.getFileSourceTaskId();
String filePath = updateFileSourceTaskReq.getFilePath();
String downloadPath = updateFileSourceTaskReq.getDownloadPath();
Long fileSize = updateFileSourceTaskReq.getFileSize();
String speed = updateFileSourceTaskReq.getSpeed();
Integer progress = updateFileSourceTaskReq.getProgress();
String content = updateFileSourceTaskReq.getContent();
TaskStatusEnum status = updateFileSourceTaskReq.getStatus();
return Response.buildSuccessResp(fileSourceTaskService.updateFileSourceTask(taskId, filePath,
downloadPath, fileSize, speed, progress, content, status));
FileTaskProgressDTO fileTaskProgressDTO =
FileTaskProgressDTO.fromUpdateFileSourceTaskReq(updateFileSourceTaskReq);
return Response.buildSuccessResp(fileSourceTaskService.updateFileSourceTask(fileTaskProgressDTO));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public interface FileSourceTaskDAO {

FileSourceTaskDTO getFileSourceTaskByIdForUpdate(String id);

Long countFileSourceTasksByBatchTaskIdForUpdate(String batchTaskId, Byte status);
Long countFileSourceTasksByBatchTaskId(String batchTaskId, Byte status);

List<FileSourceTaskDTO> listByBatchTaskId(String batchTaskId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public interface FileTaskDAO {

int deleteFileTaskByFileSourceTaskId(String fileSourceTaskId);

FileTaskDTO getOneFileTaskForUpdate(String fileSourceTaskId, String filePath);
FileTaskDTO getFileTaskByIdForUpdate(Long id);

FileTaskDTO getOneFileTask(String fileSourceTaskId, String filePath);

List<FileTaskDTO> listFileTasks(String fileSourceTaskId, Integer start, Integer pageSize);

Expand All @@ -47,5 +49,5 @@ List<String> listTimeoutFileSourceTaskIds(Long expireTimeMills, Collection<Byte>

List<FileTaskDTO> listFileTasks(String fileSourceTaskId);

Long countFileTaskForUpdate(String fileSourceTaskId, Byte status);
Long countFileTask(String fileSourceTaskId, Byte status);
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,23 +190,23 @@ val record = dslContext.selectFrom(defaultTable).where(
}

@Override
public Long countFileSourceTasksByBatchTaskIdForUpdate(String batchTaskId, Byte status) {
public Long countFileSourceTasksByBatchTaskId(String batchTaskId, Byte status) {
List<Condition> conditions = new ArrayList<>();
if (batchTaskId != null) {
conditions.add(defaultTable.BATCH_TASK_ID.eq(batchTaskId));
}
if (status != null) {
conditions.add(defaultTable.STATUS.eq(status));
}
return countFileSourceTasksByConditionsForUpdate(conditions);
return countFileSourceTasksByConditions(conditions);
}

public Long countFileSourceTasksByConditionsForUpdate(Collection<Condition> conditions) {
public Long countFileSourceTasksByConditions(Collection<Condition> conditions) {
val query = dslContext.select(
DSL.countDistinct(defaultTable.ID)
).from(defaultTable)
.where(conditions);
return query.forUpdate().fetchOne(0, Long.class);
return query.fetchOne(0, Long.class);
}

public List<FileSourceTaskDTO> listByConditions(Collection<Condition> conditions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,26 +135,40 @@ public int deleteFileTaskByFileSourceTaskId(String fileSourceTaskId) {
}

@Override
public FileTaskDTO getOneFileTaskForUpdate(String fileSourceTaskId, String filePath) {
public FileTaskDTO getFileTaskByIdForUpdate(Long id) {
List<Condition> conditions = new ArrayList<>();
conditions.add(defaultTable.ID.eq(id));
val record = dslContext.selectFrom(defaultTable).where(
conditions
).forUpdate().fetchOne();
if (record == null) {
return null;
} else {
return convertRecordToDto(record);
}
}

@Override
public FileTaskDTO getOneFileTask(String fileSourceTaskId, String filePath) {
List<Condition> conditions = new ArrayList<>();
conditions.add(defaultTable.FILE_SOURCE_TASK_ID.eq(fileSourceTaskId));
conditions.add(defaultTable.FILE_PATH.eq(filePath));
val record = dslContext.selectFrom(defaultTable).where(
conditions
).forUpdate().fetchOne();
).fetchOne();
if (record == null) {
return null;
} else {
return convertRecordToDto(record);
}
}

public Long countFileTasksByConditionsForUpdate(Collection<Condition> conditions) {
public Long countFileTasksByConditions(Collection<Condition> conditions) {
val query = dslContext.select(
DSL.countDistinct(defaultTable.ID)
).from(defaultTable)
.where(conditions);
return query.forUpdate().fetchOne(0, Long.class);
return query.fetchOne(0, Long.class);
}

@Override
Expand Down Expand Up @@ -196,15 +210,15 @@ public List<FileTaskDTO> listFileTasks(String fileSourceTaskId) {
}

@Override
public Long countFileTaskForUpdate(String fileSourceTaskId, Byte status) {
public Long countFileTask(String fileSourceTaskId, Byte status) {
List<Condition> conditions = new ArrayList<>();
if (fileSourceTaskId != null) {
conditions.add(defaultTable.FILE_SOURCE_TASK_ID.eq(fileSourceTaskId));
}
if (status != null) {
conditions.add(defaultTable.STATUS.eq(status));
}
return countFileTasksByConditionsForUpdate(conditions);
return countFileTasksByConditions(conditions);
}

private FileTaskDTO convertRecordToDto(FileTaskRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,4 +268,8 @@ public int hashCode() {
public String getBasicDesc() {
return "(id=" + id + ", appId=" + appId + ", name=" + name + ")";
}

public String getCloudIp() {
return cloudAreaId + ":" + innerIp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

package com.tencent.bk.job.file_gateway.service;

import com.tencent.bk.job.file_gateway.consts.TaskStatusEnum;
import com.tencent.bk.job.file_gateway.model.dto.FileSourceTaskDTO;
import com.tencent.bk.job.file_gateway.model.dto.FileTaskProgressDTO;
import com.tencent.bk.job.file_gateway.model.resp.inner.FileSourceTaskStatusDTO;
import com.tencent.bk.job.file_gateway.model.resp.inner.TaskInfoDTO;

Expand All @@ -39,8 +39,7 @@ TaskInfoDTO startFileSourceDownloadTaskWithId(String username, Long appId, Long
Integer executeCount, String batchTaskId, Integer fileSourceId,
List<String> filePathList, String fileSourceTaskId);

String updateFileSourceTask(String taskId, String filePath, String downloadPath, Long fileSize, String speed,
Integer progress, String content, TaskStatusEnum status);
String updateFileSourceTask(FileTaskProgressDTO fileTaskProgressDTO);

FileSourceTaskStatusDTO getFileSourceTaskStatusAndLogs(String taskId, Long logStart, Long logLength);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-JOB蓝鲸智云作业平台 is licensed under the MIT License.
*
* License for BK-JOB蓝鲸智云作业平台:
* --------------------------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.file_gateway.service;

import com.tencent.bk.job.file_gateway.model.dto.FileTaskProgressDTO;

public interface FileSourceTaskUpdateService {

String updateFileSourceTask(String batchTaskId,
String fileSourceTaskId,
Long fileTaskId,
FileTaskProgressDTO fileTaskProgressDTO);

}
Loading

0 comments on commit 9b51d2c

Please sign in to comment.