Skip to content

Commit

Permalink
[Controller] Implement dual read for Push Status System Store (linked…
Browse files Browse the repository at this point in the history
…in#909)

With changes in this PR, controller will try to compose a version level
key and use it to read from push status system store when checking batch
push status on DaVinci side:
{
  keyStrings: {10} // Version 10
  messageType: 1 // 1: Full Push
}

If controller gets null response for the new key, it will assume that
DaVinci is not upgraded yet, and fall back to partition level keys:
{
  keyStrings: {10, 0} // Version 10, partition 0
  messageType: 1 // 1: Full Push
}

Roll out steps in order:
1. After the controller changes are rolled out to production and stabilize,
   deprecate all previous backend releases.
2. Release DaVinci changes to start producing a version level batch updates;
   stabilize the client release and deprecate all previous client releases.
3. Clean up tech debts in controller - remove all partition level keys related
   logic.

Other changes:
Previously the entire instance map would be logged whenever a
push status query returns, the log is removed in this PR, because the
map is too big and would flood the log with thousands of instance names.
  • Loading branch information
huangminchn authored Apr 9, 2024
1 parent 3269186 commit 99ce218
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ protected void recordLags(
}

protected void record() {

recordLags(
leaderHeartbeatTimeStamps,
((storeName, version, region, heartbeatTs) -> versionStatsReporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.venice.pushstatus.PushStatusKey;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;


Expand All @@ -25,6 +26,10 @@ public static PushStatusKey getHeartbeatKey(String instanceName) {
return pushStatusKey;
}

public static PushStatusKey getPushKey(int version) {
return getFullPushKey(version);
}

public static PushStatusKey getPushKey(int version, int partitionId, Optional<String> incrementalPushVersion) {
return getPushKey(version, partitionId, incrementalPushVersion, Optional.empty());
}
Expand All @@ -47,14 +52,21 @@ public static PushStatusKey getPushKey(
return getFullPushKey(version, partitionId);
}

public static PushStatusKey getFullPushKey(int version, int partitionId) {
private static PushStatusKey getFullPushKey(int version) {
PushStatusKey pushStatusKey = new PushStatusKey();
pushStatusKey.keyStrings = Collections.singletonList(version);
pushStatusKey.messageType = PushStatusKeyType.FULL_PUSH.ordinal();
return pushStatusKey;
}

private static PushStatusKey getFullPushKey(int version, int partitionId) {
PushStatusKey pushStatusKey = new PushStatusKey();
pushStatusKey.keyStrings = Arrays.asList(version, partitionId);
pushStatusKey.messageType = PushStatusKeyType.FULL_PUSH.ordinal();
return pushStatusKey;
}

public static PushStatusKey getIncrementalPushKey(int version, int partitionId, String incrementalPushVersion) {
private static PushStatusKey getIncrementalPushKey(int version, int partitionId, String incrementalPushVersion) {
PushStatusKey pushStatusKey = new PushStatusKey();
pushStatusKey.keyStrings = Arrays.asList(version, partitionId, incrementalPushVersion);
pushStatusKey.messageType = PushStatusKeyType.INCREMENTAL_PUSH.ordinal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -53,6 +54,24 @@ public PushStatusStoreReader(
this.heartbeatExpirationTimeInSeconds = heartbeatExpirationTimeInSeconds;
}

public Map<CharSequence, Integer> getVersionStatus(String storeName, int version) {
AvroSpecificStoreClient<PushStatusKey, PushStatusValue> client = getVeniceClient(storeName);
PushStatusKey pushStatusKey = PushStatusStoreUtils.getPushKey(version);
try {
PushStatusValue pushStatusValue = client.get(pushStatusKey).get(60, TimeUnit.SECONDS);
if (pushStatusValue == null) {
// Don't return empty map yet, because caller cannot differentiate between DaVinci not migrated to new mode and
// DaVinci writing empty map.
return null;
} else {
return pushStatusValue.instances;
}
} catch (Exception e) {
LOGGER.error("Failed to read push status of store:{} version:{}", storeName, version, e);
throw new VeniceException(e);
}
}

public Map<CharSequence, Integer> getPartitionStatus(
String storeName,
int version,
Expand All @@ -71,7 +90,7 @@ public Map<CharSequence, Integer> getPartitionStatus(
PushStatusKey pushStatusKey =
PushStatusStoreUtils.getPushKey(version, partitionId, incrementalPushVersion, incrementalPushPrefix);
try {
PushStatusValue pushStatusValue = client.get(pushStatusKey).get();
PushStatusValue pushStatusValue = client.get(pushStatusKey).get(60, TimeUnit.SECONDS);
if (pushStatusValue == null) {
return Collections.emptyMap();
} else {
Expand Down Expand Up @@ -157,14 +176,14 @@ public Map<Integer, Map<CharSequence, Integer>> getPartitionStatuses(
completableFutures.add(completableFuture);
}
for (CompletableFuture<Map<PushStatusKey, PushStatusValue>> completableFuture: completableFutures) {
Map<PushStatusKey, PushStatusValue> statuses = completableFuture.get();
Map<PushStatusKey, PushStatusValue> statuses = completableFuture.get(60, TimeUnit.SECONDS);
if (statuses == null) {
LOGGER.warn("Failed to get incremental push status of some partitions. BatchGet returned null.");
throw new VeniceException("Failed to get incremental push status of some partitions");
}
pushStatusMap.putAll(statuses);
}
} catch (InterruptedException | ExecutionException | VeniceClientException e) {
} catch (InterruptedException | ExecutionException | VeniceClientException | TimeoutException e) {
LOGGER.error(
"Failed to get statuses of partitions. store:{}, storeVersion:{} incrementalPushVersion:{} "
+ "partitionIds:{} Exception:{}",
Expand Down Expand Up @@ -217,8 +236,8 @@ public Map<CharSequence, Integer> getSupposedlyOngoingIncrementalPushVersions(St
PushStatusKey pushStatusKey = PushStatusStoreUtils.getOngoingIncrementalPushStatusesKey(storeVersion);
PushStatusValue pushStatusValue;
try {
pushStatusValue = storeClient.get(pushStatusKey).get();
} catch (InterruptedException | ExecutionException | VeniceException e) {
pushStatusValue = storeClient.get(pushStatusKey).get(60, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | VeniceException | TimeoutException e) {
LOGGER.error("Failed to get ongoing incremental pushes for store:{}.", storeName, e);
throw new VeniceException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.linkedin.venice.common.PushStatusStoreUtils.SERVER_INCREMENTAL_PUSH_PREFIX;
import static com.linkedin.venice.common.PushStatusStoreUtils.getServerIncrementalPushKey;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anySet;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -31,6 +32,9 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -75,7 +79,7 @@ private Map<PushStatusKey, PushStatusValue> getPushStatusInstanceData(

@Test(description = "Expect empty results when push status info is not available for any of the partition")
public void testGetPartitionStatusesWhenPushStatusesAreNotAvailable()
throws ExecutionException, InterruptedException {
throws ExecutionException, InterruptedException, TimeoutException {
Map<PushStatusKey, PushStatusValue> pushStatusMap =
getPushStatusInstanceData(storeVersion, incPushVersion, partitionCount, replicationFactor);

Expand All @@ -86,7 +90,7 @@ public void testGetPartitionStatusesWhenPushStatusesAreNotAvailable()
doReturn(storeClientMock).when(storeReaderSpy).getVeniceClient(any());
when(storeClientMock.batchGet(pushStatusMap.keySet())).thenReturn(completableFutureMock);
// simulate store client returns null for given keys
when(completableFutureMock.get()).thenReturn(Collections.emptyMap());
when(completableFutureMock.get(anyLong(), any())).thenReturn(Collections.emptyMap());

Map<Integer, Map<CharSequence, Integer>> result =
storeReaderSpy.getPartitionStatuses(storeName, storeVersion, incPushVersion, partitionCount);
Expand All @@ -98,7 +102,7 @@ public void testGetPartitionStatusesWhenPushStatusesAreNotAvailable()

@Test(expectedExceptions = VeniceException.class, description = "Expect exception when result when push status read fails for some partitions")
public void testGetPartitionStatusesWhenPushStatusReadFailsForSomePartitions()
throws ExecutionException, InterruptedException {
throws ExecutionException, InterruptedException, TimeoutException {
Map<PushStatusKey, PushStatusValue> pushStatusMap =
getPushStatusInstanceData(storeVersion, incPushVersion, partitionCount, replicationFactor);

Expand All @@ -109,15 +113,16 @@ public void testGetPartitionStatusesWhenPushStatusReadFailsForSomePartitions()
doReturn(storeClientMock).when(storeReaderSpy).getVeniceClient(any());
when(storeClientMock.batchGet(pushStatusMap.keySet())).thenReturn(completableFutureMock);
// simulate store client returns null for given keys
when(completableFutureMock.get()).thenReturn(null);
when(completableFutureMock.get(anyLong(), any())).thenReturn(null);

Map<Integer, Map<CharSequence, Integer>> result =
storeReaderSpy.getPartitionStatuses(storeName, storeVersion, incPushVersion, partitionCount);
assertEqualsDeep(result, Collections.emptyMap());
}

@Test(expectedExceptions = VeniceException.class, description = "Expect an exception when push status store client throws an exception")
public void testGetPartitionStatusesWhenStoreClientThrowsException() throws ExecutionException, InterruptedException {
public void testGetPartitionStatusesWhenStoreClientThrowsException()
throws ExecutionException, InterruptedException, TimeoutException {
Map<PushStatusKey, PushStatusValue> pushStatusMap =
getPushStatusInstanceData(storeVersion, incPushVersion, partitionCount, replicationFactor);

Expand All @@ -128,14 +133,15 @@ public void testGetPartitionStatusesWhenStoreClientThrowsException() throws Exec
doReturn(storeClientMock).when(storeReaderSpy).getVeniceClient(any());
when(storeClientMock.batchGet(pushStatusMap.keySet())).thenReturn(completableFutureMock);
// simulate store client returns an exception when fetching status info for given keys
when(completableFutureMock.get()).thenThrow(new ExecutionException(new Throwable("Mock execution exception")));
when(completableFutureMock.get(anyLong(), any()))
.thenThrow(new ExecutionException(new Throwable("Mock execution exception")));

storeReaderSpy.getPartitionStatuses(storeName, storeVersion, incPushVersion, partitionCount);
}

@Test(description = "Expect statuses of all replicas when store returns all replica statuses")
public void testGetPartitionStatusesWhenStoreReturnStatusesOfAllReplicas()
throws ExecutionException, InterruptedException {
throws ExecutionException, InterruptedException, TimeoutException {
Map<PushStatusKey, PushStatusValue> pushStatusMap =
getPushStatusInstanceData(storeVersion, incPushVersion, partitionCount, replicationFactor);

Expand All @@ -145,7 +151,7 @@ public void testGetPartitionStatusesWhenStoreReturnStatusesOfAllReplicas()

doReturn(storeClientMock).when(storeReaderSpy).getVeniceClient(any());
when(storeClientMock.batchGet(pushStatusMap.keySet())).thenReturn(completableFutureMock);
when(completableFutureMock.get()).thenReturn(pushStatusMap);
when(completableFutureMock.get(anyLong(), any())).thenReturn(pushStatusMap);

Map<Integer, Map<CharSequence, Integer>> result =
storeReaderSpy.getPartitionStatuses(storeName, storeVersion, incPushVersion, partitionCount);
Expand All @@ -158,7 +164,8 @@ public void testGetPartitionStatusesWhenStoreReturnStatusesOfAllReplicas()
}

@Test(description = "Expect empty status when statuses for replicas of a partition is missing")
public void testGetPartitionStatusesWhenStatusOfPartitionIsMissing() throws ExecutionException, InterruptedException {
public void testGetPartitionStatusesWhenStatusOfPartitionIsMissing()
throws ExecutionException, InterruptedException, TimeoutException {
Map<PushStatusKey, PushStatusValue> pushStatusMap =
getPushStatusInstanceData(storeVersion, incPushVersion, partitionCount, replicationFactor);
// erase status of partitionId 0
Expand All @@ -173,7 +180,7 @@ public void testGetPartitionStatusesWhenStatusOfPartitionIsMissing() throws Exec

doReturn(storeClientMock).when(storeReaderSpy).getVeniceClient(any());
when(storeClientMock.batchGet(pushStatusMap.keySet())).thenReturn(completableFutureMock);
when(completableFutureMock.get()).thenReturn(pushStatusMap);
when(completableFutureMock.get(anyLong(), any())).thenReturn(pushStatusMap);

Map<Integer, Map<CharSequence, Integer>> result =
storeReaderSpy.getPartitionStatuses(storeName, storeVersion, incPushVersion, partitionCount);
Expand All @@ -189,7 +196,7 @@ public void testGetPartitionStatusesWhenStatusOfPartitionIsMissing() throws Exec

@Test(description = "Expect empty status when instance info for replicas of a partition is missing")
public void testGetPartitionStatusesWhenInstanceInfoOfPartitionIsMissing()
throws ExecutionException, InterruptedException {
throws ExecutionException, InterruptedException, TimeoutException {
Map<PushStatusKey, PushStatusValue> pushStatusMap =
getPushStatusInstanceData(storeVersion, incPushVersion, partitionCount, replicationFactor);
// set empty status for partitionId 0
Expand All @@ -204,7 +211,7 @@ public void testGetPartitionStatusesWhenInstanceInfoOfPartitionIsMissing()

doReturn(storeClientMock).when(storeReaderSpy).getVeniceClient(any());
when(storeClientMock.batchGet(pushStatusMap.keySet())).thenReturn(completableFutureMock);
when(completableFutureMock.get()).thenReturn(pushStatusMap);
when(completableFutureMock.get(anyLong(), any())).thenReturn(pushStatusMap);

Map<Integer, Map<CharSequence, Integer>> result =
storeReaderSpy.getPartitionStatuses(storeName, storeVersion, incPushVersion, partitionCount);
Expand All @@ -221,7 +228,7 @@ public void testGetPartitionStatusesWhenInstanceInfoOfPartitionIsMissing()

@Test(description = "Expect all statuses even when number of partitions are greater than the batchGetLimit")
public void testGetPartitionStatusesWhenNumberOfPartitionsAreGreaterThanBatchGetLimit()
throws ExecutionException, InterruptedException {
throws ExecutionException, InterruptedException, TimeoutException {
int partitionCount = 1055;
int batchGetLimit = 256;

Expand All @@ -245,7 +252,7 @@ public void testGetPartitionStatusesWhenNumberOfPartitionsAreGreaterThanBatchGet
keySets.add(statuses.keySet());
CompletableFuture<Map<PushStatusKey, PushStatusValue>> completableFutureMock = mock(CompletableFuture.class);
when(storeClientMock.batchGet(eq(statuses.keySet()))).thenReturn(completableFutureMock);
when(completableFutureMock.get()).thenReturn(statuses);
when(completableFutureMock.get(anyLong(), any())).thenReturn(statuses);
}

Map<Integer, Map<CharSequence, Integer>> result =
Expand Down Expand Up @@ -280,26 +287,26 @@ public void testGetSupposedlyOngoingIncrementalPushVersionsWithClientException()

@Test(description = "Expect an empty result when key-value for ongoing incremental pushes doesn't exist")
public void testGetSupposedlyOngoingIncrementalPushVersionsWhenIncPushVersionsDoesNotExist()
throws ExecutionException, InterruptedException {
throws ExecutionException, InterruptedException, TimeoutException {
PushStatusKey pushStatusKey = PushStatusStoreUtils.getOngoingIncrementalPushStatusesKey(storeVersion);
PushStatusStoreReader storeReaderSpy =
spy(new PushStatusStoreReader(d2ClientMock, CLUSTER_DISCOVERY_D2_SERVICE_NAME, 10));
CompletableFuture<PushStatusValue> completableFutureMock = mock(CompletableFuture.class);

doReturn(storeClientMock).when(storeReaderSpy).getVeniceClient(any());
when(storeClientMock.get(pushStatusKey)).thenReturn(completableFutureMock);
when(completableFutureMock.get()).thenReturn(null);
when(completableFutureMock.get(anyLong(), any())).thenReturn(null);

assertEqualsDeep(
storeReaderSpy.getSupposedlyOngoingIncrementalPushVersions(storeName, storeVersion),
Collections.emptyMap());
verify(completableFutureMock).get();
verify(completableFutureMock).get(60, TimeUnit.SECONDS);
verify(storeClientMock).get(pushStatusKey);
}

@Test(description = "Expect an empty result when inc push versions are missing in the returned result")
public void testGetSupposedlyOngoingIncrementalPushVersionsWhenIncPushVersionsAreMissing()
throws ExecutionException, InterruptedException {
throws ExecutionException, InterruptedException, TimeoutException {
PushStatusKey pushStatusKey = PushStatusStoreUtils.getOngoingIncrementalPushStatusesKey(storeVersion);
PushStatusValue pushStatusValue = new PushStatusValue();
pushStatusValue.instances = null; // to make intentions clear explicitly setting it to null
Expand All @@ -309,18 +316,18 @@ public void testGetSupposedlyOngoingIncrementalPushVersionsWhenIncPushVersionsAr

doReturn(storeClientMock).when(storeReaderSpy).getVeniceClient(any());
when(storeClientMock.get(pushStatusKey)).thenReturn(completableFutureMock);
when(completableFutureMock.get()).thenReturn(pushStatusValue);
when(completableFutureMock.get(anyLong(), any())).thenReturn(pushStatusValue);

assertEqualsDeep(
storeReaderSpy.getSupposedlyOngoingIncrementalPushVersions(storeName, storeVersion),
Collections.emptyMap());
verify(completableFutureMock).get();
verify(completableFutureMock).get(60, TimeUnit.SECONDS);
verify(storeClientMock).get(pushStatusKey);
}

@Test(description = "Expect all inc push versions when inc push versions are found in push status store")
public void testGetSupposedlyOngoingIncrementalPushVersionsWhenIncPushVersionsAreAvailable()
throws ExecutionException, InterruptedException {
throws ExecutionException, InterruptedException, TimeoutException {
PushStatusKey pushStatusKey = PushStatusStoreUtils.getOngoingIncrementalPushStatusesKey(storeVersion);
PushStatusValue pushStatusValue = new PushStatusValue();
pushStatusValue.instances = new HashMap<>();
Expand All @@ -333,12 +340,29 @@ public void testGetSupposedlyOngoingIncrementalPushVersionsWhenIncPushVersionsAr

doReturn(storeClientMock).when(storeReaderSpy).getVeniceClient(any());
when(storeClientMock.get(pushStatusKey)).thenReturn(completableFutureMock);
when(completableFutureMock.get()).thenReturn(pushStatusValue);
when(completableFutureMock.get(anyLong(), any())).thenReturn(pushStatusValue);

assertEqualsDeep(
storeReaderSpy.getSupposedlyOngoingIncrementalPushVersions(storeName, storeVersion),
pushStatusValue.instances);
verify(completableFutureMock).get();
verify(completableFutureMock).get(60, TimeUnit.SECONDS);
verify(storeClientMock).get(pushStatusKey);
}

@Test
public void testNullResponseWhenVersionLevelKeyIsNotWritten()
throws ExecutionException, InterruptedException, TimeoutException {
PushStatusKey pushStatusKey = PushStatusStoreUtils.getPushKey(storeVersion);
PushStatusStoreReader storeReaderSpy =
spy(new PushStatusStoreReader(d2ClientMock, CLUSTER_DISCOVERY_D2_SERVICE_NAME, 10));

doReturn(storeClientMock).when(storeReaderSpy).getVeniceClient(any());
CompletableFuture<PushStatusValue> completableFutureMock = mock(CompletableFuture.class);
when(storeClientMock.get(pushStatusKey)).thenReturn(completableFutureMock);
// simulate store client returns null for given keys
when(completableFutureMock.get(anyLong(), any())).thenReturn(null);

// Test that push status store reader will also return null instead of empty map in this case
Assert.assertNull(storeReaderSpy.getVersionStatus(storeName, storeVersion));
}
}
Loading

0 comments on commit 99ce218

Please sign in to comment.