Skip to content

Commit

Permalink
[dvc] Add config to return Da Vinci specific ExecutionStatus for Erro…
Browse files Browse the repository at this point in the history
…rs (linkedin#947)

* Added a config "use.da.vinci.specific.execution.status.for.error" to enable/disable returning newly added Da Vinci specific ExecutionStatus for Errors in Controllers for DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES and in Da Vinci clients for other DVC specific statuses.
* It's disabled by default and can be enabled back again after the new code is in all components
  • Loading branch information
m-nagarajan authored Apr 18, 2024
1 parent 796e201 commit 560b804
Show file tree
Hide file tree
Showing 13 changed files with 206 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public class DaVinciBackend implements Closeable {
private final Optional<ObjectCacheBackend> cacheBackend;
private DaVinciIngestionBackend ingestionBackend;
private final AggVersionedStorageEngineStats aggVersionedStorageEngineStats;
private final boolean useDaVinciSpecificExecutionStatusForError;

public DaVinciBackend(
ClientConfig clientConfig,
Expand All @@ -116,6 +117,7 @@ public DaVinciBackend(
LOGGER.info("Creating Da Vinci backend with managed clients: {}", managedClients);
try {
VeniceServerConfig backendConfig = configLoader.getVeniceServerConfig();
useDaVinciSpecificExecutionStatusForError = backendConfig.useDaVinciSpecificExecutionStatusForError();
this.configLoader = configLoader;
metricsRepository = Optional.ofNullable(clientConfig.getMetricsRepository())
.orElse(TehutiUtils.getMetricsRepository("davinci-client"));
Expand Down Expand Up @@ -664,7 +666,7 @@ public void error(String kafkaTopic, int partitionId, String message, Exception
/**
* Report push status needs to be executed before deleting the {@link VersionBackend}.
*/
ExecutionStatus status = getDaVinciErrorStatus(e);
ExecutionStatus status = getDaVinciErrorStatus(e, useDaVinciSpecificExecutionStatusForError);
reportPushStatus(kafkaTopic, partitionId, status);

versionBackend.completePartitionExceptionally(partitionId, e);
Expand Down Expand Up @@ -740,16 +742,21 @@ public void endOfIncrementalPushReceived(
}
};

static ExecutionStatus getDaVinciErrorStatus(Exception e) {
ExecutionStatus status = DVC_INGESTION_ERROR_OTHER;
if (e instanceof VeniceException) {
if (e instanceof MemoryLimitExhaustedException
|| (e.getCause() != null && e.getCause() instanceof MemoryLimitExhaustedException)) {
status = DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED;
} else if (e instanceof DiskLimitExhaustedException
|| (e.getCause() != null && e.getCause() instanceof DiskLimitExhaustedException)) {
status = DVC_INGESTION_ERROR_DISK_FULL;
static ExecutionStatus getDaVinciErrorStatus(Exception e, boolean useDaVinciSpecificExecutionStatusForError) {
ExecutionStatus status;
if (useDaVinciSpecificExecutionStatusForError) {
status = DVC_INGESTION_ERROR_OTHER;
if (e instanceof VeniceException) {
if (e instanceof MemoryLimitExhaustedException
|| (e.getCause() != null && e.getCause() instanceof MemoryLimitExhaustedException)) {
status = DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED;
} else if (e instanceof DiskLimitExhaustedException
|| (e.getCause() != null && e.getCause() instanceof DiskLimitExhaustedException)) {
status = DVC_INGESTION_ERROR_DISK_FULL;
}
}
} else {
status = ExecutionStatus.ERROR;
}
return status;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
import static com.linkedin.venice.ConfigKeys.SYSTEM_SCHEMA_INITIALIZATION_AT_START_TIME_ENABLED;
import static com.linkedin.venice.ConfigKeys.UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED;
import static com.linkedin.venice.ConfigKeys.UNSORTED_INPUT_DRAINER_SIZE;
import static com.linkedin.venice.ConfigKeys.USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR;
import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_CONSUMER_POOL_SIZE_DEFAULT_VALUE;

import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModelFactory;
Expand Down Expand Up @@ -458,6 +459,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final int nonExistingTopicCheckRetryIntervalSecond;
private final boolean dedicatedConsumerPoolForAAWCLeaderEnabled;
private final int dedicatedConsumerPoolSizeForAAWCLeader;
private final boolean useDaVinciSpecificExecutionStatusForError;

public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException {
this(serverProperties, Collections.emptyMap());
Expand Down Expand Up @@ -754,6 +756,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getBoolean(SERVER_DEDICATED_CONSUMER_POOL_FOR_AA_WC_LEADER_ENABLED, false);
dedicatedConsumerPoolSizeForAAWCLeader =
serverProperties.getInt(SERVER_DEDICATED_CONSUMER_POOL_SIZE_FOR_AA_WC_LEADER, 5);
useDaVinciSpecificExecutionStatusForError =
serverProperties.getBoolean(USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR, false);
}

long extractIngestionMemoryLimit(
Expand Down Expand Up @@ -1339,4 +1343,8 @@ public int getTopicManagerMetadataFetcherConsumerPoolSize() {
public int getTopicManagerMetadataFetcherThreadPoolSize() {
return topicManagerMetadataFetcherThreadPoolSize;
}

public boolean useDaVinciSpecificExecutionStatusForError() {
return useDaVinciSpecificExecutionStatusForError;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.linkedin.davinci;

import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_OTHER;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.ERROR;
import static com.linkedin.venice.utils.DataProviderUtils.BOOLEAN;
import static com.linkedin.venice.utils.DataProviderUtils.allPermutationGenerator;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
Expand All @@ -14,16 +16,18 @@


public class DaVinciBackendTest {
@DataProvider(name = "DvcErrorExecutionStatus")
public static Object[][] dvcErrorExecutionStatus() {
@DataProvider(name = "DvcErrorExecutionStatusAndBoolean")
public static Object[][] dvcErrorExecutionStatusAndBoolean() {
return allPermutationGenerator((permutation) -> {
ExecutionStatus status = (ExecutionStatus) permutation[0];
return status.isDVCIngestionError();
}, ExecutionStatus.values());
}, ExecutionStatus.values(), BOOLEAN);
}

@Test(dataProvider = "DvcErrorExecutionStatus")
public void testGetDaVinciErrorStatus(ExecutionStatus executionStatus) {
@Test(dataProvider = "DvcErrorExecutionStatusAndBoolean")
public void testGetDaVinciErrorStatus(
ExecutionStatus executionStatus,
boolean useDaVinciSpecificExecutionStatusForError) {
VeniceException veniceException;
switch (executionStatus) {
case DVC_INGESTION_ERROR_DISK_FULL:
Expand All @@ -40,15 +44,23 @@ public void testGetDaVinciErrorStatus(ExecutionStatus executionStatus) {
fail("Unexpected execution status: " + executionStatus);
return;
}
assertEquals(
DaVinciBackend.getDaVinciErrorStatus(veniceException),
executionStatus.equals(ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES)
? DVC_INGESTION_ERROR_OTHER
: executionStatus);
if (useDaVinciSpecificExecutionStatusForError) {
assertEquals(
DaVinciBackend.getDaVinciErrorStatus(veniceException, useDaVinciSpecificExecutionStatusForError),
executionStatus.equals(ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES)
? DVC_INGESTION_ERROR_OTHER
: executionStatus);
} else {
assertEquals(
DaVinciBackend.getDaVinciErrorStatus(veniceException, useDaVinciSpecificExecutionStatusForError),
ERROR);
}
}

@Test(dataProvider = "DvcErrorExecutionStatus")
public void testGetDaVinciErrorStatusNested(ExecutionStatus executionStatus) {
@Test(dataProvider = "DvcErrorExecutionStatusAndBoolean")
public void testGetDaVinciErrorStatusNested(
ExecutionStatus executionStatus,
boolean useDaVinciSpecificExecutionStatusForError) {
VeniceException veniceException;
switch (executionStatus) {
case DVC_INGESTION_ERROR_DISK_FULL:
Expand All @@ -65,15 +77,23 @@ public void testGetDaVinciErrorStatusNested(ExecutionStatus executionStatus) {
fail("Unexpected execution status: " + executionStatus);
return;
}
assertEquals(
DaVinciBackend.getDaVinciErrorStatus(veniceException),
executionStatus.equals(ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES)
? DVC_INGESTION_ERROR_OTHER
: executionStatus);
if (useDaVinciSpecificExecutionStatusForError) {
assertEquals(
DaVinciBackend.getDaVinciErrorStatus(veniceException, useDaVinciSpecificExecutionStatusForError),
executionStatus.equals(ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES)
? DVC_INGESTION_ERROR_OTHER
: executionStatus);
} else {
assertEquals(
DaVinciBackend.getDaVinciErrorStatus(veniceException, useDaVinciSpecificExecutionStatusForError),
ERROR);
}
}

@Test(dataProvider = "DvcErrorExecutionStatus")
public void testGetDaVinciErrorStatusWithInvalidCases(ExecutionStatus executionStatus) {
@Test(dataProvider = "DvcErrorExecutionStatusAndBoolean")
public void testGetDaVinciErrorStatusWithInvalidCases(
ExecutionStatus executionStatus,
boolean useDaVinciSpecificExecutionStatusForError) {
VeniceException veniceException;
switch (executionStatus) {
case DVC_INGESTION_ERROR_DISK_FULL:
Expand All @@ -87,7 +107,16 @@ public void testGetDaVinciErrorStatusWithInvalidCases(ExecutionStatus executionS
return;
}

assertEquals(DaVinciBackend.getDaVinciErrorStatus(veniceException), DVC_INGESTION_ERROR_OTHER);
if (useDaVinciSpecificExecutionStatusForError) {
assertEquals(
DaVinciBackend.getDaVinciErrorStatus(veniceException, useDaVinciSpecificExecutionStatusForError),
DVC_INGESTION_ERROR_OTHER);

} else {
assertEquals(
DaVinciBackend.getDaVinciErrorStatus(veniceException, useDaVinciSpecificExecutionStatusForError),
ERROR);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1756,6 +1756,13 @@ private ConfigKeys() {
public static final String PUSH_STATUS_STORE_HEARTBEAT_EXPIRATION_TIME_IN_SECONDS =
"push.status.store.heartbeat.expiration.seconds";

/**
* when enabled, Da Vinci Clients returns specific status codes to indicate the type of ingestion failure
* rather than a generic {@link com.linkedin.venice.pushmonitor.ExecutionStatus.ERROR}
*/
public static final String USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR =
"use.da.vinci.specific.execution.status.for.error";

/**
* Whether to throttle SSL connections between router and client.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_DISK_FULL_THRESHOLD;
import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS;
import static com.linkedin.venice.ConfigKeys.USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR;
import static com.linkedin.venice.hadoop.VenicePushJob.PushJobCheckpoints.DVC_INGESTION_ERROR_DISK_FULL;
import static com.linkedin.venice.hadoop.VenicePushJob.PushJobCheckpoints.START_JOB_STATUS_POLLING;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.PUSH_JOB_STATUS_UPLOAD_ENABLE;
import static com.linkedin.venice.integration.utils.ServiceFactory.getVeniceCluster;
import static com.linkedin.venice.meta.PersistenceType.ROCKS_DB;
Expand Down Expand Up @@ -43,6 +45,7 @@
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.status.protocol.PushJobDetails;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
Expand Down Expand Up @@ -130,7 +133,8 @@ private double getDiskFullThreshold(int recordCount, int recordSizeMin) throws I
return diskFullThreshold;
}

private VeniceProperties getDaVinciBackendConfig() throws IOException {
private VeniceProperties getDaVinciBackendConfig(boolean useDaVinciSpecificExecutionStatusForError)
throws IOException {
String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath();
PropertyBuilder venicePropertyBuilder = new PropertyBuilder();

Expand All @@ -141,6 +145,7 @@ private VeniceProperties getDaVinciBackendConfig() throws IOException {
.put(PUSH_STATUS_STORE_ENABLED, true)
.put(D2_ZK_HOSTS_ADDRESS, venice.getZk().getAddress())
.put(CLUSTER_DISCOVERY_D2_SERVICE, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME)
.put(USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR, useDaVinciSpecificExecutionStatusForError)
.put(SERVER_DISK_FULL_THRESHOLD, getDiskFullThreshold(largePushRecordCount, largePushRecordMinSize));
return venicePropertyBuilder.build();
}
Expand Down Expand Up @@ -194,8 +199,8 @@ private PushJobDetails makeCopyOf(PushJobDetails pushJobDetails) {
}
}

@Test(timeOut = TEST_TIMEOUT)
public void testDaVinciDiskFullFailure() throws Exception {
@Test(timeOut = TEST_TIMEOUT, dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False")
public void testDaVinciDiskFullFailure(boolean useDaVinciSpecificExecutionStatusForError) throws Exception {
String storeName = Utils.getUniqueString("davinci_disk_full_test");
// Test a small push
File inputDir = getTempDataDirectory();
Expand Down Expand Up @@ -231,7 +236,7 @@ public void testDaVinciDiskFullFailure() throws Exception {
});

// Spin up DaVinci client
VeniceProperties backendConfig = getDaVinciBackendConfig();
VeniceProperties backendConfig = getDaVinciBackendConfig(useDaVinciSpecificExecutionStatusForError);
MetricsRepository metricsRepository = new MetricsRepository();
try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory(
d2Client,
Expand Down Expand Up @@ -267,18 +272,27 @@ public void testDaVinciDiskFullFailure() throws Exception {
VeniceException.class,
() -> runVPJ(vpjPropertiesForV2, 2, controllerClient, Optional.of(pushJobDetailsTracker)));
assertTrue(
exception.getMessage().contains("status: " + ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL),
exception.getMessage()
.contains(
"status: " + (useDaVinciSpecificExecutionStatusForError
? ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL
: ExecutionStatus.ERROR)),
exception.getMessage());
assertTrue(
exception.getMessage()
.contains("Found a failed partition replica in Da Vinci due to disk threshold reached"),
.contains(
"Found a failed partition replica in Da Vinci"
+ (useDaVinciSpecificExecutionStatusForError ? " due to disk threshold reached" : "")),
exception.getMessage());

assertEquals(
pushJobDetailsTracker.getRecordedPushJobDetails()
.get(pushJobDetailsTracker.getRecordedPushJobDetails().size() - 1)
.getPushJobLatestCheckpoint()
.intValue(),
DVC_INGESTION_ERROR_DISK_FULL.getValue());
useDaVinciSpecificExecutionStatusForError
? DVC_INGESTION_ERROR_DISK_FULL.getValue()
: START_JOB_STATUS_POLLING.getValue());
} finally {
controllerClient.disableAndDeleteStore(storeName);
}
Expand Down
Loading

0 comments on commit 560b804

Please sign in to comment.