diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/properties/LocalPropertyParser.java b/ksqldb-common/src/main/java/io/confluent/ksql/properties/LocalPropertyParser.java index db76763d385d..19b0bef9f3b8 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/properties/LocalPropertyParser.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/properties/LocalPropertyParser.java @@ -45,8 +45,7 @@ public Object parse(final String property, final Object value) { } final ConfigItem configItem = resolver.resolve(property, true) - .orElseThrow(() -> new IllegalArgumentException(String.format( - "Not recognizable as ksql, streams, consumer, or producer property: '%s'", property))); + .orElseThrow(() -> new PropertyNotFoundException(property)); final Object parsedValue = configItem.parseValue(value); diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/properties/PropertiesUtil.java b/ksqldb-common/src/main/java/io/confluent/ksql/properties/PropertiesUtil.java index d31e012ed75f..7fc92aa6aa7b 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/properties/PropertiesUtil.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/properties/PropertiesUtil.java @@ -18,11 +18,14 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.ImmutableSet; +import io.confluent.ksql.config.PropertyParser; import io.confluent.ksql.util.KsqlException; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -33,6 +36,7 @@ * Utility class for working with property files and system properties. */ public final class PropertiesUtil { + private static final PropertyParser PROPERTY_PARSER = new LocalPropertyParser(); private static final Set> BLACK_LIST = ImmutableSet .>builder() @@ -159,4 +163,52 @@ private static Map asMap(final Properties props) { props.stringPropertyNames().forEach(key -> builder.put(key, props.getProperty(key))); return builder.build(); } + + public static Map coerceTypes( + final Map streamsProperties, + final boolean ignoreUnresolved + ) { + if (streamsProperties == null) { + return Collections.emptyMap(); + } + + final Map validated = new HashMap<>(streamsProperties.size()); + for (final Map.Entry e : streamsProperties.entrySet()) { + try { + validated.put(e.getKey(), coerceType(e.getKey(), e.getValue())); + } catch (final PropertyNotFoundException p) { + if (ignoreUnresolved) { + validated.put(e.getKey(), e.getValue()); + } else { + throw p; + } + } + } + return validated; + } + + private static Object coerceType(final String key, final Object value) { + try { + final String stringValue = value == null + ? null + : value instanceof List + ? listToString((List) value) + : String.valueOf(value); + + return PROPERTY_PARSER.parse(key, stringValue); + } catch (final PropertyNotFoundException e) { + throw e; + } catch (final Exception e) { + throw new KsqlException( + "Failed to coerce type of value '" + value + "' for key '" + key + "'", + e + ); + } + } + + private static String listToString(final List value) { + return value.stream() + .map(e -> e == null ? null : e.toString()) + .collect(Collectors.joining(",")); + } } diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/properties/PropertyNotFoundException.java b/ksqldb-common/src/main/java/io/confluent/ksql/properties/PropertyNotFoundException.java new file mode 100644 index 000000000000..3a98b706449a --- /dev/null +++ b/ksqldb-common/src/main/java/io/confluent/ksql/properties/PropertyNotFoundException.java @@ -0,0 +1,25 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.properties; + +public class PropertyNotFoundException extends IllegalArgumentException { + PropertyNotFoundException(final String property) { + super(String.format( + "Not recognizable as ksql, streams, consumer, or producer property: '%s'", + property + )); + } +} diff --git a/ksqldb-common/src/test/java/io/confluent/ksql/properties/PropertiesUtilTest.java b/ksqldb-common/src/test/java/io/confluent/ksql/properties/PropertiesUtilTest.java index 7aa2274c2714..b27804881741 100644 --- a/ksqldb-common/src/test/java/io/confluent/ksql/properties/PropertiesUtilTest.java +++ b/ksqldb-common/src/test/java/io/confluent/ksql/properties/PropertiesUtilTest.java @@ -19,7 +19,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThrows; @@ -180,6 +182,40 @@ public void shouldFilterByKey() { assertThat(result.get("keep that"), is("v1")); } + @Test + public void shouldCoerceTypes() { + // given/when: + final Map coerced = PropertiesUtil.coerceTypes(ImmutableMap.of( + "ksql.internal.topic.replicas", 3L, + "cache.max.bytes.buffering", "0" + ), false); + + // then: + assertThat(coerced.get("ksql.internal.topic.replicas"), instanceOf(Short.class)); + assertThat(coerced.get("ksql.internal.topic.replicas"), equalTo((short) 3)); + assertThat(coerced.get("cache.max.bytes.buffering"), instanceOf(Long.class)); + assertThat(coerced.get("cache.max.bytes.buffering"), equalTo(0L)); + } + + @Test + public void shouldThrowOnUnkownPropertyFromCoerceTypes() { + // given/when: + assertThrows( + PropertyNotFoundException.class, + () -> PropertiesUtil.coerceTypes(ImmutableMap.of("foo", "bar"), false) + ); + } + + @Test + public void shouldNotThrowOnUnkownPropertyFromCoerceTypesWithIgnore() { + // given/when + final Map coerced + = PropertiesUtil.coerceTypes(ImmutableMap.of("foo", "bar"), true); + + // then: + assertThat(coerced.get("foo"), is("bar")); + } + private void givenPropsFileContains(final String contents) { try { Files.write(propsFile.toPath(), contents.getBytes(StandardCharsets.UTF_8)); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java index 7298cf665f9f..8273bd13c3c3 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/Command.java @@ -26,6 +26,7 @@ import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.KsqlPlan; import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan; +import io.confluent.ksql.properties.PropertiesUtil; import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException; import io.confluent.ksql.statement.ConfiguredStatement; import java.util.Collections; @@ -117,7 +118,7 @@ public String getStatement() { justification = "overwriteProperties is unmodifiableMap()" ) public Map getOverwriteProperties() { - return overwriteProperties; + return PropertiesUtil.coerceTypes(overwriteProperties, true); } @SuppressFBWarnings( diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandTest.java index 4df216c9f3d2..4e3e03a15851 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandTest.java @@ -18,11 +18,13 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.exc.ValueInstantiationException; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.execution.json.PlanJsonMapper; import java.io.IOException; import java.util.Collections; @@ -30,6 +32,7 @@ import java.util.Optional; import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException; +import org.hamcrest.Matchers; import org.junit.Test; public class CommandTest { @@ -90,7 +93,8 @@ public void shouldDeserializeCorrectlyWithVersion() throws IOException { } private void grep(final String string, final String regex) { - assertThat(string.matches(regex), is(true)); + assertThat(String.format("[%s] does not match [%s]", string, regex), string.matches(regex), is(true)); + } @Test @@ -108,4 +112,27 @@ public void shouldSerializeDeserializeCorrectly() throws IOException { final Command deserialized = mapper.readValue(serialized, Command.class); assertThat(deserialized, equalTo(command)); } + + @Test + public void shouldCoerceProperties() { + // Given/When: + final Command command = new Command( + "test statement;", + ImmutableMap.of( + "ksql.internal.topic.replicas", 3L + ), + Collections.emptyMap(), + Optional.empty() + ); + + // Then: + assertThat( + command.getOverwriteProperties().get("ksql.internal.topic.replicas"), + instanceOf(Short.class) + ); + assertThat( + command.getOverwriteProperties().get("ksql.internal.topic.replicas"), + Matchers.equalTo((short) 3) + ); + } } diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java index ad8a60ea104d..8d7279f5a3b3 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java @@ -21,12 +21,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.google.common.collect.ImmutableMap; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import io.confluent.ksql.config.PropertyParser; -import io.confluent.ksql.properties.LocalPropertyParser; -import io.confluent.ksql.util.KsqlException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; +import io.confluent.ksql.properties.PropertiesUtil; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -35,8 +30,6 @@ @JsonIgnoreProperties(ignoreUnknown = true) @JsonSubTypes({}) public class KsqlRequest { - private static final PropertyParser PROPERTY_PARSER = new LocalPropertyParser(); - private final String ksql; private final ImmutableMap configOverrides; private final ImmutableMap requestProperties; @@ -79,11 +72,11 @@ public String getKsql() { @JsonProperty("streamsProperties") public Map getConfigOverrides() { - return coerceTypes(configOverrides); + return PropertiesUtil.coerceTypes(configOverrides, false); } public Map getRequestProperties() { - return coerceTypes(requestProperties); + return PropertiesUtil.coerceTypes(requestProperties, false); } @SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "sessionVariables is ImmutableMap") @@ -147,34 +140,4 @@ public String toString() { return kv.getValue(); })); } - - private static Map coerceTypes(final Map streamsProperties) { - if (streamsProperties == null) { - return Collections.emptyMap(); - } - - final Map validated = new HashMap<>(streamsProperties.size()); - streamsProperties.forEach((k, v) -> validated.put(k, coerceType(k, v))); - return validated; - } - - private static Object coerceType(final String key, final Object value) { - try { - final String stringValue = value == null - ? null - : value instanceof List - ? listToString((List) value) - : String.valueOf(value); - - return PROPERTY_PARSER.parse(key, stringValue); - } catch (final Exception e) { - throw new KsqlException("Failed to set '" + key + "' to '" + value + "'", e); - } - } - - private static String listToString(final List value) { - return value.stream() - .map(e -> e == null ? null : e.toString()) - .collect(Collectors.joining(",")); - } } \ No newline at end of file