diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreBackupVersionCleanupService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreBackupVersionCleanupService.java index 78c27bf3e6..1b41f927cc 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreBackupVersionCleanupService.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreBackupVersionCleanupService.java @@ -20,21 +20,22 @@ import io.tehuti.metrics.MetricsRepository; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClients; +import org.apache.http.protocol.HttpContext; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -66,8 +67,6 @@ public class StoreBackupVersionCleanupService extends AbstractVeniceService { private final Thread cleanupThread; private final long sleepInterval; private final long defaultBackupVersionRetentionMs; - - private final Map urlMap = new HashMap<>(); private final AtomicBoolean stop = new AtomicBoolean(false); private final Map clusterNameCleanupStatsMap = @@ -76,7 +75,7 @@ public class StoreBackupVersionCleanupService extends AbstractVeniceService { private final MetricsRepository metricsRepository; private final CloseableHttpAsyncClient httpAsyncClient; - + private final long keepAliveDurationMs = TimeUnit.HOURS.toMillis(1); private final Time time; public StoreBackupVersionCleanupService( @@ -107,6 +106,12 @@ protected StoreBackupVersionCleanupService( this.httpAsyncClient = HttpAsyncClients.custom() .setDefaultRequestConfig(RequestConfig.custom().setSocketTimeout(10000).build()) .setSSLContext(sslFactory.map(SSLFactory::getSSLContext).orElse(null)) + .setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy() { + @Override + public long getKeepAliveDuration(HttpResponse response, HttpContext context) { + return keepAliveDurationMs; + } + }) .build(); } @@ -135,6 +140,9 @@ CloseableHttpAsyncClient getHttpAsyncClient() { } protected static boolean whetherStoreReadyToBeCleanup(Store store, long defaultBackupVersionRetentionMs, Time time) { + if (store.getCurrentVersion() == NON_EXISTING_VERSION || store.getVersions().size() < 2) { + return false; + } long backupVersionRetentionMs = store.getBackupVersionRetentionMs(); if (backupVersionRetentionMs < 0) { backupVersionRetentionMs = defaultBackupVersionRetentionMs; @@ -230,13 +238,8 @@ protected boolean cleanupBackupVersion(Store store, String clusterName) { } List versions = store.getVersions(); - List readyToBeRemovedVersions = new ArrayList<>(); int currentVersion = store.getCurrentVersion(); - if (currentVersion == NON_EXISTING_VERSION) { - return false; - } - // Do not delete version unless all routers and all servers are on same current version if (multiClusterConfig.getControllerConfig(clusterName).isBackupVersionMetadataFetchBasedCleanupEnabled() && (!validateAllRouterOnCurrentVersion(store, clusterName, currentVersion) @@ -246,12 +249,9 @@ protected boolean cleanupBackupVersion(Store store, String clusterName) { stats.recordBackupVersionMismatch(); return false; } + List readyToBeRemovedVersions = + versions.stream().filter(v -> v.getNumber() < currentVersion).collect(Collectors.toList()); - versions.forEach(v -> { - if (v.getNumber() < currentVersion) { - readyToBeRemovedVersions.add(v); - } - }); if (readyToBeRemovedVersions.isEmpty()) { return false; } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java index ee1fdd89d2..50510f044f 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java @@ -87,9 +87,11 @@ private Store mockStore( @Test public void testWhetherStoreReadyToBeCleanup() { + Map versions = new HashMap<>(); + versions.put(1, VersionStatus.ONLINE); + versions.put(2, VersionStatus.ONLINE); long defaultBackupVersionRetentionMs = TimeUnit.DAYS.toMillis(7); - Store storeNotReadyForCleanupWithDefaultRetentionPolicy = - mockStore(-1, System.currentTimeMillis(), Collections.emptyMap(), -1); + Store storeNotReadyForCleanupWithDefaultRetentionPolicy = mockStore(-1, System.currentTimeMillis(), versions, -1); Assert.assertFalse( StoreBackupVersionCleanupService.whetherStoreReadyToBeCleanup( storeNotReadyForCleanupWithDefaultRetentionPolicy, @@ -97,7 +99,7 @@ public void testWhetherStoreReadyToBeCleanup() { new SystemTime())); Store storeReadyForCleanupWithDefaultRetentionPolicy = - mockStore(-1, System.currentTimeMillis() - 2 * defaultBackupVersionRetentionMs, Collections.emptyMap(), -1); + mockStore(-1, System.currentTimeMillis() - 2 * defaultBackupVersionRetentionMs, versions, -1); Assert.assertTrue( StoreBackupVersionCleanupService.whetherStoreReadyToBeCleanup( storeReadyForCleanupWithDefaultRetentionPolicy, @@ -106,18 +108,15 @@ public void testWhetherStoreReadyToBeCleanup() { long storeBackupRetentionMs = TimeUnit.DAYS.toMillis(3); Store storeNotReadyForCleanupWithSpecifiedRetentionPolicy = - mockStore(storeBackupRetentionMs, System.currentTimeMillis(), Collections.emptyMap(), -1); + mockStore(storeBackupRetentionMs, System.currentTimeMillis(), versions, -1); Assert.assertFalse( StoreBackupVersionCleanupService.whetherStoreReadyToBeCleanup( storeNotReadyForCleanupWithSpecifiedRetentionPolicy, defaultBackupVersionRetentionMs, new SystemTime())); - Store storeReadyForCleanupWithSpecifiedRetentionPolicy = mockStore( - storeBackupRetentionMs, - System.currentTimeMillis() - 2 * storeBackupRetentionMs, - Collections.emptyMap(), - -1); + Store storeReadyForCleanupWithSpecifiedRetentionPolicy = + mockStore(storeBackupRetentionMs, System.currentTimeMillis() - 2 * storeBackupRetentionMs, versions, -1); Assert.assertTrue( StoreBackupVersionCleanupService.whetherStoreReadyToBeCleanup( storeReadyForCleanupWithSpecifiedRetentionPolicy, @@ -126,7 +125,7 @@ public void testWhetherStoreReadyToBeCleanup() { long storeBackupRetentionMsZero = 0; Store storeNotReadyForCleanupWithZeroRetentionPolicy1 = - mockStore(storeBackupRetentionMsZero, System.currentTimeMillis(), Collections.emptyMap(), -1); + mockStore(storeBackupRetentionMsZero, System.currentTimeMillis(), versions, -1); Assert.assertFalse( StoreBackupVersionCleanupService.whetherStoreReadyToBeCleanup( storeNotReadyForCleanupWithZeroRetentionPolicy1, @@ -134,18 +133,15 @@ public void testWhetherStoreReadyToBeCleanup() { new SystemTime())); Store storeNotReadyForCleanupWithZeroRetentionPolicy2 = - mockStore(storeBackupRetentionMsZero, System.currentTimeMillis() - 10, Collections.emptyMap(), -1); + mockStore(storeBackupRetentionMsZero, System.currentTimeMillis() - 10, versions, -1); Assert.assertFalse( StoreBackupVersionCleanupService.whetherStoreReadyToBeCleanup( storeNotReadyForCleanupWithZeroRetentionPolicy2, defaultBackupVersionRetentionMs, new SystemTime())); - Store storeReadyForCleanupWithZeroRetentionPolicy = mockStore( - storeBackupRetentionMsZero, - System.currentTimeMillis() - 2 * storeBackupRetentionMs, - Collections.emptyMap(), - -1); + Store storeReadyForCleanupWithZeroRetentionPolicy = + mockStore(storeBackupRetentionMsZero, System.currentTimeMillis() - 2 * storeBackupRetentionMs, versions, -1); Assert.assertTrue( StoreBackupVersionCleanupService.whetherStoreReadyToBeCleanup( storeReadyForCleanupWithZeroRetentionPolicy,