diff --git a/.github/workflows/gh-ci-pulsar-tests.yml b/.github/workflows/gh-ci-pulsar-tests.yml index 3dc4136d26d..bec85bb4669 100644 --- a/.github/workflows/gh-ci-pulsar-tests.yml +++ b/.github/workflows/gh-ci-pulsar-tests.yml @@ -4,22 +4,6 @@ name: PulsarVeniceIntegrationCI on: [push, pull_request, workflow_dispatch] jobs: - ValidateGradleWrapper: - strategy: - fail-fast: false - matrix: - os: [ubuntu-latest] - runs-on: ${{ matrix.os }} - timeout-minutes: 5 - steps: - - uses: actions/checkout@v3 - with: - fetch-depth: 0 - - name: Validate Gradle wrapper - uses: gradle/wrapper-validation-action@v1 - - name: Set up Docker - uses: crazy-max/ghaction-setup-docker@v1 - PulsarVeniceIntegrationTests: strategy: fail-fast: false diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java index 0fc5e1eaa41..787d787bc93 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java @@ -15,6 +15,7 @@ import com.linkedin.venice.kafka.protocol.state.StoreVersionState; import com.linkedin.venice.meta.Store; import com.linkedin.venice.metadata.response.MetadataResponseRecord; +import com.linkedin.venice.participant.protocol.ParticipantMessageValue; import com.linkedin.venice.pubsub.api.PubSubPositionWireFormat; import com.linkedin.venice.pushstatus.PushStatusKey; import com.linkedin.venice.pushstatus.PushStatusValue; @@ -147,6 +148,11 @@ public enum AvroProtocolDefinition { */ PUSH_STATUS_SYSTEM_SCHEMA_STORE(1, PushStatusValue.class), + /** + * Value schema for participant system stores. + */ + PARTICIPANT_MESSAGE_SYSTEM_STORE_VALUE(1, ParticipantMessageValue.class), + /** * Response record for admin request v1. */ @@ -230,7 +236,7 @@ private static Set validateMagicBytes() { * extra header prepended in front. These are either un-evolvable or have their * schema ID stored out of band from the record. * - * For example, everything that goes inside of the Put of a {@link KafkaMessageEnvelope} + * For example, everything that goes inside the Put message of a {@link KafkaMessageEnvelope} * and uses the {@link com.linkedin.venice.kafka.protocol.Put#schemaId} to store * the protocol version. */ diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTestWithSchemaReader.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTestWithSchemaReader.java index ede161e2481..3008de49cad 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTestWithSchemaReader.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTestWithSchemaReader.java @@ -1,6 +1,5 @@ package com.linkedin.venice.consumer; -import com.linkedin.venice.controller.VeniceHelixAdmin; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.MultiSchemaResponse; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; @@ -32,13 +31,15 @@ void extraBeforeClassSetUp(VeniceClusterWrapper cluster, ControllerClient contro AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion()); }); - ((VeniceHelixAdmin) cluster.getRandomVeniceController().getVeniceAdmin()).addValueSchema( - cluster.getClusterName(), - systemStoreName, - NEW_PROTOCOL_SCHEMA.toString(), - NEW_PROTOCOL_VERSION, - DirectionalSchemaCompatibilityType.NONE, - false); + cluster.getRandomVeniceController() + .getVeniceAdmin() + .addValueSchemaInternal( + cluster.getClusterName(), + systemStoreName, + NEW_PROTOCOL_SCHEMA.toString(), + NEW_PROTOCOL_VERSION, + DirectionalSchemaCompatibilityType.NONE, + false); } @Override diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/AdminToolBackfillTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/AdminToolBackfillTest.java index ac988989c4a..ee72caed006 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/AdminToolBackfillTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/AdminToolBackfillTest.java @@ -17,7 +17,6 @@ import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -56,7 +55,7 @@ public void setUp() { 2, 2, 1, - Optional.of(new VeniceProperties(parentControllerProperties)), + Optional.of(parentControllerProperties), Optional.empty(), Optional.empty(), false); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestFabricBuildout.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestFabricBuildout.java index e0a686d1757..f2952394736 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestFabricBuildout.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestFabricBuildout.java @@ -22,7 +22,6 @@ import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import java.util.List; import java.util.Optional; import java.util.Properties; @@ -63,7 +62,7 @@ public void setUp() { 1, Optional.empty(), Optional.of(childControllerProperties), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(serverProperties), false); childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java index efdbe9a2470..b5cc46b1f03 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java @@ -24,7 +24,6 @@ import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -71,7 +70,7 @@ public void setUp() { 1, 1, 1, - Optional.of(new VeniceProperties(controllerProps)), + Optional.of(controllerProps), Optional.of(controllerProps), Optional.empty()); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java index 8084f736ad4..cc550181024 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java @@ -3,7 +3,6 @@ import static com.linkedin.venice.ConfigKeys.CONTROLLER_AUTO_MATERIALIZE_DAVINCI_PUSH_STATUS_SYSTEM_STORE; import static com.linkedin.venice.ConfigKeys.CONTROLLER_AUTO_MATERIALIZE_META_SYSTEM_STORE; import static com.linkedin.venice.ConfigKeys.CONTROLLER_PARENT_EXTERNAL_SUPERSET_SCHEMA_GENERATION_ENABLED; -import static com.linkedin.venice.ConfigKeys.REPLICATION_METADATA_VERSION; import static com.linkedin.venice.ConfigKeys.TERMINAL_STATE_TOPIC_CHECK_DELAY_MS; import static com.linkedin.venice.ConfigKeys.TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS; import static com.linkedin.venice.controller.SchemaConstants.BAD_VALUE_SCHEMA_FOR_WRITE_COMPUTE_V2; @@ -38,6 +37,7 @@ import com.linkedin.venice.integration.utils.VeniceClusterWrapper; import com.linkedin.venice.integration.utils.VeniceControllerCreateOptions; import com.linkedin.venice.integration.utils.VeniceControllerWrapper; +import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; import com.linkedin.venice.integration.utils.ZkServerWrapper; import com.linkedin.venice.meta.ETLStoreConfig; import com.linkedin.venice.meta.HybridStoreConfig; @@ -47,7 +47,9 @@ import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter; import com.linkedin.venice.security.SSLFactory; import com.linkedin.venice.utils.SslUtils; +import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.TestWriteUtils; +import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import java.io.IOException; import java.util.ArrayList; @@ -68,67 +70,42 @@ public class VeniceParentHelixAdminTest { private static final long DEFAULT_TEST_TIMEOUT_MS = 60000; - VeniceClusterWrapper venice; - ZkServerWrapper zkServerWrapper; + private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper; + private VeniceClusterWrapper venice; + private String clusterName; @BeforeClass public void setUp() { Utils.thisIsLocalhost(); - Properties properties = new Properties(); - // Disable topic deletion - properties.setProperty(TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS, String.valueOf(Long.MAX_VALUE)); - venice = ServiceFactory.getVeniceCluster(1, 1, 1, 1, 100000, false, false, properties); - zkServerWrapper = ServiceFactory.getZkServer(); + multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(1, 1, 1, 1, 1, 1); + clusterName = multiRegionMultiClusterWrapper.getClusterNames()[0]; + venice = multiRegionMultiClusterWrapper.getChildRegions().get(0).getClusters().get(clusterName); } @AfterClass public void cleanUp() { - Utils.closeQuietlyWithErrorLogged(venice); - Utils.closeQuietlyWithErrorLogged(zkServerWrapper); + Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper); } @Test(timeOut = DEFAULT_TEST_TIMEOUT_MS) public void testTerminalStateTopicChecker() { - Properties properties = new Properties(); - properties.setProperty(TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS, String.valueOf(Long.MAX_VALUE)); - properties.setProperty(TERMINAL_STATE_TOPIC_CHECK_DELAY_MS, String.valueOf(1000L)); - try ( - VeniceControllerWrapper parentController = ServiceFactory.getVeniceController( - new VeniceControllerCreateOptions.Builder(venice.getClusterName(), zkServerWrapper, venice.getKafka()) - .childControllers(new VeniceControllerWrapper[] { venice.getLeaderVeniceController() }) - .extraProperties(properties) - .build()); - ControllerClient parentControllerClient = - new ControllerClient(venice.getClusterName(), parentController.getControllerUrl())) { + try (ControllerClient parentControllerClient = + new ControllerClient(clusterName, multiRegionMultiClusterWrapper.getControllerConnectString())) { String storeName = Utils.getUniqueString("testStore"); assertFalse( parentControllerClient.createNewStore(storeName, "test", "\"string\"", "\"string\"").isError(), "Failed to create test store"); // Empty push without checking its push status - VersionCreationResponse response = parentControllerClient.emptyPush(storeName, "test-push", 1000); + ControllerResponse response = + parentControllerClient.sendEmptyPushAndWait(storeName, "test-push", 1000, 30 * Time.MS_PER_SECOND); assertFalse(response.isError(), "Failed to perform empty push on test store"); - // The empty push should eventually complete and have its version topic truncated by job status polling invoked by - // the TerminalStateTopicCheckerForParentController. - waitForNonDeterministicAssertion( - 30, - TimeUnit.SECONDS, - true, - () -> assertTrue(parentController.getVeniceAdmin().isTopicTruncated(response.getKafkaTopic()))); } } @Test(timeOut = 2 * DEFAULT_TEST_TIMEOUT_MS) public void testAddVersion() { - Properties properties = new Properties(); - properties.setProperty(REPLICATION_METADATA_VERSION, String.valueOf(1)); - try ( - VeniceControllerWrapper parentControllerWrapper = ServiceFactory.getVeniceController( - new VeniceControllerCreateOptions.Builder(venice.getClusterName(), zkServerWrapper, venice.getKafka()) - .childControllers(new VeniceControllerWrapper[] { venice.getLeaderVeniceController() }) - .extraProperties(properties) - .build()); - ControllerClient parentControllerClient = - new ControllerClient(venice.getClusterName(), parentControllerWrapper.getControllerUrl())) { + try (ControllerClient parentControllerClient = + new ControllerClient(clusterName, multiRegionMultiClusterWrapper.getControllerConnectString())) { // Adding store String storeName = Utils.getUniqueString("test_store"); String owner = "test_owner"; @@ -234,6 +211,7 @@ public void testAddVersion() { @Test(timeOut = DEFAULT_TEST_TIMEOUT_MS * 2) public void testResourceCleanupCheckForStoreRecreation() { Properties properties = new Properties(); + // Disable topic deletion properties.setProperty(TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS, String.valueOf(Long.MAX_VALUE)); properties.setProperty(TERMINAL_STATE_TOPIC_CHECK_DELAY_MS, String.valueOf(1000L)); // Recreation of the same store will fail due to lingering system store resources @@ -241,14 +219,23 @@ public void testResourceCleanupCheckForStoreRecreation() { // cleaned up. properties.setProperty(CONTROLLER_AUTO_MATERIALIZE_META_SYSTEM_STORE, String.valueOf(false)); properties.setProperty(CONTROLLER_AUTO_MATERIALIZE_DAVINCI_PUSH_STATUS_SYSTEM_STORE, String.valueOf(false)); + try ( - VeniceControllerWrapper parentController = ServiceFactory.getVeniceController( - new VeniceControllerCreateOptions.Builder(venice.getClusterName(), zkServerWrapper, venice.getKafka()) - .childControllers(new VeniceControllerWrapper[] { venice.getLeaderVeniceController() }) - .extraProperties(properties) - .build()); - ControllerClient parentControllerClient = - new ControllerClient(venice.getClusterName(), parentController.getControllerUrl())) { + VeniceTwoLayerMultiRegionMultiClusterWrapper twoLayerMultiRegionMultiClusterWrapper = + ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper( + 1, + 1, + 1, + 1, + 1, + 1, + 1, + Optional.of(properties), + Optional.of(properties), + Optional.empty()); + ControllerClient parentControllerClient = new ControllerClient( + twoLayerMultiRegionMultiClusterWrapper.getClusterNames()[0], + twoLayerMultiRegionMultiClusterWrapper.getControllerConnectString())) { String storeName = Utils.getUniqueString("testStore"); assertFalse( parentControllerClient.createNewStore(storeName, "test", "\"string\"", "\"string\"").isError(), @@ -258,16 +245,10 @@ public void testResourceCleanupCheckForStoreRecreation() { parentControllerClient.createNewStore(storeName, "test", "\"string\"", "\"string\"").isError(), "Trying to create an existing store should fail"); // Empty push without checking its push status - VersionCreationResponse response = parentControllerClient.emptyPush(storeName, "test-push", 1000); + ControllerResponse response = + parentControllerClient.sendEmptyPushAndWait(storeName, "test-push", 1000, 30 * Time.MS_PER_SECOND); assertFalse(response.isError(), "Failed to perform empty push on test store"); - // The empty push should eventually complete and have its version topic truncated by job status polling invoked by - // the TerminalStateTopicCheckerForParentController. - waitForNonDeterministicAssertion( - 30, - TimeUnit.SECONDS, - true, - () -> assertTrue(parentController.getVeniceAdmin().isTopicTruncated(response.getKafkaTopic()))); assertFalse(parentControllerClient.disableAndDeleteStore(storeName).isError(), "Delete store shouldn't fail"); ControllerResponse controllerResponse = @@ -302,95 +283,88 @@ public void testResourceCleanupCheckForStoreRecreation() { @Test(timeOut = DEFAULT_TEST_TIMEOUT_MS) public void testHybridAndETLStoreConfig() { - try (VeniceControllerWrapper parentControllerWrapper = ServiceFactory.getVeniceController( - new VeniceControllerCreateOptions.Builder(venice.getClusterName(), zkServerWrapper, venice.getKafka()) - .childControllers(new VeniceControllerWrapper[] { venice.getLeaderVeniceController() }) - .build())) { - String controllerUrl = parentControllerWrapper.getControllerUrl(); - - // Adding store - String storeName = "test_store"; - String owner = "test_owner"; - String keySchemaStr = "\"long\""; - String proxyUser = "test_user"; - Schema valueSchema = generateSchema(false); - try (ControllerClient controllerClient = new ControllerClient(venice.getClusterName(), controllerUrl)) { - assertCommand(controllerClient.createNewStore(storeName, owner, keySchemaStr, valueSchema.toString())); - - // Configure the store to hybrid - UpdateStoreQueryParams params = - new UpdateStoreQueryParams().setHybridRewindSeconds(600).setHybridOffsetLagThreshold(10000); - assertCommand(controllerClient.updateStore(storeName, params)); - HybridStoreConfig hybridStoreConfig = - assertCommand(controllerClient.getStore(storeName)).getStore().getHybridStoreConfig(); - Assert.assertEquals(hybridStoreConfig.getRewindTimeInSeconds(), 600); - Assert.assertEquals(hybridStoreConfig.getOffsetLagThresholdToGoOnline(), 10000); - - // Try to update the hybrid store with different hybrid configs - params = new UpdateStoreQueryParams().setHybridRewindSeconds(172800); - assertCommand(controllerClient.updateStore(storeName, params)); - hybridStoreConfig = assertCommand(controllerClient.getStore(storeName)).getStore().getHybridStoreConfig(); - Assert.assertEquals(hybridStoreConfig.getRewindTimeInSeconds(), 172800); - Assert.assertEquals(hybridStoreConfig.getOffsetLagThresholdToGoOnline(), 10000); - - // test enabling ETL without etl proxy account, expected failure - params = new UpdateStoreQueryParams(); - params.setRegularVersionETLEnabled(true); - params.setFutureVersionETLEnabled(true); - ControllerResponse controllerResponse = controllerClient.updateStore(storeName, params); - ETLStoreConfig etlStoreConfig = - assertCommand(controllerClient.getStore(storeName)).getStore().getEtlStoreConfig(); - Assert.assertFalse(etlStoreConfig.isRegularVersionETLEnabled()); - Assert.assertFalse(etlStoreConfig.isFutureVersionETLEnabled()); - Assert.assertTrue( - controllerResponse.getError() - .contains("Cannot enable ETL for this store " + "because etled user proxy account is not set")); - - // test enabling ETL with empty proxy account, expected failure - params = new UpdateStoreQueryParams(); - params.setRegularVersionETLEnabled(true).setEtledProxyUserAccount(""); - params.setFutureVersionETLEnabled(true).setEtledProxyUserAccount(""); - controllerResponse = controllerClient.updateStore(storeName, params); - etlStoreConfig = assertCommand(controllerClient.getStore(storeName)).getStore().getEtlStoreConfig(); - Assert.assertFalse(etlStoreConfig.isRegularVersionETLEnabled()); - Assert.assertFalse(etlStoreConfig.isFutureVersionETLEnabled()); - Assert.assertTrue( - controllerResponse.getError() - .contains("Cannot enable ETL for this store " + "because etled user proxy account is not set")); - - // test enabling ETL with etl proxy account, expected success - params = new UpdateStoreQueryParams(); - params.setRegularVersionETLEnabled(true).setEtledProxyUserAccount(proxyUser); - params.setFutureVersionETLEnabled(true).setEtledProxyUserAccount(proxyUser); - controllerClient.updateStore(storeName, params); - etlStoreConfig = assertCommand(controllerClient.getStore(storeName)).getStore().getEtlStoreConfig(); - Assert.assertTrue(etlStoreConfig.isRegularVersionETLEnabled()); - Assert.assertTrue(etlStoreConfig.isFutureVersionETLEnabled()); - - // set the ETL back to false - params = new UpdateStoreQueryParams(); - params.setRegularVersionETLEnabled(false); - params.setFutureVersionETLEnabled(false); - controllerClient.updateStore(storeName, params); - etlStoreConfig = assertCommand(controllerClient.getStore(storeName)).getStore().getEtlStoreConfig(); - Assert.assertFalse(etlStoreConfig.isRegularVersionETLEnabled()); - Assert.assertFalse(etlStoreConfig.isFutureVersionETLEnabled()); - - // test enabling ETL again without etl proxy account, expected success - params = new UpdateStoreQueryParams(); - params.setRegularVersionETLEnabled(true); - params.setFutureVersionETLEnabled(true); - controllerClient.updateStore(storeName, params); - etlStoreConfig = assertCommand(controllerClient.getStore(storeName)).getStore().getEtlStoreConfig(); - Assert.assertTrue(etlStoreConfig.isRegularVersionETLEnabled()); - Assert.assertTrue(etlStoreConfig.isFutureVersionETLEnabled()); - } + // Adding store + String storeName = "test_store"; + String owner = "test_owner"; + String keySchemaStr = "\"long\""; + String proxyUser = "test_user"; + Schema valueSchema = generateSchema(false); + try (ControllerClient controllerClient = + new ControllerClient(clusterName, multiRegionMultiClusterWrapper.getControllerConnectString())) { + assertCommand(controllerClient.createNewStore(storeName, owner, keySchemaStr, valueSchema.toString())); + + // Configure the store to hybrid + UpdateStoreQueryParams params = + new UpdateStoreQueryParams().setHybridRewindSeconds(600).setHybridOffsetLagThreshold(10000); + assertCommand(controllerClient.updateStore(storeName, params)); + HybridStoreConfig hybridStoreConfig = + assertCommand(controllerClient.getStore(storeName)).getStore().getHybridStoreConfig(); + Assert.assertEquals(hybridStoreConfig.getRewindTimeInSeconds(), 600); + Assert.assertEquals(hybridStoreConfig.getOffsetLagThresholdToGoOnline(), 10000); + + // Try to update the hybrid store with different hybrid configs + params = new UpdateStoreQueryParams().setHybridRewindSeconds(172800); + assertCommand(controllerClient.updateStore(storeName, params)); + hybridStoreConfig = assertCommand(controllerClient.getStore(storeName)).getStore().getHybridStoreConfig(); + Assert.assertEquals(hybridStoreConfig.getRewindTimeInSeconds(), 172800); + Assert.assertEquals(hybridStoreConfig.getOffsetLagThresholdToGoOnline(), 10000); + + // test enabling ETL without etl proxy account, expected failure + params = new UpdateStoreQueryParams(); + params.setRegularVersionETLEnabled(true); + params.setFutureVersionETLEnabled(true); + ControllerResponse controllerResponse = controllerClient.updateStore(storeName, params); + ETLStoreConfig etlStoreConfig = + assertCommand(controllerClient.getStore(storeName)).getStore().getEtlStoreConfig(); + Assert.assertFalse(etlStoreConfig.isRegularVersionETLEnabled()); + Assert.assertFalse(etlStoreConfig.isFutureVersionETLEnabled()); + Assert.assertTrue( + controllerResponse.getError() + .contains("Cannot enable ETL for this store " + "because etled user proxy account is not set")); + + // test enabling ETL with empty proxy account, expected failure + params = new UpdateStoreQueryParams(); + params.setRegularVersionETLEnabled(true).setEtledProxyUserAccount(""); + params.setFutureVersionETLEnabled(true).setEtledProxyUserAccount(""); + controllerResponse = controllerClient.updateStore(storeName, params); + etlStoreConfig = assertCommand(controllerClient.getStore(storeName)).getStore().getEtlStoreConfig(); + Assert.assertFalse(etlStoreConfig.isRegularVersionETLEnabled()); + Assert.assertFalse(etlStoreConfig.isFutureVersionETLEnabled()); + Assert.assertTrue( + controllerResponse.getError() + .contains("Cannot enable ETL for this store " + "because etled user proxy account is not set")); + + // test enabling ETL with etl proxy account, expected success + params = new UpdateStoreQueryParams(); + params.setRegularVersionETLEnabled(true).setEtledProxyUserAccount(proxyUser); + params.setFutureVersionETLEnabled(true).setEtledProxyUserAccount(proxyUser); + controllerClient.updateStore(storeName, params); + etlStoreConfig = assertCommand(controllerClient.getStore(storeName)).getStore().getEtlStoreConfig(); + Assert.assertTrue(etlStoreConfig.isRegularVersionETLEnabled()); + Assert.assertTrue(etlStoreConfig.isFutureVersionETLEnabled()); + + // set the ETL back to false + params = new UpdateStoreQueryParams(); + params.setRegularVersionETLEnabled(false); + params.setFutureVersionETLEnabled(false); + controllerClient.updateStore(storeName, params); + etlStoreConfig = assertCommand(controllerClient.getStore(storeName)).getStore().getEtlStoreConfig(); + Assert.assertFalse(etlStoreConfig.isRegularVersionETLEnabled()); + Assert.assertFalse(etlStoreConfig.isFutureVersionETLEnabled()); + + // test enabling ETL again without etl proxy account, expected success + params = new UpdateStoreQueryParams(); + params.setRegularVersionETLEnabled(true); + params.setFutureVersionETLEnabled(true); + controllerClient.updateStore(storeName, params); + etlStoreConfig = assertCommand(controllerClient.getStore(storeName)).getStore().getEtlStoreConfig(); + Assert.assertTrue(etlStoreConfig.isRegularVersionETLEnabled()); + Assert.assertTrue(etlStoreConfig.isFutureVersionETLEnabled()); } } @Test(timeOut = DEFAULT_TEST_TIMEOUT_MS) public void testSupersetSchemaWithCustomSupersetSchemaGenerator() throws IOException { - String clusterName = Utils.getUniqueString("testSupersetSchemaWithCustomSupersetSchemaGenerator"); final String CUSTOM_PROP = "custom_prop"; // Contains f0, f1 Schema valueSchemaV1 = @@ -409,30 +383,34 @@ public void testSupersetSchemaWithCustomSupersetSchemaGenerator() throws IOExcep properties .put(VeniceControllerWrapper.SUPERSET_SCHEMA_GENERATOR, new SupersetSchemaGeneratorWithCustomProp(CUSTOM_PROP)); - try (ZkServerWrapper zkServer = ServiceFactory.getZkServer(); - PubSubBrokerWrapper pubSubBrokerWrapper = ServiceFactory.getPubSubBroker( - new PubSubBrokerConfigs.Builder().setZkWrapper(zkServer) - .setRegionName(DEFAULT_PARENT_DATA_CENTER_REGION_NAME) - .build()); - VeniceControllerWrapper childControllerWrapper = ServiceFactory.getVeniceController( - new VeniceControllerCreateOptions.Builder(clusterName, zkServer, pubSubBrokerWrapper) - .regionName(CHILD_REGION_NAME_PREFIX + "0") - .build()); - ZkServerWrapper parentZk = ServiceFactory.getZkServer(); - VeniceControllerWrapper parentControllerWrapper = ServiceFactory.getVeniceController( - new VeniceControllerCreateOptions.Builder(clusterName, parentZk, pubSubBrokerWrapper) - .childControllers(new VeniceControllerWrapper[] { childControllerWrapper }) - .extraProperties(properties) - .build())) { - String parentControllerUrl = parentControllerWrapper.getControllerUrl(); - try (ControllerClient parentControllerClient = new ControllerClient(clusterName, parentControllerUrl)) { + try (VeniceTwoLayerMultiRegionMultiClusterWrapper twoLayerMultiRegionMultiClusterWrapper = + ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper( + 1, + 1, + 1, + 1, + 0, + 0, + 1, + Optional.of(properties), + Optional.empty(), + Optional.empty())) { + String parentControllerUrl = twoLayerMultiRegionMultiClusterWrapper.getControllerConnectString(); + try (ControllerClient parentControllerClient = + new ControllerClient(twoLayerMultiRegionMultiClusterWrapper.getClusterNames()[0], parentControllerUrl)) { + TestUtils.waitForNonDeterministicAssertion( + 30, + TimeUnit.SECONDS, + false, + true, + () -> parentControllerClient.getLeaderControllerUrl()); // Create a new store String storeName = Utils.getUniqueString("test_store_"); String owner = "test_owner"; String keySchemaStr = "\"long\""; String valueSchemaStr = valueSchemaV1.toString(); - NewStoreResponse newStoreResponse = - parentControllerClient.createNewStore(storeName, owner, keySchemaStr, valueSchemaStr); + NewStoreResponse newStoreResponse = parentControllerClient + .retryableRequest(10, cc -> cc.createNewStore(storeName, owner, keySchemaStr, valueSchemaStr)); Assert.assertNotNull(newStoreResponse); Assert.assertFalse(newStoreResponse.isError(), "error in newStoreResponse: " + newStoreResponse.getError()); // Enable write compute @@ -552,7 +530,6 @@ public static Object[][] controllerSSLAndSupersetSchemaGenerator() { public void testStoreMetaDataUpdateFromParentToChildController( boolean isControllerSslEnabled, boolean isSupersetSchemaGeneratorEnabled) throws IOException { - String clusterName = Utils.getUniqueString("testStoreMetadataUpdate"); Properties properties = new Properties(); // This cluster setup don't have server, we cannot perform push here. properties.setProperty(CONTROLLER_AUTO_MATERIALIZE_META_SYSTEM_STORE, String.valueOf(false)); @@ -965,7 +942,7 @@ private void testWriteComputeSchemaAutoGeneration(ControllerClient parentControl // Validate that the controller generates the correct schema. Assert.assertEquals(registeredWriteComputeSchema.get(0).getSchemaStr(), expectedWriteComputeSchema.toString()); - // Step 4. Add more value schemas and expect to get their corresponding write compute schemas. + // Step 4. Add more value schemas and expect to get their corresponding write-compute schemas. parentControllerClient.addValueSchema(storeName, BAD_VALUE_SCHEMA_FOR_WRITE_COMPUTE_V2); // This won't generate any // derived schema parentControllerClient.addValueSchema(storeName, VALUE_SCHEMA_FOR_WRITE_COMPUTE_V3); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/ActiveActiveReplicationForHybridTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/ActiveActiveReplicationForHybridTest.java index d152c9acb39..4387fcf9c12 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/ActiveActiveReplicationForHybridTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/ActiveActiveReplicationForHybridTest.java @@ -154,9 +154,9 @@ public void setUp() { 2, 1, 2, - Optional.of(new VeniceProperties(controllerProps)), Optional.of(controllerProps), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(controllerProps), + Optional.of(serverProperties), false); childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); parentControllers = multiRegionMultiClusterWrapper.getParentControllers(); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClusterAgnosticTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClusterAgnosticTest.java index a6475f264ae..cc550f3bed3 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClusterAgnosticTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClusterAgnosticTest.java @@ -35,9 +35,9 @@ import io.tehuti.metrics.MetricsRepository; import java.util.AbstractMap; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -72,6 +72,8 @@ public class DaVinciClusterAgnosticTest { @BeforeClass public void setUp() { Utils.thisIsLocalhost(); + Properties parentControllerProps = new Properties(); + parentControllerProps.put(OFFLINE_JOB_START_TIMEOUT_MS, "180000"); multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper( 1, 2, @@ -80,7 +82,7 @@ public void setUp() { 3, 1, 3, - Optional.of(new VeniceProperties(Collections.singletonMap(OFFLINE_JOB_START_TIMEOUT_MS, "180000"))), + Optional.of(parentControllerProps), Optional.empty(), Optional.empty(), false); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DataRecoveryTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DataRecoveryTest.java index 2df1df779ec..2258b83ec1f 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DataRecoveryTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DataRecoveryTest.java @@ -48,7 +48,6 @@ import com.linkedin.venice.samza.VeniceSystemFactory; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import java.util.AbstractMap; import java.util.Arrays; import java.util.HashMap; @@ -107,9 +106,9 @@ public void setUp() { 2, 1, 2, - Optional.of(new VeniceProperties(controllerProps)), Optional.of(controllerProps), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(controllerProps), + Optional.of(serverProperties), false); childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); parentControllers = multiRegionMultiClusterWrapper.getParentControllers(); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/NearlineE2ELatencyTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/NearlineE2ELatencyTest.java index ed5a09defeb..3d7f73f8877 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/NearlineE2ELatencyTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/NearlineE2ELatencyTest.java @@ -1,8 +1,9 @@ package com.linkedin.venice.endToEnd; -import static com.linkedin.venice.ConfigKeys.*; -import static com.linkedin.venice.utils.IntegrationTestPushUtils.*; -import static com.linkedin.venice.utils.TestWriteUtils.*; +import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS; +import static com.linkedin.venice.utils.IntegrationTestPushUtils.getSamzaProducer; +import static com.linkedin.venice.utils.IntegrationTestPushUtils.sendStreamingRecord; +import static com.linkedin.venice.utils.TestWriteUtils.STRING_SCHEMA; import com.linkedin.davinci.stats.IngestionStats; import com.linkedin.venice.client.store.AvroGenericStoreClient; @@ -23,7 +24,6 @@ import com.linkedin.venice.server.VeniceServer; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import io.tehuti.metrics.MetricsRepository; import java.util.AbstractMap; import java.util.Arrays; @@ -77,7 +77,7 @@ public void setUp() { REPLICATION_FACTOR, Optional.empty(), Optional.empty(), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(serverProperties), false); childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); parentControllers = multiRegionMultiClusterWrapper.getParentControllers(); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/OneTouchDataRecoveryTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/OneTouchDataRecoveryTest.java index b6a745695f9..f864097b610 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/OneTouchDataRecoveryTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/OneTouchDataRecoveryTest.java @@ -17,7 +17,6 @@ import com.linkedin.venice.meta.Version; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import java.util.AbstractMap; import java.util.Arrays; import java.util.List; @@ -67,7 +66,7 @@ public void setUp() { REPLICATION_FACTOR, Optional.empty(), Optional.empty(), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(serverProperties), false); childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java index c5c40ea9b69..d060c59ef3f 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java @@ -72,7 +72,6 @@ import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.writer.update.UpdateBuilder; import com.linkedin.venice.writer.update.UpdateBuilderImpl; import java.io.ByteArrayOutputStream; @@ -130,9 +129,9 @@ public void setUp() { 2, 1, 2, - Optional.of(new VeniceProperties(controllerProps)), + Optional.of(controllerProps), Optional.of(new Properties(controllerProps)), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(serverProperties), false); this.childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); List parentControllers = multiRegionMultiClusterWrapper.getParentControllers(); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushJobDetailsTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushJobDetailsTest.java index 29281975bc0..5c0bf971c1d 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushJobDetailsTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushJobDetailsTest.java @@ -50,7 +50,6 @@ import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import java.io.File; import java.io.IOException; import java.util.Arrays; @@ -96,9 +95,9 @@ public void setUp() throws IOException { 1, 1, 1, - Optional.of(new VeniceProperties(parentControllerProperties)), + Optional.of(parentControllerProperties), Optional.empty(), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(serverProperties), false); String clusterName = multiRegionMultiClusterWrapper.getClusterNames()[0]; diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreMultiColoTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreMultiColoTest.java index 9b2054ff28d..e59a197dcf5 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreMultiColoTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreMultiColoTest.java @@ -33,7 +33,6 @@ import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import java.util.List; import java.util.Optional; import java.util.Properties; @@ -85,7 +84,7 @@ public void setUp() { NUMBER_OF_SERVERS, NUMBER_OF_ROUTERS, REPLICATION_FACTOR, - Optional.of(new VeniceProperties(extraProperties)), + Optional.of(extraProperties), Optional.of(extraProperties), Optional.empty(), false); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java index 7d35504bddf..8f92cae835b 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java @@ -70,7 +70,6 @@ import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.view.TestView; import com.linkedin.venice.views.ChangeCaptureView; import com.linkedin.venice.writer.VeniceWriter; @@ -137,7 +136,7 @@ public void setUp() { 1, Optional.empty(), Optional.empty(), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(serverProperties), false); childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationForIncPush.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationForIncPush.java index 0473ddba533..8d8eb02ddd7 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationForIncPush.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationForIncPush.java @@ -26,7 +26,6 @@ import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import java.io.File; import java.util.List; import java.util.Optional; @@ -83,9 +82,9 @@ public void setUp() { 2, 1, 2, - Optional.of(new VeniceProperties(controllerProps)), Optional.of(controllerProps), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(controllerProps), + Optional.of(serverProperties), false); childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); clusterNames = multiRegionMultiClusterWrapper.getClusterNames(); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationWithDownRegion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationWithDownRegion.java index 633188f9492..25a940ffb20 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationWithDownRegion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationWithDownRegion.java @@ -29,7 +29,6 @@ import com.linkedin.venice.samza.VeniceSystemProducer; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import java.util.Collections; import java.util.List; import java.util.Map; @@ -100,9 +99,9 @@ public void setUp() { 2, 1, 2, - Optional.of(new VeniceProperties(controllerProps)), Optional.of(controllerProps), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(controllerProps), + Optional.of(serverProperties), false); childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); parentControllers = multiRegionMultiClusterWrapper.getParentControllers(); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestChangeCaptureIngestion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestChangeCaptureIngestion.java index 069041a8c1a..d5c03f8a419 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestChangeCaptureIngestion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestChangeCaptureIngestion.java @@ -1,14 +1,33 @@ package com.linkedin.venice.endToEnd; -import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.*; -import static com.linkedin.venice.CommonConfigKeys.*; -import static com.linkedin.venice.ConfigKeys.*; -import static com.linkedin.venice.hadoop.VenicePushJob.*; +import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; +import static com.linkedin.venice.CommonConfigKeys.SSL_ENABLED; +import static com.linkedin.venice.ConfigKeys.CHILD_DATA_CENTER_KAFKA_URL_PREFIX; +import static com.linkedin.venice.hadoop.VenicePushJob.DEFAULT_KEY_FIELD_PROP; +import static com.linkedin.venice.hadoop.VenicePushJob.DEFAULT_VALUE_FIELD_PROP; +import static com.linkedin.venice.hadoop.VenicePushJob.KAFKA_INPUT_BROKER_URL; +import static com.linkedin.venice.hadoop.VenicePushJob.KAFKA_INPUT_MAX_RECORDS_PER_MAPPER; +import static com.linkedin.venice.hadoop.VenicePushJob.REPUSH_TTL_ENABLE; +import static com.linkedin.venice.hadoop.VenicePushJob.REWIND_TIME_IN_SECONDS_OVERRIDE; +import static com.linkedin.venice.hadoop.VenicePushJob.SOURCE_KAFKA; import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.STANDALONE_REGION_NAME; -import static com.linkedin.venice.integration.utils.VeniceControllerWrapper.*; -import static com.linkedin.venice.samza.VeniceSystemFactory.*; -import static com.linkedin.venice.utils.IntegrationTestPushUtils.*; -import static com.linkedin.venice.utils.TestWriteUtils.*; +import static com.linkedin.venice.integration.utils.VeniceControllerWrapper.D2_SERVICE_NAME; +import static com.linkedin.venice.integration.utils.VeniceControllerWrapper.PARENT_D2_SERVICE_NAME; +import static com.linkedin.venice.samza.VeniceSystemFactory.DEPLOYMENT_ID; +import static com.linkedin.venice.samza.VeniceSystemFactory.DOT; +import static com.linkedin.venice.samza.VeniceSystemFactory.SYSTEMS_PREFIX; +import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_AGGREGATE; +import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_CHILD_CONTROLLER_D2_SERVICE; +import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_CHILD_D2_ZK_HOSTS; +import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_PARENT_CONTROLLER_D2_SERVICE; +import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_PARENT_D2_ZK_HOSTS; +import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_PUSH_TYPE; +import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_STORE; +import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob; +import static com.linkedin.venice.utils.IntegrationTestPushUtils.sendStreamingDeleteRecord; +import static com.linkedin.venice.utils.IntegrationTestPushUtils.sendStreamingRecord; +import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory; +import static com.linkedin.venice.utils.TestWriteUtils.writeSimpleAvroFileWithUserSchema; import com.linkedin.davinci.consumer.ChangeEvent; import com.linkedin.davinci.consumer.ChangelogClientConfig; @@ -46,7 +65,6 @@ import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.view.TestView; import com.linkedin.venice.views.ChangeCaptureView; import com.linkedin.venice.writer.VeniceWriter; @@ -108,7 +126,7 @@ public void setUp() { 1, Optional.empty(), Optional.empty(), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(serverProperties), false); childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMultiDataCenterAdminOperations.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMultiDataCenterAdminOperations.java index dfd3a3e9f40..67fd2815614 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMultiDataCenterAdminOperations.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMultiDataCenterAdminOperations.java @@ -20,7 +20,6 @@ import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; import com.linkedin.venice.writer.VeniceWriterOptions; @@ -71,7 +70,7 @@ public void setUp() { 1, Optional.empty(), Optional.empty(), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(serverProperties), false); childClusters = multiRegionMultiClusterWrapper.getChildRegions(); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPartialUpdateWithActiveActiveReplication.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPartialUpdateWithActiveActiveReplication.java index 50bafab747d..f0d12155910 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPartialUpdateWithActiveActiveReplication.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPartialUpdateWithActiveActiveReplication.java @@ -46,7 +46,6 @@ import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.writer.update.UpdateBuilder; import com.linkedin.venice.writer.update.UpdateBuilderImpl; import java.io.IOException; @@ -135,9 +134,9 @@ public void setUp() throws IOException { 2, 1, 2, - Optional.of(new VeniceProperties(controllerProps)), Optional.of(controllerProps), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(controllerProps), + Optional.of(serverProperties), false); parentControllers = multiRegionMultiClusterWrapper.getParentControllers(); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobVersionCleanup.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobVersionCleanup.java index f3cd8e5bbf8..ec282dad8c3 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobVersionCleanup.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobVersionCleanup.java @@ -18,7 +18,6 @@ import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import java.io.File; import java.util.List; import java.util.Optional; @@ -61,9 +60,9 @@ public void setUp() { 1, 1, 1, - Optional.of(new VeniceProperties(controllerProps)), Optional.of(controllerProps), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(controllerProps), + Optional.of(serverProperties), false); childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithEmergencySourceRegionSelection.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithEmergencySourceRegionSelection.java index 500d6a49f31..ce729dabc69 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithEmergencySourceRegionSelection.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithEmergencySourceRegionSelection.java @@ -29,7 +29,6 @@ import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import java.io.File; import java.util.List; import java.util.Optional; @@ -98,9 +97,9 @@ public void setUp() { 2, 1, 2, - Optional.of(new VeniceProperties(controllerProps)), Optional.of(controllerProps), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(controllerProps), + Optional.of(serverProperties), false); childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); parentControllers = multiRegionMultiClusterWrapper.getParentControllers(); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java index 02589ccf5ca..30921fb26c9 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java @@ -89,7 +89,6 @@ import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; import com.linkedin.venice.writer.VeniceWriterOptions; @@ -178,9 +177,9 @@ public void setUp() { 2, 1, 2, - Optional.of(new VeniceProperties(controllerProps)), Optional.of(controllerProps), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(controllerProps), + Optional.of(serverProperties), false); childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); parentControllers = multiRegionMultiClusterWrapper.getParentControllers(); @@ -455,18 +454,17 @@ public void testNativeReplicationForIncrementalPush() throws Exception { }); } - @Test(timeOut = TEST_TIMEOUT, dataProvider = "storeSize") - public void testActiveActiveForHeartbeatSystemStores(int recordCount, int partitionCount) throws Exception { + @Test(timeOut = TEST_TIMEOUT) + public void testActiveActiveForHeartbeatSystemStores() throws Exception { + int recordCount = 50; + int partitionCount = 2; motherOfAllTests( "testActiveActiveForHeartbeatSystemStores", updateStoreQueryParams -> updateStoreQueryParams.setPartitionCount(partitionCount) .setIncrementalPushEnabled(true), recordCount, (parentControllerClient, clusterName, storeName, props, inputDir) -> { - // Enable VPJ to send liveness heartbeat. - props.put(BatchJobHeartbeatConfigs.HEARTBEAT_ENABLED_CONFIG.getConfigName(), true); - // Prevent heartbeat from being deleted when the VPJ run finishes. - props.put(BatchJobHeartbeatConfigs.HEARTBEAT_LAST_HEARTBEAT_IS_DELETE_CONFIG.getConfigName(), false); + LOGGER.info("nisargthakkar Set up complete"); try ( ControllerClient dc0Client = @@ -478,6 +476,13 @@ public void testActiveActiveForHeartbeatSystemStores(int recordCount, int partit .verifyDCConfigNativeRepl(Arrays.asList(dc0Client, dc1Client), VPJ_HEARTBEAT_STORE_NAME, true); } + LOGGER.info("nisargthakkar Running Test push job"); + + // Enable VPJ to send liveness heartbeat. + props.put(BatchJobHeartbeatConfigs.HEARTBEAT_ENABLED_CONFIG.getConfigName(), true); + // Prevent heartbeat from being deleted when the VPJ run finishes. + props.put(BatchJobHeartbeatConfigs.HEARTBEAT_LAST_HEARTBEAT_IS_DELETE_CONFIG.getConfigName(), false); + try (VenicePushJob job = new VenicePushJob("Test push job", props)) { job.run(); @@ -485,6 +490,8 @@ public void testActiveActiveForHeartbeatSystemStores(int recordCount, int partit Assert.assertEquals(job.getKafkaUrl(), childDatacenters.get(0).getKafkaBrokerWrapper().getAddress()); } + LOGGER.info("nisargthakkar Test push job finished"); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { // Current version should become 1 for (int version: parentControllerClient.getStore(storeName) @@ -494,6 +501,8 @@ public void testActiveActiveForHeartbeatSystemStores(int recordCount, int partit Assert.assertEquals(version, 1); } + LOGGER.info("nisargthakkar current version is 1 in all colo"); + // Verify that the data are in all child fabrics including the first child fabric which consumes remotely. for (VeniceMultiClusterWrapper childDataCenter: childDatacenters) { String routerUrl = childDataCenter.getClusters().get(clusterName).getRandomRouterURL(); @@ -1035,8 +1044,9 @@ private void motherOfAllTests( String valueSchemaStr = recordSchema.getField(DEFAULT_VALUE_FIELD_PROP).schema().toString(); try { + LOGGER.info("nisargthakkar creating store"); createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, updateStoreParams).close(); - + LOGGER.info("nisargthakkar created store"); childDatacenters.get(0) .getClusters() .get(clusterName) @@ -1046,6 +1056,7 @@ private void motherOfAllTests( .get(clusterName) .useControllerClient( dc1Client -> TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + LOGGER.info("nisargthakkar Validating update store"); // verify the update store command has taken effect before starting the push job. StoreInfo store = dc0Client.getStore(storeName).getStore(); Assert.assertNotNull(store); @@ -1053,11 +1064,15 @@ private void motherOfAllTests( store = dc1Client.getStore(storeName).getStore(); Assert.assertNotNull(store); Assert.assertEquals(store.getStorageQuotaInByte(), Store.UNLIMITED_STORAGE_QUOTA); + LOGGER.info("nisargthakkar Validated update store"); }))); + LOGGER.info("nisargthakkar Ensuring system stores are pushed"); makeSureSystemStoreIsPushed(clusterName, storeName); + LOGGER.info("nisargthakkar System stores are pushed"); try (ControllerClient parentControllerClient = ControllerClient.constructClusterControllerClient(clusterName, parentControllerUrls)) { + LOGGER.info("nisargthakkar Triggering test"); test.run(parentControllerClient, clusterName, storeName, props, inputDir); } } finally { diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplicationSharedProducer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplicationSharedProducer.java index 63897dcccd4..cda45fd834e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplicationSharedProducer.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplicationSharedProducer.java @@ -26,7 +26,6 @@ import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import java.io.File; import java.util.Arrays; import java.util.List; @@ -97,9 +96,9 @@ public void setUp() { 2, 1, 2, - Optional.of(new VeniceProperties(controllerProps)), Optional.of(controllerProps), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(controllerProps), + Optional.of(serverProperties), false); childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithSourceGridFabricSelection.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithSourceGridFabricSelection.java index bed2b9214de..f0320d00cf0 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithSourceGridFabricSelection.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithSourceGridFabricSelection.java @@ -29,7 +29,6 @@ import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import java.io.File; import java.util.List; import java.util.Optional; @@ -80,9 +79,9 @@ public void setUp() { 2, 1, 2, - Optional.of(new VeniceProperties(controllerProps)), Optional.of(controllerProps), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(controllerProps), + Optional.of(serverProperties), false); childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); parentControllerRegionName = multiRegionMultiClusterWrapper.getParentRegionName() + ".parent"; diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStaleDataVisibility.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStaleDataVisibility.java index f41a4549ca4..90e59c4e54e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStaleDataVisibility.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStaleDataVisibility.java @@ -20,7 +20,6 @@ import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import java.io.File; import java.util.ArrayList; import java.util.List; @@ -69,7 +68,7 @@ public void setUp() { 1, Optional.empty(), Optional.of(childControllerProperties), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(serverProperties), false); childClusters = multiRegionMultiClusterWrapper.getChildRegions(); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreGraveyardCleanupService.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreGraveyardCleanupService.java index 679fe397ac8..1e65af29e15 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreGraveyardCleanupService.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreGraveyardCleanupService.java @@ -17,7 +17,6 @@ import com.linkedin.venice.meta.StoreGraveyard; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import java.util.List; import java.util.Optional; import java.util.Properties; @@ -58,9 +57,9 @@ public void setUp() { 1, 1, 1, - Optional.of(new VeniceProperties(parentControllerProperties)), + Optional.of(parentControllerProperties), Optional.empty(), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(serverProperties), false); childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java index f791a10547c..e71f989ce5b 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java @@ -110,9 +110,9 @@ public void setUp() { 2, 1, 2, - Optional.of(new VeniceProperties(parentControllerProperties)), + Optional.of(parentControllerProperties), Optional.empty(), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(serverProperties), false); multiClusterWrapper = twoLayerMultiRegionMultiClusterWrapper.getChildRegions().get(0); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java index dc3fdc20f96..9096f753605 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java @@ -426,9 +426,9 @@ public static VeniceTwoLayerMultiRegionMultiClusterWrapper getVeniceTwoLayerMult int numberOfServers, int numberOfRouters, int replicationFactor, - Optional parentControllerProps, + Optional parentControllerProps, Optional childControllerProperties, - Optional serverProps) { + Optional serverProps) { return getService( VeniceTwoLayerMultiRegionMultiClusterWrapper.SERVICE_NAME, VeniceTwoLayerMultiRegionMultiClusterWrapper.generateService( @@ -453,9 +453,9 @@ public static VeniceTwoLayerMultiRegionMultiClusterWrapper getVeniceTwoLayerMult int numberOfServers, int numberOfRouters, int replicationFactor, - Optional parentControllerProps, + Optional parentControllerProps, Optional childControllerProperties, - Optional serverProps, + Optional serverProps, boolean forkServer) { return getService( VeniceTwoLayerMultiRegionMultiClusterWrapper.SERVICE_NAME, diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterCreateOptions.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterCreateOptions.java index 74ae82b6feb..47a3e0ad4ed 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterCreateOptions.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterCreateOptions.java @@ -9,7 +9,6 @@ import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_SSL_TO_STORAGE_NODES; import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.STANDALONE_REGION_NAME; -import com.linkedin.venice.utils.VeniceProperties; import java.util.Collections; import java.util.Map; import java.util.Properties; @@ -35,7 +34,7 @@ public class VeniceMultiClusterCreateOptions { private final ZkServerWrapper zkServerWrapper; private final PubSubBrokerWrapper pubSubBrokerWrapper; private final Properties childControllerProperties; - private final VeniceProperties veniceProperties; + private final Properties extraProperties; public String getRegionName() { return regionName; @@ -113,8 +112,8 @@ public Properties getChildControllerProperties() { return childControllerProperties; } - public VeniceProperties getVeniceProperties() { - return veniceProperties; + public Properties getExtraProperties() { + return extraProperties; } @Override @@ -169,7 +168,7 @@ public String toString() { .append(childControllerProperties) .append(", ") .append("veniceProperties:") - .append(veniceProperties) + .append(extraProperties) .append(", ") .append(", ") .append("zk:") @@ -201,7 +200,7 @@ private VeniceMultiClusterCreateOptions(Builder builder) { zkServerWrapper = builder.zkServerWrapper; pubSubBrokerWrapper = builder.pubSubBrokerWrapper; childControllerProperties = builder.childControllerProperties; - veniceProperties = builder.veniceProperties; + extraProperties = builder.extraProperties; forkServer = builder.forkServer; kafkaClusterMap = builder.kafkaClusterMap; } @@ -227,7 +226,7 @@ public static class Builder { private ZkServerWrapper zkServerWrapper; private PubSubBrokerWrapper pubSubBrokerWrapper; private Properties childControllerProperties; - private VeniceProperties veniceProperties; + private Properties extraProperties; public Builder(int numberOfClusters) { this.numberOfClusters = numberOfClusters; @@ -324,8 +323,8 @@ public Builder childControllerProperties(Properties childControllerProperties) { return this; } - public Builder veniceProperties(VeniceProperties veniceProperties) { - this.veniceProperties = veniceProperties; + public Builder extraProperties(Properties extraProperties) { + this.extraProperties = extraProperties; return this; } @@ -336,8 +335,8 @@ private void addDefaults() { if (childControllerProperties == null) { childControllerProperties = new Properties(); } - if (veniceProperties == null) { - veniceProperties = VeniceProperties.empty(); + if (extraProperties == null) { + extraProperties = new Properties(); } if (kafkaClusterMap == null) { kafkaClusterMap = Collections.emptyMap(); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterWrapper.java index 91dbddff146..50de4818dae 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceMultiClusterWrapper.java @@ -126,7 +126,7 @@ static ServiceProvider generateService(VeniceMultiClu controllerMap.put(controllerWrapper.getPort(), controllerWrapper); } // Specify the system store cluster name - Properties extraProperties = options.getVeniceProperties().toProperties(); + Properties extraProperties = options.getExtraProperties(); extraProperties.put(SYSTEM_SCHEMA_CLUSTER_NAME, clusterNames[0]); extraProperties.putAll(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration())); pubBrokerDetails.forEach((key, value) -> extraProperties.putIfAbsent(key, value)); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java index 09ba9b77696..4eb0cd91e26 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java @@ -1,34 +1,11 @@ package com.linkedin.venice.integration.utils; -import static com.linkedin.venice.ConfigKeys.ACTIVE_ACTIVE_REAL_TIME_SOURCE_FABRIC_LIST; -import static com.linkedin.venice.ConfigKeys.ADMIN_TOPIC_REMOTE_CONSUMPTION_ENABLED; -import static com.linkedin.venice.ConfigKeys.ADMIN_TOPIC_SOURCE_REGION; -import static com.linkedin.venice.ConfigKeys.AGGREGATE_REAL_TIME_SOURCE_REGION; -import static com.linkedin.venice.ConfigKeys.CHILD_DATA_CENTER_KAFKA_URL_PREFIX; -import static com.linkedin.venice.ConfigKeys.ENABLE_NATIVE_REPLICATION_AS_DEFAULT_FOR_BATCH_ONLY; -import static com.linkedin.venice.ConfigKeys.ENABLE_NATIVE_REPLICATION_AS_DEFAULT_FOR_HYBRID; -import static com.linkedin.venice.ConfigKeys.ENABLE_NATIVE_REPLICATION_AS_DEFAULT_FOR_INCREMENTAL_PUSH; -import static com.linkedin.venice.ConfigKeys.ENABLE_NATIVE_REPLICATION_FOR_BATCH_ONLY; -import static com.linkedin.venice.ConfigKeys.ENABLE_NATIVE_REPLICATION_FOR_HYBRID; -import static com.linkedin.venice.ConfigKeys.ENABLE_NATIVE_REPLICATION_FOR_INCREMENTAL_PUSH; -import static com.linkedin.venice.ConfigKeys.KAFKA_CLUSTER_MAP_KEY_NAME; -import static com.linkedin.venice.ConfigKeys.KAFKA_CLUSTER_MAP_KEY_OTHER_URLS; -import static com.linkedin.venice.ConfigKeys.KAFKA_CLUSTER_MAP_KEY_URL; -import static com.linkedin.venice.ConfigKeys.KAFKA_CLUSTER_MAP_SECURITY_PROTOCOL; -import static com.linkedin.venice.ConfigKeys.KAFKA_SECURITY_PROTOCOL; -import static com.linkedin.venice.ConfigKeys.NATIVE_REPLICATION_FABRIC_ALLOWLIST; -import static com.linkedin.venice.ConfigKeys.NATIVE_REPLICATION_SOURCE_FABRIC_AS_DEFAULT_FOR_BATCH_ONLY_STORES; -import static com.linkedin.venice.ConfigKeys.NATIVE_REPLICATION_SOURCE_FABRIC_AS_DEFAULT_FOR_HYBRID_STORES; -import static com.linkedin.venice.ConfigKeys.NATIVE_REPLICATION_SOURCE_FABRIC_AS_DEFAULT_FOR_INCREMENTAL_PUSH_STORES; -import static com.linkedin.venice.ConfigKeys.PARENT_KAFKA_CLUSTER_FABRIC_LIST; -import static com.linkedin.venice.ConfigKeys.PARTICIPANT_MESSAGE_STORE_ENABLED; -import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.CHILD_REGION_NAME_PREFIX; -import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_PARENT_DATA_CENTER_REGION_NAME; +import static com.linkedin.venice.ConfigKeys.*; +import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.*; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import java.io.File; import java.util.ArrayList; import java.util.Arrays; @@ -84,8 +61,8 @@ static ServiceProvider generateSer int numberOfServers, int numberOfRouters, int replicationFactor, - Optional parentControllerProperties, - Optional serverProperties) { + Optional parentControllerProperties, + Optional serverProperties) { return generateService( numberOfRegions, numberOfClustersInEachRegion, @@ -108,9 +85,9 @@ static ServiceProvider generateSer int numberOfServers, int numberOfRouters, int replicationFactor, - Optional parentControllerPropertiesOverride, + Optional parentControllerPropertiesOverride, Optional childControllerPropertiesOverride, - Optional serverProperties, + Optional serverProperties, boolean forkServer) { String parentRegionName = DEFAULT_PARENT_DATA_CENTER_REGION_NAME; final List parentControllers = new ArrayList<>(numberOfParentControllers); @@ -135,12 +112,11 @@ static ServiceProvider generateSer .build()); allPubSubBrokerWrappers.add(parentPubSubBrokerWrapper); - Properties parentControllerProps = parentControllerPropertiesOverride.isPresent() - ? parentControllerPropertiesOverride.get().getPropertiesCopy() - : new Properties(); + Properties parentControllerProps = + parentControllerPropertiesOverride.isPresent() ? parentControllerPropertiesOverride.get() : new Properties(); /** Enable participant system store by default in a two-layer multi-region set-up */ parentControllerProps.setProperty(PARTICIPANT_MESSAGE_STORE_ENABLED, "true"); - parentControllerPropertiesOverride = Optional.of(new VeniceProperties(parentControllerProps)); + parentControllerPropertiesOverride = Optional.of(parentControllerProps); Map clusterToD2 = new HashMap<>(); Map clusterToServerD2 = new HashMap<>(); @@ -186,7 +162,7 @@ static ServiceProvider generateSer final Properties finalParentControllerProperties = new Properties(); finalParentControllerProperties.putAll(defaultParentControllerProps); - parentControllerPropertiesOverride.ifPresent(p -> finalParentControllerProperties.putAll(p.getPropertiesCopy())); + parentControllerPropertiesOverride.ifPresent(finalParentControllerProperties::putAll); Properties nativeReplicationRequiredChildControllerProps = new Properties(); nativeReplicationRequiredChildControllerProps.put(ADMIN_TOPIC_SOURCE_REGION, parentRegionName); @@ -225,10 +201,9 @@ static ServiceProvider generateSer finalChildControllerProperties.putAll(pubSubBrokerProps); // child controllers Properties additionalServerProps = new Properties(); - serverProperties - .ifPresent(veniceProperties -> additionalServerProps.putAll(veniceProperties.getPropertiesCopy())); + serverProperties.ifPresent(additionalServerProps::putAll); additionalServerProps.putAll(pubSubBrokerProps); - serverProperties = Optional.of(new VeniceProperties(additionalServerProps)); + serverProperties = Optional.of(additionalServerProps); VeniceMultiClusterCreateOptions.Builder builder = new VeniceMultiClusterCreateOptions.Builder(numberOfClustersInEachRegion) @@ -239,7 +214,7 @@ static ServiceProvider generateSer .randomizeClusterName(false) .multiRegionSetup(true) .childControllerProperties(finalChildControllerProperties) - .veniceProperties(serverProperties.orElse(null)) + .extraProperties(serverProperties.orElse(null)) .forkServer(forkServer) .kafkaClusterMap(kafkaClusterMap); // Create multi-clusters @@ -299,12 +274,12 @@ static ServiceProvider generateSer } private static Map> addKafkaClusterIDMappingToServerConfigs( - Optional serverProperties, + Optional serverProperties, List regionNames, List kafkaBrokers) { if (serverProperties.isPresent()) { SecurityProtocol baseSecurityProtocol = SecurityProtocol - .valueOf(serverProperties.get().getString(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.PLAINTEXT.name)); + .valueOf(serverProperties.get().getProperty(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.PLAINTEXT.name)); Map> kafkaClusterMap = new HashMap<>(); Map mapping; diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion.java index 73e6d234763..b251afdfd83 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion.java @@ -50,7 +50,6 @@ import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.view.TestView; import com.linkedin.venice.writer.PutMetadata; import com.linkedin.venice.writer.VeniceWriter; @@ -122,7 +121,7 @@ public void setUp() throws Exception { 1, Optional.empty(), Optional.empty(), - Optional.of(new VeniceProperties(serverProperties)), + Optional.of(serverProperties), false); List childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java index 02b6c183780..d1c9f40f7b2 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java @@ -120,10 +120,6 @@ public void setUncompletedPartitions(List uncompletedParti boolean isClusterValid(String clusterName); - default boolean isBatchJobHeartbeatEnabled() { - return false; - } - default void createStore(String clusterName, String storeName, String owner, String keySchema, String valueSchema) { createStore(clusterName, storeName, owner, keySchema, valueSchema, false, Optional.empty()); } @@ -344,16 +340,37 @@ SchemaEntry addValueSchema( * This method skips most of precondition checks and is intended for only internal use. * Code from outside should call * {@link #addValueSchema(String, String, String, DirectionalSchemaCompatibilityType)} instead. - * - * TODO: make it private and remove from the interface list */ - SchemaEntry addValueSchema( + SchemaEntry addValueSchemaInternal( String clusterName, String storeName, String valueSchemaStr, int schemaId, + DirectionalSchemaCompatibilityType expectedCompatibilityType, boolean doUpdateSupersetSchemaID); + /** + * This method skips most precondition checks and is intended for only internal use. + * Code from outside should call + * {@link #addValueSchema(String, String, String, DirectionalSchemaCompatibilityType)} instead. + * + * @see #addValueSchemaInternal(String, String, String, int, DirectionalSchemaCompatibilityType, boolean) + */ + default SchemaEntry addValueSchemaInternal( + String clusterName, + String storeName, + String valueSchemaStr, + int schemaId, + boolean doUpdateSupersetSchemaID) { + return addValueSchemaInternal( + clusterName, + storeName, + valueSchemaStr, + schemaId, + SchemaEntry.DEFAULT_SCHEMA_CREATION_COMPATIBILITY_TYPE, + doUpdateSupersetSchemaID); + } + SchemaEntry addSupersetSchema( String clusterName, String storeName, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerService.java index be34555116a..cbd47fd5837 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerService.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerService.java @@ -7,6 +7,7 @@ import com.linkedin.venice.acl.DynamicAccessController; import com.linkedin.venice.authorization.AuthorizerService; import com.linkedin.venice.client.store.ClientConfig; +import com.linkedin.venice.controller.init.DelegatingClusterLeaderInitializationRoutine; import com.linkedin.venice.controller.kafka.consumer.AdminConsumerService; import com.linkedin.venice.controller.lingeringjob.DefaultLingeringStoreVersionChecker; import com.linkedin.venice.controller.lingeringjob.HeartbeatBasedCheckerStats; @@ -26,6 +27,7 @@ import com.linkedin.venice.service.ICProvider; import com.linkedin.venice.utils.pools.LandFillObjectPool; import io.tehuti.metrics.MetricsRepository; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -57,6 +59,12 @@ public VeniceControllerService( PubSubTopicRepository pubSubTopicRepository, PubSubClientsFactory pubSubClientsFactory) { this.multiClusterConfigs = multiClusterConfigs; + + DelegatingClusterLeaderInitializationRoutine initRoutineForPushJobDetailsSystemStore = + new DelegatingClusterLeaderInitializationRoutine(); + DelegatingClusterLeaderInitializationRoutine initRoutineForHeartbeatSystemStore = + new DelegatingClusterLeaderInitializationRoutine(); + VeniceHelixAdmin internalAdmin = new VeniceHelixAdmin( multiClusterConfigs, metricsRepository, @@ -66,7 +74,8 @@ public VeniceControllerService( accessController, icProvider, pubSubTopicRepository, - pubSubClientsFactory); + pubSubClientsFactory, + Arrays.asList(initRoutineForPushJobDetailsSystemStore, initRoutineForHeartbeatSystemStore)); if (multiClusterConfigs.isParent()) { this.admin = new VeniceParentHelixAdmin( @@ -79,7 +88,9 @@ public VeniceControllerService( createLingeringStoreVersionChecker(multiClusterConfigs, metricsRepository), WriteComputeSchemaConverter.getInstance(), externalSupersetSchemaGenerator, - pubSubTopicRepository); + pubSubTopicRepository, + initRoutineForPushJobDetailsSystemStore, + initRoutineForHeartbeatSystemStore); LOGGER.info("Controller works as a parent controller."); } else { this.admin = internalAdmin; diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index ca42b7db5ba..8736641f016 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -22,6 +22,7 @@ import static com.linkedin.venice.meta.VersionStatus.PUSHED; import static com.linkedin.venice.meta.VersionStatus.STARTED; import static com.linkedin.venice.pushmonitor.OfflinePushStatus.HELIX_ASSIGNMENT_COMPLETED; +import static com.linkedin.venice.serialization.avro.AvroProtocolDefinition.PARTICIPANT_MESSAGE_SYSTEM_STORE_VALUE; import static com.linkedin.venice.utils.AvroSchemaUtils.isValidAvroSchema; import static com.linkedin.venice.utils.RegionUtils.parseRegionsFilterList; import static com.linkedin.venice.views.ViewUtils.ETERNAL_TOPIC_RETENTION_ENABLED; @@ -50,7 +51,7 @@ import com.linkedin.venice.controller.helix.SharedHelixReadOnlyZKSharedSystemStoreRepository; import com.linkedin.venice.controller.init.ClusterLeaderInitializationManager; import com.linkedin.venice.controller.init.ClusterLeaderInitializationRoutine; -import com.linkedin.venice.controller.init.InternalRTStoreInitializationRoutine; +import com.linkedin.venice.controller.init.PerClusterInternalRTStoreInitializationRoutine; import com.linkedin.venice.controller.init.SystemSchemaInitializationRoutine; import com.linkedin.venice.controller.kafka.StoreStatusDecider; import com.linkedin.venice.controller.kafka.consumer.AdminConsumerService; @@ -229,7 +230,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nonnull; import org.apache.avro.Schema; @@ -421,7 +421,8 @@ public VeniceHelixAdmin( Optional.empty(), Optional.empty(), pubSubTopicRepository, - pubSubClientsFactory); + pubSubClientsFactory, + Collections.EMPTY_LIST); } // TODO Use different configs for different clusters when creating helix admin. @@ -434,7 +435,8 @@ public VeniceHelixAdmin( Optional accessController, Optional icProvider, PubSubTopicRepository pubSubTopicRepository, - PubSubClientsFactory pubSubClientsFactory) { + PubSubClientsFactory pubSubClientsFactory, + List additionalInitRoutines) { Validate.notNull(d2Client); this.multiClusterConfigs = multiClusterConfigs; VeniceControllerConfig commonConfig = multiClusterConfigs.getCommonConfig(); @@ -601,19 +603,20 @@ public VeniceHelixAdmin( Optional.of(DEFAULT_USER_SYSTEM_STORE_UPDATE_QUERY_PARAMS), true)); } + initRoutines.addAll(additionalInitRoutines); // Participant stores are not read or written in parent colo. Parent controller skips participant store // initialization. if (!multiClusterConfigs.isParent() && multiClusterConfigs.isParticipantMessageStoreEnabled()) { - Function storeNameSupplier = VeniceSystemStoreUtils::getParticipantStoreNameForCluster; initRoutines.add( - new InternalRTStoreInitializationRoutine( - storeNameSupplier, + new PerClusterInternalRTStoreInitializationRoutine( + PARTICIPANT_MESSAGE_SYSTEM_STORE_VALUE, + VeniceSystemStoreUtils::getParticipantStoreNameForCluster, multiClusterConfigs, this, - ParticipantMessageKey.getClassSchema().toString(), - ParticipantMessageValue.getClassSchema().toString())); + ParticipantMessageKey.getClassSchema())); } + ClusterLeaderInitializationRoutine controllerInitialization = new ClusterLeaderInitializationManager(initRoutines, commonConfig.isConcurrentInitRoutinesEnabled()); @@ -4850,7 +4853,7 @@ private void validateValueSchemaUsingRandomGenerator(String schemaStr, String cl } /** - * @see #addValueSchema(String, String, String, int, DirectionalSchemaCompatibilityType, boolean) + * @see #addValueSchemaInternal(String, String, String, int, DirectionalSchemaCompatibilityType, boolean) */ @Override public SchemaEntry addValueSchema( @@ -4864,31 +4867,13 @@ public SchemaEntry addValueSchema( return new SchemaEntry(schemaRepository.getValueSchemaId(storeName, valueSchemaStr), valueSchemaStr); } - /** - * @see #addValueSchema(String, String, String, int, DirectionalSchemaCompatibilityType, boolean) - */ - @Override - public SchemaEntry addValueSchema( - String clusterName, - String storeName, - String valueSchemaStr, - int schemaId, - boolean doUpdateSupersetSchemaID) { - return addValueSchema( - clusterName, - storeName, - valueSchemaStr, - schemaId, - SchemaEntry.DEFAULT_SCHEMA_CREATION_COMPATIBILITY_TYPE, - doUpdateSupersetSchemaID); - } - /** * Add a new value schema for the given store with all specified properties and return a new SchemaEntry object * containing the schema and its id. * @return an SchemaEntry object composed of a schema and its corresponding id. */ - public SchemaEntry addValueSchema( + @Override + public SchemaEntry addValueSchemaInternal( String clusterName, String storeName, String valueSchemaStr, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index b53e963b954..c9177bb0979 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -52,6 +52,8 @@ import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_HYBRID_OFFSET_LAG_THRESHOLD; import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_HYBRID_TIME_LAG_THRESHOLD; import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REWIND_TIME_IN_SECONDS; +import static com.linkedin.venice.serialization.avro.AvroProtocolDefinition.BATCH_JOB_HEARTBEAT; +import static com.linkedin.venice.serialization.avro.AvroProtocolDefinition.PUSH_JOB_DETAILS; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -74,6 +76,8 @@ import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.controller.authorization.SystemStoreAclSynchronizationTask; +import com.linkedin.venice.controller.init.DelegatingClusterLeaderInitializationRoutine; +import com.linkedin.venice.controller.init.SharedInternalRTStoreInitializationRoutine; import com.linkedin.venice.controller.kafka.AdminTopicUtils; import com.linkedin.venice.controller.kafka.consumer.AdminConsumerService; import com.linkedin.venice.controller.kafka.consumer.AdminConsumptionTask; @@ -190,8 +194,6 @@ import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter; import com.linkedin.venice.security.SSLFactory; -import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; -import com.linkedin.venice.status.BatchJobHeartbeatConfigs; import com.linkedin.venice.status.protocol.BatchJobHeartbeatKey; import com.linkedin.venice.status.protocol.BatchJobHeartbeatValue; import com.linkedin.venice.status.protocol.PushJobDetails; @@ -264,12 +266,7 @@ */ public class VeniceParentHelixAdmin implements Admin { private static final long SLEEP_INTERVAL_FOR_DATA_CONSUMPTION_IN_MS = 1000; - private static final long SLEEP_INTERVAL_FOR_ASYNC_SETUP_MS = 3000; - private static final int MAX_ASYNC_SETUP_RETRY_COUNT = 10; private static final Logger LOGGER = LogManager.getLogger(VeniceParentHelixAdmin.class); - private static final String VENICE_INTERNAL_STORE_OWNER = "venice-internal"; - private static final String PUSH_JOB_DETAILS_STORE_DESCRIPTOR = "push job details store: "; - private static final String BATCH_JOB_HEARTBEAT_STORE_DESCRIPTOR = "batch job liveness heartbeat store: "; // Store version number to retain in Parent Controller to limit 'Store' ZNode size. static final int STORE_VERSION_RETENTION_COUNT = 5; private static final StackTraceElement[] EMPTY_STACK_TRACE = new StackTraceElement[0]; @@ -283,8 +280,8 @@ public class VeniceParentHelixAdmin implements Admin { private final byte[] emptyKeyByteArr = new byte[0]; private final AdminOperationSerializer adminOperationSerializer = new AdminOperationSerializer(); private final VeniceControllerMultiClusterConfig multiClusterConfigs; - private final Map> perStoreAdminLocks = new ConcurrentHashMap<>(); - private final Map perClusterAdminLocks = new ConcurrentHashMap<>(); + private final Map> perStoreAdminLocks = new ConcurrentHashMap<>(); + private final Map perClusterAdminLocks = new ConcurrentHashMap<>(); private final Map adminCommandExecutionTrackers; private final Set executionIdValidatedClusters = new HashSet<>(); // Only used for setup work which are intended to be short lived and is bounded by the number of venice clusters. @@ -320,8 +317,6 @@ public class VeniceParentHelixAdmin implements Admin { private final int waitingTimeForConsumptionMs; - private final boolean batchJobHeartbeatEnabled; - private Optional accessController; private final Optional authorizerService; @@ -347,6 +342,7 @@ public VeniceParentHelixAdmin( this(veniceHelixAdmin, multiClusterConfigs, false, Optional.empty(), Optional.empty()); } + // Visible for testing public VeniceParentHelixAdmin( VeniceHelixAdmin veniceHelixAdmin, VeniceControllerMultiClusterConfig multiClusterConfigs, @@ -363,6 +359,7 @@ public VeniceParentHelixAdmin( new DefaultLingeringStoreVersionChecker()); } + // Visible for testing public VeniceParentHelixAdmin( VeniceHelixAdmin veniceHelixAdmin, VeniceControllerMultiClusterConfig multiClusterConfigs, @@ -381,7 +378,9 @@ public VeniceParentHelixAdmin( lingeringStoreVersionChecker, WriteComputeSchemaConverter.getInstance(), // TODO: make it an input param Optional.empty(), - new PubSubTopicRepository()); + new PubSubTopicRepository(), + null, + null); } public VeniceParentHelixAdmin( @@ -394,13 +393,14 @@ public VeniceParentHelixAdmin( LingeringStoreVersionChecker lingeringStoreVersionChecker, WriteComputeSchemaConverter writeComputeSchemaConverter, Optional externalSupersetSchemaGenerator, - PubSubTopicRepository pubSubTopicRepository) { + PubSubTopicRepository pubSubTopicRepository, + DelegatingClusterLeaderInitializationRoutine initRoutineForPushJobDetailsSystemStore, + DelegatingClusterLeaderInitializationRoutine initRoutineForHeartbeatSystemStore) { Validate.notNull(lingeringStoreVersionChecker); Validate.notNull(writeComputeSchemaConverter); this.veniceHelixAdmin = veniceHelixAdmin; this.multiClusterConfigs = multiClusterConfigs; this.waitingTimeForConsumptionMs = this.multiClusterConfigs.getParentControllerWaitingTimeForConsumptionMs(); - this.batchJobHeartbeatEnabled = this.multiClusterConfigs.getBatchJobHeartbeatEnabled(); this.veniceWriterMap = new ConcurrentHashMap<>(); this.adminTopicMetadataAccessor = new ZkAdminTopicMetadataAccessor( this.veniceHelixAdmin.getZkClient(), @@ -463,6 +463,36 @@ public VeniceParentHelixAdmin( Class identityParserClass = ReflectUtils.loadClass(multiClusterConfigs.getCommonConfig().getIdentityParserClassName()); this.identityParser = ReflectUtils.callConstructor(identityParserClass, new Class[0], new Object[0]); + + if (initRoutineForPushJobDetailsSystemStore != null) { + // TODO: When we plan to enable active-active push details store in future, we need to enable it by default. + UpdateStoreQueryParams updateStoreQueryParamsForPushJobDetails = + new UpdateStoreQueryParams().setHybridDataReplicationPolicy(DataReplicationPolicy.AGGREGATE); + initRoutineForPushJobDetailsSystemStore.setDelegate( + new SharedInternalRTStoreInitializationRoutine( + getMultiClusterConfigs().getPushJobStatusStoreClusterName(), + VeniceSystemStoreUtils.getPushJobDetailsStoreName(), + PUSH_JOB_DETAILS, + multiClusterConfigs, + this, + PushJobStatusRecordKey.getClassSchema(), + updateStoreQueryParamsForPushJobDetails)); + } + + if (initRoutineForHeartbeatSystemStore != null) { + UpdateStoreQueryParams updateStoreQueryParamsForHeartbeatSystemStore = + new UpdateStoreQueryParams().setHybridDataReplicationPolicy(DataReplicationPolicy.ACTIVE_ACTIVE) + .setActiveActiveReplicationEnabled(true); + initRoutineForHeartbeatSystemStore.setDelegate( + new SharedInternalRTStoreInitializationRoutine( + getMultiClusterConfigs().getBatchJobHeartbeatStoreCluster(), + BATCH_JOB_HEARTBEAT.getSystemStoreName(), + BATCH_JOB_HEARTBEAT, + multiClusterConfigs, + this, + BatchJobHeartbeatKey.getClassSchema(), + updateStoreQueryParamsForHeartbeatSystemStore)); + } } // For testing purpose. @@ -527,204 +557,6 @@ public synchronized void initStorageCluster(String clusterName) { .setPartitionCount(AdminTopicUtils.PARTITION_NUM_FOR_ADMIN_TOPIC) .build()); }); - - if (!getMultiClusterConfigs().getPushJobStatusStoreClusterName().isEmpty() - && clusterName.equals(getMultiClusterConfigs().getPushJobStatusStoreClusterName())) { - // TODO: When we plan to enable active-active push details store in future, we need to enable it by default. - UpdateStoreQueryParams updateStoreQueryParams = - new UpdateStoreQueryParams().setHybridDataReplicationPolicy(DataReplicationPolicy.AGGREGATE); - asyncSetupForInternalRTStore( - getMultiClusterConfigs().getPushJobStatusStoreClusterName(), - VeniceSystemStoreUtils.getPushJobDetailsStoreName(), - PUSH_JOB_DETAILS_STORE_DESCRIPTOR + VeniceSystemStoreUtils.getPushJobDetailsStoreName(), - PushJobStatusRecordKey.getClassSchema().toString(), - PushJobDetails.getClassSchema().toString(), - getMultiClusterConfigs().getControllerConfig(clusterName).getMinNumberOfPartitions(), - updateStoreQueryParams); - } - - maybeSetupBatchJobLivenessHeartbeatStore(clusterName); - } - - private void maybeSetupBatchJobLivenessHeartbeatStore(String currClusterName) { - final String batchJobHeartbeatStoreCluster = getMultiClusterConfigs().getBatchJobHeartbeatStoreCluster(); - final String batchJobHeartbeatStoreName = AvroProtocolDefinition.BATCH_JOB_HEARTBEAT.getSystemStoreName(); - - if (Objects.equals(currClusterName, batchJobHeartbeatStoreCluster)) { - UpdateStoreQueryParams updateStoreQueryParams = - new UpdateStoreQueryParams().setHybridDataReplicationPolicy(DataReplicationPolicy.ACTIVE_ACTIVE) - .setActiveActiveReplicationEnabled(true); - asyncSetupForInternalRTStore( - currClusterName, - batchJobHeartbeatStoreName, - BATCH_JOB_HEARTBEAT_STORE_DESCRIPTOR + batchJobHeartbeatStoreName, - BatchJobHeartbeatKey.getClassSchema().toString(), - BatchJobHeartbeatValue.getClassSchema().toString(), - getMultiClusterConfigs().getControllerConfig(currClusterName).getMinNumberOfPartitions(), - updateStoreQueryParams); - } else { - LOGGER.info( - "Skip creating the batch job liveness heartbeat store: {} in cluster: {} since the designated cluster is: {}", - batchJobHeartbeatStoreName, - currClusterName, - batchJobHeartbeatStoreCluster); - } - } - - /** - * Setup the venice RT store used internally for hosting push job status records or participant messages. - * If the store already exists and is in the correct state then only verification is performed. - * TODO replace this with {@link com.linkedin.venice.controller.init.ClusterLeaderInitializationRoutine} - */ - private void asyncSetupForInternalRTStore( - String clusterName, - String storeName, - String storeDescriptor, - String keySchema, - String valueSchema, - int partitionCount, - UpdateStoreQueryParams updateStoreQueryParams) { - - asyncSetupExecutor.submit(() -> { - int retryCount = 0; - boolean isStoreReady = false; - while (!isStoreReady && asyncSetupEnabledMap.get(clusterName) && retryCount < MAX_ASYNC_SETUP_RETRY_COUNT) { - try { - if (retryCount > 0) { - timer.sleep(SLEEP_INTERVAL_FOR_ASYNC_SETUP_MS); - } - isStoreReady = createOrVerifyInternalStore( - clusterName, - storeName, - storeDescriptor, - keySchema, - valueSchema, - partitionCount, - updateStoreQueryParams); - } catch (VeniceException e) { - // Verification attempts (i.e. a controller running this routine but is not the leader of the cluster) do not - // count towards the retry count. - LOGGER.warn( - "VeniceException occurred during {} setup with store: {} in cluster: {}", - storeDescriptor, - storeName, - clusterName, - e); - LOGGER.info("Async setup for {} attempts: {}/{}", storeDescriptor, retryCount, MAX_ASYNC_SETUP_RETRY_COUNT); - } catch (Exception e) { - LOGGER.error( - "Exception occurred aborting {} setup with store: {} in cluster: {}", - storeDescriptor, - storeName, - clusterName, - e); - break; - } finally { - retryCount++; - } - } - if (isStoreReady) { - LOGGER - .info("{} has been successfully created or it already exists in cluster: {}", storeDescriptor, clusterName); - } else { - LOGGER.error("Unable to create or verify the {} in cluster: {}", storeDescriptor, clusterName); - } - }); - } - - /** - * Verify the state of the system store. The leader controller will also create and configure the store if the - * desired state is not met. - * @param clusterName the name of the cluster that push status store belongs to. - * @param storeName the name of the push status store. - * @return {@code true} if the store is ready, {@code false} otherwise. - */ - private boolean createOrVerifyInternalStore( - String clusterName, - String storeName, - String storeDescriptor, - String keySchema, - String valueSchema, - int partitionCount, - UpdateStoreQueryParams updateStoreQueryParams) { - boolean storeReady = false; - if (isLeaderControllerFor(clusterName)) { - // We should only perform the store validation if the current controller is the leader controller of the requested - // cluster. - Store store = getStore(clusterName, storeName); - if (store == null) { - createStore(clusterName, storeName, VENICE_INTERNAL_STORE_OWNER, keySchema, valueSchema, true); - store = getStore(clusterName, storeName); - if (store == null) { - throw new VeniceException("Unable to create or fetch the " + storeDescriptor); - } - } else { - LOGGER.info("Internal store: {} already exists in cluster: {}", storeName, clusterName); - } - - if (!store.isHybrid()) { - // Make sure we do not override hybrid configs passed in. - if (!updateStoreQueryParams.getHybridOffsetLagThreshold().isPresent()) { - updateStoreQueryParams.setHybridOffsetLagThreshold(100L); - } - if (!updateStoreQueryParams.getHybridRewindSeconds().isPresent()) { - updateStoreQueryParams.setHybridRewindSeconds(TimeUnit.DAYS.toSeconds(7)); - } - if (!updateStoreQueryParams.getPartitionCount().isPresent()) { - updateStoreQueryParams.setPartitionCount(partitionCount); - } - updateStore(clusterName, storeName, updateStoreQueryParams); - store = getStore(clusterName, storeName); - if (!store.isHybrid()) { - throw new VeniceException("Unable to update the " + storeDescriptor + " to a hybrid store"); - } - LOGGER.info("Enabled hybrid for internal store: {} in cluster: {}", storeName, clusterName); - } - - if (store.getVersions().isEmpty()) { - int replicationFactor = getReplicationFactor(clusterName, storeName); - Version version = incrementVersionIdempotent( - clusterName, - storeName, - Version.guidBasedDummyPushId(), - partitionCount, - replicationFactor); - writeEndOfPush(clusterName, storeName, version.getNumber(), true); - store = getStore(clusterName, storeName); - if (store.getVersions().isEmpty()) { - throw new VeniceException("Unable to initialize a version for the " + storeDescriptor); - } - LOGGER.info("Created a version for internal store: {} in cluster: {}", storeName, clusterName); - } - - final String existingRtTopic = getRealTimeTopic(clusterName, storeName); - if (!existingRtTopic.equals(Version.composeRealTimeTopic(storeName))) { - throw new VeniceException("Unexpected real time topic name for the " + storeDescriptor); - } - storeReady = true; - } else { - // Verify that the store is indeed created by another controller. This is to prevent if the initial leader fails - // or when the cluster happens to be leaderless for a bit. - try (ControllerClient controllerClient = ControllerClient - .constructClusterControllerClient(clusterName, getLeaderController(clusterName).getUrl(false), sslFactory)) { - StoreResponse storeResponse = controllerClient.getStore(storeName); - if (storeResponse.isError()) { - LOGGER.warn( - "Failed to verify if {} exists from the controller with URL: {}", - storeDescriptor, - controllerClient.getControllerDiscoveryUrls()); - return false; - } - StoreInfo storeInfo = storeResponse.getStore(); - PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); - if (storeInfo.getHybridStoreConfig() != null && !storeInfo.getVersions().isEmpty() - && storeInfo.getVersion(storeInfo.getLargestUsedVersionNumber()).get().getPartitionCount() == partitionCount - && getTopicManager().containsTopicAndAllPartitionsAreOnline(realTimeTopic)) { - storeReady = true; - } - } - } - return storeReady; } /** @@ -738,17 +570,6 @@ public boolean isClusterValid(String clusterName) { return getVeniceHelixAdmin().isClusterValid(clusterName); } - /** - * Test if batch-job heartbeat is enabled. - * @return true if batch-job heartbeat is enabled; - * false otherwise. - * @see BatchJobHeartbeatConfigs#HEARTBEAT_ENABLED_CONFIG - */ - @Override - public boolean isBatchJobHeartbeatEnabled() { - return batchJobHeartbeatEnabled; - } - private void sendAdminMessageAndWaitForConsumed(String clusterName, String storeName, AdminOperation message) { if (!veniceWriterMap.containsKey(clusterName)) { throw new VeniceException("Cluster: " + clusterName + " is not started yet!"); @@ -2838,104 +2659,41 @@ public SchemaEntry addValueSchema( String storeName, String newValueSchemaStr, DirectionalSchemaCompatibilityType expectedCompatibilityType) { - acquireAdminMessageLock(clusterName, storeName); - try { - Schema newValueSchema = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(newValueSchemaStr); - // TODO: Enable the following check for all new schema registration. - // AvroSchemaUtils.validateTopLevelFieldDefaultsValueRecordSchema(newValueSchema); - final int newValueSchemaId = getVeniceHelixAdmin().checkPreConditionForAddValueSchemaAndGetNewSchemaId( - clusterName, - storeName, - newValueSchemaStr, - expectedCompatibilityType); - - /** - * If we find this is an exactly duplicate schema, return the existing schema id; - * else add the schema with possible doc field change. - */ - if (newValueSchemaId == SchemaData.DUPLICATE_VALUE_SCHEMA_CODE) { - return new SchemaEntry( - getVeniceHelixAdmin().getValueSchemaId(clusterName, storeName, newValueSchemaStr), - newValueSchemaStr); - } - - final Store store = getVeniceHelixAdmin().getStore(clusterName, storeName); - Schema existingValueSchema = getVeniceHelixAdmin().getSupersetOrLatestValueSchema(clusterName, store); - - final boolean doUpdateSupersetSchemaID; - if (existingValueSchema != null && (store.isReadComputationEnabled() || store.isWriteComputationEnabled())) { - SupersetSchemaGenerator supersetSchemaGenerator = getSupersetSchemaGenerator(clusterName); - Schema newSuperSetSchema = supersetSchemaGenerator.generateSupersetSchema(existingValueSchema, newValueSchema); - String newSuperSetSchemaStr = newSuperSetSchema.toString(); - - if (supersetSchemaGenerator.compareSchema(newSuperSetSchema, newValueSchema)) { - doUpdateSupersetSchemaID = true; - - } else if (supersetSchemaGenerator.compareSchema(newSuperSetSchema, existingValueSchema)) { - doUpdateSupersetSchemaID = false; - - } else if (store.isSystemStore()) { - /** - * Do not register superset schema for system store for now. Because some system stores specify the schema ID - * explicitly, which may conflict with the superset schema generated internally, the new value schema registration - * could fail. - * - * TODO: Design a long-term plan. - */ - doUpdateSupersetSchemaID = false; - - } else { - // Register superset schema only if it does not match with existing or new schema. + final int newValueSchemaId = getVeniceHelixAdmin().checkPreConditionForAddValueSchemaAndGetNewSchemaId( + clusterName, + storeName, + newValueSchemaStr, + expectedCompatibilityType); - // validate compatibility of the new superset schema - getVeniceHelixAdmin().checkPreConditionForAddValueSchemaAndGetNewSchemaId( - clusterName, - storeName, - newSuperSetSchemaStr, - expectedCompatibilityType); - // Check if the superset schema already exists or not. If exists use the same ID, else bump the value ID by - // one. - int supersetSchemaId = getVeniceHelixAdmin().getValueSchemaIdIgnoreFieldOrder( - clusterName, - storeName, - newSuperSetSchemaStr, - (s1, s2) -> supersetSchemaGenerator.compareSchema(s1, s2) ? 0 : 1); - if (supersetSchemaId == SchemaData.INVALID_VALUE_SCHEMA_ID) { - supersetSchemaId = newValueSchemaId + 1; - } - return addValueAndSupersetSchemaEntries( - clusterName, - storeName, - new SchemaEntry(newValueSchemaId, newValueSchema), - new SchemaEntry(supersetSchemaId, newSuperSetSchema), - store.isWriteComputationEnabled()); - } - } else { - doUpdateSupersetSchemaID = false; - } + /** + * If we find this is an exactly duplicate schema, return the existing schema id; + * else add the schema with possible doc field change. + */ + if (newValueSchemaId == SchemaData.DUPLICATE_VALUE_SCHEMA_CODE) { + return new SchemaEntry( + getVeniceHelixAdmin().getValueSchemaId(clusterName, storeName, newValueSchemaStr), + newValueSchemaStr); + } - SchemaEntry addedSchemaEntry = - addValueSchemaEntry(clusterName, storeName, newValueSchemaStr, newValueSchemaId, doUpdateSupersetSchemaID); + final Store store = getVeniceHelixAdmin().getStore(clusterName, storeName); - /** - * if active-active replication is enabled for the store then generate and register the new Replication metadata schema - * for this newly added value schema. - */ - if (store.isActiveActiveReplicationEnabled()) { - Schema latestValueSchema = getVeniceHelixAdmin().getSupersetOrLatestValueSchema(clusterName, store); - final int valueSchemaId = getValueSchemaId(clusterName, storeName, latestValueSchema.toString()); - updateReplicationMetadataSchema(clusterName, storeName, latestValueSchema, valueSchemaId); - } - if (store.isWriteComputationEnabled()) { - Schema newWriteComputeSchema = - writeComputeSchemaConverter.convertFromValueRecordSchema(addedSchemaEntry.getSchema()); - addDerivedSchema(clusterName, storeName, addedSchemaEntry.getId(), newWriteComputeSchema.toString()); - } + /** + * Do not register superset schema for system store for now. Because some system stores specify the schema ID + * explicitly, which may conflict with the superset schema generated internally, the new value schema registration + * could fail. + * + * TODO: Design a long-term plan. + */ + final boolean doUpdateSupersetSchemaID = + !store.isSystemStore() && (store.isReadComputationEnabled() || store.isWriteComputationEnabled()); - return addedSchemaEntry; - } finally { - releaseAdminMessageLock(clusterName, storeName); - } + return addValueSchemaInternal( + clusterName, + storeName, + newValueSchemaStr, + newValueSchemaId, + expectedCompatibilityType, + doUpdateSupersetSchemaID); } private SchemaEntry addValueAndSupersetSchemaEntries( @@ -3070,20 +2828,83 @@ public SchemaEntry addSupersetSchema( int valueSchemaId, String supersetSchemaStr, int supersetSchemaId) { - throw new VeniceUnsupportedOperationException("addValueSchema"); + throw new VeniceUnsupportedOperationException("addSupersetSchema"); } - /** - * Unsupported operation in the parent controller. - */ @Override - public SchemaEntry addValueSchema( + public SchemaEntry addValueSchemaInternal( String clusterName, String storeName, - String valueSchemaStr, + String newValueSchemaStr, int schemaId, + DirectionalSchemaCompatibilityType expectedCompatibilityType, boolean doUpdateSupersetSchemaID) { - throw new VeniceUnsupportedOperationException("addValueSchema"); + acquireAdminMessageLock(clusterName, storeName); + try { + Schema newValueSchema = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(newValueSchemaStr); + + final Store store = getVeniceHelixAdmin().getStore(clusterName, storeName); + Schema existingValueSchema = getVeniceHelixAdmin().getSupersetOrLatestValueSchema(clusterName, store); + + if (doUpdateSupersetSchemaID) { + SupersetSchemaGenerator supersetSchemaGenerator = getSupersetSchemaGenerator(clusterName); + Schema newSuperSetSchema = supersetSchemaGenerator.generateSupersetSchema(existingValueSchema, newValueSchema); + String newSuperSetSchemaStr = newSuperSetSchema.toString(); + + if (supersetSchemaGenerator.compareSchema(newSuperSetSchema, newValueSchema)) { + doUpdateSupersetSchemaID = true; + } else if (supersetSchemaGenerator.compareSchema(newSuperSetSchema, existingValueSchema)) { + doUpdateSupersetSchemaID = false; + } else { + // Register superset schema only if it does not match with existing or new schema. + + // validate compatibility of the new superset schema + getVeniceHelixAdmin().checkPreConditionForAddValueSchemaAndGetNewSchemaId( + clusterName, + storeName, + newSuperSetSchemaStr, + expectedCompatibilityType); + // Check if the superset schema already exists or not. If exists use the same ID, else bump the value ID by + // one. + int supersetSchemaId = getVeniceHelixAdmin().getValueSchemaIdIgnoreFieldOrder( + clusterName, + storeName, + newSuperSetSchemaStr, + (s1, s2) -> supersetSchemaGenerator.compareSchema(s1, s2) ? 0 : 1); + if (supersetSchemaId == SchemaData.INVALID_VALUE_SCHEMA_ID) { + supersetSchemaId = schemaId + 1; + } + return addValueAndSupersetSchemaEntries( + clusterName, + storeName, + new SchemaEntry(schemaId, newValueSchema), + new SchemaEntry(supersetSchemaId, newSuperSetSchema), + store.isWriteComputationEnabled()); + } + } + + SchemaEntry addedSchemaEntry = + addValueSchemaEntry(clusterName, storeName, newValueSchemaStr, schemaId, doUpdateSupersetSchemaID); + + /** + * if active-active replication is enabled for the store then generate and register the new Replication metadata schema + * for this newly added value schema. + */ + if (store.isActiveActiveReplicationEnabled()) { + Schema latestValueSchema = getVeniceHelixAdmin().getSupersetOrLatestValueSchema(clusterName, store); + final int valueSchemaId = getValueSchemaId(clusterName, storeName, latestValueSchema.toString()); + updateReplicationMetadataSchema(clusterName, storeName, latestValueSchema, valueSchemaId); + } + if (store.isWriteComputationEnabled()) { + Schema newWriteComputeSchema = + writeComputeSchemaConverter.convertFromValueRecordSchema(addedSchemaEntry.getSchema()); + addDerivedSchema(clusterName, storeName, addedSchemaEntry.getId(), newWriteComputeSchema.toString()); + } + + return addedSchemaEntry; + } finally { + releaseAdminMessageLock(clusterName, storeName); + } } /** diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/DelegatingClusterLeaderInitializationRoutine.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/DelegatingClusterLeaderInitializationRoutine.java new file mode 100644 index 00000000000..397d936ea7b --- /dev/null +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/DelegatingClusterLeaderInitializationRoutine.java @@ -0,0 +1,18 @@ +package com.linkedin.venice.controller.init; + +import com.linkedin.venice.utils.concurrent.ConcurrencyUtils; + + +public class DelegatingClusterLeaderInitializationRoutine implements ClusterLeaderInitializationRoutine { + private ClusterLeaderInitializationRoutine delegate = null; + + @Override + public void execute(String clusterToInit) { + ConcurrencyUtils + .executeUnderConditionalLock(() -> delegate.execute(clusterToInit), () -> this.delegate != null, this); + } + + public void setDelegate(ClusterLeaderInitializationRoutine delegate) { + ConcurrencyUtils.executeUnderConditionalLock(() -> this.delegate = delegate, () -> this.delegate == null, this); + } +} diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/InternalRTStoreInitializationRoutine.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/InternalRTStoreInitializationRoutine.java deleted file mode 100644 index b612069ee56..00000000000 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/InternalRTStoreInitializationRoutine.java +++ /dev/null @@ -1,84 +0,0 @@ -package com.linkedin.venice.controller.init; - -import com.linkedin.venice.VeniceConstants; -import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig; -import com.linkedin.venice.controller.VeniceHelixAdmin; -import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; -import com.linkedin.venice.exceptions.VeniceException; -import com.linkedin.venice.meta.Store; -import com.linkedin.venice.meta.Version; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - - -public class InternalRTStoreInitializationRoutine implements ClusterLeaderInitializationRoutine { - private static final Logger LOGGER = LogManager.getLogger(InternalRTStoreInitializationRoutine.class); - - private final Function storeNameSupplier; - private final VeniceControllerMultiClusterConfig multiClusterConfigs; - private final VeniceHelixAdmin admin; - private final String keySchema; - private final String valueSchema; - - public InternalRTStoreInitializationRoutine( - Function storeNameSupplier, - VeniceControllerMultiClusterConfig multiClusterConfigs, - VeniceHelixAdmin admin, - String keySchema, - String valueSchema) { - this.storeNameSupplier = storeNameSupplier; - this.multiClusterConfigs = multiClusterConfigs; - this.admin = admin; - this.keySchema = keySchema; - this.valueSchema = valueSchema; - } - - /** - * @see ClusterLeaderInitializationRoutine#execute(String) - */ - @Override - public void execute(String clusterName) { - String storeName = storeNameSupplier.apply(clusterName); - Store store = admin.getStore(clusterName, storeName); - if (store == null) { - admin.createStore(clusterName, storeName, VeniceConstants.SYSTEM_STORE_OWNER, keySchema, valueSchema, true); - store = admin.getStore(clusterName, storeName); - if (store == null) { - throw new VeniceException("Unable to create or fetch store " + storeName); - } - } else { - LOGGER.info("Internal store {} already exists in cluster {}", storeName, clusterName); - } - - if (!store.isHybrid()) { - UpdateStoreQueryParams updateStoreQueryParams = new UpdateStoreQueryParams().setHybridOffsetLagThreshold(100L) - .setHybridRewindSeconds(TimeUnit.DAYS.toSeconds(7)); - admin.updateStore(clusterName, storeName, updateStoreQueryParams); - store = admin.getStore(clusterName, storeName); - if (!store.isHybrid()) { - throw new VeniceException("Unable to update store " + storeName + " to a hybrid store"); - } - LOGGER.info("Enabled hybrid for internal store " + storeName + " in cluster " + clusterName); - } - - if (store.getCurrentVersion() <= 0) { - int partitionCount = multiClusterConfigs.getControllerConfig(clusterName).getMinNumberOfPartitions(); - int replicationFactor = admin.getReplicationFactor(clusterName, storeName); - Version version = admin.incrementVersionIdempotent( - clusterName, - storeName, - Version.guidBasedDummyPushId(), - partitionCount, - replicationFactor); - // SOP is already sent by incrementVersionIdempotent. No need to write again. - admin.writeEndOfPush(clusterName, storeName, version.getNumber(), false); - store = admin.getStore(clusterName, storeName); - if (store.getVersions().isEmpty()) { - throw new VeniceException("Unable to initialize a version for store " + storeName); - } - LOGGER.info("Created a version for internal store {} in cluster {}", storeName, clusterName); - } - } -} diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/PerClusterInternalRTStoreInitializationRoutine.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/PerClusterInternalRTStoreInitializationRoutine.java new file mode 100644 index 00000000000..cc89f33665f --- /dev/null +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/PerClusterInternalRTStoreInitializationRoutine.java @@ -0,0 +1,171 @@ +package com.linkedin.venice.controller.init; + +import com.linkedin.venice.VeniceConstants; +import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig; +import com.linkedin.venice.controller.VeniceHelixAdmin; +import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.schema.SchemaEntry; +import com.linkedin.venice.schema.avro.DirectionalSchemaCompatibilityType; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.utils.Utils; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.apache.avro.Schema; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +public class PerClusterInternalRTStoreInitializationRoutine implements ClusterLeaderInitializationRoutine { + private static final Logger LOGGER = LogManager.getLogger(PerClusterInternalRTStoreInitializationRoutine.class); + + private final Function clusterToStoreNameSupplier; + private final VeniceControllerMultiClusterConfig multiClusterConfigs; + private final VeniceHelixAdmin admin; + private final Schema keySchema; + private final AvroProtocolDefinition protocolDefinition; + + public PerClusterInternalRTStoreInitializationRoutine( + AvroProtocolDefinition protocolDefinition, + Function clusterToStoreNameSupplier, + VeniceControllerMultiClusterConfig multiClusterConfigs, + VeniceHelixAdmin admin, + Schema keySchema) { + this.protocolDefinition = protocolDefinition; + this.clusterToStoreNameSupplier = clusterToStoreNameSupplier; + this.multiClusterConfigs = multiClusterConfigs; + this.admin = admin; + this.keySchema = keySchema; + } + + /** + * @see ClusterLeaderInitializationRoutine#execute(String) + */ + @Override + public void execute(String clusterName) { + String storeName = clusterToStoreNameSupplier.apply(clusterName); + Map protocolSchemaMap = Utils.getAllSchemasFromResources(protocolDefinition); + Store store = admin.getStore(clusterName, storeName); + if (store == null) { + String firstValueSchema = protocolSchemaMap.get(1).toString(); + admin.createStore( + clusterName, + storeName, + VeniceConstants.SYSTEM_STORE_OWNER, + keySchema.toString(), + firstValueSchema, + true); + store = admin.getStore(clusterName, storeName); + if (store == null) { + throw new VeniceException("Unable to create or fetch store " + storeName); + } + } else { + LOGGER.info("Internal store {} already exists in cluster {}", storeName, clusterName); + /** + * Only verify the key schema if it is explicitly specified by the caller, and we don't care + * about the dummy key schema. + */ + SchemaEntry keySchemaEntry = admin.getKeySchema(clusterName, storeName); + if (!keySchemaEntry.getSchema().equals(keySchema)) { + LOGGER.error( + "Key Schema of '{}' in cluster: {} is already registered but it is " + + "INCONSISTENT with the local definition.\n" + "Already registered: {}\n" + "Local definition: {}", + storeName, + clusterName, + keySchemaEntry.getSchema().toString(true), + keySchema); + } + } + + /** + * Old or new, perhaps there are new system schemas the cluster doesn't know about yet... + * Let's make sure all currently known schemas are registered, excluding any experimental schemas + * (above the current version). + */ + Collection schemaEntries = admin.getValueSchemas(clusterName, storeName); + Map knownSchemaMap = new HashMap<>(); + schemaEntries.forEach(schemaEntry -> knownSchemaMap.put(schemaEntry.getId(), schemaEntry.getSchema())); + + for (int valueSchemaVersion = 1; valueSchemaVersion <= protocolDefinition + .getCurrentProtocolVersion(); valueSchemaVersion++) { + Schema schemaInLocalResources = protocolSchemaMap.get(valueSchemaVersion); + if (schemaInLocalResources == null) { + throw new VeniceException( + "Invalid protocol definition: '" + protocolDefinition.name() + "' does not have a version " + + valueSchemaVersion + " even though that is inferior to the current version (" + + protocolDefinition.getCurrentProtocolVersion() + ")."); + } + + Schema knownSchema = knownSchemaMap.get(valueSchemaVersion); + + if (knownSchema == null) { + try { + admin.addValueSchemaInternal( + clusterName, + storeName, + schemaInLocalResources.toString(), + valueSchemaVersion, + DirectionalSchemaCompatibilityType.NONE, + false); + } catch (Exception e) { + LOGGER.error( + "Caught Exception when attempting to register '{}' schema version '{}'. Will bubble up.", + protocolDefinition.name(), + valueSchemaVersion, + e); + throw e; + } + LOGGER.info("Added new schema v{} to system store '{}'.", valueSchemaVersion, storeName); + } else { + if (knownSchema.equals(schemaInLocalResources)) { + LOGGER.info( + "Schema v{} in system store '{}' is already registered and consistent with the local definition.", + valueSchemaVersion, + storeName); + } else { + LOGGER.warn( + "Schema v{} in system store '{}' is already registered but it is INCONSISTENT with the local definition.\n" + + "Already registered: {}\n" + "Local definition: {}", + valueSchemaVersion, + storeName, + knownSchema.toString(true), + schemaInLocalResources.toString(true)); + } + } + } + + if (!store.isHybrid()) { + UpdateStoreQueryParams updateStoreQueryParams = new UpdateStoreQueryParams().setHybridOffsetLagThreshold(100L) + .setHybridRewindSeconds(TimeUnit.DAYS.toSeconds(7)); + admin.updateStore(clusterName, storeName, updateStoreQueryParams); + store = admin.getStore(clusterName, storeName); + if (!store.isHybrid()) { + throw new VeniceException("Unable to update store " + storeName + " to a hybrid store"); + } + LOGGER.info("Enabled hybrid for internal store " + storeName + " in cluster " + clusterName); + } + + if (store.getCurrentVersion() <= 0) { + int partitionCount = multiClusterConfigs.getControllerConfig(clusterName).getMinNumberOfPartitions(); + int replicationFactor = admin.getReplicationFactor(clusterName, storeName); + Version version = admin.incrementVersionIdempotent( + clusterName, + storeName, + Version.guidBasedDummyPushId(), + partitionCount, + replicationFactor); + // SOP is already sent by incrementVersionIdempotent. No need to write again. + admin.writeEndOfPush(clusterName, storeName, version.getNumber(), false); + store = admin.getStore(clusterName, storeName); + if (store.getVersions().isEmpty()) { + throw new VeniceException("Unable to initialize a version for store " + storeName); + } + LOGGER.info("Created a version for internal store {} in cluster {}", storeName, clusterName); + } + } +} diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SharedInternalRTStoreInitializationRoutine.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SharedInternalRTStoreInitializationRoutine.java new file mode 100644 index 00000000000..91c8d730592 --- /dev/null +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SharedInternalRTStoreInitializationRoutine.java @@ -0,0 +1,188 @@ +package com.linkedin.venice.controller.init; + +import com.linkedin.venice.VeniceConstants; +import com.linkedin.venice.controller.Admin; +import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig; +import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.schema.SchemaEntry; +import com.linkedin.venice.schema.avro.DirectionalSchemaCompatibilityType; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.utils.Utils; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.avro.Schema; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +public class SharedInternalRTStoreInitializationRoutine implements ClusterLeaderInitializationRoutine { + private static final Logger LOGGER = LogManager.getLogger(SharedInternalRTStoreInitializationRoutine.class); + + private final String storeCluster; + private final AvroProtocolDefinition protocolDefinition; + private final VeniceControllerMultiClusterConfig multiClusterConfigs; + private final Admin admin; + private final Schema keySchema; + private final UpdateStoreQueryParams updateStoreQueryParams; + private final String storeName; + + public SharedInternalRTStoreInitializationRoutine( + String storeCluster, + String systemStoreName, + AvroProtocolDefinition protocolDefinition, + VeniceControllerMultiClusterConfig multiClusterConfigs, + Admin admin, + Schema keySchema, + UpdateStoreQueryParams updateStoreQueryParams) { + this.storeCluster = storeCluster; + this.storeName = systemStoreName; + this.protocolDefinition = protocolDefinition; + this.multiClusterConfigs = multiClusterConfigs; + this.admin = admin; + this.keySchema = keySchema; + + if (updateStoreQueryParams == null) { + this.updateStoreQueryParams = new UpdateStoreQueryParams(); + } else { + this.updateStoreQueryParams = updateStoreQueryParams; + } + + if (!this.updateStoreQueryParams.getHybridOffsetLagThreshold().isPresent()) { + this.updateStoreQueryParams.setHybridOffsetLagThreshold(100L); + } + + if (!this.updateStoreQueryParams.getHybridRewindSeconds().isPresent()) { + this.updateStoreQueryParams.setHybridRewindSeconds(TimeUnit.DAYS.toSeconds(7)); + } + } + + /** + * @see ClusterLeaderInitializationRoutine#execute(String) + */ + @Override + public void execute(String clusterName) { + if (storeCluster.equals(clusterName)) { + Map protocolSchemaMap = Utils.getAllSchemasFromResources(protocolDefinition); + Store store = admin.getStore(clusterName, storeName); + if (store == null) { + String firstValueSchema = protocolSchemaMap.get(1).toString(); + admin.createStore( + clusterName, + storeName, + VeniceConstants.SYSTEM_STORE_OWNER, + keySchema.toString(), + firstValueSchema, + true); + store = admin.getStore(clusterName, storeName); + if (store == null) { + throw new VeniceException("Unable to create or fetch store " + storeName); + } + } else { + LOGGER.info("Internal store {} already exists in cluster {}", storeName, clusterName); + /** + * Only verify the key schema if it is explicitly specified by the caller, and we don't care + * about the dummy key schema. + */ + SchemaEntry keySchemaEntry = admin.getKeySchema(clusterName, storeName); + if (!keySchemaEntry.getSchema().equals(keySchema)) { + LOGGER.error( + "Key Schema of '{}' in cluster: {} is already registered but it is " + + "INCONSISTENT with the local definition.\n" + "Already registered: {}\n" + "Local definition: {}", + storeName, + clusterName, + keySchemaEntry.getSchema().toString(true), + keySchema); + } + } + + /** + * Old or new, perhaps there are new system schemas the cluster doesn't know about yet... + * Let's make sure all currently known schemas are registered, excluding any experimental schemas + * (above the current version). + */ + Collection schemaEntries = admin.getValueSchemas(clusterName, storeName); + Map knownSchemaMap = new HashMap<>(); + schemaEntries.forEach(schemaEntry -> knownSchemaMap.put(schemaEntry.getId(), schemaEntry.getSchema())); + + for (int valueSchemaVersion = 1; valueSchemaVersion <= protocolDefinition + .getCurrentProtocolVersion(); valueSchemaVersion++) { + Schema schemaInLocalResources = protocolSchemaMap.get(valueSchemaVersion); + if (schemaInLocalResources == null) { + throw new VeniceException( + "Invalid protocol definition: '" + protocolDefinition.name() + "' does not have a version " + + valueSchemaVersion + " even though that is inferior to the current version (" + + protocolDefinition.getCurrentProtocolVersion() + ")."); + } + + Schema knownSchema = knownSchemaMap.get(valueSchemaVersion); + + if (knownSchema == null) { + try { + admin.addValueSchemaInternal( + clusterName, + storeName, + schemaInLocalResources.toString(), + valueSchemaVersion, + DirectionalSchemaCompatibilityType.NONE, + false); + } catch (Exception e) { + LOGGER.error( + "Caught Exception when attempting to register '{}' schema version '{}'. Will bubble up.", + protocolDefinition.name(), + valueSchemaVersion, + e); + throw e; + } + LOGGER.info("Added new schema v{} to system store '{}'.", valueSchemaVersion, storeName); + } else { + if (knownSchema.equals(schemaInLocalResources)) { + LOGGER.info( + "Schema v{} in system store '{}' is already registered and consistent with the local definition.", + valueSchemaVersion, + storeName); + } else { + LOGGER.warn( + "Schema v{} in system store '{}' is already registered but it is INCONSISTENT with the local definition.\n" + + "Already registered: {}\n" + "Local definition: {}", + valueSchemaVersion, + storeName, + knownSchema.toString(true), + schemaInLocalResources.toString(true)); + } + } + } + + if (!store.isHybrid()) { + admin.updateStore(clusterName, storeName, updateStoreQueryParams); + store = admin.getStore(clusterName, storeName); + if (!store.isHybrid()) { + throw new VeniceException("Unable to update store " + storeName + " to a hybrid store"); + } + LOGGER.info("Enabled hybrid for internal store " + storeName + " in cluster " + clusterName); + } + + if (store.getCurrentVersion() <= 0) { + int partitionCount = multiClusterConfigs.getControllerConfig(clusterName).getMinNumberOfPartitions(); + int replicationFactor = admin.getReplicationFactor(clusterName, storeName); + Version version = admin.incrementVersionIdempotent( + clusterName, + storeName, + Version.guidBasedDummyPushId(), + partitionCount, + replicationFactor); + // SOP is already sent by incrementVersionIdempotent. No need to write again. + admin.writeEndOfPush(clusterName, storeName, version.getNumber(), false); + store = admin.getStore(clusterName, storeName); + if (store.getVersions().isEmpty()) { + throw new VeniceException("Unable to initialize a version for store " + storeName); + } + LOGGER.info("Created a version for internal store {} in cluster {}", storeName, clusterName); + } + } + } +} diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SystemSchemaInitializationRoutine.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SystemSchemaInitializationRoutine.java index ed055e6398a..11eb691e560 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SystemSchemaInitializationRoutine.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/init/SystemSchemaInitializationRoutine.java @@ -163,7 +163,7 @@ public void execute(String clusterToInit) { if (knownSchema == null) { try { - admin.addValueSchema( + admin.addValueSchemaInternal( clusterToInit, systemStoreName, schemaInLocalResources.toString(), diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java index 413c281bb4d..ba63b57b6a2 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java @@ -1,6 +1,6 @@ package com.linkedin.venice.controller.kafka.consumer; -import static com.linkedin.venice.controller.kafka.consumer.AdminConsumptionTask.IGNORED_CURRENT_VERSION; +import static com.linkedin.venice.controller.kafka.consumer.AdminConsumptionTask.*; import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.common.VeniceSystemStoreUtils; @@ -303,7 +303,7 @@ private void handleValueSchemaCreation(ValueSchemaCreation message) { final boolean doUpdateSupersetSchemaID = message.doUpdateSupersetSchemaID; SchemaEntry valueSchemaEntry = - admin.addValueSchema(clusterName, storeName, schemaStr, schemaId, doUpdateSupersetSchemaID); + admin.addValueSchemaInternal(clusterName, storeName, schemaStr, schemaId, doUpdateSupersetSchemaID); LOGGER.info( "Added value schema {} to store {} in cluster {} with schema ID {} and " + "[update_superset_schema_ID_with_value_schema_ID == {}]", diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java index 9946ca5fb7f..5ba9e41a000 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java @@ -666,6 +666,7 @@ public Route emptyPush(Admin admin) { responseObject.setPartitions(partitionNum); responseObject.setReplicas(replicationFactor); responseObject.setKafkaTopic(version.kafkaTopicName()); + responseObject.setKafkaBootstrapServers(version.getPushStreamSourceAddress()); admin.writeEndOfPush(clusterName, storeName, versionNumber, true); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/SchemaRoutes.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/SchemaRoutes.java index 45f1aafae62..189361339eb 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/SchemaRoutes.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/SchemaRoutes.java @@ -1,20 +1,7 @@ package com.linkedin.venice.controller.server; -import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER; -import static com.linkedin.venice.controllerapi.ControllerApiConstants.DERIVED_SCHEMA; -import static com.linkedin.venice.controllerapi.ControllerApiConstants.DERIVED_SCHEMA_ID; -import static com.linkedin.venice.controllerapi.ControllerApiConstants.NAME; -import static com.linkedin.venice.controllerapi.ControllerApiConstants.SCHEMA_ID; -import static com.linkedin.venice.controllerapi.ControllerApiConstants.VALUE_SCHEMA; -import static com.linkedin.venice.controllerapi.ControllerRoute.ADD_DERIVED_SCHEMA; -import static com.linkedin.venice.controllerapi.ControllerRoute.ADD_VALUE_SCHEMA; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_ALL_REPLICATION_METADATA_SCHEMAS; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_ALL_VALUE_SCHEMA; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_KEY_SCHEMA; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_VALUE_OR_DERIVED_SCHEMA_ID; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_VALUE_SCHEMA; -import static com.linkedin.venice.controllerapi.ControllerRoute.GET_VALUE_SCHEMA_ID; -import static com.linkedin.venice.controllerapi.ControllerRoute.REMOVE_DERIVED_SCHEMA; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.*; +import static com.linkedin.venice.controllerapi.ControllerRoute.*; import com.linkedin.venice.HttpConstants; import com.linkedin.venice.acl.DynamicAccessController; @@ -98,7 +85,7 @@ public Route addValueSchema(Admin admin) { SchemaEntry valueSchemaEntry; if (schemaIdString != null) { // Schema id is specified which suggests that the request is coming from metadata copy. - valueSchemaEntry = admin.addValueSchema( + valueSchemaEntry = admin.addValueSchemaInternal( responseObject.getCluster(), responseObject.getName(), request.queryParams(VALUE_SCHEMA), diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java index edd2808bf15..7bd33123549 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java @@ -242,36 +242,6 @@ public Version incrementVersionIdempotent( } } - @Test(timeOut = TIMEOUT_IN_MS) - public void testAsyncSetupForSystemStores() { - String arbitraryCluster = Utils.getUniqueString("test-cluster"); - doReturn(true).when(internalAdmin).isLeaderControllerFor(arbitraryCluster); - doReturn(Version.composeRealTimeTopic(PUSH_JOB_DETAILS_STORE_NAME)).when(internalAdmin) - .getRealTimeTopic(arbitraryCluster, PUSH_JOB_DETAILS_STORE_NAME); - VeniceControllerConfig asyncEnabledConfig = mockConfig(arbitraryCluster); - doReturn(arbitraryCluster).when(asyncEnabledConfig).getPushJobStatusStoreClusterName(); - doReturn(true).when(asyncEnabledConfig).isParticipantMessageStoreEnabled(); - AsyncSetupMockVeniceParentHelixAdmin mockVeniceParentHelixAdmin = - new AsyncSetupMockVeniceParentHelixAdmin(internalAdmin, asyncEnabledConfig); - mockVeniceParentHelixAdmin.setVeniceWriterForCluster(arbitraryCluster, veniceWriter); - mockVeniceParentHelixAdmin.setTimer(new TestMockTime()); - try { - mockVeniceParentHelixAdmin.initStorageCluster(arbitraryCluster); - TestUtils.waitForNonDeterministicCompletion(5, TimeUnit.SECONDS, () -> { - Store s = mockVeniceParentHelixAdmin.getStore(arbitraryCluster, PUSH_JOB_DETAILS_STORE_NAME); - return s != null && !s.getVersions().isEmpty(); - }); - Store verifyStore = mockVeniceParentHelixAdmin.getStore(arbitraryCluster, PUSH_JOB_DETAILS_STORE_NAME); - Assert.assertEquals(verifyStore.getName(), PUSH_JOB_DETAILS_STORE_NAME, "Unexpected store name"); - Assert.assertTrue(verifyStore.isHybrid(), "Store should be configured to be hybrid"); - Assert.assertEquals(verifyStore.getVersions().size(), 1, "Store should have one version"); - } finally { - mockVeniceParentHelixAdmin.stop(arbitraryCluster); - } - Assert - .assertFalse(mockVeniceParentHelixAdmin.isAsyncSetupRunning(arbitraryCluster), "Async setup should be stopped"); - } - @Test public void testAddStore() { doReturn(CompletableFuture.completedFuture(new SimplePubSubProduceResultImpl(topicName, partitionId, 1, -1)))