Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added self documentation to the KSQL config public config variables. #422

Merged
2 changes: 1 addition & 1 deletion ksql-cli/src/test/java/io/confluent/ksql/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private static Map<String, Object> validStartUpConfigs() {
Map<String, Object> startConfigs = genDefaultConfigMap();
startConfigs.put("num.stream.threads", 4);

startConfigs.put(SINK_NUMBER_OF_REPLICATIONS_PROPERTY, 1);
startConfigs.put(SINK_NUMBER_OF_REPLICAS_PROPERTY, 1);
startConfigs.put(SINK_NUMBER_OF_PARTITIONS_PROPERTY, 4);
startConfigs.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY, 1000000);

Expand Down
133 changes: 62 additions & 71 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,14 @@ public class KsqlConfig extends AbstractConfig implements Cloneable {

public static final String SINK_NUMBER_OF_PARTITIONS = "PARTITIONS";
public static final String SINK_NUMBER_OF_PARTITIONS_PROPERTY = "ksql.sink.partitions";
public static final String DEFAULT_SINK_NUMBER_OF_PARTITIONS = "ksql.sink.partitions.default";

public static final String SINK_NUMBER_OF_REPLICATIONS = "REPLICATIONS";
public static final String SINK_NUMBER_OF_REPLICATIONS_PROPERTY = "ksql.sink.replications";
public static final String DEFAULT_SINK_NUMBER_OF_REPLICATIONS = "ksql.sink.replications.default";
public static final String SINK_NUMBER_OF_REPLICAS = "REPLICAS";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name and the value of this property don't seem to line up. What does the REPLICAS value mean exactly? Would it be more appropriate to have a numeric value? This applies to the SINK_NUMBER_OF_PARTITIONS property above as well. Also where is this SINK_NUMBER_OF_REPLICAS string used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not part of the config variables that can be set by the user via the KsqlConfig but they will be set in the query to specify the number of partitions, replications and the timestamp column for the result stream/table. This is just a constant to represent the name of the attributes.
Here is an example:
CREATE STREAM enrichedpv1 with (timestamp='pageid', partitions = 3) AS SELECT users.userid AS userid, pageid as pageid, region, gender FROM pageview LEFT JOIN users ON pageview.userid = users.userid;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Then why is it in KsqlConfig? There is also a SINK_NUMBER_OF_REPLICAS_PROPERTY which I assume does go in the ConfigDef? It would be better to have the literals that are in the query moved to a separate file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I'll move them to a new file, KsqlConstants.

public static final String SINK_NUMBER_OF_REPLICAS_PROPERTY = "ksql.sink.replicas";

public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION =
"WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION";
public static final String SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY =
"ksql.sink.window.change.log.additional.retention";
public static final String DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION =
"ksql.sink.window.change.log.additional.retention.default";

public static final String STREAM_INTERNAL_CHANGELOG_TOPIC_SUFFIX = "-changelog";

Expand All @@ -57,107 +53,102 @@ public class KsqlConfig extends AbstractConfig implements Cloneable {

public static final String
KSQL_SERVICE_ID_CONFIG = "ksql.service.id";
public static final ConfigDef.Type
KSQL_SERVICE_ID_TYPE = ConfigDef.Type.STRING;
public static final String
KSQL_SERVICE_ID_DEFAULT = "ksql_";
public static final ConfigDef.Importance
KSQL_SERVICE_ID_IMPORTANCE = ConfigDef.Importance.MEDIUM;
public static final String
KSQL_SERVICE_ID_DOC =
"Indicates the ID of the ksql service. It will be used as prefix for all KSQL queries in "
+ "this service.";

public static final String
KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG = "ksql.persistent.prefix";
public static final ConfigDef.Type
KSQL_PERSISTENT_QUERY_NAME_PREFIX_TYPE = ConfigDef.Type.STRING;
public static final String
KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT = "query_";
public static final ConfigDef.Importance
KSQL_PERSISTENT_QUERY_NAME_PREFIX_IMPORTANCE = ConfigDef.Importance.MEDIUM;
public static final String
KSQL_PERSISTENT_QUERY_NAME_PREFIX_DOC =
"Second part of the prefix for persitent queries.";

public static final String
KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG = "ksql.transient.prefix";
public static final ConfigDef.Type
KSQL_TRANSIENT_QUERY_NAME_PREFIX_TYPE = ConfigDef.Type.STRING;
public static final String
KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT = "transient_";
public static final ConfigDef.Importance
KSQL_TRANSIENT_QUERY_NAME_PREFIX_IMPORTANCE = ConfigDef.Importance.MEDIUM;
public static final String
KSQL_TRANSIENT_QUERY_NAME_PREFIX_DOC =
"Second part of the prefix for transient queries.";

public static final String
KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG = "ksql.statestore.suffix";
public static final ConfigDef.Type
KSQL_TABLE_STATESTORE_NAME_SUFFIX_TYPE = ConfigDef.Type.STRING;
public static final String
KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT = "transient_";
public static final ConfigDef.Importance
KSQL_TABLE_STATESTORE_NAME_SUFFIX_IMPORTANCE = ConfigDef.Importance.MEDIUM;
public static final String
KSQL_TABLE_STATESTORE_NAME_SUFFIX_DOC =
"Suffix for state store names in Tables.";
KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT = "_ksql_statestore";

public int defaultSinkNumberOfPartitions = 4;
public short defaultSinkNumberOfReplications = 1;
private static int defaultSinkNumberOfPartitions = 4;
private static short defaultSinkNumberOfReplications = 1;
// TODO: Find out the best default value.
public long defaultSinkWindowChangeLogAdditionalRetention = 1000000;
private static long defaultSinkWindowChangeLogAdditionalRetention = 1000000;

public String defaultAutoOffsetRestConfig = "latest";
public long defaultCommitIntervalMsConfig = 2000;
public long defaultCacheMaxBytesBufferingConfig = 10000000;
public int defaultNumberOfStreamsThreads = 4;
private static String defaultAutoOffsetRestConfig = "latest";
private static long defaultCommitIntervalMsConfig = 2000;
private static long defaultCacheMaxBytesBufferingConfig = 10000000;
private static int defaultNumberOfStreamsThreads = 4;

Map<String, Object> ksqlConfigProps;
Map<String, Object> ksqlStreamConfigProps;

private static final ConfigDef CONFIG_DEF = new ConfigDef();
private static final ConfigDef CONFIG_DEF;

static {
CONFIG_DEF = new ConfigDef()
.define(KSQL_SERVICE_ID_CONFIG,
ConfigDef.Type.STRING,
KSQL_SERVICE_ID_DEFAULT,
ConfigDef.Importance.MEDIUM,
"Indicates the ID of the ksql service. It will be used as prefix for all KSQL queries in "
+ "this service.")

.define(KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG,
ConfigDef.Type.STRING,
KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT,
ConfigDef.Importance.MEDIUM,
"Second part of the prefix for persitent queries. For instance if the prefix is transient_"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean transient_ here? IT doesn't fit in with the rest of the description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the second part. The first part come from KSQL_SERVICE_ID_CONFIG(ksql.service.id)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I am still not sure I follow. I think it would make sense for the example to include the full chain, so that it is clearer to the user how it all comes together at each stage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more details with an example.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc string here as written is:

Second part of the prefix for persitent queries. For instance if the prefix is transient_query_ the query name will be ksql_query_1.

Did you really mean for the example prefix to be 'transient_query_'?

+ "query_ the query name will be ksql_query_1.")
.define(KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG,
ConfigDef.Type.STRING,
KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT,
ConfigDef.Importance.MEDIUM,
"Second part of the prefix for transient queries. For instance if the prefix is "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the example. It makes sense now!

+ "transient_ the query name would be ksql_transient_4120896722607083946_1509389010601")
.define(KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG,
ConfigDef.Type.STRING,
KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT,
ConfigDef.Importance.MEDIUM,
"Suffix for state store names in Tables. For instance if the suffix is _ksql_statestore the state "
+ "store name would be ksql_query_1_ksql_statestore"
+ "_ksql_statestore ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you add this extra line by mistake?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch for 'transient_query_', it should be 'query_'. Fixed it.
IntelliJ added this, fixed it :)

.define(SINK_NUMBER_OF_PARTITIONS_PROPERTY,
ConfigDef.Type.INT,
defaultSinkNumberOfPartitions,
ConfigDef.Importance.MEDIUM,
"The default number of partitions for the topics created by KSQL.")
.define(SINK_NUMBER_OF_REPLICAS_PROPERTY,
ConfigDef.Type.SHORT,
defaultSinkNumberOfReplications,
ConfigDef.Importance.MEDIUM,
"The default number of replicas for the topics created by KSQL."
)
.define(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY,
ConfigDef.Type.LONG,
defaultSinkWindowChangeLogAdditionalRetention,
ConfigDef.Importance.MEDIUM,
"The default window change log additional retention time."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want to drop the 'default' here. Also, it may make sense to expand the description. It doesn't make sense as is. Perhaps a sentence like 'The amount of time to retain window change logs'. Actually I am still not sure what that means Also, there should be a time unit in the name of the config and the description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, added the time unit and more details to the description.

)
;
}


public KsqlConfig(Map<?, ?> props) {
super(CONFIG_DEF, props);

ksqlConfigProps = new HashMap<>();
ksqlStreamConfigProps = new HashMap<>();
ksqlConfigProps.put(KSQL_SERVICE_ID_CONFIG, KSQL_SERVICE_ID_DEFAULT);
ksqlConfigProps.put(KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG, KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT);
ksqlConfigProps.put(KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG, KSQL_TRANSIENT_QUERY_NAME_PREFIX_DEFAULT);
ksqlConfigProps.put(KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG, KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT);

if (props.containsKey(DEFAULT_SINK_NUMBER_OF_PARTITIONS)) {
ksqlConfigProps.put(SINK_NUMBER_OF_PARTITIONS_PROPERTY,
Integer.parseInt(props.get(DEFAULT_SINK_NUMBER_OF_PARTITIONS).toString()));
} else {
ksqlConfigProps.put(SINK_NUMBER_OF_PARTITIONS_PROPERTY, defaultSinkNumberOfPartitions);
}

if (props.containsKey(DEFAULT_SINK_NUMBER_OF_REPLICATIONS)) {
ksqlConfigProps.put(SINK_NUMBER_OF_REPLICATIONS_PROPERTY,
Short.parseShort(props.get(DEFAULT_SINK_NUMBER_OF_REPLICATIONS).toString()));
} else {
ksqlConfigProps.put(SINK_NUMBER_OF_REPLICATIONS_PROPERTY, defaultSinkNumberOfReplications);
}

if (props.containsKey(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION)) {
ksqlConfigProps.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY,
Long.parseLong(props.get(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION).toString()));
} else {
ksqlConfigProps.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_PROPERTY,
defaultSinkWindowChangeLogAdditionalRetention);
}
ksqlConfigProps.putAll(super.values());

ksqlStreamConfigProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, defaultAutoOffsetRestConfig);
ksqlStreamConfigProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, defaultCommitIntervalMsConfig);
ksqlStreamConfigProps.put(
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, defaultCacheMaxBytesBufferingConfig);
ksqlStreamConfigProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, defaultNumberOfStreamsThreads);

for (Map.Entry<?, ?> entry : props.entrySet()) {
for (Map.Entry<?, ?> entry : originals().entrySet()) {
final String key = entry.getKey().toString();
if (key.toLowerCase().startsWith(KSQL_CONFIG_PREPERTY_PREFIX)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we'd already have all the properties in the ksqlConfigProps by virtue of constructing the map from super.values(), so we could skip updating the ksqlConfigProps and only update ksqlStreamConfigProps if the key doesn't start with KSQL_CONFIG_PROPERTY_PREFIX

ksqlConfigProps.put(key, entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,16 +466,16 @@ private void setIntoProperties(final StructuredDataSource into, final Table node
}
}

if (node.getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS) != null) {
if (node.getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICAS) != null) {
try {
short numberOfReplications =
Short.parseShort(node.getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS)
Short.parseShort(node.getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICAS)
.toString());
analysis.getIntoProperties()
.put(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY, numberOfReplications);
.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, numberOfReplications);
} catch (NumberFormatException e) {
throw new KsqlException("Invalid number of replications in WITH clause: " + node
.getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS).toString());
.getProperties().get(KsqlConfig.SINK_NUMBER_OF_REPLICAS).toString());
}
}
}
Expand Down Expand Up @@ -541,7 +541,7 @@ private void validateWithClause(Set<String> withClauseVariables) {
validSet.add(DdlConfig.PARTITION_BY_PROPERTY.toUpperCase());
validSet.add(KsqlConfig.SINK_TIMESTAMP_COLUMN_NAME.toUpperCase());
validSet.add(KsqlConfig.SINK_NUMBER_OF_PARTITIONS.toUpperCase());
validSet.add(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS.toUpperCase());
validSet.add(KsqlConfig.SINK_NUMBER_OF_REPLICAS.toUpperCase());

for (String withVariable: withClauseVariables) {
if (!validSet.contains(withVariable.toUpperCase())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ public SchemaKStream buildStream(final StreamsBuilder builder,
ksqlConfig.put(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY,
outputProperties.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY));
}
if (outputProperties.containsKey(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY)) {
ksqlConfig.put(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY,
outputProperties.get(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY
if (outputProperties.containsKey(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY)) {
ksqlConfig.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY,
outputProperties.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY
));
}

Expand Down Expand Up @@ -181,7 +181,7 @@ private void addAvroSchemaToResultTopic(final KsqlStructuredDataOutputNode.Build

private void createSinkTopic(final String kafkaTopicName, KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient) {
int numberOfPartitions = (Integer) ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY);
short numberOfReplications = (Short) ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY);
short numberOfReplications = (Short) ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY);
kafkaTopicClient.createTopic(kafkaTopicName, numberOfPartitions, numberOfReplications);
}
public Field getTimestampField() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class KsqlStructuredDataOutputNodeTest {
public void before() {
final Map<String, Object> props = new HashMap<>();
props.put(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, 4);
props.put(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY, (short)3);
props.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, (short)3);
createOutputNode(props);
topicClient.createTopic(eq("output"), anyInt(), anyShort());
EasyMock.expectLastCall();
Expand Down Expand Up @@ -156,7 +156,7 @@ public void shouldHaveCorrectOutputNodeSchema() {

@Test
public void shouldUpdateReplicationPartitionsInConfig() {
assertThat(ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_REPLICATIONS_PROPERTY), equalTo(Integer.valueOf(3).shortValue()));
assertThat(ksqlConfig.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY), equalTo(Integer.valueOf(3).shortValue()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ public static KsqlRestApplication buildApplication(KsqlRestConfig restConfig, bo

try {
short replicationFactor = 1;
if(restConfig.getOriginals().containsKey(KsqlConfig.DEFAULT_SINK_NUMBER_OF_REPLICATIONS)) {
if(restConfig.getOriginals().containsKey(KsqlConfig.SINK_NUMBER_OF_REPLICAS)) {
replicationFactor = Short.parseShort(restConfig.getOriginals()
.get(KsqlConfig.DEFAULT_SINK_NUMBER_OF_REPLICATIONS).toString());
.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS).toString());
}
client.createTopic(commandTopic, 1, replicationFactor);
} catch (KafkaTopicException e) {
Expand Down