Skip to content

Commit

Permalink
Merge pull request #2985 from jsonwan/github_perf/third_file
Browse files Browse the repository at this point in the history
perf: file-gateway调度逻辑优化 #2977
  • Loading branch information
jsonwan authored May 15, 2024
2 parents 4c72e45 + ce777ed commit c3c4751
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ public class MetricsConstants {

// tag
public static final String TAG_KEY_MODULE = "module";
public static final String TAG_KEY_REQUEST_SOURCE = "requestSource";
public static final String TAG_KEY_DISPATCH_RESULT = "dispatchResult";

// value
public static final String TAG_VALUE_MODULE_FILE_WORKER = "fileWorker";
public static final String TAG_VALUE_MODULE_FILE_GATEWAY = "fileGateway";
public static final String TAG_VALUE_DISPATCH_RESULT_TRUE = "true";
public static final String TAG_VALUE_DISPATCH_RESULT_FALSE = "false";

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public interface DispatchService {
* 根据文件源找到一个最适合的FileWorker
*
* @param fileSourceDTO 文件源对象
* @param requestSource 请求来源
* @return 选中的对接文件源的FileWorker对象
*/
FileWorkerDTO findBestFileWorker(FileSourceDTO fileSourceDTO);
FileWorkerDTO findBestFileWorker(FileSourceDTO fileSourceDTO, String requestSource);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@
import com.tencent.bk.job.file_gateway.service.DispatchService;
import com.tencent.bk.job.file_gateway.service.FileWorkerService;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

@Slf4j
Expand All @@ -55,6 +56,8 @@ public class DispatchServiceImpl implements DispatchService {
private final FileWorkerDAO fileWorkerDAO;
private final FileWorkerService fileWorkerService;

private final AtomicLong roundRobinCount = new AtomicLong(0);

@Autowired
public DispatchServiceImpl(FileWorkerDAO fileWorkerDAO,
AbilityTagService abilityTagService,
Expand Down Expand Up @@ -142,24 +145,50 @@ private List<FileWorkerDTO> getFileWorkerByScopeAndAbilityTag(Long appId,
}

@Override
public FileWorkerDTO findBestFileWorker(FileSourceDTO fileSourceDTO) {
public FileWorkerDTO findBestFileWorker(FileSourceDTO fileSourceDTO, String requestSource) {
Timer.Sample sample = Timer.start(meterRegistry);
FileWorkerDTO fileWorkerDTO = findBestFileWorkerIndeed(fileSourceDTO);
long nanoSeconds = sample.stop(meterRegistry.timer(MetricsConstants.NAME_FILE_GATEWAY_DISPATCH_TIME,
MetricsConstants.TAG_KEY_MODULE, MetricsConstants.TAG_VALUE_MODULE_FILE_GATEWAY));
long millis = TimeUnit.NANOSECONDS.toMillis(nanoSeconds);
if (millis > 2000) {
log.warn("Dispatch time over 2000ms, fileSourceDTO={}", fileSourceDTO.getBasicDesc());
FileWorkerDTO fileWorkerDTO = null;
try {
fileWorkerDTO = findBestFileWorkerIndeed(fileSourceDTO);
} catch (Exception e) {
log.warn("Fail to findBestFileWorker", e);
} finally {
long nanoSeconds = sample.stop(
meterRegistry.timer(
MetricsConstants.NAME_FILE_GATEWAY_DISPATCH_TIME,
buildDispatchResult(fileWorkerDTO, requestSource)
)
);
long millis = TimeUnit.NANOSECONDS.toMillis(nanoSeconds);
if (millis > 2000) {
log.warn("SLOW: Dispatch time over 2000ms, fileSourceDTO={}", fileSourceDTO.getBasicDesc());
}
}
return fileWorkerDTO;
}

private Iterable<Tag> buildDispatchResult(FileWorkerDTO fileWorkerDTO, String requestSource) {
List<Tag> tagList = new ArrayList<>();
tagList.add(Tag.of(MetricsConstants.TAG_KEY_MODULE, MetricsConstants.TAG_VALUE_MODULE_FILE_GATEWAY));
tagList.add(Tag.of(MetricsConstants.TAG_KEY_REQUEST_SOURCE, requestSource));
if (fileWorkerDTO == null) {
tagList.add(
Tag.of(MetricsConstants.TAG_KEY_DISPATCH_RESULT, MetricsConstants.TAG_VALUE_DISPATCH_RESULT_FALSE)
);
} else {
tagList.add(
Tag.of(MetricsConstants.TAG_KEY_DISPATCH_RESULT, MetricsConstants.TAG_VALUE_DISPATCH_RESULT_TRUE)
);
}
return tagList;
}

private FileWorkerDTO findWorkerByAuto(FileSourceDTO fileSourceDTO) {
FileWorkerDTO fileWorkerDTO = null;
String workerSelectScope = fileSourceDTO.getWorkerSelectScope();
List<String> abilityTagList = abilityTagService.getAbilityTagList(fileSourceDTO);
List<FileWorkerDTO> fileWorkerDTOList;
if (abilityTagList == null || abilityTagList.size() == 0) {
if (abilityTagList == null || abilityTagList.isEmpty()) {
// 无能力标签要求,任选一个FileWorker
fileWorkerDTOList = getFileWorkerByScope(fileSourceDTO.getAppId(), workerSelectScope);
if (fileWorkerDTOList.isEmpty()) {
Expand Down Expand Up @@ -198,10 +227,9 @@ private FileWorkerDTO findWorkerByAuto(FileSourceDTO fileSourceDTO) {
return onlineStatus != null && onlineStatus.intValue() == 1;
}).collect(Collectors.toList());
if (!fileWorkerDTOList.isEmpty()) {
// 按策略调度:内存占用最小
fileWorkerDTOList.sort(Comparator.comparing(FileWorkerDTO::getMemRate));
log.debug("ordered fileWorkerDTOList:{}", fileWorkerDTOList);
fileWorkerDTO = fileWorkerDTOList.get(0);
// 按策略调度:RoundRobin
int index = (int) (roundRobinCount.getAndIncrement() % fileWorkerDTOList.size());
fileWorkerDTO = fileWorkerDTOList.get(index);
} else {
log.error("Cannot find available file worker, abilityTagList={}", abilityTagList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,17 @@ public FileServiceImpl(FileSourceService fileSourceService,
this.jobHttpClient = jobHttpClient;
}

private FileWorkerDTO getFileWorker(FileSourceDTO fileSourceDTO) {
private FileWorkerDTO getFileWorker(FileSourceDTO fileSourceDTO, String requestSource) {
if (fileSourceDTO == null) {
throw new InternalException(ErrorCode.FILE_SOURCE_NOT_EXIST);
}
return dispatchService.findBestFileWorker(fileSourceDTO);
return dispatchService.findBestFileWorker(fileSourceDTO, requestSource);
}

@Override
public boolean isFileAvailable(String username, Long appId, Integer fileSourceId) {
FileSourceDTO fileSourceDTO = fileSourceService.getFileSourceById(appId, fileSourceId);
FileWorkerDTO fileWorkerDTO = getFileWorker(fileSourceDTO);
FileWorkerDTO fileWorkerDTO = getFileWorker(fileSourceDTO, "isFileAvailable");
if (fileWorkerDTO == null) {
throw new InternalException(ErrorCode.CAN_NOT_FIND_AVAILABLE_FILE_WORKER);
}
Expand All @@ -103,7 +103,7 @@ public FileNodesVO listFileNode(String username, Long appId, Integer fileSourceI
if (name == null) name = "";
final String finalName = name;
FileSourceDTO fileSourceDTO = fileSourceService.getFileSourceById(appId, fileSourceId);
FileWorkerDTO fileWorkerDTO = getFileWorker(fileSourceDTO);
FileWorkerDTO fileWorkerDTO = getFileWorker(fileSourceDTO, "listFileNode");
if (fileWorkerDTO == null) {
throw new InternalException(ErrorCode.CAN_NOT_FIND_AVAILABLE_FILE_WORKER);
}
Expand All @@ -129,7 +129,10 @@ public FileNodesVO listFileNode(String username, Long appId, Integer fileSourceI
@Override
public Boolean executeAction(String username, Long appId, Integer fileSourceId, ExecuteActionReq executeActionReq) {
FileSourceDTO fileSourceDTO = fileSourceService.getFileSourceById(appId, fileSourceId);
FileWorkerDTO fileWorkerDTO = getFileWorker(fileSourceDTO);
FileWorkerDTO fileWorkerDTO = getFileWorker(
fileSourceDTO,
"executeAction" + executeActionReq.getActionCode()
);
if (fileWorkerDTO == null) {
throw new InternalException(ErrorCode.CAN_NOT_FIND_AVAILABLE_FILE_WORKER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ public TaskInfoDTO startFileSourceDownloadTaskWithId(String username, Long appId
if (fileSourceDTO == null) {
throw new RuntimeException("FileSource not exist, fileSourceId=" + fileSourceId.toString());
}
FileWorkerDTO fileWorkerDTO = dispatchService.findBestFileWorker(fileSourceDTO);
FileWorkerDTO fileWorkerDTO = dispatchService.findBestFileWorker(
fileSourceDTO,
"DownloadTask(appId=" + appId + ")"
);
if (fileWorkerDTO == null) {
throw new RuntimeException(String.format("Cannot match fileWorker for FileSourceTask,appId=%d," +
"stepInstanceId=%d,fileSourceId=%d,filePathList=%s", appId, stepInstanceId, fileSourceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void run() {
// 2.删除现有FileSourceTask任务
fileSourceTaskService.deleteFileSourceTaskById(fileSourceTaskId);
log.debug("delete fileSourceTask {}", fileSourceTaskId);
FileWorkerDTO fileWorkerDTO = dispatchService.findBestFileWorker(fileSourceDTO);
FileWorkerDTO fileWorkerDTO = dispatchService.findBestFileWorker(fileSourceDTO, "ReDispatch");
log.debug("found bestWorker:{}", fileSourceDTO);
if (fileWorkerDTO != null) {
// 3.重新派发任务
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ public void run() {
pageSize
);
for (FileSourceDTO fileSourceDTO : fileSourceDTOList) {
FileWorkerDTO fileWorkerDTO = dispatchService.findBestFileWorker(fileSourceDTO);
FileWorkerDTO fileWorkerDTO = dispatchService.findBestFileWorker(
fileSourceDTO, "FileSourceStatusUpdateTask"
);
int status;
if (fileWorkerDTO == null) {
log.info(
Expand Down

0 comments on commit c3c4751

Please sign in to comment.