Skip to content

Commit

Permalink
[FLINK-36149][state] Make the RocksdbCompactFilter parameters take ef…
Browse files Browse the repository at this point in the history
…fect directly at the statebackend level.
  • Loading branch information
lexluo09 committed Aug 29, 2024
1 parent 576ec2b commit b88e876
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import static org.apache.flink.api.common.state.StateTtlConfig.CleanupStrategies.EMPTY_STRATEGY;
import static org.apache.flink.api.common.state.StateTtlConfig.IncrementalCleanupStrategy.DEFAULT_INCREMENTAL_CLEANUP_STRATEGY;
import static org.apache.flink.api.common.state.StateTtlConfig.RocksdbCompactFilterCleanupStrategy.DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY;
import static org.apache.flink.api.common.state.StateTtlConfig.StateVisibility.NeverReturnExpired;
import static org.apache.flink.api.common.state.StateTtlConfig.TtlTimeCharacteristic.ProcessingTime;
import static org.apache.flink.api.common.state.StateTtlConfig.UpdateType.OnCreateAndWrite;
Expand Down Expand Up @@ -444,15 +443,13 @@ public IncrementalCleanupStrategy getIncrementalCleanupStrategy() {
}

public boolean inRocksdbCompactFilter() {
return getRocksdbCompactFilterCleanupStrategy() != null;
return isCleanupInBackground || getRocksdbCompactFilterCleanupStrategy() != null;
}

@Nullable
public RocksdbCompactFilterCleanupStrategy getRocksdbCompactFilterCleanupStrategy() {
RocksdbCompactFilterCleanupStrategy defaultStrategy =
isCleanupInBackground ? DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY : null;
return (RocksdbCompactFilterCleanupStrategy)
strategies.getOrDefault(Strategies.ROCKSDB_COMPACTION_FILTER, defaultStrategy);
strategies.get(Strategies.ROCKSDB_COMPACTION_FILTER);
}
}

Expand Down Expand Up @@ -499,10 +496,6 @@ public static class RocksdbCompactFilterCleanupStrategy
*/
static final Duration DEFAULT_PERIODIC_COMPACTION_TIME = Duration.ofDays(30);

static final RocksdbCompactFilterCleanupStrategy
DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY =
new RocksdbCompactFilterCleanupStrategy(1000L);

/**
* Number of state entries to process by compaction filter before updating current
* timestamp.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -67,13 +66,10 @@ void testStateTtlConfigBuildWithCleanupInBackground() {

assertThat(cleanupStrategies.isCleanupInBackground()).isTrue();
assertThat(incrementalCleanupStrategy).isNotNull();
assertThat(rocksdbCleanupStrategy).isNotNull();
assertThat(rocksdbCleanupStrategy).isNull();
assertThat(cleanupStrategies.inRocksdbCompactFilter()).isTrue();
assertThat(incrementalCleanupStrategy.getCleanupSize()).isEqualTo(5);
assertThat(incrementalCleanupStrategy.runCleanupForEveryRecord()).isFalse();
assertThat(rocksdbCleanupStrategy.getQueryTimeAfterNumEntries()).isEqualTo(1000L);
assertThat(rocksdbCleanupStrategy.getPeriodicCompactionTime())
.isEqualTo(Duration.ofDays(30));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -115,10 +114,6 @@ void testParseStateTtlConfigFromProto() {

StateTtlConfig.RocksdbCompactFilterCleanupStrategy rocksdbCompactFilterCleanupStrategy =
cleanupStrategies.getRocksdbCompactFilterCleanupStrategy();
assertThat(rocksdbCompactFilterCleanupStrategy).isNotNull();
assertThat(rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries())
.isEqualTo(1000);
assertThat(rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime())
.isEqualTo(Duration.ofDays(30));
assertThat(rocksdbCompactFilterCleanupStrategy).isNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.io.File;
import java.io.Serializable;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -333,6 +334,25 @@ public class RocksDBConfigurableOptions implements Serializable {
.withDescription(
"If true, during rescaling, the deleteFilesInRange API will be invoked to clean up the useless files so that local disk space can be reclaimed more promptly.");

public static final ConfigOption<Long> COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES =
key("state.backend.rocksdb.compaction.filter.query-time-after-num-entries")
.longType()
.defaultValue(1000L)
.withDescription(
"Number of state entries to process by compaction filter before updating current timestamp. "
+ "Updating the timestamp more often can improve cleanup speed, "
+ "but it decreases compaction performance because it uses JNI calls from native code.The default value is '1000L'.");

public static final ConfigOption<Duration> COMPACT_FILTER_PERIODIC_COMPACTION_TIME =
key("state.backend.rocksdb.compaction.filter.periodic-compaction-time")
.durationType()
.defaultValue(Duration.ofDays(30))
.withDescription(
"Periodic compaction could speed up expired state entries cleanup, especially for state"
+ " entries rarely accessed. Files older than this value will be picked up for compaction,"
+ " and re-written to the same level as they were before. It makes sure a file goes through"
+ " compaction filters periodically. 0 means turning off periodic compaction.The default value is '30days'.");

static final ConfigOption<?>[] CANDIDATE_CONFIGS =
new ConfigOption<?>[] {
// configurable DBOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,12 @@ public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
RocksDB db = null;
RocksDBRestoreOperation restoreOperation = null;
CompletableFuture<Void> asyncCompactAfterRestoreFuture = null;

RocksDbTtlCompactFiltersManager ttlCompactFiltersManager =
new RocksDbTtlCompactFiltersManager(ttlTimeProvider);
new RocksDbTtlCompactFiltersManager(
ttlTimeProvider,
optionsContainer.getQueryTimeAfterNumEntries(),
optionsContainer.getPeriodicCompactionTime());

ResourceGuard rocksDBResourceGuard = new ResourceGuard();
RocksDBSnapshotStrategyBase<K, ?> checkpointStrategy = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -181,6 +182,18 @@ public Long getWriteBufferManagerCapacity() {
return sharedResources.getResourceHandle().getWriteBufferManagerCapacity();
}

/** Gets the "queryTimeAfterNumEntries" parameter from the configuration. */
public Long getQueryTimeAfterNumEntries() {
return internalGetOption(
RocksDBConfigurableOptions.COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES);
}

/** Gets the "getPeriodicCompactionTime" parameter from the configuration. */
public Duration getPeriodicCompactionTime() {
return internalGetOption(
RocksDBConfigurableOptions.COMPACT_FILTER_PERIODIC_COMPACTION_TIME);
}

/** Gets the RocksDB {@link ColumnFamilyOptions} to be used for all RocksDB instances. */
public ColumnFamilyOptions getColumnOptions() {
// initial options from common profile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import javax.annotation.Nonnull;

import java.io.IOException;
import java.time.Duration;
import java.util.LinkedHashMap;

/** RocksDB compaction filter utils for state with TTL. */
Expand All @@ -60,8 +61,26 @@ public class RocksDbTtlCompactFiltersManager {
/** Created column family options. */
private final LinkedHashMap<String, ColumnFamilyOptions> columnFamilyOptionsMap;

public RocksDbTtlCompactFiltersManager(TtlTimeProvider ttlTimeProvider) {
/**
* Number of state entries to process by compaction filter before updating current timestamp.
*/
private final long queryTimeAfterNumEntries;

/**
* Periodic compaction could speed up expired state entries cleanup, especially for state
* entries rarely accessed. Files older than this value will be picked up for compaction, and
* re-written to the same level as they were before. It makes sure a file goes through
* compaction filters periodically. 0 means turning off periodic compaction.
*/
private final Duration periodicCompactionTime;

public RocksDbTtlCompactFiltersManager(
TtlTimeProvider ttlTimeProvider,
long queryTimeAfterNumEntries,
Duration periodicCompactionTime) {
this.ttlTimeProvider = ttlTimeProvider;
this.queryTimeAfterNumEntries = queryTimeAfterNumEntries;
this.periodicCompactionTime = periodicCompactionTime;
this.compactionFilterFactories = new LinkedHashMap<>();
this.columnFamilyOptionsMap = new LinkedHashMap<>();
}
Expand Down Expand Up @@ -118,18 +137,26 @@ public void configCompactFilter(
Preconditions.checkNotNull(compactionFilterFactory);
long ttl = ttlConfig.getTtl().toMilliseconds();

StateTtlConfig.RocksdbCompactFilterCleanupStrategy rocksdbCompactFilterCleanupStrategy =
ttlConfig.getCleanupStrategies().getRocksdbCompactFilterCleanupStrategy();
Preconditions.checkNotNull(rocksdbCompactFilterCleanupStrategy);

ColumnFamilyOptions columnFamilyOptions =
columnFamilyOptionsMap.get(stateDesc.getName());
Preconditions.checkNotNull(columnFamilyOptions);
columnFamilyOptions.setPeriodicCompactionSeconds(
rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime().getSeconds());

long queryTimeAfterNumEntries =
rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries();
StateTtlConfig.RocksdbCompactFilterCleanupStrategy rocksdbCompactFilterCleanupStrategy =
ttlConfig.getCleanupStrategies().getRocksdbCompactFilterCleanupStrategy();

Duration periodicCompactionTime = this.periodicCompactionTime;
long queryTimeAfterNumEntries = this.queryTimeAfterNumEntries;

if (rocksdbCompactFilterCleanupStrategy != null) {
periodicCompactionTime =
rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime();
queryTimeAfterNumEntries =
rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries();
}
if (periodicCompactionTime != null) {
columnFamilyOptions.setPeriodicCompactionSeconds(
periodicCompactionTime.getSeconds());
}

FlinkCompactionFilter.Config config;
if (stateDesc instanceof ListStateDescriptor) {
Expand Down

0 comments on commit b88e876

Please sign in to comment.