diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 577eb0f8d5..beb84ca31c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -447,7 +447,8 @@ private synchronized void bootstrap() { storageMetadataService, ingestionService, storageService, - blobTransferManager) + blobTransferManager, + this::getVeniceCurrentVersionNumber) : new DefaultIngestionBackend( storageMetadataService, ingestionService, @@ -657,6 +658,11 @@ Version getVeniceCurrentVersion(String storeName) { } } + int getVeniceCurrentVersionNumber(String storeName) { + Version currentVersion = getVeniceCurrentVersion(storeName); + return currentVersion == null ? -1 : currentVersion.getNumber(); + } + private Version getVeniceLatestNonFaultyVersion(Store store, Set faultyVersions) { Version latestNonFaultyVersion = null; for (Version version: store.getVersions()) { @@ -821,7 +827,7 @@ static ExecutionStatus getDaVinciErrorStatus(Exception e, boolean useDaVinciSpec } public boolean hasCurrentVersionBootstrapping() { - return ingestionService.hasCurrentVersionBootstrapping(); + return ingestionBackend.hasCurrentVersionBootstrapping(); } static class BootstrappingAwareCompletableFuture { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java index 7f57536048..efe31697bd 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java @@ -188,6 +188,11 @@ public void setStorageEngineReference( } } + @Override + public boolean hasCurrentVersionBootstrapping() { + return getStoreIngestionService().hasCurrentVersionBootstrapping(); + } + @Override public KafkaStoreIngestionService getStoreIngestionService() { return storeIngestionService; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IngestionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IngestionBackend.java index f8d6127bfb..97094e0752 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IngestionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IngestionBackend.java @@ -54,4 +54,9 @@ void dropStoragePartitionGracefully( // setStorageEngineReference is used by Da Vinci exclusively to speed up storage engine retrieval for read path. void setStorageEngineReference(String topicName, AtomicReference storageEngineReference); + + /** + * Check whether there are any current version bootstrapping or not. + */ + boolean hasCurrentVersionBootstrapping(); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackend.java index f9716f6a52..500374d06b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackend.java @@ -11,6 +11,7 @@ import com.linkedin.davinci.ingestion.main.MainIngestionRequestClient; import com.linkedin.davinci.ingestion.main.MainIngestionStorageMetadataService; import com.linkedin.davinci.ingestion.main.MainPartitionIngestionStatus; +import com.linkedin.davinci.ingestion.main.MainTopicIngestionStatus; import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService; import com.linkedin.davinci.notifier.RelayNotifier; import com.linkedin.davinci.notifier.VeniceNotifier; @@ -19,12 +20,15 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.ingestion.protocol.enums.IngestionCommandType; import com.linkedin.venice.ingestion.protocol.enums.IngestionComponentType; +import com.linkedin.venice.meta.Version; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import io.tehuti.metrics.MetricsRepository; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Function; import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -50,6 +54,7 @@ public class IsolatedIngestionBackend extends DefaultIngestionBackend implements private final MainIngestionMonitorService mainIngestionMonitorService; private final VeniceConfigLoader configLoader; private final ExecutorService completionReportHandlingExecutor = Executors.newFixedThreadPool(10); + private final Function currentVersionSupplier; private Process isolatedIngestionServiceProcess; public IsolatedIngestionBackend( @@ -58,7 +63,8 @@ public IsolatedIngestionBackend( StorageMetadataService storageMetadataService, KafkaStoreIngestionService storeIngestionService, StorageService storageService, - BlobTransferManager blobTransferManager) { + BlobTransferManager blobTransferManager, + Function currentVersionSupplier) { super( storageMetadataService, storeIngestionService, @@ -68,6 +74,7 @@ public IsolatedIngestionBackend( int servicePort = configLoader.getVeniceServerConfig().getIngestionServicePort(); int listenerPort = configLoader.getVeniceServerConfig().getIngestionApplicationPort(); this.configLoader = configLoader; + this.currentVersionSupplier = currentVersionSupplier; // Create the ingestion request client. mainIngestionRequestClient = new MainIngestionRequestClient(configLoader); // Create the forked isolated ingestion process. @@ -192,6 +199,10 @@ public MainIngestionMonitorService getMainIngestionMonitorService() { return mainIngestionMonitorService; } + Function getCurrentVersionSupplier() { + return currentVersionSupplier; + } + public MainIngestionRequestClient getMainIngestionRequestClient() { return mainIngestionRequestClient; } @@ -218,6 +229,31 @@ public void close() { } } + public boolean hasCurrentVersionBootstrapping() { + if (super.hasCurrentVersionBootstrapping()) { + return true; + } + + Map topicIngestionStatusMap = + getMainIngestionMonitorService().getTopicIngestionStatusMap(); + for (Map.Entry entry: topicIngestionStatusMap.entrySet()) { + String topicName = entry.getKey(); + MainTopicIngestionStatus ingestionStatus = entry.getValue(); + String storeName = Version.parseStoreFromKafkaTopicName(topicName); + int version = Version.parseVersionFromKafkaTopicName(topicName); + /** + * If the current version is still being ingested by isolated process, it means the bootstrapping hasn't finished + * yet as the ingestion task should be handled over to main process if all partitions complete ingestion. + */ + if (getCurrentVersionSupplier().apply(storeName) == version + && ingestionStatus.hasPartitionIngestingInIsolatedProcess()) { + return true; + } + } + + return false; + } + boolean isTopicPartitionHostedInMainProcess(String topicName, int partition) { return getMainIngestionMonitorService().getTopicPartitionIngestionStatus(topicName, partition) .equals(MainPartitionIngestionStatus.MAIN); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/main/MainTopicIngestionStatus.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/main/MainTopicIngestionStatus.java index 91214aa7d5..270dfc6f53 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/main/MainTopicIngestionStatus.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/main/MainTopicIngestionStatus.java @@ -43,4 +43,13 @@ public long getIngestingPartitionCount() { public String getTopicName() { return topicName; } + + public boolean hasPartitionIngestingInIsolatedProcess() { + for (Map.Entry entry: ingestionStatusMap.entrySet()) { + if (entry.getValue().equals(MainPartitionIngestionStatus.ISOLATED)) { + return true; + } + } + return false; + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java index a5dcf48fd9..bc60ed7d77 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java @@ -4,10 +4,14 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; import com.linkedin.davinci.blobtransfer.BlobTransferManager; import com.linkedin.davinci.config.VeniceServerConfig; @@ -26,7 +30,6 @@ import java.util.concurrent.CompletableFuture; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -108,6 +111,18 @@ public void testStartConsumptionWithBlobTransferWhenNoPeerFound() { CompletableFuture future = ingestionBackend.bootstrapFromBlobs(store, VERSION_NUMBER, PARTITION).toCompletableFuture(); - Assert.assertTrue(future.isDone()); + assertTrue(future.isDone()); + } + + @Test + public void testHasCurrentVersionBootstrapping() { + KafkaStoreIngestionService mockIngestionService = mock(KafkaStoreIngestionService.class); + DefaultIngestionBackend ingestionBackend = + new DefaultIngestionBackend(null, mockIngestionService, null, null, null); + doReturn(true).when(mockIngestionService).hasCurrentVersionBootstrapping(); + assertTrue(ingestionBackend.hasCurrentVersionBootstrapping()); + + doReturn(false).when(mockIngestionService).hasCurrentVersionBootstrapping(); + assertFalse(ingestionBackend.hasCurrentVersionBootstrapping()); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackendTest.java index c9b88f9b0e..5f52d4e578 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackendTest.java @@ -10,11 +10,14 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.davinci.config.VeniceStoreVersionConfig; @@ -31,11 +34,13 @@ import com.linkedin.venice.utils.Pair; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.function.Supplier; import org.testng.Assert; import org.testng.annotations.Test; @@ -269,4 +274,49 @@ public void testBackendCanMaintainMetadataCorrectlyForDroppingPartition() { Assert.assertEquals(topicIngestionStatusMap.get(topic).getPartitionIngestionStatus(partition), NOT_EXIST); } } + + @Test + public void testHasCurrentVersionBootstrapping() { + IsolatedIngestionBackend mockBackend = mock(IsolatedIngestionBackend.class); + MainIngestionMonitorService mockMonitorService = mock(MainIngestionMonitorService.class); + + Map ingestionStatusMap = new HashMap<>(); + MainTopicIngestionStatus store1V1IngestionStatus = new MainTopicIngestionStatus("store1_v1"); + MainTopicIngestionStatus store1V2IngestionStatus = new MainTopicIngestionStatus("store1_v2"); + MainTopicIngestionStatus store2V2IngestionStatus = new MainTopicIngestionStatus("store2_v2"); + ingestionStatusMap.put("store1_v1", store1V1IngestionStatus); + ingestionStatusMap.put("store1_v2", store1V2IngestionStatus); + ingestionStatusMap.put("store2_v2", store2V2IngestionStatus); + + doReturn(ingestionStatusMap).when(mockMonitorService).getTopicIngestionStatusMap(); + store1V1IngestionStatus.setPartitionIngestionStatusToIsolatedIngestion(1); + store1V1IngestionStatus.setPartitionIngestionStatusToLocalIngestion(2); + store1V2IngestionStatus.setPartitionIngestionStatusToLocalIngestion(1); + store1V2IngestionStatus.setPartitionIngestionStatusToLocalIngestion(2); + store2V2IngestionStatus.setPartitionIngestionStatusToLocalIngestion(1); + + Function currentVersionSupplier = s -> { + if (s.equals("store1")) { + return 1; + } + if (s.equals("store2")) { + return 2; + } + return 3; + }; + doReturn(currentVersionSupplier).when(mockBackend).getCurrentVersionSupplier(); + doReturn(mockMonitorService).when(mockBackend).getMainIngestionMonitorService(); + + KafkaStoreIngestionService mockIngestionService = mock(KafkaStoreIngestionService.class); + doReturn(false).when(mockIngestionService).hasCurrentVersionBootstrapping(); + doReturn(mockIngestionService).when(mockBackend).getStoreIngestionService(); + + doCallRealMethod().when(mockBackend).hasCurrentVersionBootstrapping(); + + assertTrue(mockBackend.hasCurrentVersionBootstrapping()); + + // Move current version ingestion to main process + store1V1IngestionStatus.setPartitionIngestionStatusToLocalIngestion(1); + assertFalse(mockBackend.hasCurrentVersionBootstrapping()); + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/main/MainTopicIngestionStatusTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/main/MainTopicIngestionStatusTest.java new file mode 100644 index 0000000000..23b08673e4 --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/main/MainTopicIngestionStatusTest.java @@ -0,0 +1,24 @@ +package com.linkedin.davinci.ingestion.main; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import org.testng.annotations.Test; + + +public class MainTopicIngestionStatusTest { + @Test + public void testHasPartitionIngestingInIsolatedProcess() { + String topicName = "test_store_v1"; + MainTopicIngestionStatus ingestionStatus = new MainTopicIngestionStatus(topicName); + ingestionStatus.setPartitionIngestionStatusToLocalIngestion(1); + ingestionStatus.setPartitionIngestionStatusToLocalIngestion(2); + assertFalse(ingestionStatus.hasPartitionIngestingInIsolatedProcess()); + + ingestionStatus.setPartitionIngestionStatusToIsolatedIngestion(3); + assertTrue(ingestionStatus.hasPartitionIngestingInIsolatedProcess()); + + ingestionStatus.removePartitionIngestionStatus(3); + assertFalse(ingestionStatus.hasPartitionIngestingInIsolatedProcess()); + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 45a393de71..6e30c0f70f 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -11,8 +11,10 @@ import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS; import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS; import static com.linkedin.venice.ConfigKeys.DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_SPEEDUP_ENABLED; +import static com.linkedin.venice.ConfigKeys.KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND; import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE; import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_ENABLED; +import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_HEARTBEAT_INTERVAL_IN_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_CONSUMER_POOL_SIZE_PER_KAFKA_CLUSTER; import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE; @@ -70,6 +72,7 @@ import com.linkedin.venice.integration.utils.VeniceClusterWrapper; import com.linkedin.venice.integration.utils.VeniceRouterWrapper; import com.linkedin.venice.meta.IngestionMetadataUpdateType; +import com.linkedin.venice.meta.IngestionMode; import com.linkedin.venice.meta.Version; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.partitioner.ConstantVenicePartitioner; @@ -148,6 +151,8 @@ public void setUp() { inputDirPath = "file://" + inputDir.getAbsolutePath(); Properties clusterConfig = new Properties(); clusterConfig.put(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, 1L); + clusterConfig.put(PUSH_STATUS_STORE_ENABLED, true); + clusterConfig.put(DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS, 3); cluster = ServiceFactory.getVeniceCluster(1, 2, 1, 2, 100, false, false, clusterConfig); d2Client = new D2ClientBuilder().setZkHosts(cluster.getZk().getAddress()) .setZkSessionTimeout(3, TimeUnit.SECONDS) @@ -788,6 +793,58 @@ public void testHybridStore() throws Exception { } } + @Test(timeOut = TEST_TIMEOUT, dataProvider = "Isolated-Ingestion", dataProviderClass = DataProviderUtils.class) + public void testStatusReportDuringBoostrap(IngestionMode ingestionMode) throws Exception { + int keyCnt = 1000; + String storeName = createStoreWithMetaSystemStoreAndPushStatusSystemStore(keyCnt); + String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath(); + Map extraBackendProp = new HashMap<>(); + extraBackendProp.put(DATA_BASE_PATH, baseDataPath); + extraBackendProp.put(PUSH_STATUS_STORE_HEARTBEAT_INTERVAL_IN_SECONDS, "5"); + extraBackendProp.put(KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND, "5"); + extraBackendProp.put(PUSH_STATUS_STORE_ENABLED, "true"); + DaVinciTestContext daVinciTestContext = + ServiceFactory.getGenericAvroDaVinciFactoryAndClientWithRetries( + d2Client, + new MetricsRepository(), + Optional.empty(), + cluster.getZk().getAddress(), + storeName, + new DaVinciConfig().setIsolated(ingestionMode.equals(IngestionMode.ISOLATED)), + extraBackendProp); + try (DaVinciClient client = daVinciTestContext.getDaVinciClient()) { + CompletableFuture subscribeFuture = client.subscribeAll(); + + /** + * Create a new version while bootstrapping. + */ + VersionCreationResponse newVersion = cluster.getNewVersion(storeName); + String topic = newVersion.getKafkaTopic(); + VeniceWriterFactory vwFactory = IntegrationTestPushUtils + .getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory); + VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(DEFAULT_KEY_SCHEMA); + VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(DEFAULT_VALUE_SCHEMA); + int valueSchemaId = HelixReadOnlySchemaRepository.VALUE_SCHEMA_STARTING_ID; + + try (VeniceWriter batchProducer = vwFactory.createVeniceWriter( + new VeniceWriterOptions.Builder(topic).setKeySerializer(keySerializer) + .setValueSerializer(valueSerializer) + .build())) { + batchProducer.broadcastStartOfPush(Collections.emptyMap()); + int keyCntForSecondVersion = 100; + Future[] writerFutures = new Future[keyCntForSecondVersion]; + for (int i = 0; i < keyCntForSecondVersion; i++) { + writerFutures[i] = batchProducer.put(i, i, valueSchemaId); + } + for (int i = 0; i < keyCntForSecondVersion; i++) { + writerFutures[i].get(); + } + batchProducer.broadcastEndOfPush(Collections.emptyMap()); + } + subscribeFuture.get(); + } + } + @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) public void testBootstrap(DaVinciConfig daVinciConfig) throws Exception { String storeName = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT);