diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java index d8cc99a573594b..954becebcd5b2c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java @@ -501,7 +501,7 @@ public static class RocksdbCompactFilterCleanupStrategy static final RocksdbCompactFilterCleanupStrategy DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY = - new RocksdbCompactFilterCleanupStrategy(1000L); + new RocksdbCompactFilterCleanupStrategy(0L, null); /** * Number of state entries to process by compaction filter before updating current diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/StateTtlConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/StateTtlConfigTest.java index 3762abc16c8481..599e48ea601ae5 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/StateTtlConfigTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/StateTtlConfigTest.java @@ -24,7 +24,6 @@ import org.junit.jupiter.api.Test; -import java.time.Duration; import java.util.Arrays; import java.util.List; @@ -71,9 +70,6 @@ void testStateTtlConfigBuildWithCleanupInBackground() { assertThat(cleanupStrategies.inRocksdbCompactFilter()).isTrue(); assertThat(incrementalCleanupStrategy.getCleanupSize()).isEqualTo(5); assertThat(incrementalCleanupStrategy.runCleanupForEveryRecord()).isFalse(); - assertThat(rocksdbCleanupStrategy.getQueryTimeAfterNumEntries()).isEqualTo(1000L); - assertThat(rocksdbCleanupStrategy.getPeriodicCompactionTime()) - .isEqualTo(Duration.ofDays(30)); } @Test diff --git a/flink-python/src/test/java/org/apache/flink/streaming/api/utils/ProtoUtilsTest.java b/flink-python/src/test/java/org/apache/flink/streaming/api/utils/ProtoUtilsTest.java index 5e81a4b9fefc68..2b144ef358b509 100644 --- a/flink-python/src/test/java/org/apache/flink/streaming/api/utils/ProtoUtilsTest.java +++ b/flink-python/src/test/java/org/apache/flink/streaming/api/utils/ProtoUtilsTest.java @@ -116,9 +116,5 @@ void testParseStateTtlConfigFromProto() { StateTtlConfig.RocksdbCompactFilterCleanupStrategy rocksdbCompactFilterCleanupStrategy = cleanupStrategies.getRocksdbCompactFilterCleanupStrategy(); assertThat(rocksdbCompactFilterCleanupStrategy).isNotNull(); - assertThat(rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries()) - .isEqualTo(1000); - assertThat(rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime()) - .isEqualTo(Duration.ofDays(30)); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java index 19c424ed74f346..09849942f6ae68 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java @@ -30,6 +30,7 @@ import java.io.File; import java.io.Serializable; +import java.time.Duration; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -333,6 +334,25 @@ public class RocksDBConfigurableOptions implements Serializable { .withDescription( "If true, during rescaling, the deleteFilesInRange API will be invoked to clean up the useless files so that local disk space can be reclaimed more promptly."); + public static final ConfigOption COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES = + key("state.backend.rocksdb.compaction.filter.query-time-after-num-entries") + .longType() + .defaultValue(1000L) + .withDescription( + "Number of state entries to process by compaction filter before updating current timestamp. " + + "Updating the timestamp more often can improve cleanup speed, " + + "but it decreases compaction performance because it uses JNI calls from native code.The default value is '1000L'."); + + public static final ConfigOption COMPACT_FILTER_PERIODIC_COMPACTION_TIME = + key("state.backend.rocksdb.compaction.filter.periodic-compaction-time") + .durationType() + .defaultValue(Duration.ofDays(30)) + .withDescription( + "Periodic compaction could speed up expired state entries cleanup, especially for state" + + " entries rarely accessed. Files older than this value will be picked up for compaction," + + " and re-written to the same level as they were before. It makes sure a file goes through" + + " compaction filters periodically. 0 means turning off periodic compaction.The default value is '30days'."); + static final ConfigOption[] CANDIDATE_CONFIGS = new ConfigOption[] { // configurable DBOptions diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 9902889b20a868..948c1de27438dc 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -338,8 +338,12 @@ public RocksDBKeyedStateBackend build() throws BackendBuildingException { RocksDB db = null; RocksDBRestoreOperation restoreOperation = null; CompletableFuture asyncCompactAfterRestoreFuture = null; + RocksDbTtlCompactFiltersManager ttlCompactFiltersManager = - new RocksDbTtlCompactFiltersManager(ttlTimeProvider); + new RocksDbTtlCompactFiltersManager( + ttlTimeProvider, + optionsContainer.getQueryTimeAfterNumEntries(), + optionsContainer.getPeriodicCompactionTime()); ResourceGuard rocksDBResourceGuard = new ResourceGuard(); RocksDBSnapshotStrategyBase checkpointStrategy = null; diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java index c74a41cb7a13f2..b37efa8dfba4b7 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java @@ -48,6 +48,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -181,6 +182,18 @@ public Long getWriteBufferManagerCapacity() { return sharedResources.getResourceHandle().getWriteBufferManagerCapacity(); } + /** Gets the "queryTimeAfterNumEntries" parameter from the configuration. */ + public Long getQueryTimeAfterNumEntries() { + return internalGetOption( + RocksDBConfigurableOptions.COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES); + } + + /** Gets the "getPeriodicCompactionTime" parameter from the configuration. */ + public Duration getPeriodicCompactionTime() { + return internalGetOption( + RocksDBConfigurableOptions.COMPACT_FILTER_PERIODIC_COMPACTION_TIME); + } + /** Gets the RocksDB {@link ColumnFamilyOptions} to be used for all RocksDB instances. */ public ColumnFamilyOptions getColumnOptions() { // initial options from common profile diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java index 168da2f2af1714..7746b435c1c4dd 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java @@ -46,6 +46,7 @@ import javax.annotation.Nonnull; import java.io.IOException; +import java.time.Duration; import java.util.LinkedHashMap; /** RocksDB compaction filter utils for state with TTL. */ @@ -60,8 +61,26 @@ public class RocksDbTtlCompactFiltersManager { /** Created column family options. */ private final LinkedHashMap columnFamilyOptionsMap; - public RocksDbTtlCompactFiltersManager(TtlTimeProvider ttlTimeProvider) { + /** + * Number of state entries to process by compaction filter before updating current timestamp. + */ + private final long queryTimeAfterNumEntries; + + /** + * Periodic compaction could speed up expired state entries cleanup, especially for state + * entries rarely accessed. Files older than this value will be picked up for compaction, and + * re-written to the same level as they were before. It makes sure a file goes through + * compaction filters periodically. 0 means turning off periodic compaction. + */ + private final Duration periodicCompactionTime; + + public RocksDbTtlCompactFiltersManager( + TtlTimeProvider ttlTimeProvider, + long queryTimeAfterNumEntries, + Duration periodicCompactionTime) { this.ttlTimeProvider = ttlTimeProvider; + this.queryTimeAfterNumEntries = queryTimeAfterNumEntries; + this.periodicCompactionTime = periodicCompactionTime; this.compactionFilterFactories = new LinkedHashMap<>(); this.columnFamilyOptionsMap = new LinkedHashMap<>(); } @@ -125,11 +144,19 @@ public void configCompactFilter( ColumnFamilyOptions columnFamilyOptions = columnFamilyOptionsMap.get(stateDesc.getName()); Preconditions.checkNotNull(columnFamilyOptions); - columnFamilyOptions.setPeriodicCompactionSeconds( - rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime().getSeconds()); + + Duration periodicCompactionTime = + rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime(); + if (periodicCompactionTime == null) { + periodicCompactionTime = this.periodicCompactionTime; + } + columnFamilyOptions.setPeriodicCompactionSeconds(periodicCompactionTime.getSeconds()); long queryTimeAfterNumEntries = rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries(); + if (queryTimeAfterNumEntries <= 0) { + queryTimeAfterNumEntries = this.queryTimeAfterNumEntries; + } FlinkCompactionFilter.Config config; if (stateDesc instanceof ListStateDescriptor) { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java index 608e4e1cfc1432..898986c09cfe89 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java @@ -232,6 +232,7 @@ protected Transformation translateToPlanInternal( long stateRetentionTime = StateMetadata.getStateTtlForOneInputOperator(config, stateMetadataList); + StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(stateRetentionTime); AbstractTopNFunction processFunction;