Skip to content

Commit

Permalink
[FLINK-36149][table] Make the RocksdbCompactFilter parameters take ef…
Browse files Browse the repository at this point in the history
…fect directly at the statebackend level.
  • Loading branch information
lexluo09 committed Aug 28, 2024
1 parent 0daca7b commit 13d41d7
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ public static class RocksdbCompactFilterCleanupStrategy

static final RocksdbCompactFilterCleanupStrategy
DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY =
new RocksdbCompactFilterCleanupStrategy(1000L);
new RocksdbCompactFilterCleanupStrategy(0L, null);

/**
* Number of state entries to process by compaction filter before updating current
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -71,9 +70,6 @@ void testStateTtlConfigBuildWithCleanupInBackground() {
assertThat(cleanupStrategies.inRocksdbCompactFilter()).isTrue();
assertThat(incrementalCleanupStrategy.getCleanupSize()).isEqualTo(5);
assertThat(incrementalCleanupStrategy.runCleanupForEveryRecord()).isFalse();
assertThat(rocksdbCleanupStrategy.getQueryTimeAfterNumEntries()).isEqualTo(1000L);
assertThat(rocksdbCleanupStrategy.getPeriodicCompactionTime())
.isEqualTo(Duration.ofDays(30));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,5 @@ void testParseStateTtlConfigFromProto() {
StateTtlConfig.RocksdbCompactFilterCleanupStrategy rocksdbCompactFilterCleanupStrategy =
cleanupStrategies.getRocksdbCompactFilterCleanupStrategy();
assertThat(rocksdbCompactFilterCleanupStrategy).isNotNull();
assertThat(rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries())
.isEqualTo(1000);
assertThat(rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime())
.isEqualTo(Duration.ofDays(30));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.io.File;
import java.io.Serializable;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -333,6 +334,25 @@ public class RocksDBConfigurableOptions implements Serializable {
.withDescription(
"If true, during rescaling, the deleteFilesInRange API will be invoked to clean up the useless files so that local disk space can be reclaimed more promptly.");

public static final ConfigOption<Long> COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES =
key("state.backend.rocksdb.compaction.filter.query-time-after-num-entries")
.longType()
.defaultValue(1000L)
.withDescription(
"Number of state entries to process by compaction filter before updating current timestamp. "
+ "Updating the timestamp more often can improve cleanup speed, "
+ "but it decreases compaction performance because it uses JNI calls from native code.The default value is '1000L'.");

public static final ConfigOption<Duration> COMPACT_FILTER_PERIODIC_COMPACTION_TIME =
key("state.backend.rocksdb.compaction.filter.periodic-compaction-time")
.durationType()
.defaultValue(Duration.ofDays(30))
.withDescription(
"Periodic compaction could speed up expired state entries cleanup, especially for state"
+ " entries rarely accessed. Files older than this value will be picked up for compaction,"
+ " and re-written to the same level as they were before. It makes sure a file goes through"
+ " compaction filters periodically. 0 means turning off periodic compaction.The default value is '30days'.");

static final ConfigOption<?>[] CANDIDATE_CONFIGS =
new ConfigOption<?>[] {
// configurable DBOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,12 @@ public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
RocksDB db = null;
RocksDBRestoreOperation restoreOperation = null;
CompletableFuture<Void> asyncCompactAfterRestoreFuture = null;

RocksDbTtlCompactFiltersManager ttlCompactFiltersManager =
new RocksDbTtlCompactFiltersManager(ttlTimeProvider);
new RocksDbTtlCompactFiltersManager(
ttlTimeProvider,
optionsContainer.getQueryTimeAfterNumEntries(),
optionsContainer.getPeriodicCompactionTime());

ResourceGuard rocksDBResourceGuard = new ResourceGuard();
RocksDBSnapshotStrategyBase<K, ?> checkpointStrategy = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -181,6 +182,18 @@ public Long getWriteBufferManagerCapacity() {
return sharedResources.getResourceHandle().getWriteBufferManagerCapacity();
}

/** Gets the "queryTimeAfterNumEntries" parameter from the configuration. */
public Long getQueryTimeAfterNumEntries() {
return internalGetOption(
RocksDBConfigurableOptions.COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES);
}

/** Gets the "getPeriodicCompactionTime" parameter from the configuration. */
public Duration getPeriodicCompactionTime() {
return internalGetOption(
RocksDBConfigurableOptions.COMPACT_FILTER_PERIODIC_COMPACTION_TIME);
}

/** Gets the RocksDB {@link ColumnFamilyOptions} to be used for all RocksDB instances. */
public ColumnFamilyOptions getColumnOptions() {
// initial options from common profile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import javax.annotation.Nonnull;

import java.io.IOException;
import java.time.Duration;
import java.util.LinkedHashMap;

/** RocksDB compaction filter utils for state with TTL. */
Expand All @@ -60,8 +61,26 @@ public class RocksDbTtlCompactFiltersManager {
/** Created column family options. */
private final LinkedHashMap<String, ColumnFamilyOptions> columnFamilyOptionsMap;

public RocksDbTtlCompactFiltersManager(TtlTimeProvider ttlTimeProvider) {
/**
* Number of state entries to process by compaction filter before updating current timestamp.
*/
private final long queryTimeAfterNumEntries;

/**
* Periodic compaction could speed up expired state entries cleanup, especially for state
* entries rarely accessed. Files older than this value will be picked up for compaction, and
* re-written to the same level as they were before. It makes sure a file goes through
* compaction filters periodically. 0 means turning off periodic compaction.
*/
private final Duration periodicCompactionTime;

public RocksDbTtlCompactFiltersManager(
TtlTimeProvider ttlTimeProvider,
long queryTimeAfterNumEntries,
Duration periodicCompactionTime) {
this.ttlTimeProvider = ttlTimeProvider;
this.queryTimeAfterNumEntries = queryTimeAfterNumEntries;
this.periodicCompactionTime = periodicCompactionTime;
this.compactionFilterFactories = new LinkedHashMap<>();
this.columnFamilyOptionsMap = new LinkedHashMap<>();
}
Expand Down Expand Up @@ -125,11 +144,19 @@ public void configCompactFilter(
ColumnFamilyOptions columnFamilyOptions =
columnFamilyOptionsMap.get(stateDesc.getName());
Preconditions.checkNotNull(columnFamilyOptions);
columnFamilyOptions.setPeriodicCompactionSeconds(
rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime().getSeconds());

Duration periodicCompactionTime =
rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime();
if (periodicCompactionTime == null) {
periodicCompactionTime = this.periodicCompactionTime;
}
columnFamilyOptions.setPeriodicCompactionSeconds(periodicCompactionTime.getSeconds());

long queryTimeAfterNumEntries =
rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries();
if (queryTimeAfterNumEntries <= 0) {
queryTimeAfterNumEntries = this.queryTimeAfterNumEntries;
}

FlinkCompactionFilter.Config config;
if (stateDesc instanceof ListStateDescriptor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ protected Transformation<RowData> translateToPlanInternal(

long stateRetentionTime =
StateMetadata.getStateTtlForOneInputOperator(config, stateMetadataList);

StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(stateRetentionTime);

AbstractTopNFunction processFunction;
Expand Down

0 comments on commit 13d41d7

Please sign in to comment.