Skip to content

Commit

Permalink
[FLINK-36149][table]In Rank operators, add the cleanupInRocksdbCompac…
Browse files Browse the repository at this point in the history
…tFilter(queryTimeAfterNumEntries) parameter setting
  • Loading branch information
lexluo09 committed Aug 26, 2024
1 parent 26c2be3 commit eeb478f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -498,10 +498,12 @@ public static class RocksdbCompactFilterCleanupStrategy
* once every 30 days if not compacted sooner.
*/
static final Duration DEFAULT_PERIODIC_COMPACTION_TIME = Duration.ofDays(30);
public static final long DEFAULT_QUERY_TIME_AFTER_NUM_ENTRIES = 1000L;


static final RocksdbCompactFilterCleanupStrategy
DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY =
new RocksdbCompactFilterCleanupStrategy(1000L);
new RocksdbCompactFilterCleanupStrategy(DEFAULT_QUERY_TIME_AFTER_NUM_ENTRIES);

/**
* Number of state entries to process by compaction filter before updating current
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,17 @@ public class ExecutionConfigOptions {
+ "to reduce state access. Cache size is the number of records "
+ "in each ranking task.");

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Long> TABLE_EXEC_RANK_ROCKSDB_CLEANUP_QUERY_TIME_AFTER_NUM_ENTRIES =
ConfigOptions.key("table.exec.rank.rocksdb-cleanup-query-time-after-num-entries")
.longType()
.defaultValue(1000L)
.withDeprecatedKeys("table.exec.rank.rocksdb-cleanup-query-time-after-num-entries")
.withDescription(
"number of state entries to process by compaction filter before updating current timestamp in Rank operators."
+ " Default value of 1000L. Updating the timestamp more often can improve cleanup speed, "
+ "but it decreases compaction performance because it uses JNI calls from native code.");

@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
public static final ConfigOption<Boolean> TABLE_EXEC_SIMPLIFY_OPERATOR_NAME_ENABLED =
key("table.exec.simplify-operator-name-enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.List;
import java.util.stream.IntStream;

import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RANK_ROCKSDB_CLEANUP_QUERY_TIME_AFTER_NUM_ENTRIES;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RANK_TOPN_CACHE_SIZE;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -229,10 +230,13 @@ protected Transformation<RowData> translateToPlanInternal(
RowType.of(sortSpec.getFieldTypes(inputType)),
sortSpecInSortKey);
long cacheSize = config.get(TABLE_EXEC_RANK_TOPN_CACHE_SIZE);
long queryTimeAfterNumEntries = config.get(TABLE_EXEC_RANK_ROCKSDB_CLEANUP_QUERY_TIME_AFTER_NUM_ENTRIES);

long stateRetentionTime =
StateMetadata.getStateTtlForOneInputOperator(config, stateMetadataList);
StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(stateRetentionTime);

StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(stateRetentionTime,
queryTimeAfterNumEntries);

AbstractTopNFunction processFunction;
if (rankStrategy instanceof RankProcessStrategy.AppendFastStrategy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,25 @@ public class StateConfigUtil {
* @param retentionTime State ttl time which unit is MILLISECONDS.
*/
public static StateTtlConfig createTtlConfig(long retentionTime) {
return createTtlConfig(retentionTime, StateTtlConfig.RocksdbCompactFilterCleanupStrategy.DEFAULT_QUERY_TIME_AFTER_NUM_ENTRIES);
}

/**
* Creates a {@link StateTtlConfig} depends on retentionTime and queryTimeAfterNumEntries parameter.
*
* @param retentionTime State ttl time which unit is MILLISECONDS.
* @param queryTimeAfterNumEntries number of state entries to process by compaction filter
* before updating current timestamp
*/
public static StateTtlConfig createTtlConfig(
long retentionTime,
long queryTimeAfterNumEntries) {
if (retentionTime > 0) {
return StateTtlConfig.newBuilder(Time.milliseconds(retentionTime))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupFullSnapshot()
.cleanupInRocksdbCompactFilter(queryTimeAfterNumEntries)
.build();
} else {
return StateTtlConfig.DISABLED;
Expand Down

0 comments on commit eeb478f

Please sign in to comment.