Skip to content

Commit

Permalink
fix: move misplaced query-level configs to the correct list (#9144)
Browse files Browse the repository at this point in the history
* move props

* checkstyle

* checkstyle

* include query-level properties as editable, change list to set for looking up properties efficiently, and fix/expand javadocs

* add pull query configs to queryLevelProperties

* checkstyle again

* more checkstyle ahhh
  • Loading branch information
ableegoldman authored May 31, 2022
1 parent 1a0e06b commit 86d4fbb
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ private static void throwOnNonQueryLevelConfigs(final Map<String, Object> overri
final KsqlConfigResolver resolver = new KsqlConfigResolver();
final Optional<ConfigItem> resolvedItem = resolver.resolve(s, false);
return resolvedItem.map(configItem ->
!PropertiesList.QueryLevelPropertyList
!PropertiesList.QueryLevelProperties
.contains(configItem.getPropertyName())).orElse(true);
})
.distinct()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public EndpointResponse isValidProperty(final String property) {
final Optional<ConfigItem> resolvedItem = resolver.resolve(property, false);
if (ksqlEngine.getKsqlConfig().getBoolean(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED)
&& resolvedItem.isPresent()) {
if (!PropertiesList.QueryLevelPropertyList.contains(resolvedItem.get().getPropertyName())) {
if (!PropertiesList.QueryLevelProperties.contains(resolvedItem.get().getPropertyName())) {
throw new KsqlException(String.format("When shared runtimes are enabled, the"
+ " config %s can only be set for the entire cluster and all queries currently"
+ " running in it, and not configurable for individual queries."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import static io.confluent.ksql.util.KsqlConfig.FAIL_ON_DESERIALIZATION_ERROR_CONFIG;
import static io.confluent.ksql.util.KsqlConfig.KSQL_NESTED_ERROR_HANDLING_CONFIG;
import static io.confluent.ksql.util.KsqlConfig.KSQL_QUERY_ERROR_MAX_QUEUE_SIZE;
import static io.confluent.ksql.util.KsqlConfig.KSQL_QUERY_PULL_MAX_ALLOWED_OFFSET_LAG_CONFIG;
import static io.confluent.ksql.util.KsqlConfig.KSQL_QUERY_PULL_TABLE_SCAN_ENABLED;
import static io.confluent.ksql.util.KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_INITIAL_MS;
import static io.confluent.ksql.util.KsqlConfig.KSQL_QUERY_RETRY_BACKOFF_MAX_MS;
import static io.confluent.ksql.util.KsqlConfig.KSQL_STRING_CASE_CONFIG_TOGGLE;
Expand Down Expand Up @@ -71,6 +73,7 @@
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.APPLICATION_SERVER_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG;
Expand All @@ -88,6 +91,7 @@
import static org.apache.kafka.streams.StreamsConfig.REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.STATE_DIR_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_CONFIG;
Expand All @@ -98,37 +102,54 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.config.ConfigItem;
import io.confluent.ksql.config.KsqlConfigResolver;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

@JsonIgnoreProperties(ignoreUnknown = true)
public class PropertiesList extends KsqlEntity {
public static final List<String> QueryLevelPropertyList = ImmutableList.of(

/**
* The set of query-level properties that can be configured via the `SET` command. They can also
* use the `ALTER SYSTEM` command to set a default value for queries without an explicit override.
* NOTE: IF YOU ADD A NEW CONFIG AND WANT IT TO BE CONFIGURABLE PER-QUERY YOU MUST ADD IT HERE.
*/
@SuppressWarnings("deprecation")
public static final Set<String> QueryLevelProperties = ImmutableSet.of(
AUTO_OFFSET_RESET_CONFIG,
BUFFERED_RECORDS_PER_PARTITION_CONFIG,
CACHE_MAX_BYTES_BUFFERING_CONFIG,
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
FAIL_ON_DESERIALIZATION_ERROR_CONFIG,
KSQL_STRING_CASE_CONFIG_TOGGLE,
KSQL_NESTED_ERROR_HANDLING_CONFIG,
KSQL_QUERY_ERROR_MAX_QUEUE_SIZE,
KSQL_QUERY_RETRY_BACKOFF_INITIAL_MS,
KSQL_QUERY_RETRY_BACKOFF_MAX_MS,
KSQL_QUERY_PULL_MAX_ALLOWED_OFFSET_LAG_CONFIG,
KSQL_QUERY_PULL_TABLE_SCAN_ENABLED,
KSQL_TIMESTAMP_THROW_ON_INVALID,
FAIL_ON_DESERIALIZATION_ERROR_CONFIG
MAX_TASK_IDLE_MS_CONFIG,
STATESTORE_CACHE_MAX_BYTES_CONFIG,
TASK_TIMEOUT_MS_CONFIG
);

/**
* List os properties that can be changes via `ALTER SYSTEM` command.
* The set of system properties that can be changed via the `ALTER SYSTEM` command.
* We use this "allow list" for security reasons.
* (Independent of LD.)
*/
public static final List<String> EditablePropertyList = ImmutableList.of(
public static final Set<String> MutableSystemProperties = ImmutableSet.of(
MAX_POLL_RECORDS_CONFIG,
MAX_POLL_INTERVAL_MS_CONFIG,
SESSION_TIMEOUT_MS_CONFIG,
HEARTBEAT_INTERVAL_MS_CONFIG,
AUTO_OFFSET_RESET_CONFIG,
FETCH_MIN_BYTES_CONFIG,
FETCH_MAX_BYTES_CONFIG,
FETCH_MAX_WAIT_MS_CONFIG,
Expand Down Expand Up @@ -174,14 +195,10 @@ public class PropertiesList extends KsqlEntity {
ACCEPTABLE_RECOVERY_LAG_CONFIG,
APPLICATION_SERVER_CONFIG,
BUILT_IN_METRICS_VERSION_CONFIG,
CACHE_MAX_BYTES_BUFFERING_CONFIG,
COMMIT_INTERVAL_MS_CONFIG,
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
DEFAULT_KEY_SERDE_CLASS_CONFIG,
DEFAULT_VALUE_SERDE_CLASS_CONFIG,
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
MAX_TASK_IDLE_MS_CONFIG,
MAX_WARMUP_REPLICAS_CONFIG,
NUM_STANDBY_REPLICAS_CONFIG,
POLL_MS_CONFIG,
Expand All @@ -192,7 +209,6 @@ public class PropertiesList extends KsqlEntity {
SECURITY_PROTOCOL_CONFIG,
STATE_CLEANUP_DELAY_MS_CONFIG,
STATE_DIR_CONFIG,
TASK_TIMEOUT_MS_CONFIG,
WINDOW_SIZE_MS_CONFIG,
UPGRADE_FROM_CONFIG
);
Expand All @@ -219,7 +235,7 @@ public Property(
this.scope = scope;
this.value = value;
this.editable = Property.isEditable(name);
this.level = PropertiesList.QueryLevelPropertyList.contains(name) ? "QUERY" : "SERVER";
this.level = PropertiesList.QueryLevelProperties.contains(name) ? "QUERY" : "SERVER";
}

@SuppressWarnings("checkstyle:MissingJavadocMethod")
Expand All @@ -228,7 +244,8 @@ public static boolean isEditable(final String propertyName) {
final Optional<ConfigItem> resolvedItem = resolver.resolve(propertyName, false);

return resolvedItem.isPresent()
&& PropertiesList.EditablePropertyList.contains(resolvedItem.get().getPropertyName());
&& (PropertiesList.MutableSystemProperties.contains(resolvedItem.get().getPropertyName())
|| PropertiesList.QueryLevelProperties.contains(resolvedItem.get().getPropertyName()));
}

public String getLevel() {
Expand Down

0 comments on commit 86d4fbb

Please sign in to comment.