Skip to content

Commit

Permalink
Addressed the comment
Browse files Browse the repository at this point in the history
  • Loading branch information
gaojieliu committed Oct 1, 2024
1 parent 269912a commit 57e2d9d
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,8 @@ private synchronized void bootstrap() {
storageMetadataService,
ingestionService,
storageService,
blobTransferManager)
blobTransferManager,
this::getVeniceCurrentVersionNumber)
: new DefaultIngestionBackend(
storageMetadataService,
ingestionService,
Expand Down Expand Up @@ -657,6 +658,11 @@ Version getVeniceCurrentVersion(String storeName) {
}
}

int getVeniceCurrentVersionNumber(String storeName) {
Version currentVersion = getVeniceCurrentVersion(storeName);
return currentVersion == null ? -1 : currentVersion.getNumber();
}

private Version getVeniceLatestNonFaultyVersion(Store store, Set<Integer> faultyVersions) {
Version latestNonFaultyVersion = null;
for (Version version: store.getVersions()) {
Expand Down Expand Up @@ -821,7 +827,7 @@ static ExecutionStatus getDaVinciErrorStatus(Exception e, boolean useDaVinciSpec
}

public boolean hasCurrentVersionBootstrapping() {
return ingestionService.hasCurrentVersionBootstrapping();
return ingestionBackend.hasCurrentVersionBootstrapping();
}

static class BootstrappingAwareCompletableFuture {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ public void setStorageEngineReference(
}
}

@Override
public boolean hasCurrentVersionBootstrapping() {
return getStoreIngestionService().hasCurrentVersionBootstrapping();
}

@Override
public KafkaStoreIngestionService getStoreIngestionService() {
return storeIngestionService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ void dropStoragePartitionGracefully(

// setStorageEngineReference is used by Da Vinci exclusively to speed up storage engine retrieval for read path.
void setStorageEngineReference(String topicName, AtomicReference<AbstractStorageEngine> storageEngineReference);

/**
* Check whether there are any current version bootstrapping or not.
*/
boolean hasCurrentVersionBootstrapping();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.davinci.ingestion.main.MainIngestionRequestClient;
import com.linkedin.davinci.ingestion.main.MainIngestionStorageMetadataService;
import com.linkedin.davinci.ingestion.main.MainPartitionIngestionStatus;
import com.linkedin.davinci.ingestion.main.MainTopicIngestionStatus;
import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService;
import com.linkedin.davinci.notifier.RelayNotifier;
import com.linkedin.davinci.notifier.VeniceNotifier;
Expand All @@ -19,12 +20,15 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.ingestion.protocol.enums.IngestionCommandType;
import com.linkedin.venice.ingestion.protocol.enums.IngestionComponentType;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import io.tehuti.metrics.MetricsRepository;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -50,6 +54,7 @@ public class IsolatedIngestionBackend extends DefaultIngestionBackend implements
private final MainIngestionMonitorService mainIngestionMonitorService;
private final VeniceConfigLoader configLoader;
private final ExecutorService completionReportHandlingExecutor = Executors.newFixedThreadPool(10);
private final Function<String, Integer> currentVersionSupplier;
private Process isolatedIngestionServiceProcess;

public IsolatedIngestionBackend(
Expand All @@ -58,7 +63,8 @@ public IsolatedIngestionBackend(
StorageMetadataService storageMetadataService,
KafkaStoreIngestionService storeIngestionService,
StorageService storageService,
BlobTransferManager blobTransferManager) {
BlobTransferManager blobTransferManager,
Function<String, Integer> currentVersionSupplier) {
super(
storageMetadataService,
storeIngestionService,
Expand All @@ -68,6 +74,7 @@ public IsolatedIngestionBackend(
int servicePort = configLoader.getVeniceServerConfig().getIngestionServicePort();
int listenerPort = configLoader.getVeniceServerConfig().getIngestionApplicationPort();
this.configLoader = configLoader;
this.currentVersionSupplier = currentVersionSupplier;
// Create the ingestion request client.
mainIngestionRequestClient = new MainIngestionRequestClient(configLoader);
// Create the forked isolated ingestion process.
Expand Down Expand Up @@ -192,6 +199,10 @@ public MainIngestionMonitorService getMainIngestionMonitorService() {
return mainIngestionMonitorService;
}

Function<String, Integer> getCurrentVersionSupplier() {
return currentVersionSupplier;
}

public MainIngestionRequestClient getMainIngestionRequestClient() {
return mainIngestionRequestClient;
}
Expand All @@ -218,6 +229,31 @@ public void close() {
}
}

public boolean hasCurrentVersionBootstrapping() {
if (super.hasCurrentVersionBootstrapping()) {
return true;
}

Map<String, MainTopicIngestionStatus> topicIngestionStatusMap =
getMainIngestionMonitorService().getTopicIngestionStatusMap();
for (Map.Entry<String, MainTopicIngestionStatus> entry: topicIngestionStatusMap.entrySet()) {
String topicName = entry.getKey();
MainTopicIngestionStatus ingestionStatus = entry.getValue();
String storeName = Version.parseStoreFromKafkaTopicName(topicName);
int version = Version.parseVersionFromKafkaTopicName(topicName);
/**
* If the current version is still being ingested by isolated process, it means the bootstrapping hasn't finished
* yet as the ingestion task should be handled over to main process if all partitions complete ingestion.
*/
if (getCurrentVersionSupplier().apply(storeName) == version
&& ingestionStatus.hasPartitionIngestingInIsolatedProcess()) {
return true;
}
}

return false;
}

boolean isTopicPartitionHostedInMainProcess(String topicName, int partition) {
return getMainIngestionMonitorService().getTopicPartitionIngestionStatus(topicName, partition)
.equals(MainPartitionIngestionStatus.MAIN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,13 @@ public long getIngestingPartitionCount() {
public String getTopicName() {
return topicName;
}

public boolean hasPartitionIngestingInIsolatedProcess() {
for (Map.Entry<Integer, MainPartitionIngestionStatus> entry: ingestionStatusMap.entrySet()) {
if (entry.getValue().equals(MainPartitionIngestionStatus.ISOLATED)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import com.linkedin.davinci.blobtransfer.BlobTransferManager;
import com.linkedin.davinci.config.VeniceServerConfig;
Expand All @@ -26,7 +30,6 @@
import java.util.concurrent.CompletableFuture;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -108,6 +111,18 @@ public void testStartConsumptionWithBlobTransferWhenNoPeerFound() {

CompletableFuture<Void> future =
ingestionBackend.bootstrapFromBlobs(store, VERSION_NUMBER, PARTITION).toCompletableFuture();
Assert.assertTrue(future.isDone());
assertTrue(future.isDone());
}

@Test
public void testHasCurrentVersionBootstrapping() {
KafkaStoreIngestionService mockIngestionService = mock(KafkaStoreIngestionService.class);
DefaultIngestionBackend ingestionBackend =
new DefaultIngestionBackend(null, mockIngestionService, null, null, null);
doReturn(true).when(mockIngestionService).hasCurrentVersionBootstrapping();
assertTrue(ingestionBackend.hasCurrentVersionBootstrapping());

doReturn(false).when(mockIngestionService).hasCurrentVersionBootstrapping();
assertFalse(ingestionBackend.hasCurrentVersionBootstrapping());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
Expand All @@ -31,11 +34,13 @@
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -269,4 +274,49 @@ public void testBackendCanMaintainMetadataCorrectlyForDroppingPartition() {
Assert.assertEquals(topicIngestionStatusMap.get(topic).getPartitionIngestionStatus(partition), NOT_EXIST);
}
}

@Test
public void testHasCurrentVersionBootstrapping() {
IsolatedIngestionBackend mockBackend = mock(IsolatedIngestionBackend.class);
MainIngestionMonitorService mockMonitorService = mock(MainIngestionMonitorService.class);

Map<String, MainTopicIngestionStatus> ingestionStatusMap = new HashMap<>();
MainTopicIngestionStatus store1V1IngestionStatus = new MainTopicIngestionStatus("store1_v1");
MainTopicIngestionStatus store1V2IngestionStatus = new MainTopicIngestionStatus("store1_v2");
MainTopicIngestionStatus store2V2IngestionStatus = new MainTopicIngestionStatus("store2_v2");
ingestionStatusMap.put("store1_v1", store1V1IngestionStatus);
ingestionStatusMap.put("store1_v2", store1V2IngestionStatus);
ingestionStatusMap.put("store2_v2", store2V2IngestionStatus);

doReturn(ingestionStatusMap).when(mockMonitorService).getTopicIngestionStatusMap();
store1V1IngestionStatus.setPartitionIngestionStatusToIsolatedIngestion(1);
store1V1IngestionStatus.setPartitionIngestionStatusToLocalIngestion(2);
store1V2IngestionStatus.setPartitionIngestionStatusToLocalIngestion(1);
store1V2IngestionStatus.setPartitionIngestionStatusToLocalIngestion(2);
store2V2IngestionStatus.setPartitionIngestionStatusToLocalIngestion(1);

Function<String, Integer> currentVersionSupplier = s -> {
if (s.equals("store1")) {
return 1;
}
if (s.equals("store2")) {
return 2;
}
return 3;
};
doReturn(currentVersionSupplier).when(mockBackend).getCurrentVersionSupplier();
doReturn(mockMonitorService).when(mockBackend).getMainIngestionMonitorService();

KafkaStoreIngestionService mockIngestionService = mock(KafkaStoreIngestionService.class);
doReturn(false).when(mockIngestionService).hasCurrentVersionBootstrapping();
doReturn(mockIngestionService).when(mockBackend).getStoreIngestionService();

doCallRealMethod().when(mockBackend).hasCurrentVersionBootstrapping();

assertTrue(mockBackend.hasCurrentVersionBootstrapping());

// Move current version ingestion to main process
store1V1IngestionStatus.setPartitionIngestionStatusToLocalIngestion(1);
assertFalse(mockBackend.hasCurrentVersionBootstrapping());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.linkedin.davinci.ingestion.main;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import org.testng.annotations.Test;


public class MainTopicIngestionStatusTest {
@Test
public void testHasPartitionIngestingInIsolatedProcess() {
String topicName = "test_store_v1";
MainTopicIngestionStatus ingestionStatus = new MainTopicIngestionStatus(topicName);
ingestionStatus.setPartitionIngestionStatusToLocalIngestion(1);
ingestionStatus.setPartitionIngestionStatusToLocalIngestion(2);
assertFalse(ingestionStatus.hasPartitionIngestingInIsolatedProcess());

ingestionStatus.setPartitionIngestionStatusToIsolatedIngestion(3);
assertTrue(ingestionStatus.hasPartitionIngestingInIsolatedProcess());

ingestionStatus.removePartitionIngestionStatus(3);
assertFalse(ingestionStatus.hasPartitionIngestingInIsolatedProcess());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS;
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_SPEEDUP_ENABLED;
import static com.linkedin.venice.ConfigKeys.KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND;
import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE;
import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_ENABLED;
import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_HEARTBEAT_INTERVAL_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_CONSUMER_POOL_SIZE_PER_KAFKA_CLUSTER;
import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE;
Expand Down Expand Up @@ -70,6 +72,7 @@
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.meta.IngestionMetadataUpdateType;
import com.linkedin.venice.meta.IngestionMode;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.partitioner.ConstantVenicePartitioner;
Expand Down Expand Up @@ -148,6 +151,8 @@ public void setUp() {
inputDirPath = "file://" + inputDir.getAbsolutePath();
Properties clusterConfig = new Properties();
clusterConfig.put(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, 1L);
clusterConfig.put(PUSH_STATUS_STORE_ENABLED, true);
clusterConfig.put(DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS, 3);
cluster = ServiceFactory.getVeniceCluster(1, 2, 1, 2, 100, false, false, clusterConfig);
d2Client = new D2ClientBuilder().setZkHosts(cluster.getZk().getAddress())
.setZkSessionTimeout(3, TimeUnit.SECONDS)
Expand Down Expand Up @@ -788,6 +793,58 @@ public void testHybridStore() throws Exception {
}
}

@Test(timeOut = TEST_TIMEOUT, dataProvider = "Isolated-Ingestion", dataProviderClass = DataProviderUtils.class)
public void testStatusReportDuringBoostrap(IngestionMode ingestionMode) throws Exception {
int keyCnt = 1000;
String storeName = createStoreWithMetaSystemStoreAndPushStatusSystemStore(keyCnt);
String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath();
Map<String, Object> extraBackendProp = new HashMap<>();
extraBackendProp.put(DATA_BASE_PATH, baseDataPath);
extraBackendProp.put(PUSH_STATUS_STORE_HEARTBEAT_INTERVAL_IN_SECONDS, "5");
extraBackendProp.put(KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND, "5");
extraBackendProp.put(PUSH_STATUS_STORE_ENABLED, "true");
DaVinciTestContext<Integer, Object> daVinciTestContext =
ServiceFactory.getGenericAvroDaVinciFactoryAndClientWithRetries(
d2Client,
new MetricsRepository(),
Optional.empty(),
cluster.getZk().getAddress(),
storeName,
new DaVinciConfig().setIsolated(ingestionMode.equals(IngestionMode.ISOLATED)),
extraBackendProp);
try (DaVinciClient<Integer, Object> client = daVinciTestContext.getDaVinciClient()) {
CompletableFuture<Void> subscribeFuture = client.subscribeAll();

/**
* Create a new version while bootstrapping.
*/
VersionCreationResponse newVersion = cluster.getNewVersion(storeName);
String topic = newVersion.getKafkaTopic();
VeniceWriterFactory vwFactory = IntegrationTestPushUtils
.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory);
VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(DEFAULT_KEY_SCHEMA);
VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(DEFAULT_VALUE_SCHEMA);
int valueSchemaId = HelixReadOnlySchemaRepository.VALUE_SCHEMA_STARTING_ID;

try (VeniceWriter<Object, Object, byte[]> batchProducer = vwFactory.createVeniceWriter(
new VeniceWriterOptions.Builder(topic).setKeySerializer(keySerializer)
.setValueSerializer(valueSerializer)
.build())) {
batchProducer.broadcastStartOfPush(Collections.emptyMap());
int keyCntForSecondVersion = 100;
Future[] writerFutures = new Future[keyCntForSecondVersion];
for (int i = 0; i < keyCntForSecondVersion; i++) {
writerFutures[i] = batchProducer.put(i, i, valueSchemaId);
}
for (int i = 0; i < keyCntForSecondVersion; i++) {
writerFutures[i].get();
}
batchProducer.broadcastEndOfPush(Collections.emptyMap());
}
subscribeFuture.get();
}
}

@Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class)
public void testBootstrap(DaVinciConfig daVinciConfig) throws Exception {
String storeName = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT);
Expand Down

0 comments on commit 57e2d9d

Please sign in to comment.