Skip to content

Commit

Permalink
[FLINK-36149][state] Make the RocksdbCompactFilter parameters take ef…
Browse files Browse the repository at this point in the history
…fect directly at the statebackend level.
  • Loading branch information
lexluo09 committed Sep 3, 2024
1 parent 339f97c commit 09b34d0
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@
<td>Boolean</td>
<td>If true, RocksDB will use block-based filter instead of full filter, this only take effect when bloom filter is used. The default value is 'false'.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.compaction.filter.periodic-compaction-time</h5></td>
<td style="word-wrap: break-word;">30 d</td>
<td>Duration</td>
<td>Periodic compaction could speed up expired state entries cleanup, especially for state entries rarely accessed. Files older than this value will be picked up for compaction, and re-written to the same level as they were before. It makes sure a file goes through compaction filters periodically. 0 means turning off periodic compaction.The default value is '30days'.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.compaction.filter.query-time-after-num-entries</h5></td>
<td style="word-wrap: break-word;">1000</td>
<td>Long</td>
<td>Number of state entries to process by compaction filter before updating current timestamp. Updating the timestamp more often can improve cleanup speed, but it decreases compaction performance because it uses JNI calls from native code.The default value is '1000L'.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.compaction.level.max-size-level-base</h5></td>
<td style="word-wrap: break-word;">256 mb</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import static org.apache.flink.api.common.state.StateTtlConfig.CleanupStrategies.EMPTY_STRATEGY;
import static org.apache.flink.api.common.state.StateTtlConfig.IncrementalCleanupStrategy.DEFAULT_INCREMENTAL_CLEANUP_STRATEGY;
import static org.apache.flink.api.common.state.StateTtlConfig.RocksdbCompactFilterCleanupStrategy.DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY;
import static org.apache.flink.api.common.state.StateTtlConfig.StateVisibility.NeverReturnExpired;
import static org.apache.flink.api.common.state.StateTtlConfig.TtlTimeCharacteristic.ProcessingTime;
import static org.apache.flink.api.common.state.StateTtlConfig.UpdateType.OnCreateAndWrite;
Expand Down Expand Up @@ -444,15 +443,13 @@ public IncrementalCleanupStrategy getIncrementalCleanupStrategy() {
}

public boolean inRocksdbCompactFilter() {
return getRocksdbCompactFilterCleanupStrategy() != null;
return isCleanupInBackground || getRocksdbCompactFilterCleanupStrategy() != null;
}

@Nullable
public RocksdbCompactFilterCleanupStrategy getRocksdbCompactFilterCleanupStrategy() {
RocksdbCompactFilterCleanupStrategy defaultStrategy =
isCleanupInBackground ? DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY : null;
return (RocksdbCompactFilterCleanupStrategy)
strategies.getOrDefault(Strategies.ROCKSDB_COMPACTION_FILTER, defaultStrategy);
strategies.get(Strategies.ROCKSDB_COMPACTION_FILTER);
}
}

Expand Down Expand Up @@ -494,11 +491,18 @@ public static class RocksdbCompactFilterCleanupStrategy
private static final long serialVersionUID = 3109278796506988980L;

/**
* Default value is 30 days so that every file goes through the compaction process at least
* once every 30 days if not compacted sooner.
* @deprecated Use {@link
* org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions#COMPACT_FILTER_PERIODIC_COMPACTION_TIME}
* instead.
*/
static final Duration DEFAULT_PERIODIC_COMPACTION_TIME = Duration.ofDays(30);
@Deprecated static final Duration DEFAULT_PERIODIC_COMPACTION_TIME = Duration.ofDays(30);

/**
* @deprecated Use {@link
* org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions#COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES}
* instead.
*/
@Deprecated
static final RocksdbCompactFilterCleanupStrategy
DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY =
new RocksdbCompactFilterCleanupStrategy(1000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -67,13 +66,10 @@ void testStateTtlConfigBuildWithCleanupInBackground() {

assertThat(cleanupStrategies.isCleanupInBackground()).isTrue();
assertThat(incrementalCleanupStrategy).isNotNull();
assertThat(rocksdbCleanupStrategy).isNotNull();
assertThat(rocksdbCleanupStrategy).isNull();
assertThat(cleanupStrategies.inRocksdbCompactFilter()).isTrue();
assertThat(incrementalCleanupStrategy.getCleanupSize()).isEqualTo(5);
assertThat(incrementalCleanupStrategy.runCleanupForEveryRecord()).isFalse();
assertThat(rocksdbCleanupStrategy.getQueryTimeAfterNumEntries()).isEqualTo(1000L);
assertThat(rocksdbCleanupStrategy.getPeriodicCompactionTime())
.isEqualTo(Duration.ofDays(30));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.io.File;
import java.io.Serializable;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -333,6 +334,25 @@ public class RocksDBConfigurableOptions implements Serializable {
.withDescription(
"If true, during rescaling, the deleteFilesInRange API will be invoked to clean up the useless files so that local disk space can be reclaimed more promptly.");

public static final ConfigOption<Long> COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES =
key("state.backend.rocksdb.compaction.filter.query-time-after-num-entries")
.longType()
.defaultValue(1000L)
.withDescription(
"Number of state entries to process by compaction filter before updating current timestamp. "
+ "Updating the timestamp more often can improve cleanup speed, "
+ "but it decreases compaction performance because it uses JNI calls from native code.The default value is '1000L'.");

public static final ConfigOption<Duration> COMPACT_FILTER_PERIODIC_COMPACTION_TIME =
key("state.backend.rocksdb.compaction.filter.periodic-compaction-time")
.durationType()
.defaultValue(Duration.ofDays(30))
.withDescription(
"Periodic compaction could speed up expired state entries cleanup, especially for state"
+ " entries rarely accessed. Files older than this value will be picked up for compaction,"
+ " and re-written to the same level as they were before. It makes sure a file goes through"
+ " compaction filters periodically. 0 means turning off periodic compaction.The default value is '30days'.");

static final ConfigOption<?>[] CANDIDATE_CONFIGS =
new ConfigOption<?>[] {
// configurable DBOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,12 @@ public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
RocksDB db = null;
RocksDBRestoreOperation restoreOperation = null;
CompletableFuture<Void> asyncCompactAfterRestoreFuture = null;

RocksDbTtlCompactFiltersManager ttlCompactFiltersManager =
new RocksDbTtlCompactFiltersManager(ttlTimeProvider);
new RocksDbTtlCompactFiltersManager(
ttlTimeProvider,
optionsContainer.getQueryTimeAfterNumEntries(),
optionsContainer.getPeriodicCompactionTime());

ResourceGuard rocksDBResourceGuard = new ResourceGuard();
RocksDBSnapshotStrategyBase<K, ?> checkpointStrategy = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -181,6 +182,18 @@ public Long getWriteBufferManagerCapacity() {
return sharedResources.getResourceHandle().getWriteBufferManagerCapacity();
}

/** Gets the "queryTimeAfterNumEntries" parameter from the configuration. */
public Long getQueryTimeAfterNumEntries() {
return internalGetOption(
RocksDBConfigurableOptions.COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES);
}

/** Gets the "getPeriodicCompactionTime" parameter from the configuration. */
public Duration getPeriodicCompactionTime() {
return internalGetOption(
RocksDBConfigurableOptions.COMPACT_FILTER_PERIODIC_COMPACTION_TIME);
}

/** Gets the RocksDB {@link ColumnFamilyOptions} to be used for all RocksDB instances. */
public ColumnFamilyOptions getColumnOptions() {
// initial options from common profile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import javax.annotation.Nonnull;

import java.io.IOException;
import java.time.Duration;
import java.util.LinkedHashMap;

/** RocksDB compaction filter utils for state with TTL. */
Expand All @@ -60,8 +61,26 @@ public class RocksDbTtlCompactFiltersManager {
/** Created column family options. */
private final LinkedHashMap<String, ColumnFamilyOptions> columnFamilyOptionsMap;

public RocksDbTtlCompactFiltersManager(TtlTimeProvider ttlTimeProvider) {
/**
* Number of state entries to process by compaction filter before updating current timestamp.
*/
private final long queryTimeAfterNumEntries;

/**
* Periodic compaction could speed up expired state entries cleanup, especially for state
* entries rarely accessed. Files older than this value will be picked up for compaction, and
* re-written to the same level as they were before. It makes sure a file goes through
* compaction filters periodically. 0 means turning off periodic compaction.
*/
private final Duration periodicCompactionTime;

public RocksDbTtlCompactFiltersManager(
TtlTimeProvider ttlTimeProvider,
long queryTimeAfterNumEntries,
Duration periodicCompactionTime) {
this.ttlTimeProvider = ttlTimeProvider;
this.queryTimeAfterNumEntries = queryTimeAfterNumEntries;
this.periodicCompactionTime = periodicCompactionTime;
this.compactionFilterFactories = new LinkedHashMap<>();
this.columnFamilyOptionsMap = new LinkedHashMap<>();
}
Expand Down Expand Up @@ -118,18 +137,26 @@ public void configCompactFilter(
Preconditions.checkNotNull(compactionFilterFactory);
long ttl = ttlConfig.getTtl().toMilliseconds();

StateTtlConfig.RocksdbCompactFilterCleanupStrategy rocksdbCompactFilterCleanupStrategy =
ttlConfig.getCleanupStrategies().getRocksdbCompactFilterCleanupStrategy();
Preconditions.checkNotNull(rocksdbCompactFilterCleanupStrategy);

ColumnFamilyOptions columnFamilyOptions =
columnFamilyOptionsMap.get(stateDesc.getName());
Preconditions.checkNotNull(columnFamilyOptions);
columnFamilyOptions.setPeriodicCompactionSeconds(
rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime().getSeconds());

long queryTimeAfterNumEntries =
rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries();
StateTtlConfig.RocksdbCompactFilterCleanupStrategy rocksdbCompactFilterCleanupStrategy =
ttlConfig.getCleanupStrategies().getRocksdbCompactFilterCleanupStrategy();

Duration periodicCompactionTime = this.periodicCompactionTime;
long queryTimeAfterNumEntries = this.queryTimeAfterNumEntries;

if (rocksdbCompactFilterCleanupStrategy != null) {
periodicCompactionTime =
rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime();
queryTimeAfterNumEntries =
rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries();
}
if (periodicCompactionTime != null) {
columnFamilyOptions.setPeriodicCompactionSeconds(
periodicCompactionTime.getSeconds());
}

FlinkCompactionFilter.Config config;
if (stateDesc instanceof ListStateDescriptor) {
Expand Down

0 comments on commit 09b34d0

Please sign in to comment.