Skip to content

Commit

Permalink
feat: add ASSERT TOPIC command (#9066)
Browse files Browse the repository at this point in the history
* feat: add ASSERT TOPIC command

* make timeout nonreserved

* checkstyle

* fix unit test

* update cli

* fix integration test

* fix qtt

* address review comments
  • Loading branch information
Zara Lim authored Apr 28, 2022
1 parent 99f8186 commit cd5254f
Show file tree
Hide file tree
Showing 21 changed files with 681 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.confluent.ksql.query.QueryError;
import io.confluent.ksql.rest.ApiJsonMapper;
import io.confluent.ksql.rest.entity.ArgumentInfo;
import io.confluent.ksql.rest.entity.AssertTopicEntity;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ConnectorDescription;
import io.confluent.ksql.rest.entity.ConnectorList;
Expand Down Expand Up @@ -191,6 +192,7 @@ public class Console implements Closeable {
tablePrinter(VariablesList.class, ListVariablesTableBuilder::new))
.put(TerminateQueryEntity.class,
tablePrinter(TerminateQueryEntity.class, TerminateQueryTableBuilder::new))
.put(AssertTopicEntity.class, Console::printAssertTopic)
.build();

private static <T extends KsqlEntity> Handler1<KsqlEntity, Console> tablePrinter(
Expand Down Expand Up @@ -904,6 +906,10 @@ private void printFunctionDescription(final FunctionDescriptionList describeFunc
);
}

private void printAssertTopic(final AssertTopicEntity assertTopic) {
writer().printf("Topic " + assertTopic.getTopicName() + " exists.\n");
}

private static String argToString(final ArgumentInfo arg) {
final String type = arg.getType() + (arg.getIsVariadic() ? "[]" : "");
return arg.getName().isEmpty() ? type : (arg.getName() + " " + type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand All @@ -44,6 +42,7 @@
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.ArgumentInfo;
import io.confluent.ksql.rest.entity.AssertTopicEntity;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
Expand Down Expand Up @@ -95,7 +94,6 @@
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlConstants.KsqlQueryStatus;
import io.confluent.ksql.util.KsqlConstants.KsqlQueryType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -106,7 +104,6 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.ConnectorState;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.TaskState;
Expand Down Expand Up @@ -1094,4 +1091,19 @@ public void shouldThrowOnInvalidCliPropertyValue() {
assertThat(terminal.getOutputString(),
containsString("Invalid value BURRITO for configuration WRAP: String must be one of: ON, OFF, null"));
}

@Test
public void shouldPrintAssertTopicResult() {
// Given:
final KsqlEntityList entities = new KsqlEntityList(ImmutableList.of(
new AssertTopicEntity("statement", "name")
));

// When:
console.printKsqlEntityList(entities);

// Then:
final String output = terminal.getOutputString();
Approvals.verify(output, approvalOptions);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[ {
"@type" : "assert_topic",
"statementText" : "statement",
"topicName" : "name",
"warnings" : [ ]
} ]
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

Topic name exists.
13 changes: 13 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,12 @@ public class KsqlConfig extends AbstractConfig {
private static final boolean KSQL_ENDPOINT_MIGRATE_QUERY_DEFAULT = true;
private static final String KSQL_ENDPOINT_MIGRATE_QUERY_DOC
= "Migrates the /query endpoint to use the same handler as /query-stream.";
public static final String KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS
= "ksql.assert.topic.default.timeout.ms";
private static final int KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DEFAULT = 1000;
private static final String KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DOC
= "The default timeout for ASSERT TOPIC statements if not timeout is specified "
+ "in the statement.";

private enum ConfigGeneration {
LEGACY,
Expand Down Expand Up @@ -1428,6 +1434,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS_DOC
)
.define(
KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS,
Type.INT,
KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DEFAULT,
Importance.LOW,
KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DOC
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
],
"expectedException": {
"type": "io.confluent.ksql.parser.exception.ParseFailedException",
"message": "mismatched input 'ASSERT'"
"message": "mismatched input 'VALUES'"
}
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ statement
| CREATE TYPE (IF NOT EXISTS)? identifier AS type #registerType
| DROP TYPE (IF EXISTS)? identifier #dropType
| ALTER (STREAM | TABLE) sourceName alterOption (',' alterOption)* #alterSource
| ASSERT TOPIC identifier (WITH tableProperties)? timeout? #assertTopic
;

assertStatement
Expand Down Expand Up @@ -119,6 +120,10 @@ resultMaterialization
| FINAL
;

timeout
: TIMEOUT number windowUnit
;

alterOption
: ADD (COLUMN)? identifier type
;
Expand Down Expand Up @@ -414,6 +419,7 @@ nonReserved
| GRACE | PERIOD
| DEFINE | UNDEFINE | VARIABLES
| PLUGINS | SYSTEM
| TIMEOUT
;

EMIT: 'EMIT';
Expand Down Expand Up @@ -553,6 +559,7 @@ PLUGINS: 'PLUGINS';
HEADERS: 'HEADERS';
HEADER: 'HEADER';
SYSTEM: 'SYSTEM';
TIMEOUT: 'TIMEOUT';

IF: 'IF';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import io.confluent.ksql.parser.SqlBaseParser.AssertStreamContext;
import io.confluent.ksql.parser.SqlBaseParser.AssertTableContext;
import io.confluent.ksql.parser.SqlBaseParser.AssertTombstoneContext;
import io.confluent.ksql.parser.SqlBaseParser.AssertTopicContext;
import io.confluent.ksql.parser.SqlBaseParser.AssertValuesContext;
import io.confluent.ksql.parser.SqlBaseParser.CreateConnectorContext;
import io.confluent.ksql.parser.SqlBaseParser.DescribeConnectorContext;
Expand Down Expand Up @@ -106,6 +107,7 @@
import io.confluent.ksql.parser.tree.AssertStatement;
import io.confluent.ksql.parser.tree.AssertStream;
import io.confluent.ksql.parser.tree.AssertTombstone;
import io.confluent.ksql.parser.tree.AssertTopic;
import io.confluent.ksql.parser.tree.AssertValues;
import io.confluent.ksql.parser.tree.ColumnConstraints;
import io.confluent.ksql.parser.tree.CreateConnector;
Expand Down Expand Up @@ -1397,6 +1399,21 @@ public Node visitRegisterType(final RegisterTypeContext context) {
);
}

@Override
public Node visitAssertTopic(final AssertTopicContext context) {
return new AssertTopic(
getLocation(context),
ParserUtil.getIdentifierText(true, context.identifier()),
context.WITH() == null
? ImmutableMap.of()
: processTableProperties(context.tableProperties()),
context.timeout() == null
? Optional.empty()
: Optional.of(getTimeClause(
context.timeout().number(), context.timeout().windowUnit()))
);
}

@Override
public Node visitAssertValues(final AssertValuesContext context) {
final SourceName targetName = ParserUtil.getSourceName(context.sourceName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.tree;

import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.execution.expression.tree.Literal;
import io.confluent.ksql.execution.windows.WindowTimeClause;
import io.confluent.ksql.parser.NodeLocation;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

@Immutable
public class AssertTopic extends Statement {

private final String topic;
private final ImmutableMap<String, Literal> config;
private final Optional<WindowTimeClause> timeout;

public AssertTopic(
final Optional<NodeLocation> location,
final String topic,
final Map<String, Literal> config,
final Optional<WindowTimeClause> timeout
) {
super(location);
this.topic = Objects.requireNonNull(topic, "topic");
this.config = ImmutableMap.copyOf(Objects.requireNonNull(config, "config"));
this.timeout = Objects.requireNonNull(timeout, "timeout");
}

public String getTopic() {
return topic;
}

public Map<String, Literal> getConfig() {
return config;
}

public Optional<WindowTimeClause> getTimeout() {
return timeout;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final AssertTopic that = (AssertTopic) o;
return topic.equals(that.topic)
&& Objects.equals(config, that.config)
&& timeout.equals(that.timeout);
}

@Override
public int hashCode() {
return Objects.hash(topic, config, timeout);
}

@Override
public String toString() {
return "AssertTopic{"
+ "topic=" + topic
+ ",config=" + config
+ ",timeout=" + timeout
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.confluent.ksql.parser.exception.ParseFailedException;
import io.confluent.ksql.parser.tree.AliasedRelation;
import io.confluent.ksql.parser.tree.AllColumns;
import io.confluent.ksql.parser.tree.AssertTopic;
import io.confluent.ksql.parser.tree.ColumnConstraints;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.CreateTable;
Expand Down Expand Up @@ -963,4 +964,36 @@ public void shouldFailOnPersistentQueryLimitClauseTable() {

assertEquals(expectedMessage, actualMessage);
}

@Test
public void shouldBuildAssertTopic() {
// Given:
final SingleStatementContext stmt
= givenQuery("ASSERT TOPIC X;");

// When:
final AssertTopic assertTopic = (AssertTopic) builder.buildStatement(stmt);

// Then:
assertThat(assertTopic.getTopic(), is("X"));
assertThat(assertTopic.getConfig().size(), is(0));
assertThat(assertTopic.getTimeout(), is(Optional.empty()));
}

@Test
public void shouldBuildAssertTopicWithConfigsAndTimeout() {
// Given:
final SingleStatementContext stmt
= givenQuery("ASSERT TOPIC X WITH (REPLICAS=1, partitions=1) TIMEOUT 10 SECONDS;");

// When:
final AssertTopic assertTopic = (AssertTopic) builder.buildStatement(stmt);

// Then:
assertThat(assertTopic.getTopic(), is("X"));
assertThat(assertTopic.getConfig().get("REPLICAS").getValue(), is(1));
assertThat(assertTopic.getConfig().get("PARTITIONS").getValue(), is(1));
assertThat(assertTopic.getTimeout().get().getTimeUnit(), is(TimeUnit.SECONDS));
assertThat(assertTopic.getTimeout().get().getValue(), is(10L));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.confluent.ksql.parser.tree;

import com.google.common.collect.ImmutableMap;
import com.google.common.testing.EqualsTester;
import io.confluent.ksql.execution.windows.WindowTimeClause;
import io.confluent.ksql.parser.NodeLocation;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

import org.mockito.junit.MockitoRule;

public class AssertTopicTest {

@Rule
public final MockitoRule mockitoRule = MockitoJUnit.rule();

public static final NodeLocation SOME_LOCATION = new NodeLocation(0, 0);
public static final NodeLocation OTHER_LOCATION = new NodeLocation(1, 0);
private final String SOME_TOPIC = "TOPIC";
private final Map SOME_CONFIG = ImmutableMap.of("partitions", 1);
private final WindowTimeClause SOME_TIMEOUT = new WindowTimeClause(5, TimeUnit.SECONDS);

@Test
public void shouldImplementHashCodeAndEquals() {
new EqualsTester()
.addEqualityGroup(
new AssertTopic(Optional.empty(), SOME_TOPIC, SOME_CONFIG, Optional.of(SOME_TIMEOUT)),
new AssertTopic(Optional.of(new NodeLocation(1, 1)), SOME_TOPIC, SOME_CONFIG, Optional.of(SOME_TIMEOUT)))
.addEqualityGroup(
new AssertTopic(Optional.empty(), "another topic", SOME_CONFIG, Optional.of(SOME_TIMEOUT)))
.addEqualityGroup(
new AssertTopic(Optional.empty(), SOME_TOPIC, ImmutableMap.of(), Optional.of(SOME_TIMEOUT)))
.addEqualityGroup(
new AssertTopic(Optional.empty(), SOME_TOPIC, SOME_CONFIG, Optional.empty()))
.testEquals();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class GrammarTokenExporterTest {
"CATALOG", "PROPERTIES", "BEGINNING", "UNSET", "RUN", "SCRIPT", "DECIMAL",
"KEY", "CONNECTOR", "CONNECTORS", "SINK", "SOURCE", "NAMESPACE", "MATERIALIZED",
"VIEW", "PRIMARY", "REPLACE", "ASSERT", "ADD", "ALTER", "VARIABLES", "PLUGINS", "HEADERS",
"HEADER", "SYSTEM", "IF", "EQ", "NEQ", "LT", "LTE", "GT", "GTE", "PLUS", "MINUS", "ASTERISK",
"HEADER", "SYSTEM", "TIMEOUT", "IF", "EQ", "NEQ", "LT", "LTE", "GT", "GTE", "PLUS", "MINUS", "ASTERISK",
"SLASH", "PERCENT", "CONCAT", "ASSIGN", "STRING", "IDENTIFIER", "VARIABLE", "EXPONENT",
"DIGIT", "LETTER", "WS"));

Expand Down
Loading

0 comments on commit cd5254f

Please sign in to comment.