diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileSourceBatchTaskDAO.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileSourceBatchTaskDAO.java index 6c4503dd49..edca79a6ed 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileSourceBatchTaskDAO.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileSourceBatchTaskDAO.java @@ -26,21 +26,13 @@ import com.tencent.bk.job.file_gateway.model.dto.FileSourceBatchTaskDTO; -import java.util.List; - public interface FileSourceBatchTaskDAO { String insertFileSourceBatchTask(FileSourceBatchTaskDTO fileSourceBatchTaskDTO); int updateFileSourceBatchTask(FileSourceBatchTaskDTO fileSourceBatchTaskDTO); - int updateFileClearStatus(List taskIdList, boolean fileCleared); - - int deleteFileSourceBatchTaskById(String id); - - FileSourceBatchTaskDTO getFileSourceBatchTaskById(String id); + FileSourceBatchTaskDTO getBatchTaskById(String id); - Long countFileSourceBatchTasks(Long appId); + FileSourceBatchTaskDTO getBatchTaskByIdForUpdate(String id); - List listFileSourceBatchTasks(Long appId, Integer start, - Integer pageSize); } diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileSourceTaskDAO.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileSourceTaskDAO.java index a5eb9ebdc4..7f2738a892 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileSourceTaskDAO.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileSourceTaskDAO.java @@ -26,7 +26,6 @@ import com.tencent.bk.job.file_gateway.model.dto.FileSourceTaskDTO; -import java.util.Collection; import java.util.List; public interface FileSourceTaskDAO { @@ -40,16 +39,9 @@ public interface FileSourceTaskDAO { FileSourceTaskDTO getFileSourceTaskById(String id); - Long countFileSourceTasks(Long appId); + FileSourceTaskDTO getFileSourceTaskByIdForUpdate(String id); Long countFileSourceTasksByBatchTaskId(String batchTaskId, Byte status); - List listFileSourceTasks(Long appId, Integer start, Integer pageSize); - - List listTimeoutTasks(Long expireTimeMills, Collection statusSet, - Integer start, Integer pageSize); - List listByBatchTaskId(String batchTaskId); - - int deleteByBatchTaskId(String batchTaskId); } diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileTaskDAO.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileTaskDAO.java index 176f6b950a..f2db469824 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileTaskDAO.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileTaskDAO.java @@ -36,20 +36,14 @@ public interface FileTaskDAO { int resetFileTasks(String fileSourceTaskId); - int deleteFileTaskById(Long id); - int deleteFileTaskByFileSourceTaskId(String fileSourceTaskId); - FileTaskDTO getFileTaskById(Long id); - - FileTaskDTO getOneFileTask(String fileSourceTaskId, String filePath); - - Long countFileTasks(String fileSourceTaskId); + FileTaskDTO getOneFileTaskForUpdate(String fileSourceTaskId, String filePath); List listFileTasks(String fileSourceTaskId, Integer start, Integer pageSize); - List listTimeoutFileSourceTaskIds(Long expireTimeMills, Collection statusSet - , Integer start, Integer pageSize); + List listTimeoutFileSourceTaskIds(Long expireTimeMills, Collection statusSet, Integer start, + Integer pageSize); List listFileTasks(String fileSourceTaskId); diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileSourceBatchTaskDAOImpl.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileSourceBatchTaskDAOImpl.java index 83c9320627..80b5695adf 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileSourceBatchTaskDAOImpl.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileSourceBatchTaskDAOImpl.java @@ -36,16 +36,12 @@ import com.tencent.bk.job.file_gateway.model.tables.records.FileSourceBatchTaskRecord; import lombok.extern.slf4j.Slf4j; import lombok.val; -import org.jooq.Condition; import org.jooq.DSLContext; import org.jooq.conf.ParamType; -import org.jooq.impl.DSL; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Repository; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; @Slf4j @@ -53,9 +49,8 @@ public class FileSourceBatchTaskDAOImpl extends BaseDAOImpl implements FileSourceBatchTaskDAO { private static final FileSourceBatchTask defaultTable = FileSourceBatchTask.FILE_SOURCE_BATCH_TASK; - private final FileSourceTaskDAO fileSourceTaskDAO; - private final DSLContext dslContext; + private final FileSourceTaskDAO fileSourceTaskDAO; @Autowired public FileSourceBatchTaskDAOImpl(@Qualifier("job-file-gateway-dsl-context") DSLContext dslContext, @@ -135,29 +130,7 @@ public int updateFileSourceBatchTask(FileSourceBatchTaskDTO fileSourceBatchTaskD } @Override - public int updateFileClearStatus(List taskIdList, boolean fileCleared) { - val query = dslContext.update(defaultTable) - .set(defaultTable.FILE_CLEARED, fileCleared) - .set(defaultTable.LAST_MODIFY_TIME, System.currentTimeMillis()) - .where(defaultTable.ID.in(taskIdList)); - val sql = query.getSQL(ParamType.INLINED); - try { - return query.execute(); - } catch (Exception e) { - log.error(sql); - throw e; - } - } - - @Override - public int deleteFileSourceBatchTaskById(String id) { - return dslContext.deleteFrom(defaultTable).where( - defaultTable.ID.eq(id) - ).execute(); - } - - @Override - public FileSourceBatchTaskDTO getFileSourceBatchTaskById(String id) { + public FileSourceBatchTaskDTO getBatchTaskById(String id) { val record = dslContext.selectFrom(defaultTable).where( defaultTable.ID.eq(id) ).fetchOne(); @@ -169,34 +142,15 @@ val record = dslContext.selectFrom(defaultTable).where( } @Override - public Long countFileSourceBatchTasks(Long appId) { - List conditions = new ArrayList<>(); - if (appId != null) { - conditions.add(defaultTable.APP_ID.eq(appId)); - } - return countFileSourceBatchTasksByConditions(conditions); - } - - public Long countFileSourceBatchTasksByConditions(Collection conditions) { - val query = dslContext.select( - DSL.countDistinct(defaultTable.ID) - ).from(defaultTable) - .where(conditions); - return query.fetchOne(0, Long.class); - } - - @Override - public List listFileSourceBatchTasks(Long appId, - Integer start, - Integer pageSize) { - List conditions = new ArrayList<>(); - if (appId != null) { - conditions.add(defaultTable.APP_ID.eq(appId)); + public FileSourceBatchTaskDTO getBatchTaskByIdForUpdate(String id) { + val record = dslContext.selectFrom(defaultTable).where( + defaultTable.ID.eq(id) + ).forUpdate().fetchOne(); + if (record == null) { + return null; + } else { + return convertRecordToDto(record); } - val query = dslContext.selectFrom(defaultTable) - .where(conditions) - .orderBy(defaultTable.LAST_MODIFY_TIME.desc()); - return listPage(query, start, pageSize, this::convertRecordToDto); } private FileSourceBatchTaskDTO convertRecordToDto(FileSourceBatchTaskRecord record) { diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileSourceTaskDAOImpl.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileSourceTaskDAOImpl.java index 6d1cef7baa..33d82ac79b 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileSourceTaskDAOImpl.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileSourceTaskDAOImpl.java @@ -113,7 +113,8 @@ public String insertFileSourceTask(FileSourceTaskDTO fileSourceTaskDTO) { for (FileTaskDTO fileTaskDTO : fileTaskDTOList) { fileTaskDTO.setFileSourceTaskId(id); // 插入FileTask - fileTaskDAO.insertFileTask(fileTaskDTO); + Long fileTaskId = fileTaskDAO.insertFileTask(fileTaskDTO); + log.debug("{} inserted, id={}", fileTaskDTO, fileTaskId); } fileSourceTaskDTO.setFileTaskList(fileTaskDTOList); return id; @@ -177,12 +178,15 @@ val record = dslContext.selectFrom(defaultTable).where( } @Override - public Long countFileSourceTasks(Long appId) { - List conditions = new ArrayList<>(); - if (appId != null) { - conditions.add(defaultTable.APP_ID.eq(appId)); + public FileSourceTaskDTO getFileSourceTaskByIdForUpdate(String id) { + val record = dslContext.selectFrom(defaultTable).where( + defaultTable.ID.eq(id) + ).forUpdate().fetchOne(); + if (record == null) { + return null; + } else { + return convertRecordToDto(record); } - return countFileSourceTasksByConditions(conditions); } @Override @@ -213,29 +217,6 @@ public List listByConditions(Collection conditions return listPage(query, start, pageSize, this::convertRecordToDto); } - @Override - public List listFileSourceTasks(Long appId, Integer start, - Integer pageSize) { - List conditions = new ArrayList<>(); - if (appId != null) { - conditions.add(defaultTable.APP_ID.eq(appId)); - } - return listByConditions(conditions, start, pageSize); - } - - @Override - public List listTimeoutTasks(Long expireTimeMills, - Collection statusSet, Integer start, Integer pageSize) { - List conditions = new ArrayList<>(); - if (expireTimeMills != null && expireTimeMills > 0) { - conditions.add(defaultTable.LAST_MODIFY_TIME.le(System.currentTimeMillis() - expireTimeMills)); - } - if (statusSet != null && !statusSet.isEmpty()) { - conditions.add(defaultTable.STATUS.in(statusSet)); - } - return listByConditions(conditions, start, pageSize); - } - @Override public List listByBatchTaskId(String batchTaskId) { List conditions = new ArrayList<>(); @@ -245,13 +226,6 @@ public List listByBatchTaskId(String batchTaskId) { return listByConditions(conditions, null, null); } - @Override - public int deleteByBatchTaskId(String batchTaskId) { - return dslContext.deleteFrom(defaultTable).where( - defaultTable.BATCH_TASK_ID.eq(batchTaskId) - ).execute(); - } - private FileSourceTaskDTO convertRecordToDto(FileSourceTaskRecord record) { String id = record.getId(); List fileTaskDTOList = fileTaskDAO.listFileTasks(id); diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileTaskDAOImpl.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileTaskDAOImpl.java index cd205bf377..2f141577ba 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileTaskDAOImpl.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileTaskDAOImpl.java @@ -127,14 +127,6 @@ public int resetFileTasks(String fileSourceTaskId) { } } - - @Override - public int deleteFileTaskById(Long id) { - return dslContext.deleteFrom(defaultTable).where( - defaultTable.ID.eq(id) - ).execute(); - } - @Override public int deleteFileTaskByFileSourceTaskId(String fileSourceTaskId) { return dslContext.deleteFrom(defaultTable).where( @@ -143,29 +135,13 @@ public int deleteFileTaskByFileSourceTaskId(String fileSourceTaskId) { } @Override - public FileTaskDTO getFileTaskById(Long id) { - val record = dslContext.selectFrom(defaultTable).where( - defaultTable.ID.eq(id) - ).fetchOne(); - if (record == null) { - return null; - } else { - return convertRecordToDto(record); - } - } - - @Override - public FileTaskDTO getOneFileTask(String fileSourceTaskId, String filePath) { + public FileTaskDTO getOneFileTaskForUpdate(String fileSourceTaskId, String filePath) { List conditions = new ArrayList<>(); - if (fileSourceTaskId != null) { - conditions.add(defaultTable.FILE_SOURCE_TASK_ID.eq(fileSourceTaskId)); - } - if (filePath != null) { - conditions.add(defaultTable.FILE_PATH.eq(filePath)); - } + conditions.add(defaultTable.FILE_SOURCE_TASK_ID.eq(fileSourceTaskId)); + conditions.add(defaultTable.FILE_PATH.eq(filePath)); val record = dslContext.selectFrom(defaultTable).where( conditions - ).fetchOne(); + ).forUpdate().fetchOne(); if (record == null) { return null; } else { @@ -173,15 +149,6 @@ val record = dslContext.selectFrom(defaultTable).where( } } - @Override - public Long countFileTasks(String fileSourceTaskId) { - List conditions = new ArrayList<>(); - if (fileSourceTaskId != null) { - conditions.add(defaultTable.FILE_SOURCE_TASK_ID.eq(fileSourceTaskId)); - } - return countFileTasksByConditions(conditions); - } - public Long countFileTasksByConditions(Collection conditions) { val query = dslContext.select( DSL.countDistinct(defaultTable.ID) diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/BatchTaskServiceImpl.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/BatchTaskServiceImpl.java index f507090e82..eaa4166070 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/BatchTaskServiceImpl.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/BatchTaskServiceImpl.java @@ -108,8 +108,7 @@ public Integer stopBatchTasks(List batchTaskIdList) { public BatchTaskStatusDTO getBatchTaskStatusAndLogs(String batchTaskId, Long logStart, Long logLength) { BatchTaskStatusDTO batchTaskStatusDTO = new BatchTaskStatusDTO(); batchTaskStatusDTO.setBatchTaskId(batchTaskId); - FileSourceBatchTaskDTO fileSourceBatchTaskDTO = fileSourceBatchTaskDAO.getFileSourceBatchTaskById( - batchTaskId); + FileSourceBatchTaskDTO fileSourceBatchTaskDTO = fileSourceBatchTaskDAO.getBatchTaskById(batchTaskId); if (fileSourceBatchTaskDTO == null) { throw new InternalException(ErrorCode.INTERNAL_ERROR); } diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/FileSourceTaskServiceImpl.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/FileSourceTaskServiceImpl.java index 9ad4644a02..615c0c8aba 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/FileSourceTaskServiceImpl.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/FileSourceTaskServiceImpl.java @@ -63,6 +63,7 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; import java.util.Collections; @@ -77,7 +78,7 @@ public class FileSourceTaskServiceImpl implements FileSourceTaskService { public static final String PREFIX_REDIS_TASK_LOG = "job:file-gateway:taskLog:"; - + private final FileSourceTaskDAO fileSourceTaskDAO; private final FileTaskDAO fileTaskDAO; private final FileWorkerDAO fileworkerDAO; @@ -90,9 +91,9 @@ public class FileSourceTaskServiceImpl implements FileSourceTaskService { @Autowired public FileSourceTaskServiceImpl(FileSourceTaskDAO fileSourceTaskDAO, - FileTaskDAO fileTaskDAO, + FileTaskDAO fileTaskDAO, FileWorkerDAO fileworkerDAO, - FileSourceDAO fileSourceDAO, + FileSourceDAO fileSourceDAO, DispatchService dispatchService, FileSourceTaskReqGenService fileSourceTaskReqGenService, @Qualifier("jsonRedisTemplate") RedisTemplate redisTemplate, @@ -121,6 +122,7 @@ public TaskInfoDTO startFileSourceDownloadTask(String username, Long appId, Long fileSourceId, filePathList, null); } + @Transactional(value = "jobFileGatewayTransactionManager", rollbackFor = {Throwable.class}) public TaskInfoDTO startFileSourceDownloadTaskWithId(String username, Long appId, Long stepInstanceId, Integer executeCount, String batchTaskId, Integer fileSourceId, List filePathList, @@ -240,28 +242,30 @@ private void notifyFileTaskStatusChangeListeners(FileTaskDTO fileTaskDTO, FileSo } @Override + @Transactional(value = "jobFileGatewayTransactionManager", rollbackFor = {Throwable.class}) public String updateFileSourceTask(String taskId, String filePath, String downloadPath, Long fileSize, String speed, Integer progress, String content, TaskStatusEnum status) { - FileTaskDTO fileTaskDTO = fileTaskDAO.getOneFileTask(taskId, filePath); + FileTaskDTO fileTaskDTO = fileTaskDAO.getOneFileTaskForUpdate(taskId, filePath); if (fileTaskDTO == null) { log.error("Cannot find fileTaskDTO by taskId {} filePath {}", taskId, filePath); return null; } TaskStatusEnum previousStatus = TaskStatusEnum.valueOf(fileTaskDTO.getStatus()); fileTaskDTO.setDownloadPath(downloadPath); - FileSourceTaskDTO fileSourceTaskDTO = fileSourceTaskDAO.getFileSourceTaskById(taskId); + FileSourceTaskDTO fileSourceTaskDTO = fileSourceTaskDAO.getFileSourceTaskByIdForUpdate(taskId); if (fileSourceTaskDTO == null) { log.error("Cannot find fileSourceTaskDTO by taskId {} filePath {}", taskId, filePath); return null; } FileWorkerDTO fileWorkerDTO = fileworkerDAO.getFileWorkerById(fileSourceTaskDTO.getFileWorkerId()); + int affectedRowNum = -1; if (status == TaskStatusEnum.RUNNING) { // 已处于结束态的任务不再接受状态更新 if (!fileTaskDTO.isDone()) { fileTaskDTO.setProgress(progress); fileTaskDTO.setFileSize(fileSize); fileTaskDTO.setStatus(TaskStatusEnum.RUNNING.getStatus()); - fileTaskDAO.updateFileTask(fileTaskDTO); + affectedRowNum = fileTaskDAO.updateFileTask(fileTaskDTO); logUpdatedTaskStatus(taskId, filePath, progress, status); } else { log.info("fileTask {} already done, do not update to running", taskId); @@ -269,21 +273,24 @@ public String updateFileSourceTask(String taskId, String filePath, String downlo } else if (status == TaskStatusEnum.SUCCESS) { fileTaskDTO.setProgress(100); fileTaskDTO.setStatus(TaskStatusEnum.SUCCESS.getStatus()); - fileTaskDAO.updateFileTask(fileTaskDTO); + affectedRowNum = fileTaskDAO.updateFileTask(fileTaskDTO); logUpdatedTaskStatus(taskId, filePath, progress, status); } else if (status == TaskStatusEnum.FAILED) { fileTaskDTO.setProgress(progress); fileTaskDTO.setStatus(TaskStatusEnum.FAILED.getStatus()); - fileTaskDAO.updateFileTask(fileTaskDTO); + affectedRowNum = fileTaskDAO.updateFileTask(fileTaskDTO); logUpdatedTaskStatus(taskId, filePath, progress, status); } else if (status == TaskStatusEnum.STOPPED) { fileTaskDTO.setProgress(progress); fileTaskDTO.setStatus(TaskStatusEnum.STOPPED.getStatus()); - fileTaskDAO.updateFileTask(fileTaskDTO); + affectedRowNum = fileTaskDAO.updateFileTask(fileTaskDTO); logUpdatedTaskStatus(taskId, filePath, progress, status); } else { log.warn("fileTask {} unknown status:{}", taskId, status); } + if (affectedRowNum != -1) { + log.info("{} updated, affectedRowNum={}", fileTaskDTO, affectedRowNum); + } // 通知关注者 if (status != previousStatus) { notifyFileTaskStatusChangeListeners(fileTaskDTO, fileSourceTaskDTO, fileWorkerDTO, @@ -442,7 +449,8 @@ public Integer clearTaskFiles(List taskIdList) { addTaskIdToWorkerTaskMap(workerTaskMap, fileWorkerDTO, taskId); targetTaskIdList.add(taskId); } - fileSourceTaskDAO.updateFileClearStatus(targetTaskIdList, true); + int affectedRowNum = fileSourceTaskDAO.updateFileClearStatus(targetTaskIdList, true); + log.info("{}/{} taskFile set clear status true", affectedRowNum, targetTaskIdList.size()); int allCount = 0; // 逐个Worker清理文件 for (Map.Entry> entry : workerTaskMap.entrySet()) { diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/FileTaskServiceImpl.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/FileTaskServiceImpl.java index f670c48310..3c26426e67 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/FileTaskServiceImpl.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/FileTaskServiceImpl.java @@ -51,7 +51,8 @@ public List listFileTasks(String fileSourceTaskId) { @Override public void resetTasks(String fileSourceTaskId) { - fileTaskDAO.resetFileTasks(fileSourceTaskId); + int affectedRowNum = fileTaskDAO.resetFileTasks(fileSourceTaskId); + log.info("fileSourceTask {} has been reset, {} fileTask updated", fileSourceTaskId, affectedRowNum); } @Override diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/listener/impl/BatchTaskStatusUpdater.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/listener/impl/BatchTaskStatusUpdater.java index 65573a1b9b..21769fcf94 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/listener/impl/BatchTaskStatusUpdater.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/listener/impl/BatchTaskStatusUpdater.java @@ -54,8 +54,7 @@ public BatchTaskStatusUpdater(FileSourceTaskDAO fileSourceTaskDAO, public boolean onStatusChange(TaskContext context, TaskStatusEnum previousStatus, TaskStatusEnum currentStatus) { FileSourceTaskDTO fileSourceTaskDTO = context.getFileSourceTaskDTO(); String batchTaskId = fileSourceTaskDTO.getBatchTaskId(); - FileSourceBatchTaskDTO fileSourceBatchTaskDTO = - fileSourceBatchTaskDAO.getFileSourceBatchTaskById(batchTaskId); + FileSourceBatchTaskDTO fileSourceBatchTaskDTO = fileSourceBatchTaskDAO.getBatchTaskByIdForUpdate(batchTaskId); if (StringUtils.isBlank(batchTaskId)) { return false; } @@ -66,14 +65,13 @@ public boolean onStatusChange(TaskContext context, TaskStatusEnum previousStatus } if (TaskStatusEnum.SUCCESS.equals(currentStatus)) { //检查批量任务是否成功 - if (fileSourceTaskDAO.countFileSourceTasksByBatchTaskId( - batchTaskId, null) + if (fileSourceTaskDAO.countFileSourceTasksByBatchTaskId(batchTaskId, null) .equals(fileSourceTaskDAO.countFileSourceTasksByBatchTaskId( batchTaskId, TaskStatusEnum.SUCCESS.getStatus()))) { // 批量任务成功 fileSourceBatchTaskDTO.setStatus(TaskStatusEnum.SUCCESS.getStatus()); - fileSourceBatchTaskDAO.updateFileSourceBatchTask(fileSourceBatchTaskDTO); - logUpdatedBatchTaskStatus(batchTaskId, TaskStatusEnum.SUCCESS); + int affectedNum = fileSourceBatchTaskDAO.updateFileSourceBatchTask(fileSourceBatchTaskDTO); + logUpdatedBatchTaskStatus(batchTaskId, TaskStatusEnum.SUCCESS, affectedNum); } else { // 主任务尚未成功 log.info("task {} done, batchTask not finished yet", fileSourceTaskDTO.getId()); @@ -81,18 +79,18 @@ public boolean onStatusChange(TaskContext context, TaskStatusEnum previousStatus } else if (TaskStatusEnum.FAILED.equals(currentStatus)) { // 批量任务失败 fileSourceBatchTaskDTO.setStatus(TaskStatusEnum.FAILED.getStatus()); - fileSourceBatchTaskDAO.updateFileSourceBatchTask(fileSourceBatchTaskDTO); - logUpdatedBatchTaskStatus(batchTaskId, TaskStatusEnum.FAILED); + int affectedNum = fileSourceBatchTaskDAO.updateFileSourceBatchTask(fileSourceBatchTaskDTO); + logUpdatedBatchTaskStatus(batchTaskId, TaskStatusEnum.FAILED, affectedNum); } else if (TaskStatusEnum.STOPPED.equals(currentStatus)) { // 批量任务停止 fileSourceBatchTaskDTO.setStatus(TaskStatusEnum.STOPPED.getStatus()); - fileSourceBatchTaskDAO.updateFileSourceBatchTask(fileSourceBatchTaskDTO); - logUpdatedBatchTaskStatus(batchTaskId, TaskStatusEnum.STOPPED); + int affectedNum = fileSourceBatchTaskDAO.updateFileSourceBatchTask(fileSourceBatchTaskDTO); + logUpdatedBatchTaskStatus(batchTaskId, TaskStatusEnum.STOPPED, affectedNum); } return false; } - private void logUpdatedBatchTaskStatus(String batchTaskId, TaskStatusEnum status) { - log.info("updated batchTask:{},{}", batchTaskId, status.name()); + private void logUpdatedBatchTaskStatus(String batchTaskId, TaskStatusEnum status, int affectedNum) { + log.info("updated batchTask:{},{}, affectedNum={}", batchTaskId, status.name(), affectedNum); } } diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/task/ScheduledTasks.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/task/ScheduledTasks.java index 6feaba4d12..149b19830c 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/task/ScheduledTasks.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/task/ScheduledTasks.java @@ -80,9 +80,9 @@ public void refreshWorkerOnlineStatus() { } /** - * 任务超时重调度:1s/次 + * 任务超时重调度:10s/次 */ - @Scheduled(fixedDelay = 1000L, initialDelay = 3 * 1000L) + @Scheduled(fixedDelay = 10000L, initialDelay = 3 * 1000L) public void reDispatchTimeoutFileSourceTask() { logger.info(Thread.currentThread().getId() + ":reDispatchTimeoutFileSourceTask start"); try { diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/config/ExecutorConfiguration.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/config/ExecutorConfiguration.java index b06db2bbf3..7402fe79f0 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/config/ExecutorConfiguration.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/config/ExecutorConfiguration.java @@ -24,6 +24,8 @@ package com.tencent.bk.job.file.worker.config; +import com.tencent.bk.job.common.WatchableThreadPoolExecutor; +import io.micrometer.core.instrument.MeterRegistry; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -37,10 +39,12 @@ public class ExecutorConfiguration { @Bean("fileTaskExecutor") - public ThreadPoolExecutor fileTaskExecutor() { - return new ThreadPoolExecutor( - 20, - 50, + public ThreadPoolExecutor fileTaskExecutor(MeterRegistry meterRegistry) { + return new WatchableThreadPoolExecutor( + meterRegistry, + "fileTaskExecutor", + 40, + 40, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(1000), (r, executor) -> { @@ -50,10 +54,12 @@ public ThreadPoolExecutor fileTaskExecutor() { } @Bean("watchingTaskExecutor") - public ThreadPoolExecutor watchingTaskExecutor() { - return new ThreadPoolExecutor( - 20, - 50, + public ThreadPoolExecutor watchingTaskExecutor(MeterRegistry meterRegistry) { + return new WatchableThreadPoolExecutor( + meterRegistry, + "watchingTaskExecutor", + 40, + 40, 1, TimeUnit.MINUTES , new LinkedBlockingQueue<>(1000), (r, executor) -> diff --git a/support-files/sql/job-file-gateway/0008_job_file_gateway_20230913-1000_V3.8.1_mysql.sql b/support-files/sql/job-file-gateway/0008_job_file_gateway_20230913-1000_V3.8.1_mysql.sql new file mode 100644 index 0000000000..0d87a36bc0 --- /dev/null +++ b/support-files/sql/job-file-gateway/0008_job_file_gateway_20230913-1000_V3.8.1_mysql.sql @@ -0,0 +1,36 @@ +SET NAMES utf8mb4; +USE job_file_gateway; + +DROP PROCEDURE IF EXISTS job_schema_update; + +DELIMITER + +CREATE PROCEDURE job_schema_update() +BEGIN + + DECLARE db VARCHAR(100); + SET AUTOCOMMIT = 0; + SELECT DATABASE() INTO db; + + IF NOT EXISTS(SELECT 1 + FROM information_schema.statistics + WHERE TABLE_SCHEMA = db + AND TABLE_NAME = 'file_task' + AND INDEX_NAME = 'idx_file_source_task_id_file_path') THEN + ALTER TABLE file_task ADD INDEX idx_file_source_task_id_file_path(`file_source_task_id`,`file_path`(128)); + END IF; + + IF EXISTS(SELECT 1 + FROM information_schema.statistics + WHERE TABLE_SCHEMA = db + AND TABLE_NAME = 'file_task' + AND INDEX_NAME = 'idx_file_source_task_id') THEN + ALTER TABLE file_task DROP INDEX idx_file_source_task_id; + END IF; + +COMMIT; +END +DELIMITER ; +CALL job_schema_update(); + +DROP PROCEDURE IF EXISTS job_schema_update;