diff --git a/ksql-api/src/main/java/io/confluent/ksql/api/server/ApiServerConfig.java b/ksql-api/src/main/java/io/confluent/ksql/api/server/ApiServerConfig.java index 38a195b0ac0a..754e68259f61 100644 --- a/ksql-api/src/main/java/io/confluent/ksql/api/server/ApiServerConfig.java +++ b/ksql-api/src/main/java/io/confluent/ksql/api/server/ApiServerConfig.java @@ -15,12 +15,16 @@ package io.confluent.ksql.api.server; -import io.confluent.common.config.AbstractConfig; -import io.confluent.common.config.ConfigDef; -import io.confluent.common.config.ConfigDef.Importance; -import io.confluent.common.config.ConfigDef.Type; +import static io.confluent.ksql.configdef.ConfigValidators.zeroOrPositive; + import io.confluent.ksql.util.KsqlConfig; import java.util.Map; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Validator; +import org.apache.kafka.common.config.ConfigException; /** * Config for the API server @@ -91,6 +95,7 @@ private static String propertyName(final String name) { VERTICLE_INSTANCES, Type.INT, DEFAULT_VERTICLE_INSTANCES, + oneOrMore(), Importance.MEDIUM, VERTICLE_INSTANCES_DOC) .define( @@ -103,6 +108,7 @@ private static String propertyName(final String name) { LISTEN_PORT, Type.INT, DEFAULT_LISTEN_PORT, + zeroOrPositive(), Importance.MEDIUM, LISTEN_PORT_DOC) .define( @@ -145,12 +151,14 @@ private static String propertyName(final String name) { WORKER_POOL_SIZE, Type.INT, DEFAULT_WORKER_POOL_SIZE, + zeroOrPositive(), Importance.MEDIUM, WORKER_POOL_DOC) .define( MAX_PUSH_QUERIES, Type.INT, DEFAULT_MAX_PUSH_QUERIES, + zeroOrPositive(), Importance.MEDIUM, MAX_PUSH_QUERIES_DOC); @@ -158,4 +166,20 @@ public ApiServerConfig(final Map map) { super(CONFIG_DEF, map); } + private static Validator oneOrMore() { + return (name, val) -> { + if (val instanceof Long) { + if (((Long) val) < 1) { + throw new ConfigException(name, val, "Not >= 1"); + } + } else if (val instanceof Integer) { + if (((Integer) val) < 1) { + throw new ConfigException(name, val, "Not >= 1"); + } + } else { + throw new IllegalArgumentException("validator should only be used with int, long"); + } + }; + } + }