Skip to content

Commit

Permalink
fix job retry function
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinw66 committed Aug 4, 2024
1 parent 68b1f6f commit 76afa7b
Show file tree
Hide file tree
Showing 13 changed files with 149 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static <T extends MessageOrBuilder> T fromJson(String json, Class<T> claz

public static <T extends Message> String toJson(T message) {
try {
return JsonFormat.printer().print(message);
return JsonFormat.printer().omittingInsignificantWhitespace().print(message);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ public List<Stage> getStages() {
return stages;
}

@Override
public void loadJobPO(JobPO jobPO) {
this.jobPO = jobPO;
}

@Override
public JobPO getJobPO() {
if (jobPO == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,7 @@ public interface Job {

List<Stage> getStages();

void loadJobPO(JobPO jobPO);

JobPO getJobPO();
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected void createStages() {
// Install components
super.createInstallStages();

// Distribute caches after installed
// Update cache files after installed
super.createCacheStage();

// Start all master components
Expand Down Expand Up @@ -78,14 +78,17 @@ protected List<String> findHostnamesByComponentName(String componentName) {
for (ComponentHostDTO componentHost : componentHosts) {
if (componentHost.getComponentName().equals(componentName)) {
List<String> hostnames = new ArrayList<>(componentHost.getHostnames());
List<String> existHostnames = hostComponentRepository
.findAllByComponentPOClusterPOIdAndComponentPOComponentNameAndHostPOHostnameIn(
clusterPO.getId(), componentName, hostnames)
.stream()
.map(hostComponent -> hostComponent.getHostPO().getHostname())
.toList();

hostnames.removeAll(existHostnames);
if (serviceCommand.getInstalled()) {
List<String> existHostnames = hostComponentRepository
.findAllByComponentPOClusterPOIdAndComponentPOComponentNameAndHostPOHostnameIn(
clusterPO.getId(), componentName, hostnames)
.stream()
.map(hostComponent -> hostComponent.getHostPO().getHostname())
.toList();

hostnames.removeAll(existHostnames);
}

return hostnames;
}
}
Expand Down Expand Up @@ -119,7 +122,7 @@ private void upsertService(ServicePO servicePO, ServiceCommandDTO serviceCommand
String stackName = clusterPO.getStackPO().getStackName();
String stackVersion = clusterPO.getStackPO().getStackVersion();

// 1. Persist service
// 1. Persist service and components
if (servicePO == null) {
ServiceDTO serviceDTO = StackUtils.getServiceDTO(stackName, stackVersion, serviceName);
servicePO = ServiceConverter.INSTANCE.fromDTO2PO(serviceDTO, clusterPO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public List<Task> getTasks() {
return tasks;
}

@Override
public void loadStagePO(StagePO stagePO) {
this.stagePO = stagePO;
}

@Override
public StagePO getStagePO() {
if (stagePO == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ public interface Stage {

List<Task> getTasks();

void loadStagePO(StagePO stagePO);

StagePO getStagePO();
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public TaskContext getTaskContext() {
return taskContext;
}

@Override
public void loadTaskPO(TaskPO taskPO) {
this.taskPO = taskPO;
}

@Override
public TaskPO getTaskPO() {
if (taskPO == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,7 @@ public interface Task {

TaskContext getTaskContext();

void loadTaskPO(TaskPO taskPO);

TaskPO getTaskPO();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class ServiceCommandDTO implements Serializable {

private String serviceName;

private Boolean installed;

private String configDesc;

private Integer version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public class ServiceCommandReq {
@NotNull @Schema(description = "Service name", example = "zookeeper")
private String serviceName;

@Schema(description = "If service installed", example = "true")
private Boolean installed;

@Schema(description = "Config Description", example = "Initial config for zookeeper")
private String configDesc;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,41 @@
*/
package org.apache.bigtop.manager.server.service.impl;

import jakarta.annotation.Resource;
import org.apache.bigtop.manager.common.enums.JobState;
import org.apache.bigtop.manager.common.utils.JsonUtils;
import org.apache.bigtop.manager.dao.po.JobPO;
import org.apache.bigtop.manager.dao.po.StagePO;
import org.apache.bigtop.manager.dao.po.TaskPO;
import org.apache.bigtop.manager.dao.repository.JobRepository;
import org.apache.bigtop.manager.dao.repository.StageRepository;
import org.apache.bigtop.manager.dao.repository.TaskRepository;
import org.apache.bigtop.manager.server.command.CommandIdentifier;
import org.apache.bigtop.manager.server.command.job.Job;
import org.apache.bigtop.manager.server.command.job.factory.JobContext;
import org.apache.bigtop.manager.server.command.job.factory.JobFactories;
import org.apache.bigtop.manager.server.command.job.factory.JobFactory;
import org.apache.bigtop.manager.server.command.scheduler.JobScheduler;
import org.apache.bigtop.manager.server.command.stage.Stage;
import org.apache.bigtop.manager.server.command.task.Task;
import org.apache.bigtop.manager.server.enums.ApiExceptionEnum;
import org.apache.bigtop.manager.server.exception.ApiException;
import org.apache.bigtop.manager.server.model.converter.JobConverter;
import org.apache.bigtop.manager.server.model.query.PageQuery;
import org.apache.bigtop.manager.server.model.vo.JobVO;
import org.apache.bigtop.manager.server.model.vo.PageVO;
import org.apache.bigtop.manager.server.service.JobService;
import org.apache.bigtop.manager.server.utils.ClusterUtils;
import org.apache.bigtop.manager.server.utils.PageUtils;

import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;

import jakarta.annotation.Resource;

import java.util.List;

@Service
public class JobServiceImpl implements JobService {

Expand Down Expand Up @@ -73,26 +90,82 @@ public JobVO get(Long id) {

@Override
public JobVO retry(Long id) {
throw new RuntimeException("Api is in maintenance");
// JobPO jobPO = jobRepository.getReferenceById(id);
// if (jobPO.getState() != JobState.FAILED) {
// throw new ApiException(ApiExceptionEnum.JOB_NOT_RETRYABLE);
// }
//
// for (StagePO stagePO : jobPO.getStagePOList()) {
// for (TaskPO taskPO : stagePO.getTaskPOList()) {
// taskPO.setState(JobState.PENDING);
// taskRepository.save(taskPO);
// }
//
// stagePO.setState(JobState.PENDING);
// stageRepository.save(stagePO);
// }
//
// jobPO.setState(JobState.PENDING);
// jobRepository.save(jobPO);
// jobScheduler.submit(jobPO);
//
// return JobConverter.INSTANCE.fromPO2VO(jobPO);
JobPO jobPO = jobRepository.getReferenceById(id);
if (jobPO.getState() != JobState.FAILED) {
throw new ApiException(ApiExceptionEnum.JOB_NOT_RETRYABLE);
}

resetJobStatusInDB(jobPO);
Job job = recreateJob(jobPO);
jobScheduler.submit(job);

return JobConverter.INSTANCE.fromPO2VO(jobPO);
}

private void resetJobStatusInDB(JobPO jobPO) {
for (StagePO stagePO : jobPO.getStagePOList()) {
for (TaskPO taskPO : stagePO.getTaskPOList()) {
taskPO.setState(JobState.PENDING);
taskRepository.save(taskPO);
}

stagePO.setState(JobState.PENDING);
stageRepository.save(stagePO);
}

jobPO.setState(JobState.PENDING);
jobRepository.save(jobPO);
}

private Job recreateJob(JobPO jobPO) {
JobContext jobContext = JsonUtils.readFromString(jobPO.getContext(), JobContext.class);
CommandIdentifier commandIdentifier = new CommandIdentifier(
jobContext.getCommandDTO().getCommandLevel(),
jobContext.getCommandDTO().getCommand());
JobFactory jobFactory = JobFactories.getJobFactory(commandIdentifier);
Job job = jobFactory.createJob(jobContext);

job.loadJobPO(jobPO);
for (int i = 0; i < job.getStages().size(); i++) {
Stage stage = job.getStages().get(i);
StagePO stagePO = findCorrectStagePO(jobPO.getStagePOList(), i + 1);
if (stagePO == null) {
throw new ApiException(ApiExceptionEnum.JOB_NOT_RETRYABLE);
}

stage.loadStagePO(stagePO);

for (int j = 0; j < stage.getTasks().size(); j++) {
Task task = stage.getTasks().get(j);
TaskPO taskPO = findCorrectTaskPO(stagePO.getTaskPOList(), task.getTaskContext().getHostname());
if (taskPO == null) {
throw new ApiException(ApiExceptionEnum.JOB_NOT_RETRYABLE);
}

task.loadTaskPO(taskPO);
}
}

return job;
}

private StagePO findCorrectStagePO(List<StagePO> stagePOList, Integer order) {
for (StagePO stagePO : stagePOList) {
if (stagePO.getOrder().equals(order)) {
return stagePO;
}
}

return null;
}

private TaskPO findCorrectTaskPO(List<TaskPO> taskPOList, String hostname) {
for (TaskPO taskPO : taskPOList) {
if (taskPO.getHostname().equals(hostname)) {
return taskPO;
}
}

return null;
}
}
25 changes: 13 additions & 12 deletions bigtop-manager-ui/src/components/service-add/choose-services.vue
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@
-->

<script setup lang="ts">
import { storeToRefs } from 'pinia'
import { TableProps } from 'ant-design-vue'
import { useServiceStore } from '@/store/service'
import { MergedServiceVO } from '@/store/service/types.ts'
import { onMounted } from 'vue'
import { ServiceVO } from '@/api/service/types.ts'
import { useStackStore } from '@/store/stack'
import { ComponentVO, ServiceComponentVO } from '@/api/component/types.ts'
import { TypeConfigVO, ServiceConfigVO } from '@/api/config/types.ts'
import _ from 'lodash'
const serviceInfo = defineModel<any>('serviceInfo')
import {storeToRefs} from 'pinia'
import {TableProps} from 'ant-design-vue'
import {useServiceStore} from '@/store/service'
import {MergedServiceVO} from '@/store/service/types.ts'
import {onMounted} from 'vue'
import {ServiceVO} from '@/api/service/types.ts'
import {useStackStore} from '@/store/stack'
import {ComponentVO, ServiceComponentVO} from '@/api/component/types.ts'
import {ServiceConfigVO, TypeConfigVO} from '@/api/config/types.ts'
import _ from 'lodash'
const serviceInfo = defineModel<any>('serviceInfo')
const disableButton = defineModel<boolean>('disableButton')
const stackStore = useStackStore()
Expand Down Expand Up @@ -84,6 +84,7 @@
return {
serviceName: serviceName,
installed: false,
componentHosts: componentHosts,
configs: configs
}
Expand Down
1 change: 1 addition & 0 deletions bigtop-manager-ui/src/components/service-add/index.vue
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
return {
serviceName: serviceName,
installed: true,
componentHosts: componentHosts,
configs: configs
}
Expand Down

0 comments on commit 76afa7b

Please sign in to comment.