Skip to content

Commit

Permalink
Rebase and handle merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
nisargthakkar committed Sep 19, 2024
1 parent 50d3b2c commit 33d1620
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void testClusterLevelNativeReplicationConfigForNewStores() {
}

@Test(timeOut = TEST_TIMEOUT)
public void testConvertHybridDuringPushjob() {
public void testConvertHybridDuringPushJob() {
String storeName = Utils.getUniqueString("test-store");
parentControllerClient.createNewStore(storeName, "test-owner", "\"string\"", "\"string\"");
parentControllerClient.requestTopicForWrites(
Expand All @@ -130,7 +130,7 @@ public void testConvertHybridDuringPushjob() {
storeName,
new UpdateStoreQueryParams().setHybridRewindSeconds(1L).setHybridOffsetLagThreshold(1L));
Assert.assertTrue(response.isError());
Assert.assertTrue(response.getError().contains("Cannot convert to hybrid as there is already a pushjob running"));
Assert.assertTrue(response.getError().contains("Cannot convert to hybrid as there is already a push job running"));
parentControllerClient.killOfflinePushJob(Version.composeKafkaTopic(storeName, 1));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ public void testStoreMigrationWithDaVinciPushStatusSystemStore() throws Exceptio
.assertEquals(pushStatusStoreReader.getPartitionStatus(storeName, 1, 0, Optional.empty()).size(), 1));

// Verify that store and system store only exist in destination cluster after ending migration
StoreMigrationTestUtil.endMigration(parentControllerUrl, childControllerUrl0, storeName, srcClusterName, destClusterName);
StoreMigrationTestUtil
.endMigration(parentControllerUrl, childControllerUrl0, storeName, srcClusterName, destClusterName);

TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
statusOutput.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.exceptions.ResourceStillExistsException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceHttpException;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.exceptions.VeniceUnsupportedOperationException;
import com.linkedin.venice.helix.HelixReadOnlyStoreConfigRepository;
Expand Down Expand Up @@ -253,6 +254,7 @@
import org.apache.avro.Schema;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -2171,7 +2173,18 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa
String errMsg = "UpdateStore command failed for store " + storeName + ". The command didn't change any specific"
+ " store config and didn't specify \"--replicate-all-configs\" flag.";
LOGGER.error(errMsg);
throw new VeniceException(errMsg);
throw new VeniceHttpException(HttpStatus.SC_BAD_REQUEST, errMsg, ErrorType.BAD_REQUEST);
}

if (!originalStore.isHybrid() && updatedStore.isHybrid()) {
// Today target colo push job cannot handle hybrid stores, so if a batch push is running, fail the request
Optional<String> currentPushTopic = getTopicForCurrentPushJob(clusterName, storeName, false, false);
if (currentPushTopic.isPresent()) {
String errorMessage =
"Cannot convert to hybrid as there is already a push job running with topic " + currentPushTopic.get();
LOGGER.error(errorMessage);
throw new VeniceHttpException(HttpStatus.SC_BAD_REQUEST, errorMessage, ErrorType.BAD_REQUEST);
}
}

UpdateStore setStore = (UpdateStore) AdminMessageType.UPDATE_STORE.getNewInstance();
Expand Down Expand Up @@ -2507,6 +2520,13 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa
setStore.maxRecordSizeBytes = originalStore.getMaxRecordSizeBytes();
}

if (updatedConfigs.contains(MAX_NEARLINE_RECORD_SIZE_BYTES)) {
setStore.maxNearlineRecordSizeBytes = updatedStore.getMaxNearlineRecordSizeBytes();
updatedConfigsList.add(MAX_NEARLINE_RECORD_SIZE_BYTES);
} else {
setStore.maxNearlineRecordSizeBytes = originalStore.getMaxRecordSizeBytes();
}

if (updatedConfigs.contains(LATEST_SUPERSET_SCHEMA_ID)) {
setStore.latestSuperSetValueSchemaId = updatedStore.getLatestSuperSetValueSchemaId();
updatedConfigsList.add(LATEST_SUPERSET_SCHEMA_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.LARGEST_USED_VERSION_NUMBER;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.LATEST_SUPERSET_SCHEMA_ID;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.MAX_COMPACTION_LAG_SECONDS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.MAX_NEARLINE_RECORD_SIZE_BYTES;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.MAX_RECORD_SIZE_BYTES;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.MIGRATION_DUPLICATE_STORE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.MIN_COMPACTION_LAG_SECONDS;
Expand Down Expand Up @@ -210,6 +211,7 @@ public static UpdateStoreWrapper getStoreUpdate(
Optional<Long> minCompactionLagSeconds = params.getMinCompactionLagSeconds();
Optional<Long> maxCompactionLagSeconds = params.getMaxCompactionLagSeconds();
Optional<Integer> maxRecordSizeBytes = params.getMaxRecordSizeBytes();
Optional<Integer> maxNearlineRecordSizeBytes = params.getMaxNearlineRecordSizeBytes();
Optional<Boolean> unusedSchemaDeletionEnabled = params.getUnusedSchemaDeletionEnabled();
Optional<Boolean> blobTransferEnabled = params.getBlobTransferEnabled();

Expand Down Expand Up @@ -322,6 +324,11 @@ public static UpdateStoreWrapper getStoreUpdate(
maxCompactionLagSeconds,
updatedStore::setMaxCompactionLagSeconds);
addToUpdatedConfigs(updatedConfigs, MAX_RECORD_SIZE_BYTES, maxRecordSizeBytes, updatedStore::setMaxRecordSizeBytes);
addToUpdatedConfigs(
updatedConfigs,
MAX_NEARLINE_RECORD_SIZE_BYTES,
maxNearlineRecordSizeBytes,
updatedStore::setMaxNearlineRecordSizeBytes);
addToUpdatedConfigs(
updatedConfigs,
UNUSED_SCHEMA_DELETION_ENABLED,
Expand Down

0 comments on commit 33d1620

Please sign in to comment.