Skip to content

Commit

Permalink
fix: file-worker任务状态更新请求无序到达导致第三方源文件偶现分发失败 #2434
Browse files Browse the repository at this point in the history
  • Loading branch information
jsonwan committed Sep 13, 2023
1 parent e7b9934 commit 544e686
Show file tree
Hide file tree
Showing 15 changed files with 115 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,13 @@

import com.tencent.bk.job.file_gateway.model.dto.FileSourceBatchTaskDTO;

import java.util.List;

public interface FileSourceBatchTaskDAO {
String insertFileSourceBatchTask(FileSourceBatchTaskDTO fileSourceBatchTaskDTO);

int updateFileSourceBatchTask(FileSourceBatchTaskDTO fileSourceBatchTaskDTO);

int updateFileClearStatus(List<String> taskIdList, boolean fileCleared);

int deleteFileSourceBatchTaskById(String id);

FileSourceBatchTaskDTO getFileSourceBatchTaskById(String id);
FileSourceBatchTaskDTO getBatchTaskById(String id);

Long countFileSourceBatchTasks(Long appId);
FileSourceBatchTaskDTO getBatchTaskByIdForUpdate(String id);

List<FileSourceBatchTaskDTO> listFileSourceBatchTasks(Long appId, Integer start,
Integer pageSize);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import com.tencent.bk.job.file_gateway.model.dto.FileSourceTaskDTO;

import java.util.Collection;
import java.util.List;

public interface FileSourceTaskDAO {
Expand All @@ -40,16 +39,9 @@ public interface FileSourceTaskDAO {

FileSourceTaskDTO getFileSourceTaskById(String id);

Long countFileSourceTasks(Long appId);
FileSourceTaskDTO getFileSourceTaskByIdForUpdate(String id);

Long countFileSourceTasksByBatchTaskId(String batchTaskId, Byte status);

List<FileSourceTaskDTO> listFileSourceTasks(Long appId, Integer start, Integer pageSize);

List<FileSourceTaskDTO> listTimeoutTasks(Long expireTimeMills, Collection<Byte> statusSet,
Integer start, Integer pageSize);

List<FileSourceTaskDTO> listByBatchTaskId(String batchTaskId);

int deleteByBatchTaskId(String batchTaskId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,14 @@ public interface FileTaskDAO {

int resetFileTasks(String fileSourceTaskId);

int deleteFileTaskById(Long id);

int deleteFileTaskByFileSourceTaskId(String fileSourceTaskId);

FileTaskDTO getFileTaskById(Long id);

FileTaskDTO getOneFileTask(String fileSourceTaskId, String filePath);

Long countFileTasks(String fileSourceTaskId);
FileTaskDTO getOneFileTaskForUpdate(String fileSourceTaskId, String filePath);

List<FileTaskDTO> listFileTasks(String fileSourceTaskId, Integer start, Integer pageSize);

List<String> listTimeoutFileSourceTaskIds(Long expireTimeMills, Collection<Byte> statusSet
, Integer start, Integer pageSize);
List<String> listTimeoutFileSourceTaskIds(Long expireTimeMills, Collection<Byte> statusSet, Integer start,
Integer pageSize);

List<FileTaskDTO> listFileTasks(String fileSourceTaskId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,21 @@
import com.tencent.bk.job.file_gateway.model.tables.records.FileSourceBatchTaskRecord;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.conf.ParamType;
import org.jooq.impl.DSL;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

@Slf4j
@Repository
public class FileSourceBatchTaskDAOImpl extends BaseDAOImpl implements FileSourceBatchTaskDAO {

private static final FileSourceBatchTask defaultTable = FileSourceBatchTask.FILE_SOURCE_BATCH_TASK;
private final FileSourceTaskDAO fileSourceTaskDAO;

private final DSLContext dslContext;
private final FileSourceTaskDAO fileSourceTaskDAO;

@Autowired
public FileSourceBatchTaskDAOImpl(@Qualifier("job-file-gateway-dsl-context") DSLContext dslContext,
Expand Down Expand Up @@ -135,29 +130,7 @@ public int updateFileSourceBatchTask(FileSourceBatchTaskDTO fileSourceBatchTaskD
}

@Override
public int updateFileClearStatus(List<String> taskIdList, boolean fileCleared) {
val query = dslContext.update(defaultTable)
.set(defaultTable.FILE_CLEARED, fileCleared)
.set(defaultTable.LAST_MODIFY_TIME, System.currentTimeMillis())
.where(defaultTable.ID.in(taskIdList));
val sql = query.getSQL(ParamType.INLINED);
try {
return query.execute();
} catch (Exception e) {
log.error(sql);
throw e;
}
}

@Override
public int deleteFileSourceBatchTaskById(String id) {
return dslContext.deleteFrom(defaultTable).where(
defaultTable.ID.eq(id)
).execute();
}

@Override
public FileSourceBatchTaskDTO getFileSourceBatchTaskById(String id) {
public FileSourceBatchTaskDTO getBatchTaskById(String id) {
val record = dslContext.selectFrom(defaultTable).where(
defaultTable.ID.eq(id)
).fetchOne();
Expand All @@ -169,34 +142,15 @@ val record = dslContext.selectFrom(defaultTable).where(
}

@Override
public Long countFileSourceBatchTasks(Long appId) {
List<Condition> conditions = new ArrayList<>();
if (appId != null) {
conditions.add(defaultTable.APP_ID.eq(appId));
}
return countFileSourceBatchTasksByConditions(conditions);
}

public Long countFileSourceBatchTasksByConditions(Collection<Condition> conditions) {
val query = dslContext.select(
DSL.countDistinct(defaultTable.ID)
).from(defaultTable)
.where(conditions);
return query.fetchOne(0, Long.class);
}

@Override
public List<FileSourceBatchTaskDTO> listFileSourceBatchTasks(Long appId,
Integer start,
Integer pageSize) {
List<Condition> conditions = new ArrayList<>();
if (appId != null) {
conditions.add(defaultTable.APP_ID.eq(appId));
public FileSourceBatchTaskDTO getBatchTaskByIdForUpdate(String id) {
val record = dslContext.selectFrom(defaultTable).where(
defaultTable.ID.eq(id)
).forUpdate().fetchOne();
if (record == null) {
return null;
} else {
return convertRecordToDto(record);
}
val query = dslContext.selectFrom(defaultTable)
.where(conditions)
.orderBy(defaultTable.LAST_MODIFY_TIME.desc());
return listPage(query, start, pageSize, this::convertRecordToDto);
}

private FileSourceBatchTaskDTO convertRecordToDto(FileSourceBatchTaskRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public String insertFileSourceTask(FileSourceTaskDTO fileSourceTaskDTO) {
for (FileTaskDTO fileTaskDTO : fileTaskDTOList) {
fileTaskDTO.setFileSourceTaskId(id);
// 插入FileTask
fileTaskDAO.insertFileTask(fileTaskDTO);
Long fileTaskId = fileTaskDAO.insertFileTask(fileTaskDTO);
log.debug("{} inserted, id={}", fileTaskDTO, fileTaskId);
}
fileSourceTaskDTO.setFileTaskList(fileTaskDTOList);
return id;
Expand Down Expand Up @@ -177,12 +178,15 @@ val record = dslContext.selectFrom(defaultTable).where(
}

@Override
public Long countFileSourceTasks(Long appId) {
List<Condition> conditions = new ArrayList<>();
if (appId != null) {
conditions.add(defaultTable.APP_ID.eq(appId));
public FileSourceTaskDTO getFileSourceTaskByIdForUpdate(String id) {
val record = dslContext.selectFrom(defaultTable).where(
defaultTable.ID.eq(id)
).forUpdate().fetchOne();
if (record == null) {
return null;
} else {
return convertRecordToDto(record);
}
return countFileSourceTasksByConditions(conditions);
}

@Override
Expand Down Expand Up @@ -213,29 +217,6 @@ public List<FileSourceTaskDTO> listByConditions(Collection<Condition> conditions
return listPage(query, start, pageSize, this::convertRecordToDto);
}

@Override
public List<FileSourceTaskDTO> listFileSourceTasks(Long appId, Integer start,
Integer pageSize) {
List<Condition> conditions = new ArrayList<>();
if (appId != null) {
conditions.add(defaultTable.APP_ID.eq(appId));
}
return listByConditions(conditions, start, pageSize);
}

@Override
public List<FileSourceTaskDTO> listTimeoutTasks(Long expireTimeMills,
Collection<Byte> statusSet, Integer start, Integer pageSize) {
List<Condition> conditions = new ArrayList<>();
if (expireTimeMills != null && expireTimeMills > 0) {
conditions.add(defaultTable.LAST_MODIFY_TIME.le(System.currentTimeMillis() - expireTimeMills));
}
if (statusSet != null && !statusSet.isEmpty()) {
conditions.add(defaultTable.STATUS.in(statusSet));
}
return listByConditions(conditions, start, pageSize);
}

@Override
public List<FileSourceTaskDTO> listByBatchTaskId(String batchTaskId) {
List<Condition> conditions = new ArrayList<>();
Expand All @@ -245,13 +226,6 @@ public List<FileSourceTaskDTO> listByBatchTaskId(String batchTaskId) {
return listByConditions(conditions, null, null);
}

@Override
public int deleteByBatchTaskId(String batchTaskId) {
return dslContext.deleteFrom(defaultTable).where(
defaultTable.BATCH_TASK_ID.eq(batchTaskId)
).execute();
}

private FileSourceTaskDTO convertRecordToDto(FileSourceTaskRecord record) {
String id = record.getId();
List<FileTaskDTO> fileTaskDTOList = fileTaskDAO.listFileTasks(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,6 @@ public int resetFileTasks(String fileSourceTaskId) {
}
}


@Override
public int deleteFileTaskById(Long id) {
return dslContext.deleteFrom(defaultTable).where(
defaultTable.ID.eq(id)
).execute();
}

@Override
public int deleteFileTaskByFileSourceTaskId(String fileSourceTaskId) {
return dslContext.deleteFrom(defaultTable).where(
Expand All @@ -143,45 +135,20 @@ public int deleteFileTaskByFileSourceTaskId(String fileSourceTaskId) {
}

@Override
public FileTaskDTO getFileTaskById(Long id) {
val record = dslContext.selectFrom(defaultTable).where(
defaultTable.ID.eq(id)
).fetchOne();
if (record == null) {
return null;
} else {
return convertRecordToDto(record);
}
}

@Override
public FileTaskDTO getOneFileTask(String fileSourceTaskId, String filePath) {
public FileTaskDTO getOneFileTaskForUpdate(String fileSourceTaskId, String filePath) {
List<Condition> conditions = new ArrayList<>();
if (fileSourceTaskId != null) {
conditions.add(defaultTable.FILE_SOURCE_TASK_ID.eq(fileSourceTaskId));
}
if (filePath != null) {
conditions.add(defaultTable.FILE_PATH.eq(filePath));
}
conditions.add(defaultTable.FILE_SOURCE_TASK_ID.eq(fileSourceTaskId));
conditions.add(defaultTable.FILE_PATH.eq(filePath));
val record = dslContext.selectFrom(defaultTable).where(
conditions
).fetchOne();
).forUpdate().fetchOne();
if (record == null) {
return null;
} else {
return convertRecordToDto(record);
}
}

@Override
public Long countFileTasks(String fileSourceTaskId) {
List<Condition> conditions = new ArrayList<>();
if (fileSourceTaskId != null) {
conditions.add(defaultTable.FILE_SOURCE_TASK_ID.eq(fileSourceTaskId));
}
return countFileTasksByConditions(conditions);
}

public Long countFileTasksByConditions(Collection<Condition> conditions) {
val query = dslContext.select(
DSL.countDistinct(defaultTable.ID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ public Integer stopBatchTasks(List<String> batchTaskIdList) {
public BatchTaskStatusDTO getBatchTaskStatusAndLogs(String batchTaskId, Long logStart, Long logLength) {
BatchTaskStatusDTO batchTaskStatusDTO = new BatchTaskStatusDTO();
batchTaskStatusDTO.setBatchTaskId(batchTaskId);
FileSourceBatchTaskDTO fileSourceBatchTaskDTO = fileSourceBatchTaskDAO.getFileSourceBatchTaskById(
batchTaskId);
FileSourceBatchTaskDTO fileSourceBatchTaskDTO = fileSourceBatchTaskDAO.getBatchTaskById(batchTaskId);
if (fileSourceBatchTaskDTO == null) {
throw new InternalException(ErrorCode.INTERNAL_ERROR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ public class FileSourceServiceImpl implements FileSourceService {

@Autowired
public FileSourceServiceImpl(FileSourceTypeDAO fileSourceTypeDAO,
FileSourceDAO fileSourceDAO,
FileWorkerDAO fileWorkerDAO) {
FileSourceDAO fileSourceDAO, FileWorkerDAO fileWorkerDAO) {
this.fileSourceTypeDAO = fileSourceTypeDAO;
this.fileSourceDAO = fileSourceDAO;
this.fileWorkerDAO = fileWorkerDAO;
Expand Down
Loading

0 comments on commit 544e686

Please sign in to comment.