Skip to content

Commit

Permalink
[controller][common] Use ExecutionStatusWithDetails class to replace …
Browse files Browse the repository at this point in the history
…existing Pair usage in controller (linkedin#515)

This PR uses existing POJO class ExecutionStatusWithDetails to replace Pair<ExecutionStatus, Optional> and Pair<ExecutionStatus, String> usage in controller push monitoring logics to improve readability
  • Loading branch information
sixpluszero authored Jun 28, 2023
1 parent 256627a commit ea885cb
Show file tree
Hide file tree
Showing 16 changed files with 168 additions and 185 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.linkedin.venice.meta;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pushmonitor.PushStatusDecider;
import com.linkedin.venice.pushmonitor.WaitAllPushStatusDecider;
import com.linkedin.venice.pushmonitor.WaitNMinusOnePushStatusDecider;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -11,14 +14,16 @@
*/
public enum OfflinePushStrategy {
/*Wait all replica is ready, the version is ready to serve.*/
WAIT_ALL_REPLICAS(0),
WAIT_ALL_REPLICAS(0, new WaitAllPushStatusDecider()),
/*Wait until N-1 replicas are ready, the version is ready to serve*/
WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION(1);
WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION(1, new WaitNMinusOnePushStatusDecider());

public final int value;
private final PushStatusDecider pushStatusDecider;

OfflinePushStrategy(int v) {
this.value = v;
OfflinePushStrategy(int value, PushStatusDecider decider) {
this.value = value;
this.pushStatusDecider = decider;
}

private static final Map<Integer, OfflinePushStrategy> idMapping = new HashMap<>();
Expand All @@ -33,4 +38,8 @@ public static OfflinePushStrategy getOfflinePushStrategyFromInt(int v) {
}
return strategy;
}

public PushStatusDecider getPushStatusDecider() {
return pushStatusDecider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ public ExecutionStatusWithDetails(ExecutionStatus status, String details, boolea
this.noDaVinciStatusReport = noDaVinciStatusReport;
}

public ExecutionStatusWithDetails(ExecutionStatus status, String details) {
this(status, details, true);
}

public ExecutionStatusWithDetails(ExecutionStatus status) {
this(status, null, true);
}

public ExecutionStatus getStatus() {
return status;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void setPartitionStatus(PartitionStatus partitionStatus, boolean updateDe
}

private void updateStatusDetails() {
PushStatusDecider decider = PushStatusDecider.getDecider(strategy);
PushStatusDecider decider = strategy.getPushStatusDecider();
Set<Integer> incompletePartitions = new HashSet<>();
int finishedPartitions = 0;
for (PartitionStatus partitionStatus: getPartitionStatuses()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,15 @@
import static com.linkedin.venice.pushmonitor.ExecutionStatus.STARTED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.isDeterminedStatus;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixState;
import com.linkedin.venice.helix.ResourceAssignment;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.OfflinePushStrategy;
import com.linkedin.venice.meta.Partition;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.systemstore.schemas.StoreReplicaStatus;
import com.linkedin.venice.utils.Pair;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -28,37 +25,29 @@


/**
* Decide the offline push status by checking all of replicas statuses under different offline push strategies.
* Decide the offline push status by checking all replicas statuses under different offline push strategies.
*/
public abstract class PushStatusDecider {
private final Logger logger = LogManager.getLogger(PushStatusDecider.class);
private static final String REASON_NOT_IN_EV = "not yet in EXTERNALVIEW";
private static final String REASON_NOT_ENOUGH_PARTITIONS_IN_EV = "not enough partitions in EXTERNALVIEW";
private static final String REASON_UNDER_REPLICATED = "does not have enough replicas";

private static Map<OfflinePushStrategy, PushStatusDecider> decidersMap = new HashMap<>();

static {
decidersMap.put(OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION, new WaitNMinusOnePushStatusDecider());
decidersMap.put(OfflinePushStrategy.WAIT_ALL_REPLICAS, new WaitAllPushStatusDecider());
}

/**
* Check the current status based on {@link PartitionStatus}
*/
public Pair<ExecutionStatus, Optional<String>> checkPushStatusAndDetailsByPartitionsStatus(
public ExecutionStatusWithDetails checkPushStatusAndDetailsByPartitionsStatus(
OfflinePushStatus pushStatus,
PartitionAssignment partitionAssignment,
DisableReplicaCallback callback) {
// Sanity check
if (partitionAssignment == null || partitionAssignment.isMissingAssignedPartitions()) {
logger.warn("partitionAssignment not ready: {}", partitionAssignment);
return new Pair<>(NOT_CREATED, Optional.empty());
return new ExecutionStatusWithDetails(NOT_CREATED);
}

boolean isAllPartitionCompleted = true;
boolean isAllPartitionEndOfPushReceived = true;

if (pushStatus.getPartitionStatuses().size() != pushStatus.getNumberOfPartition()) {
isAllPartitionCompleted = false;
isAllPartitionEndOfPushReceived = false;
Expand All @@ -77,11 +66,10 @@ public Pair<ExecutionStatus, Optional<String>> checkPushStatusAndDetailsByPartit
callback);

if (executionStatus == ERROR) {
return new Pair<>(
return new ExecutionStatusWithDetails(
executionStatus,
Optional.of(
"too many ERROR replicas in partition: " + partitionStatus.getPartitionId()
+ " for offlinePushStrategy: " + getStrategy().name()));
"too many ERROR replicas in partition: " + partitionStatus.getPartitionId() + " for offlinePushStrategy: "
+ getStrategy().name());
}

if (!executionStatus.equals(COMPLETED)) {
Expand All @@ -95,13 +83,12 @@ public Pair<ExecutionStatus, Optional<String>> checkPushStatusAndDetailsByPartit
}

if (isAllPartitionCompleted) {
return new Pair<>(COMPLETED, Optional.empty());
return new ExecutionStatusWithDetails(COMPLETED);
}
if (isAllPartitionEndOfPushReceived) {
return new Pair<>(END_OF_PUSH_RECEIVED, Optional.empty());
} else {
return new Pair<>(STARTED, Optional.empty());
return new ExecutionStatusWithDetails(END_OF_PUSH_RECEIVED);
}
return new ExecutionStatusWithDetails(STARTED);
}

public static List<Instance> getReadyToServeInstances(
Expand All @@ -119,7 +106,7 @@ public static List<Instance> getReadyToServeInstances(
}

/**
* Replicas from L/F and Online/Offline model will be considered ready to serve if their status is {@link ExecutionStatus.COMPLETED}.
* Replicas will be considered ready to serve if their status is {@link ExecutionStatus#COMPLETED}.
* More information is needed if you'd like to change/support other behaviors such as routing to the leader replica.
* @param replicaStatusMap
* @return List of ready to serve instance ids
Expand Down Expand Up @@ -254,29 +241,13 @@ protected ExecutionStatus getPartitionStatus(
*/
public static ExecutionStatus getReplicaCurrentStatus(List<StatusSnapshot> historicStatusList) {
List<ExecutionStatus> statusList =
historicStatusList.stream().map(statusSnapshot -> statusSnapshot.getStatus()).collect(Collectors.toList());
// prep to traverse the list from latest status.
historicStatusList.stream().map(StatusSnapshot::getStatus).collect(Collectors.toList());
Collections.reverse(statusList);
ExecutionStatus status = STARTED;
for (ExecutionStatus executionStatus: statusList) {
if (isDeterminedStatus(executionStatus)) {
status = executionStatus;
break;
return executionStatus;
}
}

return status;
}

public static PushStatusDecider getDecider(OfflinePushStrategy strategy) {
if (!decidersMap.containsKey(strategy)) {
throw new VeniceException("Unknown offline push strategy:" + strategy);
} else {
return decidersMap.get(strategy);
}
}

protected static void updateDecider(OfflinePushStrategy strategy, PushStatusDecider decider) {
decidersMap.put(strategy, decider);
return STARTED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


public class WaitAllPushStatusDeciderTest extends TestPushStatusDecider {
private WaitAllPushStatusDecider statusDecider = new WaitAllPushStatusDecider();
private final WaitAllPushStatusDecider statusDecider = new WaitAllPushStatusDecider();

@BeforeMethod
public void setUp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


public class WaitNMinusOnePushStatusDeciderTest extends TestPushStatusDecider {
private WaitNMinusOnePushStatusDecider statusDecider = new WaitNMinusOnePushStatusDecider();
private final WaitNMinusOnePushStatusDecider statusDecider = new WaitNMinusOnePushStatusDecider();

@BeforeMethod
public void setUp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void cleanUp() {
// Controller shutdown needs to complete within 5 minutes
ExecutorService ex = Executors.newSingleThreadExecutor();
Future clusterShutdownFuture = ex.submit(this::cleanupCluster);
TestUtils.waitForNonDeterministicCompletion(5, TimeUnit.MINUTES, () -> clusterShutdownFuture.isDone());
TestUtils.waitForNonDeterministicCompletion(5, TimeUnit.MINUTES, clusterShutdownFuture::isDone);
ex.shutdownNow();
}

Expand Down Expand Up @@ -792,7 +792,7 @@ public void testDisableStoreWrite() {
TOTAL_TIMEOUT_FOR_SHORT_TEST_MS,
TimeUnit.MILLISECONDS,
() -> monitor.getPushStatusAndDetails(Version.composeKafkaTopic(storeName, 2))
.getFirst()
.getStatus()
.equals(ExecutionStatus.COMPLETED));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4641,7 +4641,7 @@ private void waitUntilNodesAreAssignedForResource(
HelixVeniceClusterResources clusterResources = getHelixVeniceClusterResources(clusterName);
PushMonitor pushMonitor = clusterResources.getPushMonitor();
RoutingDataRepository routingDataRepository = clusterResources.getRoutingDataRepository();
PushStatusDecider statusDecider = PushStatusDecider.getDecider(strategy);
PushStatusDecider statusDecider = strategy.getPushStatusDecider();

Optional<String> notReadyReason = Optional.of("unknown");
long startTime = System.currentTimeMillis();
Expand Down Expand Up @@ -5402,7 +5402,7 @@ public OfflinePushStatusInfo getOffLinePushStatus(
return list.get(0);
}

private Pair<ExecutionStatus, String> getIncrementalPushStatus(
private ExecutionStatusWithDetails getIncrementalPushStatus(
String clusterName,
String kafkaTopic,
String incrementalPushVersion,
Expand Down Expand Up @@ -5430,7 +5430,7 @@ private OfflinePushStatusInfo getOfflinePushStatusInfo(
Store store,
int versionNumber,
boolean isTargetPush) {
Pair<ExecutionStatus, String> statusAndDetails;
ExecutionStatusWithDetails statusAndDetails;

HelixVeniceClusterResources resources = getHelixVeniceClusterResources(clusterName);
MaintenanceSignal maintenanceSignal =
Expand All @@ -5450,8 +5450,8 @@ private OfflinePushStatusInfo getOfflinePushStatusInfo(
} else {
statusAndDetails = monitor.getPushStatusAndDetails(kafkaTopic);
}
ExecutionStatus executionStatus = statusAndDetails.getFirst();
String details = statusAndDetails.getSecond();
ExecutionStatus executionStatus = statusAndDetails.getStatus();
String details = statusAndDetails.getDetails();
if (executionStatus.equals(ExecutionStatus.NOT_CREATED)) {
StringBuilder moreDetailsBuilder = new StringBuilder(details == null ? "" : details + " and ");

Expand Down Expand Up @@ -5617,7 +5617,7 @@ private void createClusterIfRequired(String clusterName) {
VeniceControllerClusterConfig config = multiClusterConfigs.getControllerConfig(clusterName);
HelixConfigScope configScope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
Map<String, String> helixClusterProperties = new HashMap<String, String>();
Map<String, String> helixClusterProperties = new HashMap<>();
helixClusterProperties.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, String.valueOf(true));
long delayedTime = config.getDelayToRebalanceMS();
if (delayedTime > 0) {
Expand Down
Loading

0 comments on commit ea885cb

Please sign in to comment.