Skip to content

Commit

Permalink
[controller] Dont call backup version cleanup service on stores witho…
Browse files Browse the repository at this point in the history
…ut backup version (linkedin#963)

Since fetching the current version from all router and server is an expensive operation, we should not call backup version cleanup on stores without current version or backup version. Also add a keep-alive period of 1hr to the http client.



---------

Co-authored-by: Sourav Maji <[email protected]>
  • Loading branch information
majisourav99 and Sourav Maji authored Apr 25, 2024
1 parent d8c700e commit 1b277a6
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,22 @@
import io.tehuti.metrics.MetricsRepository;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.protocol.HttpContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -66,8 +67,6 @@ public class StoreBackupVersionCleanupService extends AbstractVeniceService {
private final Thread cleanupThread;
private final long sleepInterval;
private final long defaultBackupVersionRetentionMs;

private final Map<String, String> urlMap = new HashMap<>();
private final AtomicBoolean stop = new AtomicBoolean(false);

private final Map<String, StoreBackupVersionCleanupServiceStats> clusterNameCleanupStatsMap =
Expand All @@ -76,7 +75,7 @@ public class StoreBackupVersionCleanupService extends AbstractVeniceService {
private final MetricsRepository metricsRepository;

private final CloseableHttpAsyncClient httpAsyncClient;

private final long keepAliveDurationMs = TimeUnit.HOURS.toMillis(1);
private final Time time;

public StoreBackupVersionCleanupService(
Expand Down Expand Up @@ -107,6 +106,12 @@ protected StoreBackupVersionCleanupService(
this.httpAsyncClient = HttpAsyncClients.custom()
.setDefaultRequestConfig(RequestConfig.custom().setSocketTimeout(10000).build())
.setSSLContext(sslFactory.map(SSLFactory::getSSLContext).orElse(null))
.setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy() {
@Override
public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
return keepAliveDurationMs;
}
})
.build();
}

Expand Down Expand Up @@ -135,6 +140,9 @@ CloseableHttpAsyncClient getHttpAsyncClient() {
}

protected static boolean whetherStoreReadyToBeCleanup(Store store, long defaultBackupVersionRetentionMs, Time time) {
if (store.getCurrentVersion() == NON_EXISTING_VERSION || store.getVersions().size() < 2) {
return false;
}
long backupVersionRetentionMs = store.getBackupVersionRetentionMs();
if (backupVersionRetentionMs < 0) {
backupVersionRetentionMs = defaultBackupVersionRetentionMs;
Expand Down Expand Up @@ -230,13 +238,8 @@ protected boolean cleanupBackupVersion(Store store, String clusterName) {
}

List<Version> versions = store.getVersions();
List<Version> readyToBeRemovedVersions = new ArrayList<>();
int currentVersion = store.getCurrentVersion();

if (currentVersion == NON_EXISTING_VERSION) {
return false;
}

// Do not delete version unless all routers and all servers are on same current version
if (multiClusterConfig.getControllerConfig(clusterName).isBackupVersionMetadataFetchBasedCleanupEnabled()
&& (!validateAllRouterOnCurrentVersion(store, clusterName, currentVersion)
Expand All @@ -246,12 +249,9 @@ protected boolean cleanupBackupVersion(Store store, String clusterName) {
stats.recordBackupVersionMismatch();
return false;
}
List<Version> readyToBeRemovedVersions =
versions.stream().filter(v -> v.getNumber() < currentVersion).collect(Collectors.toList());

versions.forEach(v -> {
if (v.getNumber() < currentVersion) {
readyToBeRemovedVersions.add(v);
}
});
if (readyToBeRemovedVersions.isEmpty()) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,19 @@ private Store mockStore(

@Test
public void testWhetherStoreReadyToBeCleanup() {
Map<Integer, VersionStatus> versions = new HashMap<>();
versions.put(1, VersionStatus.ONLINE);
versions.put(2, VersionStatus.ONLINE);
long defaultBackupVersionRetentionMs = TimeUnit.DAYS.toMillis(7);
Store storeNotReadyForCleanupWithDefaultRetentionPolicy =
mockStore(-1, System.currentTimeMillis(), Collections.emptyMap(), -1);
Store storeNotReadyForCleanupWithDefaultRetentionPolicy = mockStore(-1, System.currentTimeMillis(), versions, -1);
Assert.assertFalse(
StoreBackupVersionCleanupService.whetherStoreReadyToBeCleanup(
storeNotReadyForCleanupWithDefaultRetentionPolicy,
defaultBackupVersionRetentionMs,
new SystemTime()));

Store storeReadyForCleanupWithDefaultRetentionPolicy =
mockStore(-1, System.currentTimeMillis() - 2 * defaultBackupVersionRetentionMs, Collections.emptyMap(), -1);
mockStore(-1, System.currentTimeMillis() - 2 * defaultBackupVersionRetentionMs, versions, -1);
Assert.assertTrue(
StoreBackupVersionCleanupService.whetherStoreReadyToBeCleanup(
storeReadyForCleanupWithDefaultRetentionPolicy,
Expand All @@ -106,18 +108,15 @@ public void testWhetherStoreReadyToBeCleanup() {

long storeBackupRetentionMs = TimeUnit.DAYS.toMillis(3);
Store storeNotReadyForCleanupWithSpecifiedRetentionPolicy =
mockStore(storeBackupRetentionMs, System.currentTimeMillis(), Collections.emptyMap(), -1);
mockStore(storeBackupRetentionMs, System.currentTimeMillis(), versions, -1);
Assert.assertFalse(
StoreBackupVersionCleanupService.whetherStoreReadyToBeCleanup(
storeNotReadyForCleanupWithSpecifiedRetentionPolicy,
defaultBackupVersionRetentionMs,
new SystemTime()));

Store storeReadyForCleanupWithSpecifiedRetentionPolicy = mockStore(
storeBackupRetentionMs,
System.currentTimeMillis() - 2 * storeBackupRetentionMs,
Collections.emptyMap(),
-1);
Store storeReadyForCleanupWithSpecifiedRetentionPolicy =
mockStore(storeBackupRetentionMs, System.currentTimeMillis() - 2 * storeBackupRetentionMs, versions, -1);
Assert.assertTrue(
StoreBackupVersionCleanupService.whetherStoreReadyToBeCleanup(
storeReadyForCleanupWithSpecifiedRetentionPolicy,
Expand All @@ -126,26 +125,23 @@ public void testWhetherStoreReadyToBeCleanup() {

long storeBackupRetentionMsZero = 0;
Store storeNotReadyForCleanupWithZeroRetentionPolicy1 =
mockStore(storeBackupRetentionMsZero, System.currentTimeMillis(), Collections.emptyMap(), -1);
mockStore(storeBackupRetentionMsZero, System.currentTimeMillis(), versions, -1);
Assert.assertFalse(
StoreBackupVersionCleanupService.whetherStoreReadyToBeCleanup(
storeNotReadyForCleanupWithZeroRetentionPolicy1,
defaultBackupVersionRetentionMs,
new SystemTime()));

Store storeNotReadyForCleanupWithZeroRetentionPolicy2 =
mockStore(storeBackupRetentionMsZero, System.currentTimeMillis() - 10, Collections.emptyMap(), -1);
mockStore(storeBackupRetentionMsZero, System.currentTimeMillis() - 10, versions, -1);
Assert.assertFalse(
StoreBackupVersionCleanupService.whetherStoreReadyToBeCleanup(
storeNotReadyForCleanupWithZeroRetentionPolicy2,
defaultBackupVersionRetentionMs,
new SystemTime()));

Store storeReadyForCleanupWithZeroRetentionPolicy = mockStore(
storeBackupRetentionMsZero,
System.currentTimeMillis() - 2 * storeBackupRetentionMs,
Collections.emptyMap(),
-1);
Store storeReadyForCleanupWithZeroRetentionPolicy =
mockStore(storeBackupRetentionMsZero, System.currentTimeMillis() - 2 * storeBackupRetentionMs, versions, -1);
Assert.assertTrue(
StoreBackupVersionCleanupService.whetherStoreReadyToBeCleanup(
storeReadyForCleanupWithZeroRetentionPolicy,
Expand Down

0 comments on commit 1b277a6

Please sign in to comment.