diff --git a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html index bafad2db137f8..b4c1a749fc8bf 100644 --- a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html +++ b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html @@ -38,6 +38,18 @@ Boolean If true, RocksDB will use block-based filter instead of full filter, this only take effect when bloom filter is used. The default value is 'false'. + +
state.backend.rocksdb.compaction.filter.periodic-compaction-time
+ 30 d + Duration + Periodic compaction could speed up expired state entries cleanup, especially for state entries rarely accessed. Files older than this value will be picked up for compaction, and re-written to the same level as they were before. It makes sure a file goes through compaction filters periodically. 0 means turning off periodic compaction.The default value is '30days'. + + +
state.backend.rocksdb.compaction.filter.query-time-after-num-entries
+ 1000 + Long + Number of state entries to process by compaction filter before updating current timestamp. Updating the timestamp more often can improve cleanup speed, but it decreases compaction performance because it uses JNI calls from native code.The default value is '1000L'. +
state.backend.rocksdb.compaction.level.max-size-level-base
256 mb diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java index d8cc99a573594..c7009bf7c5ede 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java @@ -32,7 +32,6 @@ import static org.apache.flink.api.common.state.StateTtlConfig.CleanupStrategies.EMPTY_STRATEGY; import static org.apache.flink.api.common.state.StateTtlConfig.IncrementalCleanupStrategy.DEFAULT_INCREMENTAL_CLEANUP_STRATEGY; -import static org.apache.flink.api.common.state.StateTtlConfig.RocksdbCompactFilterCleanupStrategy.DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY; import static org.apache.flink.api.common.state.StateTtlConfig.StateVisibility.NeverReturnExpired; import static org.apache.flink.api.common.state.StateTtlConfig.TtlTimeCharacteristic.ProcessingTime; import static org.apache.flink.api.common.state.StateTtlConfig.UpdateType.OnCreateAndWrite; @@ -444,15 +443,13 @@ public IncrementalCleanupStrategy getIncrementalCleanupStrategy() { } public boolean inRocksdbCompactFilter() { - return getRocksdbCompactFilterCleanupStrategy() != null; + return isCleanupInBackground || getRocksdbCompactFilterCleanupStrategy() != null; } @Nullable public RocksdbCompactFilterCleanupStrategy getRocksdbCompactFilterCleanupStrategy() { - RocksdbCompactFilterCleanupStrategy defaultStrategy = - isCleanupInBackground ? DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY : null; return (RocksdbCompactFilterCleanupStrategy) - strategies.getOrDefault(Strategies.ROCKSDB_COMPACTION_FILTER, defaultStrategy); + strategies.get(Strategies.ROCKSDB_COMPACTION_FILTER); } } @@ -494,11 +491,18 @@ public static class RocksdbCompactFilterCleanupStrategy private static final long serialVersionUID = 3109278796506988980L; /** - * Default value is 30 days so that every file goes through the compaction process at least - * once every 30 days if not compacted sooner. + * @deprecated Use {@link + * org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions#COMPACT_FILTER_PERIODIC_COMPACTION_TIME} + * instead. */ - static final Duration DEFAULT_PERIODIC_COMPACTION_TIME = Duration.ofDays(30); + @Deprecated static final Duration DEFAULT_PERIODIC_COMPACTION_TIME = Duration.ofDays(30); + /** + * @deprecated Use {@link + * org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions#COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES} + * instead. + */ + @Deprecated static final RocksdbCompactFilterCleanupStrategy DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY = new RocksdbCompactFilterCleanupStrategy(1000L); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/StateTtlConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/StateTtlConfigTest.java index 3762abc16c848..8a473b74eaa9d 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/StateTtlConfigTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/StateTtlConfigTest.java @@ -24,7 +24,6 @@ import org.junit.jupiter.api.Test; -import java.time.Duration; import java.util.Arrays; import java.util.List; @@ -67,13 +66,10 @@ void testStateTtlConfigBuildWithCleanupInBackground() { assertThat(cleanupStrategies.isCleanupInBackground()).isTrue(); assertThat(incrementalCleanupStrategy).isNotNull(); - assertThat(rocksdbCleanupStrategy).isNotNull(); + assertThat(rocksdbCleanupStrategy).isNull(); assertThat(cleanupStrategies.inRocksdbCompactFilter()).isTrue(); assertThat(incrementalCleanupStrategy.getCleanupSize()).isEqualTo(5); assertThat(incrementalCleanupStrategy.runCleanupForEveryRecord()).isFalse(); - assertThat(rocksdbCleanupStrategy.getQueryTimeAfterNumEntries()).isEqualTo(1000L); - assertThat(rocksdbCleanupStrategy.getPeriodicCompactionTime()) - .isEqualTo(Duration.ofDays(30)); } @Test diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java index 19c424ed74f34..09849942f6ae6 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java @@ -30,6 +30,7 @@ import java.io.File; import java.io.Serializable; +import java.time.Duration; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -333,6 +334,25 @@ public class RocksDBConfigurableOptions implements Serializable { .withDescription( "If true, during rescaling, the deleteFilesInRange API will be invoked to clean up the useless files so that local disk space can be reclaimed more promptly."); + public static final ConfigOption COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES = + key("state.backend.rocksdb.compaction.filter.query-time-after-num-entries") + .longType() + .defaultValue(1000L) + .withDescription( + "Number of state entries to process by compaction filter before updating current timestamp. " + + "Updating the timestamp more often can improve cleanup speed, " + + "but it decreases compaction performance because it uses JNI calls from native code.The default value is '1000L'."); + + public static final ConfigOption COMPACT_FILTER_PERIODIC_COMPACTION_TIME = + key("state.backend.rocksdb.compaction.filter.periodic-compaction-time") + .durationType() + .defaultValue(Duration.ofDays(30)) + .withDescription( + "Periodic compaction could speed up expired state entries cleanup, especially for state" + + " entries rarely accessed. Files older than this value will be picked up for compaction," + + " and re-written to the same level as they were before. It makes sure a file goes through" + + " compaction filters periodically. 0 means turning off periodic compaction.The default value is '30days'."); + static final ConfigOption[] CANDIDATE_CONFIGS = new ConfigOption[] { // configurable DBOptions diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 9902889b20a86..948c1de27438d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -338,8 +338,12 @@ public RocksDBKeyedStateBackend build() throws BackendBuildingException { RocksDB db = null; RocksDBRestoreOperation restoreOperation = null; CompletableFuture asyncCompactAfterRestoreFuture = null; + RocksDbTtlCompactFiltersManager ttlCompactFiltersManager = - new RocksDbTtlCompactFiltersManager(ttlTimeProvider); + new RocksDbTtlCompactFiltersManager( + ttlTimeProvider, + optionsContainer.getQueryTimeAfterNumEntries(), + optionsContainer.getPeriodicCompactionTime()); ResourceGuard rocksDBResourceGuard = new ResourceGuard(); RocksDBSnapshotStrategyBase checkpointStrategy = null; diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java index c74a41cb7a13f..b37efa8dfba4b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java @@ -48,6 +48,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -181,6 +182,18 @@ public Long getWriteBufferManagerCapacity() { return sharedResources.getResourceHandle().getWriteBufferManagerCapacity(); } + /** Gets the "queryTimeAfterNumEntries" parameter from the configuration. */ + public Long getQueryTimeAfterNumEntries() { + return internalGetOption( + RocksDBConfigurableOptions.COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES); + } + + /** Gets the "getPeriodicCompactionTime" parameter from the configuration. */ + public Duration getPeriodicCompactionTime() { + return internalGetOption( + RocksDBConfigurableOptions.COMPACT_FILTER_PERIODIC_COMPACTION_TIME); + } + /** Gets the RocksDB {@link ColumnFamilyOptions} to be used for all RocksDB instances. */ public ColumnFamilyOptions getColumnOptions() { // initial options from common profile diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java index 168da2f2af171..d442962372710 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java @@ -46,6 +46,7 @@ import javax.annotation.Nonnull; import java.io.IOException; +import java.time.Duration; import java.util.LinkedHashMap; /** RocksDB compaction filter utils for state with TTL. */ @@ -60,8 +61,26 @@ public class RocksDbTtlCompactFiltersManager { /** Created column family options. */ private final LinkedHashMap columnFamilyOptionsMap; - public RocksDbTtlCompactFiltersManager(TtlTimeProvider ttlTimeProvider) { + /** + * Number of state entries to process by compaction filter before updating current timestamp. + */ + private final long queryTimeAfterNumEntries; + + /** + * Periodic compaction could speed up expired state entries cleanup, especially for state + * entries rarely accessed. Files older than this value will be picked up for compaction, and + * re-written to the same level as they were before. It makes sure a file goes through + * compaction filters periodically. 0 means turning off periodic compaction. + */ + private final Duration periodicCompactionTime; + + public RocksDbTtlCompactFiltersManager( + TtlTimeProvider ttlTimeProvider, + long queryTimeAfterNumEntries, + Duration periodicCompactionTime) { this.ttlTimeProvider = ttlTimeProvider; + this.queryTimeAfterNumEntries = queryTimeAfterNumEntries; + this.periodicCompactionTime = periodicCompactionTime; this.compactionFilterFactories = new LinkedHashMap<>(); this.columnFamilyOptionsMap = new LinkedHashMap<>(); } @@ -118,18 +137,26 @@ public void configCompactFilter( Preconditions.checkNotNull(compactionFilterFactory); long ttl = ttlConfig.getTtl().toMilliseconds(); - StateTtlConfig.RocksdbCompactFilterCleanupStrategy rocksdbCompactFilterCleanupStrategy = - ttlConfig.getCleanupStrategies().getRocksdbCompactFilterCleanupStrategy(); - Preconditions.checkNotNull(rocksdbCompactFilterCleanupStrategy); - ColumnFamilyOptions columnFamilyOptions = columnFamilyOptionsMap.get(stateDesc.getName()); Preconditions.checkNotNull(columnFamilyOptions); - columnFamilyOptions.setPeriodicCompactionSeconds( - rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime().getSeconds()); - long queryTimeAfterNumEntries = - rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries(); + StateTtlConfig.RocksdbCompactFilterCleanupStrategy rocksdbCompactFilterCleanupStrategy = + ttlConfig.getCleanupStrategies().getRocksdbCompactFilterCleanupStrategy(); + + Duration periodicCompactionTime = this.periodicCompactionTime; + long queryTimeAfterNumEntries = this.queryTimeAfterNumEntries; + + if (rocksdbCompactFilterCleanupStrategy != null) { + periodicCompactionTime = + rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime(); + queryTimeAfterNumEntries = + rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries(); + } + if (periodicCompactionTime != null) { + columnFamilyOptions.setPeriodicCompactionSeconds( + periodicCompactionTime.getSeconds()); + } FlinkCompactionFilter.Config config; if (stateDesc instanceof ListStateDescriptor) {