Skip to content

Commit

Permalink
add controller config for the status
Browse files Browse the repository at this point in the history
  • Loading branch information
m-nagarajan committed Apr 18, 2024
1 parent 5d056b6 commit 08ba80e
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import static com.linkedin.venice.ConfigKeys.TOPIC_DELETION_STATUS_POLL_INTERVAL_MS;
import static com.linkedin.venice.ConfigKeys.TOPIC_MANAGER_KAFKA_OPERATION_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED;
import static com.linkedin.venice.ConfigKeys.USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR;
import static com.linkedin.venice.ConfigKeys.USE_PUSH_STATUS_STORE_FOR_INCREMENTAL_PUSH;
import static com.linkedin.venice.ConfigKeys.VENICE_STORAGE_CLUSTER_LEADER_HAAS;
import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_TOPIC_DELETION_STATUS_POLL_INTERVAL_MS_DEFAULT_VALUE;
Expand Down Expand Up @@ -318,7 +319,7 @@ public class VeniceControllerConfig extends VeniceControllerClusterConfig {
private final int unusedSchemaCleanupIntervalSeconds;

private final int minSchemaCountToKeep;

private final boolean useDaVinciSpecificExecutionStatusForError;
private final PubSubClientsFactory pubSubClientsFactory;

public VeniceControllerConfig(VeniceProperties props) {
Expand Down Expand Up @@ -556,6 +557,8 @@ public VeniceControllerConfig(VeniceProperties props) {
props.getBoolean(CONTROLLER_UNUSED_VALUE_SCHEMA_CLEANUP_ENABLED, false);
this.unusedSchemaCleanupIntervalSeconds = props.getInt(CONTROLLER_UNUSED_SCHEMA_CLEANUP_INTERVAL_SECONDS, 36000);
this.minSchemaCountToKeep = props.getInt(CONTROLLER_MIN_SCHEMA_COUNT_TO_KEEP, 20);
this.useDaVinciSpecificExecutionStatusForError =
props.getBoolean(USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR, false);
this.pubSubClientsFactory = new PubSubClientsFactory(props);
}

Expand Down Expand Up @@ -678,6 +681,10 @@ public int getMinSchemaCountToKeep() {
return minSchemaCountToKeep;
}

public boolean useDaVinciSpecificExecutionStatusForError() {
return useDaVinciSpecificExecutionStatusForError;
}

public Map<String, String> getChildDataCenterControllerD2Map() {
return childDataCenterControllerD2Map;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5831,7 +5831,8 @@ private OfflinePushStatusInfo getOfflinePushStatusInfo(
version.getPartitionCount(),
incrementalPushVersion,
multiClusterConfigs.getControllerConfig(clusterName).getDaVinciPushStatusScanMaxOfflineInstanceCount(),
multiClusterConfigs.getControllerConfig(clusterName).getDaVinciPushStatusScanMaxOfflineInstanceRatio());
multiClusterConfigs.getControllerConfig(clusterName).getDaVinciPushStatusScanMaxOfflineInstanceRatio(),
multiClusterConfigs.getCommonConfig().useDaVinciSpecificExecutionStatusForError());
ExecutionStatus daVinciStatus = daVinciStatusAndDetails.getStatus();
String daVinciDetails = daVinciStatusAndDetails.getDetails();
ExecutionStatus overallExecutionStatus = getOverallPushStatus(executionStatus, daVinciStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public AbstractPushMonitor(
controllerConfig.getDaVinciPushStatusScanThreadNumber(),
controllerConfig.getDaVinciPushStatusScanNoReportRetryMaxAttempt(),
controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceCount(),
controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceRatio());
controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceRatio(),
controllerConfig.useDaVinciSpecificExecutionStatusForError());
this.isOfflinePushMonitorDaVinciPushStatusEnabled = controllerConfig.isDaVinciPushStatusEnabled();
pushStatusCollector.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails(
int partitionCount,
Optional<String> incrementalPushVersion,
int maxOfflineInstanceCount,
double maxOfflineInstanceRatio) {
double maxOfflineInstanceRatio,
boolean useDaVinciSpecificExecutionStatusForError) {
if (reader == null) {
throw new VeniceException("PushStatusStoreReader is null");
}
Expand All @@ -70,7 +71,8 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails(
partitionCount,
incrementalPushVersion,
maxOfflineInstanceCount,
maxOfflineInstanceRatio);
maxOfflineInstanceRatio,
useDaVinciSpecificExecutionStatusForError);
} else {
// DaVinci starts using new status key format, which contains status for all partitions in one key.
// Only batch pushes will use this key; incremental pushes will still use partition level status key.
Expand Down Expand Up @@ -126,7 +128,9 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails(
if (lastUpdateTime + TimeUnit.MINUTES.toMillis(daVinciErrorInstanceWaitTime) < System.currentTimeMillis()) {
storeVersionToDVCDeadInstanceTimeMap.remove(topicName);
return new ExecutionStatusWithDetails(
ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES,
useDaVinciSpecificExecutionStatusForError
? ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES
: ExecutionStatus.ERROR,
"Too many dead instances: " + offlineInstanceCount + ", total instances: " + totalInstanceCount
+ ", example offline instances: " + offlineInstanceList,
noDaVinciStatusReported);
Expand Down Expand Up @@ -165,7 +169,8 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails(
partitionCount,
incrementalPushVersion,
maxOfflineInstanceCount,
maxOfflineInstanceRatio);
maxOfflineInstanceRatio,
useDaVinciSpecificExecutionStatusForError);
if (partitionLevelStatus.getStatus() != ExecutionStatus.COMPLETED) {
// Do not report COMPLETED, instead, report status from the partition level status key.
statusDetailStringBuilder.append(
Expand Down Expand Up @@ -200,7 +205,8 @@ public static ExecutionStatusWithDetails getDaVinciPartitionLevelPushStatusAndDe
int partitionCount,
Optional<String> incrementalPushVersion,
int maxOfflineInstanceCount,
double maxOfflineInstanceRatio) {
double maxOfflineInstanceRatio,
boolean useDaVinciSpecificExecutionStatusForError) {
if (reader == null) {
throw new VeniceException("PushStatusStoreReader is null");
}
Expand Down Expand Up @@ -277,7 +283,9 @@ public static ExecutionStatusWithDetails getDaVinciPartitionLevelPushStatusAndDe
if (lastUpdateTime + TimeUnit.MINUTES.toMillis(daVinciErrorInstanceWaitTime) < System.currentTimeMillis()) {
storeVersionToDVCDeadInstanceTimeMap.remove(topicName);
return new ExecutionStatusWithDetails(
ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES,
useDaVinciSpecificExecutionStatusForError
? ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES
: ExecutionStatus.ERROR,
"Too many dead instances: " + offlineReplicaCount + ", total instances: " + totalReplicaCount
+ ", example offline instances: " + offlineInstanceList,
noDaVinciStatusReported);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class PushStatusCollector {
private final AtomicBoolean isStarted = new AtomicBoolean(false);

private final Map<String, Integer> topicToNoDaVinciStatusRetryCountMap = new HashMap<>();
private final boolean useDaVinciSpecificExecutionStatusForError;

public PushStatusCollector(
ReadWriteStoreRepository storeRepository,
Expand All @@ -60,7 +61,8 @@ public PushStatusCollector(
int daVinciPushStatusScanThreadNumber,
int daVinciPushStatusNoReportRetryMaxAttempts,
int daVinciPushStatusScanMaxOfflineInstanceCount,
double daVinciPushStatusScanMaxOfflineInstanceRatio) {
double daVinciPushStatusScanMaxOfflineInstanceRatio,
boolean useDaVinciSpecificExecutionStatusForError) {
this.storeRepository = storeRepository;
this.pushStatusStoreReader = pushStatusStoreReader;
this.pushCompletedHandler = pushCompletedHandler;
Expand All @@ -71,6 +73,7 @@ public PushStatusCollector(
this.daVinciPushStatusNoReportRetryMaxAttempts = daVinciPushStatusNoReportRetryMaxAttempts;
this.daVinciPushStatusScanMaxOfflineInstanceCount = daVinciPushStatusScanMaxOfflineInstanceCount;
this.daVinciPushStatusScanMaxOfflineInstanceRatio = daVinciPushStatusScanMaxOfflineInstanceRatio;
this.useDaVinciSpecificExecutionStatusForError = useDaVinciSpecificExecutionStatusForError;
}

public void start() {
Expand Down Expand Up @@ -135,7 +138,8 @@ private void scanDaVinciPushStatus() {
pushStatus.getPartitionCount(),
Optional.empty(),
daVinciPushStatusScanMaxOfflineInstanceCount,
daVinciPushStatusScanMaxOfflineInstanceRatio);
daVinciPushStatusScanMaxOfflineInstanceRatio,
useDaVinciSpecificExecutionStatusForError);
pushStatus.setDaVinciStatus(statusWithDetails);
return pushStatus;
}, pushStatusStoreScanExecutor));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.mockito.Mockito.mock;

import com.linkedin.venice.pushstatushelper.PushStatusStoreReader;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.Utils;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -51,8 +52,8 @@ public void testCompleteStatusCanBeReportedWithOfflineInstancesBelowFailFastThre
validatePushStatus(reader, "store_v1", 2, 0.25, ExecutionStatus.COMPLETED);
}

@Test
public void testDaVinciPushStatusScan() {
@Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
public void testDaVinciPushStatusScan(boolean useDaVinciSpecificExecutionStatusForError) {
PushMonitorUtils.setDaVinciErrorInstanceWaitTime(0);
PushStatusStoreReader reader = mock(PushStatusStoreReader.class);
doReturn(true).when(reader).isInstanceAlive(eq("store"), eq("a"));
Expand Down Expand Up @@ -83,16 +84,20 @@ public void testDaVinciPushStatusScan() {
2,
0.25,
ExecutionStatus.STARTED,
null);
null,
useDaVinciSpecificExecutionStatusForError);

// Expected to fail.
validateOfflineReplicaInPushStatusWhenBreachingFailFastThreshold(
reader,
"store_v1",
1,
0.25,
ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES,
"Too many dead instances: 2, total instances: 4, example offline instances: " + offlineInstances);
useDaVinciSpecificExecutionStatusForError
? ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES
: ExecutionStatus.ERROR,
"Too many dead instances: 2, total instances: 4, example offline instances: " + offlineInstances,
useDaVinciSpecificExecutionStatusForError);

/**
* Testing ratio-based threshold.
Expand All @@ -104,15 +109,19 @@ public void testDaVinciPushStatusScan() {
1,
0.5,
ExecutionStatus.STARTED,
null);
null,
useDaVinciSpecificExecutionStatusForError);
// Expected to fail.
validateOfflineReplicaInPushStatusWhenBreachingFailFastThreshold(
reader,
"store_v2",
1,
0.25,
ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES,
"Too many dead instances: 2, total instances: 4, example offline instances: " + offlineInstances);
useDaVinciSpecificExecutionStatusForError
? ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES
: ExecutionStatus.ERROR,
"Too many dead instances: 2, total instances: 4, example offline instances: " + offlineInstances,
useDaVinciSpecificExecutionStatusForError);
}

private void validateOfflineReplicaInPushStatusWhenBreachingFailFastThreshold(
Expand All @@ -121,7 +130,8 @@ private void validateOfflineReplicaInPushStatusWhenBreachingFailFastThreshold(
int maxOfflineInstanceCount,
double maxOfflineInstanceRatio,
ExecutionStatus expectedStatus,
String expectedErrorDetails) {
String expectedErrorDetails,
boolean useDaVinciSpecificExecutionStatusForError) {
/**
* Even if offline instances number exceed the max offline threshold count it will remain STARTED for the first check,
* as we need to wait until daVinciErrorInstanceWaitTime has passed since it first occurs.
Expand All @@ -132,7 +142,8 @@ private void validateOfflineReplicaInPushStatusWhenBreachingFailFastThreshold(
1,
Optional.empty(),
maxOfflineInstanceCount,
maxOfflineInstanceRatio);
maxOfflineInstanceRatio,
useDaVinciSpecificExecutionStatusForError);
Assert.assertEquals(executionStatusWithDetails.getStatus(), ExecutionStatus.STARTED);
// Sleep 1ms and try again.
Utils.sleep(1);
Expand All @@ -142,7 +153,8 @@ private void validateOfflineReplicaInPushStatusWhenBreachingFailFastThreshold(
1,
Optional.empty(),
maxOfflineInstanceCount,
maxOfflineInstanceRatio);
maxOfflineInstanceRatio,
useDaVinciSpecificExecutionStatusForError);
Assert.assertEquals(executionStatusWithDetails.getStatus(), expectedStatus);
if (expectedStatus.isError()) {
Assert.assertEquals(executionStatusWithDetails.getDetails(), expectedErrorDetails);
Expand All @@ -161,7 +173,8 @@ private void validatePushStatus(
1,
Optional.empty(),
maxOfflineInstanceCount,
maxOfflineInstanceRatio);
maxOfflineInstanceRatio,
true);
Assert.assertEquals(executionStatusWithDetails.getStatus(), expectedStatus);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public void testPushStatusCollector() {
4,
1,
20,
1);
1,
true);
pushStatusCollector.start();

pushStatusCollector.subscribeTopic(regularStoreTopicV1, 10);
Expand Down Expand Up @@ -223,7 +224,8 @@ public void testPushStatusCollectorDaVinciStatusPollingRetry() {
4,
1,
20,
1);
1,
true);
pushStatusCollector.start();

pushCompletedCount.set(0);
Expand Down Expand Up @@ -314,7 +316,8 @@ public void testPushStatusCollectorDaVinciStatusPollingRetryWhenEmptyResultUntil
4,
0,
20,
1);
1,
true);
pushStatusCollector.start();

pushCompletedCount.set(0);
Expand Down

0 comments on commit 08ba80e

Please sign in to comment.