From 6ab2ac71a1034eca6f750fddb5f5b5888c60421d Mon Sep 17 00:00:00 2001 From: "Xin(Adam) Chen" Date: Wed, 21 Aug 2024 14:04:06 -0700 Subject: [PATCH 1/8] [FC] throws exception during store migration to enforce d2 refresh --- .../fastclient/meta/RequestBasedMetadata.java | 1 + .../meta/RequestBasedMetadataTest.java | 32 +++ .../venice/endToEnd/TestStoreMigration.java | 195 +++++++----------- .../utils/AbstractClientEndToEndSetup.java | 3 +- .../utils/VeniceMultiClusterWrapper.java | 2 +- ...woLayerMultiRegionMultiClusterWrapper.java | 2 +- .../venice/utils/StoreMigrationTestUtil.java | 131 ++++++++++++ .../ServerReadMetadataRepository.java | 5 + .../ServerReadMetadataRepositoryTest.java | 21 +- 9 files changed, 273 insertions(+), 119 deletions(-) create mode 100644 internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/StoreMigrationTestUtil.java diff --git a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadata.java b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadata.java index 8b74725718..9b1a6f64b3 100644 --- a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadata.java +++ b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadata.java @@ -470,6 +470,7 @@ synchronized void updateCache(boolean onDemandRefresh) throws InterruptedExcepti // TODO: need a better way to handle store migration if (!onDemandRefresh) { LOGGER.warn("Metadata fetch operation for store: {} failed with exception {}", storeName, e.getMessage()); + LOGGER.info("[DEBUG] refreshing the d2 service"); isServiceDiscovered = false; discoverD2Service(); updateCache(true); diff --git a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadataTest.java b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadataTest.java index a7354081aa..86e7fc1961 100644 --- a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadataTest.java +++ b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadataTest.java @@ -10,10 +10,13 @@ import static com.linkedin.venice.fastclient.meta.RequestBasedMetadataTestUtils.getMockMetaData; import static com.linkedin.venice.utils.TestUtils.waitForNonDeterministicAssertion; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -26,7 +29,9 @@ import com.linkedin.venice.client.schema.RouterBackedSchemaReader; import com.linkedin.venice.client.store.D2ServiceDiscovery; import com.linkedin.venice.client.store.transport.D2TransportClient; +import com.linkedin.venice.client.store.transport.TransportClientResponse; import com.linkedin.venice.compression.CompressionStrategy; +import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse; import com.linkedin.venice.exceptions.ConfigurationException; import com.linkedin.venice.fastclient.ClientConfig; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; @@ -370,4 +375,31 @@ public void testRequestBasedMetadataStartFailFast() throws IOException { Assert.assertThrows(ConfigurationException.class, requestBasedMetadata::start); } } + + @Test(timeOut = TEST_TIMEOUT) + public void testRequestBasedMetadataStartRefreshDuringStoreMigration() throws IOException, InterruptedException { + String storeName = "testStore"; + ClientConfig clientConfig = RequestBasedMetadataTestUtils.getMockClientConfig(storeName, false, false); + D2TransportClient d2TransportClient = mock(D2TransportClient.class); + CompletableFuture exceptionFuture = new CompletableFuture<>(); + exceptionFuture.completeExceptionally(new RuntimeException("Failed to execute")); + doReturn(exceptionFuture).when(d2TransportClient).get(anyString()); + D2ServiceDiscovery d2ServiceDiscovery = getMockD2ServiceDiscovery(d2TransportClient, storeName); + D2ServiceDiscoveryResponse d2Response = new D2ServiceDiscoveryResponse(); + d2Response.setServerD2Service("test-service"); + doReturn(d2Response).when(d2ServiceDiscovery).find(any(), any(), anyBoolean()); + try (RequestBasedMetadata requestBasedMetadata = new RequestBasedMetadata(clientConfig, d2TransportClient)) { + RequestBasedMetadata spy = spy(requestBasedMetadata); + spy.setD2ServiceDiscovery(d2ServiceDiscovery); + // let child thread handling the start logic otherwise the main thread cannot verify the invocation times. + CompletableFuture.runAsync(spy::start); + + // refresh would happen multiple times + // the first one w/o onDemond refresh but triggered would fail + verify(spy, timeout(3000).atLeast(1)).updateCache(false); + + // the failed refresh triggers a a new refresh with onDemand refresh + verify(spy, timeout(3000).atLeast(1)).updateCache(true); + } + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java index 47a9ef69f1..4946f4b7ab 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java @@ -23,6 +23,7 @@ import com.linkedin.d2.balancer.D2Client; import com.linkedin.davinci.client.DaVinciClient; import com.linkedin.davinci.client.DaVinciConfig; +import com.linkedin.r2.transport.common.Client; import com.linkedin.venice.AdminTool; import com.linkedin.venice.AdminTool.PrintFunction; import com.linkedin.venice.D2.D2ClientUtils; @@ -41,6 +42,8 @@ import com.linkedin.venice.controllerapi.StoreResponse; import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.fastclient.meta.StoreMetadataFetchMode; +import com.linkedin.venice.fastclient.utils.ClientTestUtils; import com.linkedin.venice.hadoop.VenicePushJob; import com.linkedin.venice.integration.utils.D2TestUtils; import com.linkedin.venice.integration.utils.DaVinciTestContext; @@ -60,6 +63,7 @@ import com.linkedin.venice.systemstore.schemas.StoreMetaKey; import com.linkedin.venice.systemstore.schemas.StoreMetaValue; import com.linkedin.venice.utils.IntegrationTestPushUtils; +import com.linkedin.venice.utils.StoreMigrationTestUtil; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.utils.Time; @@ -95,7 +99,6 @@ public class TestStoreMigration { private static final int RECORD_COUNT = 20; private static final String NEW_OWNER = "newtest@linkedin.com"; private static final String FABRIC0 = "dc-0"; - private static final boolean[] ABORT_MIGRATION_PROMPTS_OVERRIDE = { false, true, true }; private VeniceTwoLayerMultiRegionMultiClusterWrapper twoLayerMultiRegionMultiClusterWrapper; private VeniceMultiClusterWrapper multiClusterWrapper; @@ -103,9 +106,10 @@ public class TestStoreMigration { private String destClusterName; private String parentControllerUrl; private String childControllerUrl0; + protected Client r2Client; @BeforeClass - public void setUp() { + public void setUp() throws Exception { Utils.thisIsLocalhost(); Properties parentControllerProperties = new Properties(); // Disable topic cleanup since parent and child are sharing the same kafka cluster. @@ -139,6 +143,8 @@ public void setUp() { parentControllerUrl = twoLayerMultiRegionMultiClusterWrapper.getControllerConnectString(); childControllerUrl0 = multiClusterWrapper.getControllerConnectString(); + r2Client = ClientTestUtils.getR2Client(ClientTestUtils.FastClientHTTPVariant.HTTP_2_BASED_HTTPCLIENT5); + for (String cluster: clusterNames) { try (ControllerClient controllerClient = new ControllerClient(cluster, childControllerUrl0)) { // Verify the participant store is up and running in child region @@ -171,8 +177,9 @@ public void testStoreMigration() throws Exception { try (AvroGenericStoreClient client = ClientFactory.getAndStartGenericAvroClient(clientConfig)) { readFromStore(client); - startMigration(parentControllerUrl, storeName); - completeMigration(parentControllerUrl, storeName); + StoreMigrationTestUtil.startMigration(parentControllerUrl, storeName, srcClusterName, destClusterName); + StoreMigrationTestUtil + .completeMigration(parentControllerUrl, storeName, srcClusterName, destClusterName, FABRIC0); TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { // StoreConfig in router might not be up-to-date. Keep reading from the store. Finally, router will find that // cluster discovery changes and redirect the request to dest store. Client's d2ServiceName will be updated. @@ -187,11 +194,15 @@ public void testStoreMigration() throws Exception { // Test abort migration on parent controller try (ControllerClient srcParentControllerClient = new ControllerClient(srcClusterName, parentControllerUrl); ControllerClient destParentControllerClient = new ControllerClient(destClusterName, parentControllerUrl)) { - abortMigration(parentControllerUrl, storeName, true); + StoreMigrationTestUtil.abortMigration(parentControllerUrl, storeName, true, srcClusterName, destClusterName); TestUtils.waitForNonDeterministicAssertion( 30, TimeUnit.SECONDS, - () -> checkStatusAfterAbortMigration(srcParentControllerClient, destParentControllerClient, storeName)); + () -> StoreMigrationTestUtil.checkStatusAfterAbortMigration( + srcParentControllerClient, + destParentControllerClient, + storeName, + srcClusterName)); } } @@ -202,7 +213,7 @@ public void testStoreMigrationWithNewPushesAndUpdates() throws Exception { try (ControllerClient srcParentControllerClient = new ControllerClient(srcClusterName, parentControllerUrl); ControllerClient destParentControllerClient = new ControllerClient(destClusterName, parentControllerUrl)) { - startMigration(parentControllerUrl, storeName); + StoreMigrationTestUtil.startMigration(parentControllerUrl, storeName, srcClusterName, destClusterName); // Ensure migration status is updated in source parent controller TestUtils.waitForNonDeterministicAssertion( 30, @@ -275,8 +286,9 @@ public void testStoreMigrationWithMetaSystemStore() throws Exception { Assert.assertTrue(storeProperties != null && storeProperties.storeProperties != null); }); - startMigration(parentControllerUrl, storeName); - completeMigration(parentControllerUrl, storeName); + StoreMigrationTestUtil.startMigration(parentControllerUrl, storeName, srcClusterName, destClusterName); + StoreMigrationTestUtil + .completeMigration(parentControllerUrl, storeName, srcClusterName, destClusterName, FABRIC0); // Verify the meta system store is materialized in the destination cluster and contains correct values. StoreMetaKey storePropertiesKeyInDestCluster = @@ -296,7 +308,8 @@ public void testStoreMigrationWithMetaSystemStore() throws Exception { } }); // Test end migration - endMigration(parentControllerUrl, childControllerUrl0, storeName); + StoreMigrationTestUtil + .endMigration(parentControllerUrl, childControllerUrl0, storeName, srcClusterName, destClusterName); } } @@ -338,7 +351,7 @@ public void testStoreMigrationWithDaVinciPushStatusSystemStore() throws Exceptio () -> Assert .assertEquals(pushStatusStoreReader.getPartitionStatus(storeName, 1, 0, Optional.empty()).size(), 1)); - startMigration(parentControllerUrl, storeName); + StoreMigrationTestUtil.startMigration(parentControllerUrl, storeName, srcClusterName, destClusterName); // Store migration status output via closure PrintFunction Set statusOutput = new HashSet(); @@ -347,7 +360,8 @@ public void testStoreMigrationWithDaVinciPushStatusSystemStore() throws Exceptio System.err.println(message); }; - checkMigrationStatus(parentControllerUrl, storeName, printFunction); + StoreMigrationTestUtil + .checkMigrationStatus(parentControllerUrl, storeName, srcClusterName, destClusterName, printFunction); // Check that store and system store exists in both source and destination cluster Assert.assertTrue( @@ -363,7 +377,8 @@ public void testStoreMigrationWithDaVinciPushStatusSystemStore() throws Exceptio Assert.assertTrue( statusOutput.contains(String.format("%s exists in this cluster %s", systemStoreName, destClusterName))); - completeMigration(parentControllerUrl, storeName); + StoreMigrationTestUtil + .completeMigration(parentControllerUrl, storeName, srcClusterName, destClusterName, FABRIC0); // Verify the da vinci push status system store is materialized in dest cluster and contains the same value TestUtils.waitForNonDeterministicAssertion( @@ -374,8 +389,10 @@ public void testStoreMigrationWithDaVinciPushStatusSystemStore() throws Exceptio // Verify that store and system store only exist in destination cluster after ending migration statusOutput.clear(); - endMigration(parentControllerUrl, childControllerUrl0, storeName); - checkMigrationStatus(parentControllerUrl, storeName, printFunction); + StoreMigrationTestUtil + .endMigration(parentControllerUrl, childControllerUrl0, storeName, srcClusterName, destClusterName); + StoreMigrationTestUtil + .checkMigrationStatus(parentControllerUrl, storeName, srcClusterName, destClusterName, printFunction); Assert.assertFalse( statusOutput.contains(String.format("%s exists in this cluster %s", storeName, srcClusterName)), @@ -443,8 +460,9 @@ public void testVersionConfigsRemainSameAfterStoreMigration() throws Exception { try (AvroGenericStoreClient client = ClientFactory.getAndStartGenericAvroClient(clientConfig)) { readFromStore(client); - startMigration(parentControllerUrl, storeName); - completeMigration(parentControllerUrl, storeName); + StoreMigrationTestUtil.startMigration(parentControllerUrl, storeName, srcClusterName, destClusterName); + StoreMigrationTestUtil + .completeMigration(parentControllerUrl, storeName, srcClusterName, destClusterName, FABRIC0); TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { // StoreConfig in router might not be up-to-date. Keep reading from the store. Finally, router will find that // cluster discovery changes and redirect the request to dest store. Client's d2ServiceName will be updated. @@ -485,7 +503,8 @@ private Properties createAndPushStore(String clusterName, String storeName) thro new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA) .setHybridRewindSeconds(TEST_TIMEOUT) .setHybridOffsetLagThreshold(2L) - .setCompressionStrategy(CompressionStrategy.ZSTD_WITH_DICT); + .setCompressionStrategy(CompressionStrategy.ZSTD_WITH_DICT) + .setStorageNodeReadQuotaEnabled(true); // enable this for using fast client IntegrationTestPushUtils.createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, updateStoreQueryParams) .close(); @@ -519,103 +538,11 @@ private Properties createAndPushStore(String clusterName, String storeName) thro return props; } - private void startMigration(String controllerUrl, String storeName) throws Exception { - String[] startMigrationArgs = { "--migrate-store", "--url", controllerUrl, "--store", storeName, "--cluster-src", - srcClusterName, "--cluster-dest", destClusterName }; - AdminTool.main(startMigrationArgs); - } - - private void checkMigrationStatus(String controllerUrl, String storeName, PrintFunction printFunction) - throws Exception { - String[] checkMigrationStatusArgs = { "--migration-status", "--url", controllerUrl, "--store", storeName, - "--cluster-src", srcClusterName, "--cluster-dest", destClusterName }; - AdminTool.checkMigrationStatus(AdminTool.getCommandLine(checkMigrationStatusArgs), printFunction); - } - - private void completeMigration(String controllerUrl, String storeName) { - String[] completeMigration0 = { "--complete-migration", "--url", controllerUrl, "--store", storeName, - "--cluster-src", srcClusterName, "--cluster-dest", destClusterName, "--fabric", FABRIC0 }; - - try (ControllerClient destParentControllerClient = new ControllerClient(destClusterName, controllerUrl)) { - TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> { - AdminTool.main(completeMigration0); - // Store discovery should point to the new cluster after completing migration - ControllerResponse discoveryResponse = destParentControllerClient.discoverCluster(storeName); - Assert.assertEquals(discoveryResponse.getCluster(), destClusterName); - }); - } - } - - private void endMigration(String parentControllerUrl, String childControllerUrl, String storeName) throws Exception { - String[] endMigration = { "--end-migration", "--url", parentControllerUrl, "--store", storeName, "--cluster-src", - srcClusterName, "--cluster-dest", destClusterName }; - AdminTool.main(endMigration); - - try (ControllerClient srcControllerClient = new ControllerClient(srcClusterName, parentControllerUrl); - ControllerClient destControllerClient = new ControllerClient(destClusterName, parentControllerUrl)) { - TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { - // Store should be deleted in source cluster. Store in destination cluster should not be migrating. - StoreResponse storeResponse = srcControllerClient.getStore(storeName); - Assert.assertNull(storeResponse.getStore()); - - storeResponse = destControllerClient.getStore(storeName); - Assert.assertNotNull(storeResponse.getStore()); - Assert.assertFalse(storeResponse.getStore().isMigrating()); - Assert.assertFalse(storeResponse.getStore().isMigrationDuplicateStore()); - }); - } - if (childControllerUrl == null) { - return; - } - - // Perform the same check on child controller too - try (ControllerClient srcControllerClient = new ControllerClient(srcClusterName, childControllerUrl); - ControllerClient destControllerClient = new ControllerClient(destClusterName, childControllerUrl)) { - TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { - // Store should be deleted in source cluster. Store in destination cluster should not be migrating. - StoreResponse storeResponse = srcControllerClient.getStore(storeName); - Assert.assertNull(storeResponse.getStore()); - - storeResponse = destControllerClient.getStore(storeName); - Assert.assertNotNull(storeResponse.getStore()); - Assert.assertFalse(storeResponse.getStore().isMigrating()); - Assert.assertFalse(storeResponse.getStore().isMigrationDuplicateStore()); - }); - } - - } - private void readFromStore(AvroGenericStoreClient client) { int key = ThreadLocalRandom.current().nextInt(RECORD_COUNT) + 1; client.get(Integer.toString(key)); } - private void abortMigration(String controllerUrl, String storeName, boolean force) { - AdminTool.abortMigration( - controllerUrl, - storeName, - srcClusterName, - destClusterName, - force, - ABORT_MIGRATION_PROMPTS_OVERRIDE); - } - - private void checkStatusAfterAbortMigration( - ControllerClient srcControllerClient, - ControllerClient destControllerClient, - String storeName) { - // Migration flag should be false - // Store should be deleted in dest cluster - // Cluster discovery should point to src cluster - StoreResponse storeResponse = srcControllerClient.getStore(storeName); - Assert.assertNotNull(storeResponse.getStore()); - Assert.assertFalse(storeResponse.getStore().isMigrating()); - storeResponse = destControllerClient.getStore(storeName); - Assert.assertNull(storeResponse.getStore()); - ControllerResponse discoveryResponse = destControllerClient.discoverCluster(storeName); - Assert.assertEquals(discoveryResponse.getCluster(), srcClusterName); - } - private StoreInfo getStoreConfig(String controllerUrl, String clusterName, String storeName) { try (ControllerClient controllerClient = new ControllerClient(clusterName, controllerUrl)) { StoreResponse storeResponse = controllerClient.getStore(storeName); @@ -628,6 +555,40 @@ private StoreInfo getStoreConfig(String controllerUrl, String clusterName, Strin } } + @Test(timeOut = 2 * TEST_TIMEOUT) + public void testStoreMigrationForFastClient() throws Exception { + String storeName = Utils.getUniqueString("testMigrationWithFastClient"); + createAndPushStore(srcClusterName, storeName); + String destD2ServiceName = multiClusterWrapper.getClusterToD2().get(destClusterName); + D2Client d2Client = + D2TestUtils.getAndStartD2Client(multiClusterWrapper.getClusters().get(srcClusterName).getZk().getAddress()); + + // this is for SERVER_BASED_METADATA only + com.linkedin.venice.fastclient.ClientConfig.ClientConfigBuilder fastClientConfigBuilder = + new com.linkedin.venice.fastclient.ClientConfig.ClientConfigBuilder<>().setStoreName(storeName) + .setR2Client(r2Client) + .setD2Client(d2Client) + .setMetadataRefreshIntervalInSeconds(1) + .setClusterDiscoveryD2Service(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) + .setStoreMetadataFetchMode(StoreMetadataFetchMode.SERVER_BASED_METADATA); + + try (AvroGenericStoreClient client = com.linkedin.venice.fastclient.factory.ClientFactory + .getAndStartGenericStoreClient(fastClientConfigBuilder.build())) { + readFromStore(client); + StoreMigrationTestUtil.startMigration(parentControllerUrl, storeName, srcClusterName, destClusterName); + StoreMigrationTestUtil + .completeMigration(parentControllerUrl, storeName, srcClusterName, destD2ServiceName, FABRIC0); + TestUtils.waitForNonDeterministicAssertion(45, TimeUnit.SECONDS, () -> { + // Keep reading from the store. Fast client is supposed to refresh d2 service + readFromStore(client); + AbstractAvroStoreClient castClient = + (AbstractAvroStoreClient) ((StatTrackingStoreClient) client) + .getInnerStoreClient(); + Assert.assertTrue(castClient.toString().contains(destD2ServiceName)); + }); + } + } + @Test(timeOut = TEST_TIMEOUT) public void testStoreMigrationStaleKillIngestionMessageDeletion() { String storeName = Utils.getUniqueString("testWithFailedAttempt"); @@ -673,7 +634,7 @@ public void testStoreMigrationAfterFailedAttempt() throws Exception { // Verify the kill push message is in the participant message store. verifyKillMessageInParticipantStore(destClusterWrapper, currentVersionTopicName, true); - startMigration(parentControllerUrl, storeName); + StoreMigrationTestUtil.startMigration(parentControllerUrl, storeName, srcClusterName, destClusterName); // Ensure migration status is updated in source parent controller TestUtils.waitForNonDeterministicAssertion( 30, @@ -689,7 +650,8 @@ public void testStoreMigrationAfterFailedAttempt() throws Exception { TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> { statusOutput.clear(); - checkMigrationStatus(parentControllerUrl, storeName, printFunction); + StoreMigrationTestUtil + .checkMigrationStatus(parentControllerUrl, storeName, srcClusterName, destClusterName, printFunction); assertTrue( statusOutput .contains(storeName + " belongs to cluster " + srcClusterName + " according to cluster discovery")); @@ -697,12 +659,15 @@ public void testStoreMigrationAfterFailedAttempt() throws Exception { }); verifyKillMessageInParticipantStore(destClusterWrapper, currentVersionTopicName, false); - completeMigration(parentControllerUrl, storeName); - endMigration(parentControllerUrl, childControllerUrl0, storeName); + StoreMigrationTestUtil + .completeMigration(parentControllerUrl, storeName, srcClusterName, destClusterName, FABRIC0); + StoreMigrationTestUtil + .endMigration(parentControllerUrl, childControllerUrl0, storeName, srcClusterName, destClusterName); TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> { // Store migration status output via closure PrintFunction statusOutput.clear(); - checkMigrationStatus(parentControllerUrl, storeName, printFunction); + StoreMigrationTestUtil + .checkMigrationStatus(parentControllerUrl, storeName, srcClusterName, destClusterName, printFunction); assertTrue( statusOutput .contains(storeName + " belongs to cluster " + destClusterName + " according to cluster discovery")); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java index 26d8d6cf37..5d758ebe9e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java @@ -99,6 +99,7 @@ public abstract class AbstractClientEndToEndSetup { private VeniceWriter veniceWriter; protected Client r2Client; protected D2Client d2Client; + protected String controllerUrl; // da-vinci client for the da-vinci client based metadata private VeniceProperties daVinciBackendConfig; @@ -225,6 +226,7 @@ public void setUp() throws Exception { .toAbsolutePath() .toString(); + controllerUrl = veniceCluster.getAllControllersURLs(); prepareData(); prepareMetaSystemStore(); waitForRouterD2(); @@ -274,7 +276,6 @@ protected void prepareData() throws Exception { veniceWriter.broadcastEndOfPush(new HashMap<>()); // Wait for storage node to finish consuming, and new version to be activated - String controllerUrl = veniceCluster.getAllControllersURLs(); TestUtils.waitForNonDeterministicCompletion(30, TimeUnit.SECONDS, () -> { int currentVersion = ControllerClient.getStore(controllerUrl, veniceCluster.getClusterName(), storeName) .getStore() diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterWrapper.java index d03c9518ce..6bf6beb197 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterWrapper.java @@ -81,7 +81,7 @@ static ServiceProvider generateService(VeniceMultiClu clusterNames[i] = clusterName; String d2ServiceName = "venice-" + i; clusterToD2.put(clusterName, d2ServiceName); - String serverD2ServiceName = Utils.getUniqueString(clusterName + "_d2"); + String serverD2ServiceName = Utils.getUniqueString(clusterName + "_server_d2"); clusterToServerD2.put(clusterName, serverD2ServiceName); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java index 5065c3d52e..3c2fcc6059 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java @@ -99,7 +99,7 @@ static ServiceProvider generateSer clusterNames[i] = clusterName; String routerD2ServiceName = "venice-" + i; clusterToD2.put(clusterName, routerD2ServiceName); - String serverD2ServiceName = Utils.getUniqueString(clusterName + "_d2"); + String serverD2ServiceName = Utils.getUniqueString(clusterName + "_server_d2"); clusterToServerD2.put(clusterName, serverD2ServiceName); } List childRegionName = new ArrayList<>(options.getNumberOfRegions()); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/StoreMigrationTestUtil.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/StoreMigrationTestUtil.java new file mode 100644 index 0000000000..9385a0c03d --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/StoreMigrationTestUtil.java @@ -0,0 +1,131 @@ +package com.linkedin.venice.utils; + +import com.linkedin.venice.AdminTool; +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.ControllerResponse; +import com.linkedin.venice.controllerapi.StoreResponse; +import java.util.concurrent.TimeUnit; +import org.testng.Assert; + + +/** + * Utility class for store migration tests. + */ +public class StoreMigrationTestUtil { + private static final boolean[] ABORT_MIGRATION_PROMPTS_OVERRIDE = { false, true, true }; + + public static void startMigration( + String controllerUrl, + String storeName, + String srcClusterName, + String destClusterName) throws Exception { + String[] startMigrationArgs = { "--migrate-store", "--url", controllerUrl, "--store", storeName, "--cluster-src", + srcClusterName, "--cluster-dest", destClusterName }; + AdminTool.main(startMigrationArgs); + } + + public static void checkMigrationStatus( + String controllerUrl, + String storeName, + String srcClusterName, + String destClusterName, + AdminTool.PrintFunction printFunction) throws Exception { + String[] checkMigrationStatusArgs = { "--migration-status", "--url", controllerUrl, "--store", storeName, + "--cluster-src", srcClusterName, "--cluster-dest", destClusterName }; + AdminTool.checkMigrationStatus(AdminTool.getCommandLine(checkMigrationStatusArgs), printFunction); + } + + public static void completeMigration( + String controllerUrl, + String storeName, + String srcClusterName, + String destClusterName, + String fabric) { + String[] completeMigration0 = { "--complete-migration", "--url", controllerUrl, "--store", storeName, + "--cluster-src", srcClusterName, "--cluster-dest", destClusterName, "--fabric", fabric }; + + try (ControllerClient destParentControllerClient = new ControllerClient(destClusterName, controllerUrl)) { + TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> { + AdminTool.main(completeMigration0); + // Store discovery should point to the new cluster after completing migration + ControllerResponse discoveryResponse = destParentControllerClient.discoverCluster(storeName); + Assert.assertEquals(discoveryResponse.getCluster(), destClusterName); + }); + } + } + + public static void endMigration( + String parentControllerUrl, + String childControllerUrl, + String storeName, + String srcClusterName, + String destClusterName) throws Exception { + String[] endMigration = { "--end-migration", "--url", parentControllerUrl, "--store", storeName, "--cluster-src", + srcClusterName, "--cluster-dest", destClusterName }; + AdminTool.main(endMigration); + + try (ControllerClient srcControllerClient = new ControllerClient(srcClusterName, parentControllerUrl); + ControllerClient destControllerClient = new ControllerClient(destClusterName, parentControllerUrl)) { + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + // Store should be deleted in source cluster. Store in destination cluster should not be migrating. + StoreResponse storeResponse = srcControllerClient.getStore(storeName); + Assert.assertNull(storeResponse.getStore()); + + storeResponse = destControllerClient.getStore(storeName); + Assert.assertNotNull(storeResponse.getStore()); + Assert.assertFalse(storeResponse.getStore().isMigrating()); + Assert.assertFalse(storeResponse.getStore().isMigrationDuplicateStore()); + }); + } + if (childControllerUrl == null) { + return; + } + + // Perform the same check on child controller too + try (ControllerClient srcControllerClient = new ControllerClient(srcClusterName, childControllerUrl); + ControllerClient destControllerClient = new ControllerClient(destClusterName, childControllerUrl)) { + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + // Store should be deleted in source cluster. Store in destination cluster should not be migrating. + StoreResponse storeResponse = srcControllerClient.getStore(storeName); + Assert.assertNull(storeResponse.getStore()); + + storeResponse = destControllerClient.getStore(storeName); + Assert.assertNotNull(storeResponse.getStore()); + Assert.assertFalse(storeResponse.getStore().isMigrating()); + Assert.assertFalse(storeResponse.getStore().isMigrationDuplicateStore()); + }); + } + } + + public static void abortMigration( + String controllerUrl, + String storeName, + boolean force, + String srcClusterName, + String destClusterName) { + AdminTool.abortMigration( + controllerUrl, + storeName, + srcClusterName, + destClusterName, + force, + ABORT_MIGRATION_PROMPTS_OVERRIDE); + } + + public static void checkStatusAfterAbortMigration( + ControllerClient srcControllerClient, + ControllerClient destControllerClient, + String storeName, + String srcClusterName) { + // Migration flag should be false + // Store should be deleted in dest cluster + // Cluster discovery should point to src cluster + StoreResponse storeResponse = srcControllerClient.getStore(storeName); + Assert.assertNotNull(storeResponse.getStore()); + Assert.assertFalse(storeResponse.getStore().isMigrating()); + storeResponse = destControllerClient.getStore(storeName); + Assert.assertNull(storeResponse.getStore()); + ControllerResponse discoveryResponse = destControllerClient.discoverCluster(storeName); + Assert.assertEquals(discoveryResponse.getCluster(), srcClusterName); + } +} diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java index f3ca90d779..9553ffe63b 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java @@ -72,6 +72,11 @@ public MetadataResponse getMetadata(String storeName) { "Fast client is not enabled for store: %s, please ensure storage node read quota is enabled for the given store", storeName)); } + if (store.isMigrating()) { + throw new VeniceException( + "Store: " + storeName + " is migrating. Failing the request to allow fast " + + "client refresh service discovery."); + } // Version metadata int currentVersionNumber = store.getCurrentVersion(); if (currentVersionNumber == Store.NON_EXISTING_VERSION) { diff --git a/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java b/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java index f768362150..6420949dfa 100644 --- a/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java +++ b/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java @@ -38,6 +38,8 @@ public class ServerReadMetadataRepositoryTest { private ReadOnlySchemaRepository mockSchemaRepo; private HelixCustomizedViewOfflinePushRepository mockCustomizedViewRepository; private HelixInstanceConfigRepository mockHelixInstanceConfigRepository; + private final MetricsRepository metricsRepository = new MetricsRepository(); + private final static String TEST_STORE = "test_store"; @BeforeMethod public void setUp() { @@ -49,7 +51,6 @@ public void setUp() { @Test public void testGetMetadata() { - MetricsRepository metricsRepository = new MetricsRepository(); ServerReadMetadataRepository serverReadMetadataRepository = new ServerReadMetadataRepository( metricsRepository, mockMetadataRepo, @@ -114,4 +115,22 @@ public void testGetMetadata() { metadataResponse = serverReadMetadataRepository.getMetadata(storeName); Assert.assertEquals(metadataResponse.getResponseRecord().getBatchGetLimit(), 300); } + + @Test + public void storeMigrationShouldThrownException() { + ServerReadMetadataRepository serverReadMetadataRepository = new ServerReadMetadataRepository( + metricsRepository, + mockMetadataRepo, + mockSchemaRepo, + Optional.of(CompletableFuture.completedFuture(mockCustomizedViewRepository)), + Optional.of(CompletableFuture.completedFuture(mockHelixInstanceConfigRepository))); + + Store store = mock(Store.class); + doReturn(Boolean.TRUE).when(store).isMigrating(); + doReturn(Boolean.TRUE).when(store).isStorageNodeReadQuotaEnabled(); + doReturn(store).when(mockMetadataRepo).getStoreOrThrow(TEST_STORE); + MetadataResponse response = serverReadMetadataRepository.getMetadata(TEST_STORE); + Assert.assertTrue(response.isError()); + Assert.assertTrue(response.getMessage().contains(TEST_STORE + " is migrating")); + } } From 624dbff9fe8148910da4dbf94e49949b2e9d2f29 Mon Sep 17 00:00:00 2001 From: "Xin(Adam) Chen" Date: Wed, 21 Aug 2024 14:05:19 -0700 Subject: [PATCH 2/8] remove debug msg --- .../linkedin/venice/fastclient/meta/RequestBasedMetadata.java | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadata.java b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadata.java index 9b1a6f64b3..8b74725718 100644 --- a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadata.java +++ b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadata.java @@ -470,7 +470,6 @@ synchronized void updateCache(boolean onDemandRefresh) throws InterruptedExcepti // TODO: need a better way to handle store migration if (!onDemandRefresh) { LOGGER.warn("Metadata fetch operation for store: {} failed with exception {}", storeName, e.getMessage()); - LOGGER.info("[DEBUG] refreshing the d2 service"); isServiceDiscovered = false; discoverD2Service(); updateCache(true); From 1b0a6222410cf916363ace92c63595a15a171f99 Mon Sep 17 00:00:00 2001 From: "Xin(Adam) Chen" Date: Thu, 22 Aug 2024 10:46:39 -0700 Subject: [PATCH 3/8] fix my dumb typo --- .../venice/fastclient/meta/RequestBasedMetadataTest.java | 6 +++--- .../com/linkedin/venice/endToEnd/TestStoreMigration.java | 8 ++------ .../venice/fastclient/AvroStoreClientEndToEndTest.java | 3 +++ 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadataTest.java b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadataTest.java index 86e7fc1961..1abd064034 100644 --- a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadataTest.java +++ b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadataTest.java @@ -377,7 +377,7 @@ public void testRequestBasedMetadataStartFailFast() throws IOException { } @Test(timeOut = TEST_TIMEOUT) - public void testRequestBasedMetadataStartRefreshDuringStoreMigration() throws IOException, InterruptedException { + public void testRequestBasedMetadataOnDemandRefresh() throws IOException, InterruptedException { String storeName = "testStore"; ClientConfig clientConfig = RequestBasedMetadataTestUtils.getMockClientConfig(storeName, false, false); D2TransportClient d2TransportClient = mock(D2TransportClient.class); @@ -395,10 +395,10 @@ public void testRequestBasedMetadataStartRefreshDuringStoreMigration() throws IO CompletableFuture.runAsync(spy::start); // refresh would happen multiple times - // the first one w/o onDemond refresh but triggered would fail + // the first one w/o onDemond refresh and would fail due to d2 client exception verify(spy, timeout(3000).atLeast(1)).updateCache(false); - // the failed refresh triggers a a new refresh with onDemand refresh + // the first failed refresh triggers a onDemand refresh verify(spy, timeout(3000).atLeast(1)).updateCache(true); } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java index 4946f4b7ab..b2c512f587 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java @@ -555,7 +555,7 @@ private StoreInfo getStoreConfig(String controllerUrl, String clusterName, Strin } } - @Test(timeOut = 2 * TEST_TIMEOUT) + @Test(timeOut = TEST_TIMEOUT) public void testStoreMigrationForFastClient() throws Exception { String storeName = Utils.getUniqueString("testMigrationWithFastClient"); createAndPushStore(srcClusterName, storeName); @@ -577,14 +577,10 @@ public void testStoreMigrationForFastClient() throws Exception { readFromStore(client); StoreMigrationTestUtil.startMigration(parentControllerUrl, storeName, srcClusterName, destClusterName); StoreMigrationTestUtil - .completeMigration(parentControllerUrl, storeName, srcClusterName, destD2ServiceName, FABRIC0); + .completeMigration(parentControllerUrl, storeName, srcClusterName, destClusterName, FABRIC0); TestUtils.waitForNonDeterministicAssertion(45, TimeUnit.SECONDS, () -> { // Keep reading from the store. Fast client is supposed to refresh d2 service readFromStore(client); - AbstractAvroStoreClient castClient = - (AbstractAvroStoreClient) ((StatTrackingStoreClient) client) - .getInnerStoreClient(); - Assert.assertTrue(castClient.toString().contains(destD2ServiceName)); }); } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientEndToEndTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientEndToEndTest.java index ebccaf75df..e4be1ad9a0 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientEndToEndTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientEndToEndTest.java @@ -35,6 +35,9 @@ * TODO * 1. There might be some duplicate tests in this file and {@link BatchGetAvroStoreClientTest}, need to clean it up. * 2. add test for get with speculative query but only with 1 replica + * + * The test suite requires JDK version 1.8.252 or higher in order to leverage httpclient5 H2 support. User needs to + * set up the correct JDK versions on IDE and JAVA_HOME. */ public class AvroStoreClientEndToEndTest extends AbstractClientEndToEndSetup { From 6dcf15429336a4f707ee3896a7704748a40deb89 Mon Sep 17 00:00:00 2001 From: "Xin(Adam) Chen" Date: Thu, 22 Aug 2024 11:33:24 -0700 Subject: [PATCH 4/8] fix spotbug --- .../java/com/linkedin/venice/endToEnd/TestStoreMigration.java | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java index b2c512f587..c0742515ad 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java @@ -559,7 +559,6 @@ private StoreInfo getStoreConfig(String controllerUrl, String clusterName, Strin public void testStoreMigrationForFastClient() throws Exception { String storeName = Utils.getUniqueString("testMigrationWithFastClient"); createAndPushStore(srcClusterName, storeName); - String destD2ServiceName = multiClusterWrapper.getClusterToD2().get(destClusterName); D2Client d2Client = D2TestUtils.getAndStartD2Client(multiClusterWrapper.getClusters().get(srcClusterName).getZk().getAddress()); From 9216a2357dcac8dd175e121f8c1edb702b56ac32 Mon Sep 17 00:00:00 2001 From: "Xin(Adam) Chen" Date: Thu, 22 Aug 2024 17:30:55 -0700 Subject: [PATCH 5/8] address comment --- .../ServerReadMetadataRepository.java | 21 ++++- .../linkedin/venice/server/VeniceServer.java | 6 ++ .../ServerReadMetadataRepositoryTest.java | 81 +++++++++++++++---- 3 files changed, 91 insertions(+), 17 deletions(-) diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java index 9553ffe63b..6598acb2ff 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java @@ -7,11 +7,13 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository; import com.linkedin.venice.helix.HelixInstanceConfigRepository; +import com.linkedin.venice.helix.ZkStoreConfigAccessor; import com.linkedin.venice.meta.Instance; import com.linkedin.venice.meta.Partition; import com.linkedin.venice.meta.ReadOnlySchemaRepository; import com.linkedin.venice.meta.ReadOnlyStoreRepository; import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.StoreConfig; import com.linkedin.venice.meta.Version; import com.linkedin.venice.metadata.response.VersionProperties; import com.linkedin.venice.schema.SchemaEntry; @@ -38,16 +40,19 @@ public class ServerReadMetadataRepository implements ReadMetadataRetriever { private final ReadOnlySchemaRepository schemaRepository; private HelixCustomizedViewOfflinePushRepository customizedViewRepository; private HelixInstanceConfigRepository helixInstanceConfigRepository; + private ZkStoreConfigAccessor zkStoreConfigAccessor; public ServerReadMetadataRepository( MetricsRepository metricsRepository, ReadOnlyStoreRepository storeRepository, ReadOnlySchemaRepository schemaRepository, + ZkStoreConfigAccessor zkStoreConfigAccessor, Optional> customizedViewFuture, Optional> helixInstanceFuture) { this.serverMetadataServiceStats = new ServerMetadataServiceStats(metricsRepository); this.storeRepository = storeRepository; this.schemaRepository = schemaRepository; + this.zkStoreConfigAccessor = zkStoreConfigAccessor; customizedViewFuture.ifPresent(future -> future.thenApply(cv -> this.customizedViewRepository = cv)); helixInstanceFuture.ifPresent(future -> future.thenApply(helix -> this.helixInstanceConfigRepository = helix)); @@ -72,10 +77,20 @@ public MetadataResponse getMetadata(String storeName) { "Fast client is not enabled for store: %s, please ensure storage node read quota is enabled for the given store", storeName)); } + if (store.isMigrating()) { - throw new VeniceException( - "Store: " + storeName + " is migrating. Failing the request to allow fast " - + "client refresh service discovery."); + // only obtain store Config when store is migrating and only throw exceptions when dest cluster is ready + StoreConfig storeConfig = zkStoreConfigAccessor.getStoreConfig(storeName); + String destCluster = storeConfig.getMigrationDestCluster(); + if (destCluster == null) { + // defensive check + throw new VeniceException("Store: " + storeName + " is migrating but dest cluster is not set."); + } + if (storeConfig.getCluster().equals(storeConfig.getMigrationDestCluster())) { + throw new VeniceException( + "Store: " + storeName + " is migrating. Failing the request to allow fast " + + "client refresh service discovery."); + } } // Version metadata int currentVersionNumber = store.getCurrentVersion(); diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index d7f51a510f..4d3b9ac819 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -29,11 +29,13 @@ import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.AllowlistAccessor; +import com.linkedin.venice.helix.HelixAdapterSerializer; import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository; import com.linkedin.venice.helix.HelixInstanceConfigRepository; import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository; import com.linkedin.venice.helix.SafeHelixManager; import com.linkedin.venice.helix.ZkAllowlistAccessor; +import com.linkedin.venice.helix.ZkStoreConfigAccessor; import com.linkedin.venice.kafka.protocol.state.PartitionState; import com.linkedin.venice.kafka.protocol.state.StoreVersionState; import com.linkedin.venice.listener.ListenerService; @@ -414,10 +416,14 @@ private List createServices() { new StoreValueSchemasCacheService(metadataRepo, schemaRepo); services.add(storeValueSchemasCacheService); + ZkStoreConfigAccessor storeConfigAccessor = + new ZkStoreConfigAccessor(zkClient, new HelixAdapterSerializer(), Optional.empty()); + serverReadMetadataRepository = new ServerReadMetadataRepository( metricsRepository, metadataRepo, schemaRepo, + storeConfigAccessor, Optional.of(customizedViewFuture), Optional.of(helixInstanceFuture)); diff --git a/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java b/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java index 6420949dfa..59936d420c 100644 --- a/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java +++ b/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java @@ -1,28 +1,35 @@ package com.linkedin.venice.listener; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.linkedin.davinci.listener.response.MetadataResponse; import com.linkedin.davinci.listener.response.ServerCurrentVersionResponse; +import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository; import com.linkedin.venice.helix.HelixInstanceConfigRepository; +import com.linkedin.venice.helix.ZkStoreConfigAccessor; import com.linkedin.venice.meta.Instance; import com.linkedin.venice.meta.OfflinePushStrategy; import com.linkedin.venice.meta.Partition; import com.linkedin.venice.meta.PartitionAssignment; +import com.linkedin.venice.meta.PartitionerConfig; import com.linkedin.venice.meta.PersistenceType; import com.linkedin.venice.meta.ReadOnlySchemaRepository; import com.linkedin.venice.meta.ReadOnlyStoreRepository; import com.linkedin.venice.meta.ReadStrategy; import com.linkedin.venice.meta.RoutingStrategy; import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.StoreConfig; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionImpl; import com.linkedin.venice.meta.ZKStore; import com.linkedin.venice.metadata.response.VersionProperties; import com.linkedin.venice.schema.SchemaEntry; import io.tehuti.metrics.MetricsRepository; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -38,8 +45,12 @@ public class ServerReadMetadataRepositoryTest { private ReadOnlySchemaRepository mockSchemaRepo; private HelixCustomizedViewOfflinePushRepository mockCustomizedViewRepository; private HelixInstanceConfigRepository mockHelixInstanceConfigRepository; - private final MetricsRepository metricsRepository = new MetricsRepository(); + private ZkStoreConfigAccessor zkStoreConfigAccessor; + private ServerReadMetadataRepository serverReadMetadataRepository; + private MetricsRepository metricsRepository; private final static String TEST_STORE = "test_store"; + private final static String DEST_CLUSTER = "test-cluster-dst"; + private final static String SRC_CLUSTER = "test-cluster-src"; @BeforeMethod public void setUp() { @@ -47,16 +58,19 @@ public void setUp() { mockSchemaRepo = mock(ReadOnlySchemaRepository.class); mockCustomizedViewRepository = mock(HelixCustomizedViewOfflinePushRepository.class); mockHelixInstanceConfigRepository = mock(HelixInstanceConfigRepository.class); - } - - @Test - public void testGetMetadata() { - ServerReadMetadataRepository serverReadMetadataRepository = new ServerReadMetadataRepository( + zkStoreConfigAccessor = mock(ZkStoreConfigAccessor.class); + metricsRepository = new MetricsRepository(); + serverReadMetadataRepository = new ServerReadMetadataRepository( metricsRepository, mockMetadataRepo, mockSchemaRepo, + zkStoreConfigAccessor, Optional.of(CompletableFuture.completedFuture(mockCustomizedViewRepository)), Optional.of(CompletableFuture.completedFuture(mockHelixInstanceConfigRepository))); + } + + @Test + public void testGetMetadata() { String storeName = "test-store"; Store mockStore = new ZKStore( storeName, @@ -74,7 +88,7 @@ public void testGetMetadata() { String topicName = Version.composeKafkaTopic(storeName, 2); PartitionAssignment partitionAssignment = new PartitionAssignment(topicName, 1); Partition partition = mock(Partition.class); - doReturn(0).when(partition).getId(); + when(partition.getId()).thenReturn(0); List readyToServeInstances = Collections.singletonList(new Instance("host1", "host1", 1234)); doReturn(readyToServeInstances).when(partition).getReadyToServeInstances(); partitionAssignment.addPartition(partition); @@ -117,18 +131,57 @@ public void testGetMetadata() { } @Test - public void storeMigrationShouldThrownException() { - ServerReadMetadataRepository serverReadMetadataRepository = new ServerReadMetadataRepository( - metricsRepository, - mockMetadataRepo, - mockSchemaRepo, - Optional.of(CompletableFuture.completedFuture(mockCustomizedViewRepository)), - Optional.of(CompletableFuture.completedFuture(mockHelixInstanceConfigRepository))); + public void storeMigrationShouldNotThrownExceptionWhenStartMigration() { + Store store = mock(Store.class); + String topicName = Version.composeKafkaTopic(TEST_STORE, 1); + doReturn(Boolean.TRUE).when(store).isMigrating(); + doReturn(Boolean.TRUE).when(store).isStorageNodeReadQuotaEnabled(); + doReturn(store).when(mockMetadataRepo).getStoreOrThrow(TEST_STORE); + StoreConfig storeConfig = new StoreConfig(TEST_STORE); + storeConfig.setMigrationDestCluster(DEST_CLUSTER); + storeConfig.setMigrationSrcCluster(SRC_CLUSTER); + storeConfig.setCluster(SRC_CLUSTER); + doReturn(storeConfig).when(zkStoreConfigAccessor).getStoreConfig(TEST_STORE); + doReturn(1).when(store).getCurrentVersion(); + Version version = mock(Version.class); + PartitionerConfig partitionerConfig = mock(PartitionerConfig.class); + CompressionStrategy strategy = CompressionStrategy.NO_OP; + doReturn(strategy).when(version).getCompressionStrategy(); + doReturn(partitionerConfig).when(version).getPartitionerConfig(); + doReturn(version).when(store).getVersionOrThrow(anyInt()); + String schema = "\"string\""; + SchemaEntry entry = new SchemaEntry(0, schema); + List schemas = new ArrayList<>(); + schemas.add(entry); + doReturn(entry).when(mockSchemaRepo).getKeySchema(TEST_STORE); + doReturn(schemas).when(mockSchemaRepo).getValueSchemas(TEST_STORE); + PartitionAssignment partitionAssignment = new PartitionAssignment(topicName, 1); + Partition partition = mock(Partition.class); + when(partition.getId()).thenReturn(0); + List readyToServeInstances = Collections.singletonList(new Instance("host1", "host1", 1234)); + doReturn(readyToServeInstances).when(partition).getReadyToServeInstances(); + partitionAssignment.addPartition(partition); + when(mockCustomizedViewRepository.getPartitionAssignments(topicName)).thenReturn(partitionAssignment); + when(mockHelixInstanceConfigRepository.getInstanceGroupIdMapping()).thenReturn(Collections.emptyMap()); + MetadataResponse response = serverReadMetadataRepository.getMetadata(TEST_STORE); + Assert.assertFalse(response.isError()); + } + + @Test + public void storeMigrationShouldThrownExceptionWhenMigrationCompletes() { Store store = mock(Store.class); doReturn(Boolean.TRUE).when(store).isMigrating(); doReturn(Boolean.TRUE).when(store).isStorageNodeReadQuotaEnabled(); doReturn(store).when(mockMetadataRepo).getStoreOrThrow(TEST_STORE); + StoreConfig storeConfig = new StoreConfig(TEST_STORE); + storeConfig.setMigrationDestCluster(DEST_CLUSTER); + storeConfig.setMigrationSrcCluster(SRC_CLUSTER); + // when current cluster is the same as destination cluster + // exception should be thrown so clients can act on it and refresh d2 + storeConfig.setCluster(DEST_CLUSTER); + doReturn(storeConfig).when(zkStoreConfigAccessor).getStoreConfig(TEST_STORE); + MetadataResponse response = serverReadMetadataRepository.getMetadata(TEST_STORE); Assert.assertTrue(response.isError()); Assert.assertTrue(response.getMessage().contains(TEST_STORE + " is migrating")); From c34f326686f7bc42730c561246758c9b7b3b264b Mon Sep 17 00:00:00 2001 From: "Xin(Adam) Chen" Date: Fri, 23 Aug 2024 14:49:55 -0700 Subject: [PATCH 6/8] what's goin on --- .../utils/VeniceMultiClusterWrapper.java | 2 +- ...woLayerMultiRegionMultiClusterWrapper.java | 2 +- .../ServerReadMetadataRepository.java | 26 +++++++++------ .../linkedin/venice/server/VeniceServer.java | 13 +++++--- .../ServerReadMetadataRepositoryTest.java | 33 +++++++++++++------ 5 files changed, 50 insertions(+), 26 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterWrapper.java index 6bf6beb197..d03c9518ce 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterWrapper.java @@ -81,7 +81,7 @@ static ServiceProvider generateService(VeniceMultiClu clusterNames[i] = clusterName; String d2ServiceName = "venice-" + i; clusterToD2.put(clusterName, d2ServiceName); - String serverD2ServiceName = Utils.getUniqueString(clusterName + "_server_d2"); + String serverD2ServiceName = Utils.getUniqueString(clusterName + "_d2"); clusterToServerD2.put(clusterName, serverD2ServiceName); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java index 3c2fcc6059..5065c3d52e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java @@ -99,7 +99,7 @@ static ServiceProvider generateSer clusterNames[i] = clusterName; String routerD2ServiceName = "venice-" + i; clusterToD2.put(clusterName, routerD2ServiceName); - String serverD2ServiceName = Utils.getUniqueString(clusterName + "_server_d2"); + String serverD2ServiceName = Utils.getUniqueString(clusterName + "_d2"); clusterToServerD2.put(clusterName, serverD2ServiceName); } List childRegionName = new ArrayList<>(options.getNumberOfRegions()); diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java index 6598acb2ff..f1568fadaf 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java @@ -7,7 +7,7 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository; import com.linkedin.venice.helix.HelixInstanceConfigRepository; -import com.linkedin.venice.helix.ZkStoreConfigAccessor; +import com.linkedin.venice.helix.HelixReadOnlyStoreConfigRepository; import com.linkedin.venice.meta.Instance; import com.linkedin.venice.meta.Partition; import com.linkedin.venice.meta.ReadOnlySchemaRepository; @@ -35,24 +35,27 @@ */ public class ServerReadMetadataRepository implements ReadMetadataRetriever { private static final Logger LOGGER = LogManager.getLogger(ServerReadMetadataRepository.class); + private final String serverCluster; private final ServerMetadataServiceStats serverMetadataServiceStats; private final ReadOnlyStoreRepository storeRepository; private final ReadOnlySchemaRepository schemaRepository; private HelixCustomizedViewOfflinePushRepository customizedViewRepository; private HelixInstanceConfigRepository helixInstanceConfigRepository; - private ZkStoreConfigAccessor zkStoreConfigAccessor; + private HelixReadOnlyStoreConfigRepository storeConfigRepository; public ServerReadMetadataRepository( + String serverCluster, MetricsRepository metricsRepository, ReadOnlyStoreRepository storeRepository, ReadOnlySchemaRepository schemaRepository, - ZkStoreConfigAccessor zkStoreConfigAccessor, + HelixReadOnlyStoreConfigRepository storeConfigRepository, Optional> customizedViewFuture, Optional> helixInstanceFuture) { + this.serverCluster = serverCluster; this.serverMetadataServiceStats = new ServerMetadataServiceStats(metricsRepository); this.storeRepository = storeRepository; this.schemaRepository = schemaRepository; - this.zkStoreConfigAccessor = zkStoreConfigAccessor; + this.storeConfigRepository = storeConfigRepository; customizedViewFuture.ifPresent(future -> future.thenApply(cv -> this.customizedViewRepository = cv)); helixInstanceFuture.ifPresent(future -> future.thenApply(helix -> this.helixInstanceConfigRepository = helix)); @@ -79,14 +82,17 @@ public MetadataResponse getMetadata(String storeName) { } if (store.isMigrating()) { - // only obtain store Config when store is migrating and only throw exceptions when dest cluster is ready - StoreConfig storeConfig = zkStoreConfigAccessor.getStoreConfig(storeName); - String destCluster = storeConfig.getMigrationDestCluster(); - if (destCluster == null) { + // only obtain store Config when store is migrating and only throw exceptions when dest cluster is ready or + // store + // config is not available + StoreConfig storeConfig = storeConfigRepository.getStoreConfigOrThrow(storeName); + String storeCluster = storeConfig.getCluster(); + if (storeCluster == null) { // defensive check - throw new VeniceException("Store: " + storeName + " is migrating but dest cluster is not set."); + throw new VeniceException("Store: " + storeName + " is migrating but store cluster is not set."); } - if (storeConfig.getCluster().equals(storeConfig.getMigrationDestCluster())) { + // store cluster has changed so throw exception to enforce client to do a new service discovery + if (!storeCluster.equals(serverCluster)) { throw new VeniceException( "Store: " + storeName + " is migrating. Failing the request to allow fast " + "client refresh service discovery."); diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index 4d3b9ac819..0de908ffc3 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -32,10 +32,10 @@ import com.linkedin.venice.helix.HelixAdapterSerializer; import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository; import com.linkedin.venice.helix.HelixInstanceConfigRepository; +import com.linkedin.venice.helix.HelixReadOnlyStoreConfigRepository; import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository; import com.linkedin.venice.helix.SafeHelixManager; import com.linkedin.venice.helix.ZkAllowlistAccessor; -import com.linkedin.venice.helix.ZkStoreConfigAccessor; import com.linkedin.venice.kafka.protocol.state.PartitionState; import com.linkedin.venice.kafka.protocol.state.StoreVersionState; import com.linkedin.venice.listener.ListenerService; @@ -416,14 +416,19 @@ private List createServices() { new StoreValueSchemasCacheService(metadataRepo, schemaRepo); services.add(storeValueSchemasCacheService); - ZkStoreConfigAccessor storeConfigAccessor = - new ZkStoreConfigAccessor(zkClient, new HelixAdapterSerializer(), Optional.empty()); + HelixReadOnlyStoreConfigRepository storeConfigRepository = new HelixReadOnlyStoreConfigRepository( + zkClient, + new HelixAdapterSerializer(), + serverConfig.getRefreshAttemptsForZkReconnect(), + serverConfig.getRefreshIntervalForZkReconnectInMs()); + storeConfigRepository.refresh(); serverReadMetadataRepository = new ServerReadMetadataRepository( + clusterConfig.getClusterName(), metricsRepository, metadataRepo, schemaRepo, - storeConfigAccessor, + storeConfigRepository, Optional.of(customizedViewFuture), Optional.of(helixInstanceFuture)); diff --git a/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java b/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java index 59936d420c..51d1e518a8 100644 --- a/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java +++ b/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java @@ -2,15 +2,17 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.linkedin.davinci.listener.response.MetadataResponse; import com.linkedin.davinci.listener.response.ServerCurrentVersionResponse; import com.linkedin.venice.compression.CompressionStrategy; +import com.linkedin.venice.exceptions.VeniceNoStoreException; import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository; import com.linkedin.venice.helix.HelixInstanceConfigRepository; -import com.linkedin.venice.helix.ZkStoreConfigAccessor; +import com.linkedin.venice.helix.HelixReadOnlyStoreConfigRepository; import com.linkedin.venice.meta.Instance; import com.linkedin.venice.meta.OfflinePushStrategy; import com.linkedin.venice.meta.Partition; @@ -45,7 +47,7 @@ public class ServerReadMetadataRepositoryTest { private ReadOnlySchemaRepository mockSchemaRepo; private HelixCustomizedViewOfflinePushRepository mockCustomizedViewRepository; private HelixInstanceConfigRepository mockHelixInstanceConfigRepository; - private ZkStoreConfigAccessor zkStoreConfigAccessor; + private HelixReadOnlyStoreConfigRepository storeConfigRepository; private ServerReadMetadataRepository serverReadMetadataRepository; private MetricsRepository metricsRepository; private final static String TEST_STORE = "test_store"; @@ -58,13 +60,14 @@ public void setUp() { mockSchemaRepo = mock(ReadOnlySchemaRepository.class); mockCustomizedViewRepository = mock(HelixCustomizedViewOfflinePushRepository.class); mockHelixInstanceConfigRepository = mock(HelixInstanceConfigRepository.class); - zkStoreConfigAccessor = mock(ZkStoreConfigAccessor.class); + storeConfigRepository = mock(HelixReadOnlyStoreConfigRepository.class); metricsRepository = new MetricsRepository(); serverReadMetadataRepository = new ServerReadMetadataRepository( + SRC_CLUSTER, metricsRepository, mockMetadataRepo, mockSchemaRepo, - zkStoreConfigAccessor, + storeConfigRepository, Optional.of(CompletableFuture.completedFuture(mockCustomizedViewRepository)), Optional.of(CompletableFuture.completedFuture(mockHelixInstanceConfigRepository))); } @@ -141,7 +144,7 @@ public void storeMigrationShouldNotThrownExceptionWhenStartMigration() { storeConfig.setMigrationDestCluster(DEST_CLUSTER); storeConfig.setMigrationSrcCluster(SRC_CLUSTER); storeConfig.setCluster(SRC_CLUSTER); - doReturn(storeConfig).when(zkStoreConfigAccessor).getStoreConfig(TEST_STORE); + doReturn(storeConfig).when(storeConfigRepository).getStoreConfigOrThrow(TEST_STORE); doReturn(1).when(store).getCurrentVersion(); Version version = mock(Version.class); PartitionerConfig partitionerConfig = mock(PartitionerConfig.class); @@ -175,15 +178,25 @@ public void storeMigrationShouldThrownExceptionWhenMigrationCompletes() { doReturn(Boolean.TRUE).when(store).isStorageNodeReadQuotaEnabled(); doReturn(store).when(mockMetadataRepo).getStoreOrThrow(TEST_STORE); StoreConfig storeConfig = new StoreConfig(TEST_STORE); - storeConfig.setMigrationDestCluster(DEST_CLUSTER); - storeConfig.setMigrationSrcCluster(SRC_CLUSTER); - // when current cluster is the same as destination cluster - // exception should be thrown so clients can act on it and refresh d2 storeConfig.setCluster(DEST_CLUSTER); - doReturn(storeConfig).when(zkStoreConfigAccessor).getStoreConfig(TEST_STORE); + doReturn(storeConfig).when(storeConfigRepository).getStoreConfigOrThrow(TEST_STORE); MetadataResponse response = serverReadMetadataRepository.getMetadata(TEST_STORE); Assert.assertTrue(response.isError()); Assert.assertTrue(response.getMessage().contains(TEST_STORE + " is migrating")); } + + @Test + public void storeMigrationShouldThrownExceptionWhenStoreConfigMisfunction() { + Store store = mock(Store.class); + doReturn(Boolean.TRUE).when(store).isMigrating(); + doReturn(Boolean.TRUE).when(store).isStorageNodeReadQuotaEnabled(); + doReturn(store).when(mockMetadataRepo).getStoreOrThrow(TEST_STORE); + doThrow(new VeniceNoStoreException(TEST_STORE)).when(storeConfigRepository).getStoreConfigOrThrow(TEST_STORE); + + // store config is not available + MetadataResponse response = serverReadMetadataRepository.getMetadata(TEST_STORE); + Assert.assertTrue(response.isError()); + Assert.assertTrue(response.getMessage().contains(TEST_STORE + " does not exist")); + } } From 07a4aa7655c926d0a164760d1040963337d6f890 Mon Sep 17 00:00:00 2001 From: "Xin(Adam) Chen" Date: Wed, 4 Sep 2024 15:06:51 -0700 Subject: [PATCH 7/8] move storeConfigRepo creation another place --- .../repository/VeniceMetadataRepositoryBuilder.java | 13 +++++++++++++ .../com/linkedin/venice/server/VeniceServer.java | 11 +---------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/VeniceMetadataRepositoryBuilder.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/VeniceMetadataRepositoryBuilder.java index ee8d660072..49a27875d8 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/VeniceMetadataRepositoryBuilder.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/VeniceMetadataRepositoryBuilder.java @@ -7,6 +7,7 @@ import com.linkedin.venice.helix.HelixReadOnlyLiveClusterConfigRepository; import com.linkedin.venice.helix.HelixReadOnlySchemaRepository; import com.linkedin.venice.helix.HelixReadOnlySchemaRepositoryAdapter; +import com.linkedin.venice.helix.HelixReadOnlyStoreConfigRepository; import com.linkedin.venice.helix.HelixReadOnlyStoreRepository; import com.linkedin.venice.helix.HelixReadOnlyStoreRepositoryAdapter; import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository; @@ -43,6 +44,7 @@ public class VeniceMetadataRepositoryBuilder { private final ICProvider icProvider; private ReadOnlyStoreRepository storeRepo; + private HelixReadOnlyStoreConfigRepository storeConfigRepo; private ReadOnlySchemaRepository schemaRepo; private HelixReadOnlyZKSharedSchemaRepository readOnlyZKSharedSchemaRepository; private ReadOnlyLiveClusterConfigRepository liveClusterConfigRepo; @@ -79,6 +81,10 @@ public ReadOnlyStoreRepository getStoreRepo() { return storeRepo; } + public HelixReadOnlyStoreConfigRepository getStoreConfigRepo() { + return storeConfigRepo; + } + public ReadOnlySchemaRepository getSchemaRepo() { return schemaRepo; } @@ -136,6 +142,13 @@ private void initServerStoreAndSchemaRepository() { // Load existing store config and setup watches storeRepo.refresh(); + storeConfigRepo = new HelixReadOnlyStoreConfigRepository( + zkClient, + adapter, + clusterConfig.getRefreshAttemptsForZkReconnect(), + clusterConfig.getRefreshIntervalForZkReconnectInMs()); + storeConfigRepo.refresh(); + readOnlyZKSharedSchemaRepository = new HelixReadOnlyZKSharedSchemaRepository( readOnlyZKSharedSystemStoreRepository, zkClient, diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index 0de908ffc3..8fc074d02e 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -29,10 +29,8 @@ import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.AllowlistAccessor; -import com.linkedin.venice.helix.HelixAdapterSerializer; import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository; import com.linkedin.venice.helix.HelixInstanceConfigRepository; -import com.linkedin.venice.helix.HelixReadOnlyStoreConfigRepository; import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository; import com.linkedin.venice.helix.SafeHelixManager; import com.linkedin.venice.helix.ZkAllowlistAccessor; @@ -416,19 +414,12 @@ private List createServices() { new StoreValueSchemasCacheService(metadataRepo, schemaRepo); services.add(storeValueSchemasCacheService); - HelixReadOnlyStoreConfigRepository storeConfigRepository = new HelixReadOnlyStoreConfigRepository( - zkClient, - new HelixAdapterSerializer(), - serverConfig.getRefreshAttemptsForZkReconnect(), - serverConfig.getRefreshIntervalForZkReconnectInMs()); - - storeConfigRepository.refresh(); serverReadMetadataRepository = new ServerReadMetadataRepository( clusterConfig.getClusterName(), metricsRepository, metadataRepo, schemaRepo, - storeConfigRepository, + veniceMetadataRepositoryBuilder.getStoreConfigRepo(), Optional.of(customizedViewFuture), Optional.of(helixInstanceFuture)); From 1fcdd0dc4cdcab6567fc0c8361acc0bb13b2bc8b Mon Sep 17 00:00:00 2001 From: "Xin(Adam) Chen" Date: Fri, 6 Sep 2024 17:21:16 -0700 Subject: [PATCH 8/8] enable SSL --- .../venice/endToEnd/TestStoreMigration.java | 21 ++++++++++--- .../integration/utils/ServiceFactory.java | 31 ++++++++++++++++++- .../ServerReadMetadataRepository.java | 3 +- 3 files changed, 47 insertions(+), 8 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java index 4d8ef4a029..d4d3701c49 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java @@ -3,7 +3,9 @@ import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS; import static com.linkedin.venice.ConfigKeys.OFFLINE_JOB_START_TIMEOUT_MS; import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_ENABLED; +import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_INBOUND_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS; +import static com.linkedin.venice.ConfigKeys.SERVER_QUOTA_ENFORCEMENT_ENABLED; import static com.linkedin.venice.ConfigKeys.TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS; import static com.linkedin.venice.hadoop.VenicePushJobConstants.DEFAULT_KEY_FIELD_PROP; import static com.linkedin.venice.hadoop.VenicePushJobConstants.DEFAULT_VALUE_FIELD_PROP; @@ -76,6 +78,7 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.apache.avro.Schema; @@ -119,6 +122,8 @@ public void setUp() throws Exception { Properties serverProperties = new Properties(); serverProperties.put(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, 1L); + serverProperties.put(SERVER_HTTP2_INBOUND_ENABLED, "true"); + serverProperties.put(SERVER_QUOTA_ENFORCEMENT_ENABLED, "true"); // 1 parent controller, 1 child region, 2 clusters per child region, 2 servers per cluster // RF=2 to test both leader and follower SNs @@ -133,7 +138,8 @@ public void setUp() throws Exception { Optional.of(parentControllerProperties), Optional.empty(), Optional.of(serverProperties), - false); + false, + true); multiClusterWrapper = twoLayerMultiRegionMultiClusterWrapper.getChildRegions().get(0); String[] clusterNames = multiClusterWrapper.getClusterNames(); @@ -551,9 +557,10 @@ private Properties createAndPushStore(String clusterName, String storeName) thro return props; } - private void readFromStore(AvroGenericStoreClient client) { + private void readFromStore(AvroGenericStoreClient client) + throws ExecutionException, InterruptedException { int key = ThreadLocalRandom.current().nextInt(RECORD_COUNT) + 1; - client.get(Integer.toString(key)); + client.get(Integer.toString(key)).get(); } private StoreInfo getStoreConfig(String controllerUrl, String clusterName, String storeName) { @@ -572,8 +579,9 @@ private StoreInfo getStoreConfig(String controllerUrl, String clusterName, Strin public void testStoreMigrationForFastClient() throws Exception { String storeName = Utils.getUniqueString("testMigrationWithFastClient"); createAndPushStore(srcClusterName, storeName); - D2Client d2Client = - D2TestUtils.getAndStartD2Client(multiClusterWrapper.getClusters().get(srcClusterName).getZk().getAddress()); + // D2 must talk to HTTPS endpoint since SSL is enabled for server + D2Client d2Client = D2TestUtils + .getAndStartD2Client(multiClusterWrapper.getClusters().get(srcClusterName).getZk().getAddress(), true); // this is for SERVER_BASED_METADATA only com.linkedin.venice.fastclient.ClientConfig.ClientConfigBuilder fastClientConfigBuilder = @@ -581,6 +589,8 @@ public void testStoreMigrationForFastClient() throws Exception { .setR2Client(r2Client) .setD2Client(d2Client) .setMetadataRefreshIntervalInSeconds(1) + .setDualReadEnabled(false) + .setSpeculativeQueryEnabled(false) .setClusterDiscoveryD2Service(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) .setStoreMetadataFetchMode(StoreMetadataFetchMode.SERVER_BASED_METADATA); @@ -588,6 +598,7 @@ public void testStoreMigrationForFastClient() throws Exception { .getAndStartGenericStoreClient(fastClientConfigBuilder.build())) { readFromStore(client); StoreMigrationTestUtil.startMigration(parentControllerUrl, storeName, srcClusterName, destClusterName); + readFromStore(client); StoreMigrationTestUtil .completeMigration(parentControllerUrl, storeName, srcClusterName, destClusterName, FABRIC0); TestUtils.waitForNonDeterministicAssertion(45, TimeUnit.SECONDS, () -> { diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java index a69b92f7fa..e412b1af2b 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java @@ -431,6 +431,33 @@ public static VeniceTwoLayerMultiRegionMultiClusterWrapper getVeniceTwoLayerMult Optional parentControllerProps, Optional childControllerProperties, Optional serverProps) { + return getVeniceTwoLayerMultiRegionMultiClusterWrapper( + numberOfRegions, + numberOfClustersInEachRegion, + numberOfParentControllers, + numberOfControllers, + numberOfServers, + numberOfRouters, + replicationFactor, + parentControllerProps, + childControllerProperties, + serverProps, + false); + } + + public static VeniceTwoLayerMultiRegionMultiClusterWrapper getVeniceTwoLayerMultiRegionMultiClusterWrapper( + int numberOfRegions, + int numberOfClustersInEachRegion, + int numberOfParentControllers, + int numberOfControllers, + int numberOfServers, + int numberOfRouters, + int replicationFactor, + Optional parentControllerProps, + Optional childControllerProperties, + Optional serverProps, + boolean forkServer, + boolean sslToStorageNodes) { VeniceMultiRegionClusterCreateOptions.Builder optionsBuilder = new VeniceMultiRegionClusterCreateOptions.Builder().numberOfRegions(numberOfRegions) .numberOfClusters(numberOfClustersInEachRegion) @@ -438,7 +465,9 @@ public static VeniceTwoLayerMultiRegionMultiClusterWrapper getVeniceTwoLayerMult .numberOfChildControllers(numberOfControllers) .numberOfServers(numberOfServers) .numberOfRouters(numberOfRouters) - .replicationFactor(replicationFactor); + .replicationFactor(replicationFactor) + .sslToStorageNodes(sslToStorageNodes) + .forkServer(forkServer); parentControllerProps.ifPresent(optionsBuilder::parentControllerProperties); childControllerProperties.ifPresent(optionsBuilder::childControllerProperties); diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java index f1568fadaf..a6126fdd43 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java @@ -83,8 +83,7 @@ public MetadataResponse getMetadata(String storeName) { if (store.isMigrating()) { // only obtain store Config when store is migrating and only throw exceptions when dest cluster is ready or - // store - // config is not available + // store config is not available StoreConfig storeConfig = storeConfigRepository.getStoreConfigOrThrow(storeName); String storeCluster = storeConfig.getCluster(); if (storeCluster == null) {