Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added self documentation to the KSQL config public config variables. #422

Merged
7 changes: 2 additions & 5 deletions ksql-cli/src/test/java/io/confluent/ksql/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand Down Expand Up @@ -147,8 +146,6 @@ private static Map<String, Object> genDefaultConfigMap() {
configMap.put("commit.interval.ms", 0);
configMap.put("cache.max.bytes.buffering", 0);
configMap.put("auto.offset.reset", "earliest");
configMap.put("ksql.command.topic.suffix", "commands");

return configMap;
}

Expand All @@ -163,9 +160,9 @@ private static Map<String, Object> validStartUpConfigs() {
Map<String, Object> startConfigs = genDefaultConfigMap();
startConfigs.put("num.stream.threads", 4);

startConfigs.put(SINK_NUMBER_OF_REPLICATIONS_PROPERTY, 1);
startConfigs.put(SINK_NUMBER_OF_REPLICAS_PROPERTY, 1);
startConfigs.put(SINK_NUMBER_OF_PARTITIONS_PROPERTY, 4);
startConfigs.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY, 1000000);
startConfigs.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_MS_PROPERTY, 1000000);

startConfigs.put(KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG, KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT);
startConfigs.put(KSQL_SERVICE_ID_CONFIG, KSQL_SERVICE_ID_DEFAULT);
Expand Down
153 changes: 69 additions & 84 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,14 @@ public class KsqlConfig extends AbstractConfig implements Cloneable {
public static final String KSQL_CONFIG_PREPERTY_PREFIX = "ksql.";

public static final String KSQL_TIMESTAMP_COLUMN_INDEX = "ksq.timestamp.column.index";
public static final String SINK_TIMESTAMP_COLUMN_NAME = "TIMESTAMP";

public static final String SINK_NUMBER_OF_PARTITIONS = "PARTITIONS";
public static final String SINK_NUMBER_OF_PARTITIONS_PROPERTY = "ksql.sink.partitions";
public static final String DEFAULT_SINK_NUMBER_OF_PARTITIONS = "ksql.sink.partitions.default";

public static final String SINK_NUMBER_OF_REPLICATIONS = "REPLICATIONS";
public static final String SINK_NUMBER_OF_REPLICATIONS_PROPERTY = "ksql.sink.replications";
public static final String DEFAULT_SINK_NUMBER_OF_REPLICATIONS = "ksql.sink.replications.default";
public static final String SINK_NUMBER_OF_REPLICAS_PROPERTY = "ksql.sink.replicas";

public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION =
"WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION";
public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY =

public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_MS_PROPERTY =
"ksql.sink.window.change.log.additional.retention";
public static final String DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION =
"ksql.sink.window.change.log.additional.retention.default";

public static final String STREAM_INTERNAL_CHANGELOG_TOPIC_SUFFIX = "-changelog";

Expand All @@ -57,111 +49,104 @@ public class KsqlConfig extends AbstractConfig implements Cloneable {

public static final String
KSQL_SERVICE_ID_CONFIG = "ksql.service.id";
public static final ConfigDef.Type
KSQL_SERVICE_ID_TYPE = ConfigDef.Type.STRING;
public static final String
KSQL_SERVICE_ID_DEFAULT = "ksql_";
public static final ConfigDef.Importance
KSQL_SERVICE_ID_IMPORTANCE = ConfigDef.Importance.MEDIUM;
public static final String
KSQL_SERVICE_ID_DOC =
"Indicates the ID of the ksql service. It will be used as prefix for all KSQL queries in "
+ "this service.";

public static final String
KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG = "ksql.persistent.prefix";
public static final ConfigDef.Type
KSQL_PERSISTENT_QUERY_NAME_PREFIX_TYPE = ConfigDef.Type.STRING;
public static final String
KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT = "query_";
public static final ConfigDef.Importance
KSQL_PERSISTENT_QUERY_NAME_PREFIX_IMPORTANCE = ConfigDef.Importance.MEDIUM;
public static final String
KSQL_PERSISTENT_QUERY_NAME_PREFIX_DOC =
"Second part of the prefix for persitent queries.";

public static final String
KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG = "ksql.transient.prefix";
public static final ConfigDef.Type
KSQL_TRANSIENT_QUERY_NAME_PREFIX_TYPE = ConfigDef.Type.STRING;
public static final String
KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT = "transient_";
public static final ConfigDef.Importance
KSQL_TRANSIENT_QUERY_NAME_PREFIX_IMPORTANCE = ConfigDef.Importance.MEDIUM;
public static final String
KSQL_TRANSIENT_QUERY_NAME_PREFIX_DOC =
"Second part of the prefix for transient queries.";

public static final String
KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG = "ksql.statestore.suffix";
public static final ConfigDef.Type
KSQL_TABLE_STATESTORE_NAME_SUFFIX_TYPE = ConfigDef.Type.STRING;
public static final String
KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT = "transient_";
public static final ConfigDef.Importance
KSQL_TABLE_STATESTORE_NAME_SUFFIX_IMPORTANCE = ConfigDef.Importance.MEDIUM;
public static final String
KSQL_TABLE_STATESTORE_NAME_SUFFIX_DOC =
"Suffix for state store names in Tables.";
KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT = "_ksql_statestore";

public int defaultSinkNumberOfPartitions = 4;
public short defaultSinkNumberOfReplications = 1;
// TODO: Find out the best default value.
public long defaultSinkWindowChangeLogAdditionalRetention = 1000000;

public String defaultAutoOffsetRestConfig = "latest";
public long defaultCommitIntervalMsConfig = 2000;
public long defaultCacheMaxBytesBufferingConfig = 10000000;
public int defaultNumberOfStreamsThreads = 4;

Map<String, Object> ksqlConfigProps;
Map<String, Object> ksqlStreamConfigProps;

private static final ConfigDef CONFIG_DEF = new ConfigDef();
private static final ConfigDef CONFIG_DEF;

static {
CONFIG_DEF = new ConfigDef()
.define(KSQL_SERVICE_ID_CONFIG,
ConfigDef.Type.STRING,
KSQL_SERVICE_ID_DEFAULT,
ConfigDef.Importance.MEDIUM,
"Indicates the ID of the ksql service. It will be used as prefix for all KSQL queries in "
+ "this service.")

.define(KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG,
ConfigDef.Type.STRING,
KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT,
ConfigDef.Importance.MEDIUM,
"Second part of the prefix for persitent queries. For instance if the prefix is "
+ "query_ the query name will be ksql_query_1.")
.define(KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG,
ConfigDef.Type.STRING,
KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT,
ConfigDef.Importance.MEDIUM,
"Second part of the prefix for transient queries. For instance if the prefix is "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the example. It makes sense now!

+ "transient_ the query name would be "
+ "ksql_transient_4120896722607083946_1509389010601 where 'ksql_' is the first prefix"
+ " and '_transient' is the second part of the prefix for the query id the third and "
+ "4th parts are a random long value and the current timestamp. ")
.define(KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG,
ConfigDef.Type.STRING,
KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT,
ConfigDef.Importance.MEDIUM,
"Suffix for state store names in Tables. For instance if the suffix is _ksql_statestore the state "
+ "store name would be ksql_query_1_ksql_statestore _ksql_statestore ")
.define(SINK_NUMBER_OF_PARTITIONS_PROPERTY,
ConfigDef.Type.INT,
KsqlConstants.defaultSinkNumberOfPartitions,
ConfigDef.Importance.MEDIUM,
"The default number of partitions for the topics created by KSQL.")
.define(SINK_NUMBER_OF_REPLICAS_PROPERTY,
ConfigDef.Type.SHORT,
KsqlConstants.defaultSinkNumberOfReplications,
ConfigDef.Importance.MEDIUM,
"The default number of replicas for the topics created by KSQL."
)
.define(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_MS_PROPERTY,
ConfigDef.Type.LONG,
KsqlConstants.defaultSinkWindowChangeLogAdditionalRetention,
ConfigDef.Importance.MEDIUM,
"The default window change log additional retention time. This is a streams "
+ "config value which will be added to a windows maintainMs to ensure data is not "
+ "deleted from the log prematurely. Allows for clock drift. Default is 1 day"
)
;
}


public KsqlConfig(Map<?, ?> props) {
super(CONFIG_DEF, props);

ksqlConfigProps = new HashMap<>();
ksqlStreamConfigProps = new HashMap<>();
ksqlConfigProps.put(KSQL_SERVICE_ID_CONFIG, KSQL_SERVICE_ID_DEFAULT);
ksqlConfigProps.put(KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG, KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT);
ksqlConfigProps.put(KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG, KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT);
ksqlConfigProps.put(KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG, KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT);

if (props.containsKey(DEFAULT_SINK_NUMBER_OF_PARTITIONS)) {
ksqlConfigProps.put(SINK_NUMBER_OF_PARTITIONS_PROPERTY,
Integer.parseInt(props.get(DEFAULT_SINK_NUMBER_OF_PARTITIONS).toString()));
} else {
ksqlConfigProps.put(SINK_NUMBER_OF_PARTITIONS_PROPERTY, defaultSinkNumberOfPartitions);
}

if (props.containsKey(DEFAULT_SINK_NUMBER_OF_REPLICATIONS)) {
ksqlConfigProps.put(SINK_NUMBER_OF_REPLICATIONS_PROPERTY,
Short.parseShort(props.get(DEFAULT_SINK_NUMBER_OF_REPLICATIONS).toString()));
} else {
ksqlConfigProps.put(SINK_NUMBER_OF_REPLICATIONS_PROPERTY, defaultSinkNumberOfReplications);
}

if (props.containsKey(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION)) {
ksqlConfigProps.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY,
Long.parseLong(props.get(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION).toString()));
} else {
ksqlConfigProps.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY,
defaultSinkWindowChangeLogAdditionalRetention);
}
ksqlConfigProps.putAll(super.values());

ksqlStreamConfigProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, defaultAutoOffsetRestConfig);
ksqlStreamConfigProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, defaultCommitIntervalMsConfig);
ksqlStreamConfigProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KsqlConstants
.defaultAutoOffsetRestConfig);
ksqlStreamConfigProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, KsqlConstants
.defaultCommitIntervalMsConfig);
ksqlStreamConfigProps.put(
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, defaultCacheMaxBytesBufferingConfig);
ksqlStreamConfigProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, defaultNumberOfStreamsThreads);
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, KsqlConstants
.defaultCacheMaxBytesBufferingConfig);
ksqlStreamConfigProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, KsqlConstants
.defaultNumberOfStreamsThreads);

for (Map.Entry<?, ?> entry : props.entrySet()) {
for (Map.Entry<?, ?> entry : originals().entrySet()) {
final String key = entry.getKey().toString();
if (key.toLowerCase().startsWith(KSQL_CONFIG_PREPERTY_PREFIX)) {
ksqlConfigProps.put(key, entry.getValue());
} else {
if (!key.toLowerCase().startsWith(KSQL_CONFIG_PREPERTY_PREFIX)) {
ksqlStreamConfigProps.put(key, entry.getValue());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Copyright 2017 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package io.confluent.ksql.util;

public class KsqlConstants {

public static final String SINK_NUMBER_OF_PARTITIONS = "PARTITIONS";
public static final String SINK_NUMBER_OF_REPLICAS = "REPLICAS";

public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION =
"WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION";
public static final String SINK_TIMESTAMP_COLUMN_NAME = "TIMESTAMP";

public static int defaultSinkNumberOfPartitions = 4;
public static short defaultSinkNumberOfReplications = 1;
// TODO: Find out the best default value.
public static long defaultSinkWindowChangeLogAdditionalRetention = 1000000;

public static String defaultAutoOffsetRestConfig = "latest";
public static long defaultCommitIntervalMsConfig = 2000;
public static long defaultCacheMaxBytesBufferingConfig = 10000000;
public static int defaultNumberOfStreamsThreads = 4;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright 2017 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package io.confluent.ksql.util;

import org.apache.kafka.streams.StreamsConfig;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

import java.util.HashMap;
import java.util.Map;

public class KsqlConfigTest {

@Test
public void shouldSetInitialValuesCorrectly() {
Map<String, Object> initialProps = new HashMap<>();
initialProps.put(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, 10);
initialProps.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, (short) 3);
initialProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 800);
initialProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5);

KsqlConfig ksqlConfig = new KsqlConfig(initialProps);

assertThat(ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY), equalTo(10));
assertThat(ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY), equalTo((short) 3));

}


}
Loading