Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added self documentation to the KSQL config public config variables. #422

Merged
2 changes: 1 addition & 1 deletion ksql-cli/src/test/java/io/confluent/ksql/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private static Map<String, Object> validStartUpConfigs() {
Map<String, Object> startConfigs = genDefaultConfigMap();
startConfigs.put("num.stream.threads", 4);

startConfigs.put(SINK_NUMBER_OF_REPLICATIONS_PROPERTY, 1);
startConfigs.put(SINK_NUMBER_OF_REPLICAS_PROPERTY, 1);
startConfigs.put(SINK_NUMBER_OF_PARTITIONS_PROPERTY, 4);
startConfigs.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY, 1000000);

Expand Down
43 changes: 7 additions & 36 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,14 @@ public class KsqlConfig extends AbstractConfig implements Cloneable {

public static final String SINK_NUMBER_OF_PARTITIONS = "PARTITIONS";
public static final String SINK_NUMBER_OF_PARTITIONS_PROPERTY = "ksql.sink.partitions";
public static final String DEFAULT_SINK_NUMBER_OF_PARTITIONS = "ksql.sink.partitions.default";

public static final String SINK_NUMBER_OF_REPLICATIONS = "REPLICATIONS";
public static final String SINK_NUMBER_OF_REPLICATIONS_PROPERTY = "ksql.sink.replications";
public static final String DEFAULT_SINK_NUMBER_OF_REPLICATIONS = "ksql.sink.replications.default";
public static final String SINK_NUMBER_OF_REPLICAS = "REPLICAS";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name and the value of this property don't seem to line up. What does the REPLICAS value mean exactly? Would it be more appropriate to have a numeric value? This applies to the SINK_NUMBER_OF_PARTITIONS property above as well. Also where is this SINK_NUMBER_OF_REPLICAS string used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not part of the config variables that can be set by the user via the KsqlConfig but they will be set in the query to specify the number of partitions, replications and the timestamp column for the result stream/table. This is just a constant to represent the name of the attributes.
Here is an example:
CREATE STREAM enrichedpv1 with (timestamp='pageid', partitions = 3) AS SELECT users.userid AS userid, pageid as pageid, region, gender FROM pageview LEFT JOIN users ON pageview.userid = users.userid;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Then why is it in KsqlConfig? There is also a SINK_NUMBER_OF_REPLICAS_PROPERTY which I assume does go in the ConfigDef? It would be better to have the literals that are in the query moved to a separate file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I'll move them to a new file, KsqlConstants.

public static final String SINK_NUMBER_OF_REPLICAS_PROPERTY = "ksql.sink.replicas";

public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION =
"WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION";
public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY =
"ksql.sink.window.change.log.additional.retention";
public static final String DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION =
"ksql.sink.window.change.log.additional.retention.default";

public static final String STREAM_INTERNAL_CHANGELOG_TOPIC_SUFFIX = "-changelog";

Expand Down Expand Up @@ -118,18 +114,18 @@ public class KsqlConfig extends AbstractConfig implements Cloneable {
"Suffix for state store names in Tables. For instance if the suffix is _ksql_statestore the state "
+ "store name would be ksql_query_1_ksql_statestore"
+ "_ksql_statestore ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you add this extra line by mistake?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch for 'transient_query_', it should be 'query_'. Fixed it.
IntelliJ added this, fixed it :)

.define(DEFAULT_SINK_NUMBER_OF_PARTITIONS,
.define(SINK_NUMBER_OF_PARTITIONS_PROPERTY,
ConfigDef.Type.INT,
defaultSinkNumberOfPartitions,
ConfigDef.Importance.MEDIUM,
"The default number of partitions for the topics created by KSQL.")
.define(DEFAULT_SINK_NUMBER_OF_REPLICATIONS,
.define(SINK_NUMBER_OF_REPLICAS_PROPERTY,
ConfigDef.Type.SHORT,
defaultSinkNumberOfReplications,
ConfigDef.Importance.MEDIUM,
"The default number of replicas for the topics created by KSQL."
)
.define(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION,
.define(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY,
ConfigDef.Type.LONG,
defaultSinkWindowChangeLogAdditionalRetention,
ConfigDef.Importance.MEDIUM,
Expand All @@ -144,40 +140,15 @@ public KsqlConfig(Map<?, ?> props) {

ksqlConfigProps = new HashMap<>();
ksqlStreamConfigProps = new HashMap<>();
ksqlConfigProps.put(KSQL_SERVICE_ID_CONFIG, KSQL_SERVICE_ID_DEFAULT);
ksqlConfigProps.put(KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG, KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT);
ksqlConfigProps.put(KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG, KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT);
ksqlConfigProps.put(KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG, KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT);

if (props.containsKey(DEFAULT_SINK_NUMBER_OF_PARTITIONS)) {
ksqlConfigProps.put(SINK_NUMBER_OF_PARTITIONS_PROPERTY,
Integer.parseInt(props.get(DEFAULT_SINK_NUMBER_OF_PARTITIONS).toString()));
} else {
ksqlConfigProps.put(SINK_NUMBER_OF_PARTITIONS_PROPERTY, defaultSinkNumberOfPartitions);
}

if (props.containsKey(DEFAULT_SINK_NUMBER_OF_REPLICATIONS)) {
ksqlConfigProps.put(SINK_NUMBER_OF_REPLICATIONS_PROPERTY,
Short.parseShort(props.get(DEFAULT_SINK_NUMBER_OF_REPLICATIONS).toString()));
} else {
ksqlConfigProps.put(SINK_NUMBER_OF_REPLICATIONS_PROPERTY, defaultSinkNumberOfReplications);
}

if (props.containsKey(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION)) {
ksqlConfigProps.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY,
Long.parseLong(props.get(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION).toString()));
} else {
ksqlConfigProps.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY,
defaultSinkWindowChangeLogAdditionalRetention);
}
ksqlConfigProps.putAll(super.values());

ksqlStreamConfigProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, defaultAutoOffsetRestConfig);
ksqlStreamConfigProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, defaultCommitIntervalMsConfig);
ksqlStreamConfigProps.put(
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, defaultCacheMaxBytesBufferingConfig);
ksqlStreamConfigProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, defaultNumberOfStreamsThreads);

for (Map.Entry<?, ?> entry : props.entrySet()) {
for (Map.Entry<?, ?> entry : originals().entrySet()) {
final String key = entry.getKey().toString();
if (key.toLowerCase().startsWith(KSQL_CONFIG_PREPERTY_PREFIX)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we'd already have all the properties in the ksqlConfigProps by virtue of constructing the map from super.values(), so we could skip updating the ksqlConfigProps and only update ksqlStreamConfigProps if the key doesn't start with KSQL_CONFIG_PROPERTY_PREFIX

ksqlConfigProps.put(key, entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,16 +466,16 @@ private void setIntoProperties(final StructuredDataSource into, final Table node
}
}

if (node.getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS) != null) {
if (node.getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICAS) != null) {
try {
short numberOfReplications =
Short.parseShort(node.getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS)
Short.parseShort(node.getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICAS)
.toString());
analysis.getIntoProperties()
.put(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY, numberOfReplications);
.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, numberOfReplications);
} catch (NumberFormatException e) {
throw new KsqlException("Invalid number of replications in WITH clause: " + node
.getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS).toString());
.getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICAS).toString());
}
}
}
Expand Down Expand Up @@ -541,7 +541,7 @@ private void validateWithClause(Set<String> withClauseVariables) {
validSet.add(DdlConfig.PARTITION_BY_PROPERTY.toUpperCase());
validSet.add(KsqlConfig.SINK_TIMESTAMP_COLUMN_NAME.toUpperCase());
validSet.add(KsqlConfig.SINK_NUMBER_OF_PARTITIONS.toUpperCase());
validSet.add(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS.toUpperCase());
validSet.add(KsqlConfig.SINK_NUMBER_OF_REPLICAS.toUpperCase());

for (String withVariable: withClauseVariables) {
if (!validSet.contains(withVariable.toUpperCase())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ public SchemaKStream buildStream(final StreamsBuilder builder,
ksqlConfig.put(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY,
outputProperties.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY));
}
if (outputProperties.containsKey(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY)) {
ksqlConfig.put(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY,
outputProperties.get(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY
if (outputProperties.containsKey(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY)) {
ksqlConfig.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY,
outputProperties.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY
));
}

Expand Down Expand Up @@ -181,7 +181,7 @@ private void addAvroSchemaToResultTopic(final KsqlStructuredDataOutputNode.Build

private void createSinkTopic(final String kafkaTopicName, KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient) {
int numberOfPartitions = (Integer) ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY);
short numberOfReplications = (Short) ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY);
short numberOfReplications = (Short) ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY);
kafkaTopicClient.createTopic(kafkaTopicName, numberOfPartitions, numberOfReplications);
}
public Field getTimestampField() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class KsqlStructuredDataOutputNodeTest {
public void before() {
final Map<String, Object> props = new HashMap<>();
props.put(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, 4);
props.put(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY, (short)3);
props.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, (short)3);
createOutputNode(props);
topicClient.createTopic(eq("output"), anyInt(), anyShort());
EasyMock.expectLastCall();
Expand Down Expand Up @@ -156,7 +156,7 @@ public void shouldHaveCorrectOutputNodeSchema() {

@Test
public void shouldUpdateReplicationPartitionsInConfig() {
assertThat(ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY), equalTo(Integer.valueOf(3).shortValue()));
assertThat(ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY), equalTo(Integer.valueOf(3).shortValue()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ public static KsqlRestApplication buildApplication(KsqlRestConfig restConfig, bo

try {
short replicationFactor = 1;
if(restConfig.getOriginals().containsKey(KsqlConfig.DEFAULT_SINK_NUMBER_OF_REPLICATIONS)) {
if(restConfig.getOriginals().containsKey(KsqlConfig.SINK_NUMBER_OF_REPLICAS)) {
replicationFactor = Short.parseShort(restConfig.getOriginals()
.get(KsqlConfig.DEFAULT_SINK_NUMBER_OF_REPLICATIONS).toString());
.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS).toString());
}
client.createTopic(commandTopic, 1, replicationFactor);
} catch (KafkaTopicException e) {
Expand Down