Skip to content

Commit

Permalink
fix: coerce property values to correct type (#8765)
Browse files Browse the repository at this point in the history
* coerce property values to correct type

This patch changes Command to coerce overwritten properties to the right
types. Previously, if a user provided a property that deserializes to a
different type by default (e.g. a property that is declared as Short will
deserialize to Integer by default), the command would fail to execute due
to a type mismatch.

* checkstyle

* fix test

* fix exception type

* review feedback

* fix test bug
  • Loading branch information
rodesai authored Mar 2, 2022
1 parent f85b8e6 commit 7f7a076
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public Object parse(final String property, final Object value) {
}

final ConfigItem configItem = resolver.resolve(property, true)
.orElseThrow(() -> new IllegalArgumentException(String.format(
"Not recognizable as ksql, streams, consumer, or producer property: '%s'", property)));
.orElseThrow(() -> new PropertyNotFoundException(property));

final Object parsedValue = configItem.parseValue(value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.config.PropertyParser;
import io.confluent.ksql.util.KsqlException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
Expand All @@ -33,6 +36,7 @@
* Utility class for working with property files and system properties.
*/
public final class PropertiesUtil {
private static final PropertyParser PROPERTY_PARSER = new LocalPropertyParser();

private static final Set<Predicate<String>> BLACK_LIST = ImmutableSet
.<Predicate<String>>builder()
Expand Down Expand Up @@ -159,4 +163,52 @@ private static Map<String, String> asMap(final Properties props) {
props.stringPropertyNames().forEach(key -> builder.put(key, props.getProperty(key)));
return builder.build();
}

public static Map<String, Object> coerceTypes(
final Map<String, Object> streamsProperties,
final boolean ignoreUnresolved
) {
if (streamsProperties == null) {
return Collections.emptyMap();
}

final Map<String, Object> validated = new HashMap<>(streamsProperties.size());
for (final Map.Entry<String, Object> e : streamsProperties.entrySet()) {
try {
validated.put(e.getKey(), coerceType(e.getKey(), e.getValue()));
} catch (final PropertyNotFoundException p) {
if (ignoreUnresolved) {
validated.put(e.getKey(), e.getValue());
} else {
throw p;
}
}
}
return validated;
}

private static Object coerceType(final String key, final Object value) {
try {
final String stringValue = value == null
? null
: value instanceof List
? listToString((List<?>) value)
: String.valueOf(value);

return PROPERTY_PARSER.parse(key, stringValue);
} catch (final PropertyNotFoundException e) {
throw e;
} catch (final Exception e) {
throw new KsqlException(
"Failed to coerce type of value '" + value + "' for key '" + key + "'",
e
);
}
}

private static String listToString(final List<?> value) {
return value.stream()
.map(e -> e == null ? null : e.toString())
.collect(Collectors.joining(","));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.properties;

public class PropertyNotFoundException extends IllegalArgumentException {
PropertyNotFoundException(final String property) {
super(String.format(
"Not recognizable as ksql, streams, consumer, or producer property: '%s'",
property
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -180,6 +182,40 @@ public void shouldFilterByKey() {
assertThat(result.get("keep that"), is("v1"));
}

@Test
public void shouldCoerceTypes() {
// given/when:
final Map<String, Object> coerced = PropertiesUtil.coerceTypes(ImmutableMap.of(
"ksql.internal.topic.replicas", 3L,
"cache.max.bytes.buffering", "0"
), false);

// then:
assertThat(coerced.get("ksql.internal.topic.replicas"), instanceOf(Short.class));
assertThat(coerced.get("ksql.internal.topic.replicas"), equalTo((short) 3));
assertThat(coerced.get("cache.max.bytes.buffering"), instanceOf(Long.class));
assertThat(coerced.get("cache.max.bytes.buffering"), equalTo(0L));
}

@Test
public void shouldThrowOnUnkownPropertyFromCoerceTypes() {
// given/when:
assertThrows(
PropertyNotFoundException.class,
() -> PropertiesUtil.coerceTypes(ImmutableMap.of("foo", "bar"), false)
);
}

@Test
public void shouldNotThrowOnUnkownPropertyFromCoerceTypesWithIgnore() {
// given/when
final Map<String, Object> coerced
= PropertiesUtil.coerceTypes(ImmutableMap.of("foo", "bar"), true);

// then:
assertThat(coerced.get("foo"), is("bar"));
}

private void givenPropsFileContains(final String contents) {
try {
Files.write(propsFile.toPath(), contents.getBytes(StandardCharsets.UTF_8));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.properties.PropertiesUtil;
import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException;
import io.confluent.ksql.statement.ConfiguredStatement;
import java.util.Collections;
Expand Down Expand Up @@ -117,7 +118,7 @@ public String getStatement() {
justification = "overwriteProperties is unmodifiableMap()"
)
public Map<String, Object> getOverwriteProperties() {
return overwriteProperties;
return PropertiesUtil.coerceTypes(overwriteProperties, true);
}

@SuppressFBWarnings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.execution.json.PlanJsonMapper;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException;
import org.hamcrest.Matchers;
import org.junit.Test;

public class CommandTest {
Expand Down Expand Up @@ -90,7 +93,8 @@ public void shouldDeserializeCorrectlyWithVersion() throws IOException {
}

private void grep(final String string, final String regex) {
assertThat(string.matches(regex), is(true));
assertThat(String.format("[%s] does not match [%s]", string, regex), string.matches(regex), is(true));

}

@Test
Expand All @@ -108,4 +112,27 @@ public void shouldSerializeDeserializeCorrectly() throws IOException {
final Command deserialized = mapper.readValue(serialized, Command.class);
assertThat(deserialized, equalTo(command));
}

@Test
public void shouldCoerceProperties() {
// Given/When:
final Command command = new Command(
"test statement;",
ImmutableMap.of(
"ksql.internal.topic.replicas", 3L
),
Collections.emptyMap(),
Optional.empty()
);

// Then:
assertThat(
command.getOverwriteProperties().get("ksql.internal.topic.replicas"),
instanceOf(Short.class)
);
assertThat(
command.getOverwriteProperties().get("ksql.internal.topic.replicas"),
Matchers.equalTo((short) 3)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.config.PropertyParser;
import io.confluent.ksql.properties.LocalPropertyParser;
import io.confluent.ksql.util.KsqlException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import io.confluent.ksql.properties.PropertiesUtil;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -35,8 +30,6 @@
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonSubTypes({})
public class KsqlRequest {
private static final PropertyParser PROPERTY_PARSER = new LocalPropertyParser();

private final String ksql;
private final ImmutableMap<String, Object> configOverrides;
private final ImmutableMap<String, Object> requestProperties;
Expand Down Expand Up @@ -79,11 +72,11 @@ public String getKsql() {

@JsonProperty("streamsProperties")
public Map<String, Object> getConfigOverrides() {
return coerceTypes(configOverrides);
return PropertiesUtil.coerceTypes(configOverrides, false);
}

public Map<String, Object> getRequestProperties() {
return coerceTypes(requestProperties);
return PropertiesUtil.coerceTypes(requestProperties, false);
}

@SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "sessionVariables is ImmutableMap")
Expand Down Expand Up @@ -147,34 +140,4 @@ public String toString() {
return kv.getValue();
}));
}

private static Map<String, Object> coerceTypes(final Map<String, Object> streamsProperties) {
if (streamsProperties == null) {
return Collections.emptyMap();
}

final Map<String, Object> validated = new HashMap<>(streamsProperties.size());
streamsProperties.forEach((k, v) -> validated.put(k, coerceType(k, v)));
return validated;
}

private static Object coerceType(final String key, final Object value) {
try {
final String stringValue = value == null
? null
: value instanceof List
? listToString((List<?>) value)
: String.valueOf(value);

return PROPERTY_PARSER.parse(key, stringValue);
} catch (final Exception e) {
throw new KsqlException("Failed to set '" + key + "' to '" + value + "'", e);
}
}

private static String listToString(final List<?> value) {
return value.stream()
.map(e -> e == null ? null : e.toString())
.collect(Collectors.joining(","));
}
}

0 comments on commit 7f7a076

Please sign in to comment.