From 3f54364531e43ce8b245df147e7651c18714833c Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Tue, 26 Apr 2022 15:25:56 -0700 Subject: [PATCH 1/8] feat: add ASSERT TOPIC command --- .../io/confluent/ksql/util/KsqlConfig.java | 13 ++ .../io/confluent/ksql/parser/SqlBase.g4 | 6 + .../io/confluent/ksql/parser/AstBuilder.java | 18 +++ .../ksql/parser/tree/AssertTopic.java | 85 ++++++++++ .../confluent/ksql/parser/AstBuilderTest.java | 33 ++++ .../ksql/parser/tree/AssertTopicTest.java | 57 +++++++ .../server/execution/AssertTopicExecutor.java | 122 +++++++++++++++ .../server/execution/CustomExecutors.java | 2 + .../server/validation/CustomValidators.java | 2 + .../ksql/rest/integration/RestApiTest.java | 103 ++++++++++++ .../integration/RestIntegrationTestUtil.java | 2 + .../execution/AssertTopicExecutorTest.java | 146 ++++++++++++++++++ .../java/io/confluent/ksql/rest/Errors.java | 10 ++ .../ksql/rest/entity/AssertTopicEntity.java | 44 ++++++ .../ksql/rest/entity/KsqlEntity.java | 3 +- 15 files changed, 645 insertions(+), 1 deletion(-) create mode 100644 ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/AssertTopic.java create mode 100644 ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/AssertTopicTest.java create mode 100644 ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/AssertTopicExecutor.java create mode 100644 ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/AssertTopicExecutorTest.java create mode 100644 ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/AssertTopicEntity.java diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 0da6301b188b..55a530b27646 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -661,6 +661,12 @@ public class KsqlConfig extends AbstractConfig { private static final boolean KSQL_ENDPOINT_MIGRATE_QUERY_DEFAULT = true; private static final String KSQL_ENDPOINT_MIGRATE_QUERY_DOC = "Migrates the /query endpoint to use the same handler as /query-stream."; + public static final String KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS + = "ksql.assert.topic.default.timeout.ms"; + private static final int KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DEFAULT = 1000; + private static final String KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DOC + = "The default timeout for ASSERT TOPIC statements if not timeout is specified " + + "in the statement."; private enum ConfigGeneration { LEGACY, @@ -1428,6 +1434,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { Importance.LOW, KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS_DOC ) + .define( + KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS, + Type.INT, + KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DEFAULT, + Importance.LOW, + KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DOC + ) .withClientSslSupport(); for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef diff --git a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index 1e75f1671bc0..d4fc32019b59 100644 --- a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -89,6 +89,7 @@ statement | CREATE TYPE (IF NOT EXISTS)? identifier AS type #registerType | DROP TYPE (IF EXISTS)? identifier #dropType | ALTER (STREAM | TABLE) sourceName alterOption (',' alterOption)* #alterSource + | ASSERT TOPIC identifier (WITH tableProperties)? timeout? #assertTopic ; assertStatement @@ -119,6 +120,10 @@ resultMaterialization | FINAL ; +timeout + : TIMEOUT number windowUnit + ; + alterOption : ADD (COLUMN)? identifier type ; @@ -553,6 +558,7 @@ PLUGINS: 'PLUGINS'; HEADERS: 'HEADERS'; HEADER: 'HEADER'; SYSTEM: 'SYSTEM'; +TIMEOUT: 'TIMEOUT'; IF: 'IF'; diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index b21ead57c774..53c7772f7bb0 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -73,6 +73,7 @@ import io.confluent.ksql.parser.SqlBaseParser.AssertStreamContext; import io.confluent.ksql.parser.SqlBaseParser.AssertTableContext; import io.confluent.ksql.parser.SqlBaseParser.AssertTombstoneContext; +import io.confluent.ksql.parser.SqlBaseParser.AssertTopicContext; import io.confluent.ksql.parser.SqlBaseParser.AssertValuesContext; import io.confluent.ksql.parser.SqlBaseParser.CreateConnectorContext; import io.confluent.ksql.parser.SqlBaseParser.DescribeConnectorContext; @@ -106,6 +107,7 @@ import io.confluent.ksql.parser.tree.AssertStatement; import io.confluent.ksql.parser.tree.AssertStream; import io.confluent.ksql.parser.tree.AssertTombstone; +import io.confluent.ksql.parser.tree.AssertTopic; import io.confluent.ksql.parser.tree.AssertValues; import io.confluent.ksql.parser.tree.ColumnConstraints; import io.confluent.ksql.parser.tree.CreateConnector; @@ -171,6 +173,7 @@ import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.ParserUtil; import java.math.BigDecimal; +import java.time.Duration; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -1397,6 +1400,21 @@ public Node visitRegisterType(final RegisterTypeContext context) { ); } + @Override + public Node visitAssertTopic(final AssertTopicContext context) { + return new AssertTopic( + getLocation(context), + ParserUtil.getIdentifierText(context.identifier()), + context.WITH() == null + ? ImmutableMap.of() + : processTableProperties(context.tableProperties()), + context.timeout() == null + ? Optional.empty() + : Optional.of(getTimeClause( + context.timeout().number(), context.timeout().windowUnit())) + ); + } + @Override public Node visitAssertValues(final AssertValuesContext context) { final SourceName targetName = ParserUtil.getSourceName(context.sourceName()); diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/AssertTopic.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/AssertTopic.java new file mode 100644 index 000000000000..81b1870624e4 --- /dev/null +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/AssertTopic.java @@ -0,0 +1,85 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.parser.tree; + +import com.google.common.collect.ImmutableMap; +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.execution.expression.tree.Literal; +import io.confluent.ksql.execution.windows.WindowTimeClause; +import io.confluent.ksql.parser.NodeLocation; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +@Immutable +public class AssertTopic extends Statement { + + private final String topic; + private final ImmutableMap config; + private final Optional timeout; + + public AssertTopic( + final Optional location, + final String topic, + final Map config, + final Optional timeout + ) { + super(location); + this.topic = Objects.requireNonNull(topic, "topic"); + this.config = ImmutableMap.copyOf(Objects.requireNonNull(config, "config")); + this.timeout = Objects.requireNonNull(timeout, "timeout"); + } + + public String getTopic() { + return topic; + } + + public Map getConfig() { + return config; + } + + public Optional getTimeout() { + return timeout; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AssertTopic that = (AssertTopic) o; + return topic.equals(that.topic) + && Objects.equals(config, that.config) + && timeout.equals(that.timeout); + } + + @Override + public int hashCode() { + return Objects.hash(topic, config, timeout); + } + + @Override + public String toString() { + return "AssertTopic{" + + "topic=" + topic + + ",config=" + config + + ",timeout=" + timeout + + '}'; + } +} diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/AstBuilderTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/AstBuilderTest.java index 16c1785deec7..389117c1deaa 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/AstBuilderTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/AstBuilderTest.java @@ -46,6 +46,7 @@ import io.confluent.ksql.parser.exception.ParseFailedException; import io.confluent.ksql.parser.tree.AliasedRelation; import io.confluent.ksql.parser.tree.AllColumns; +import io.confluent.ksql.parser.tree.AssertTopic; import io.confluent.ksql.parser.tree.ColumnConstraints; import io.confluent.ksql.parser.tree.CreateStream; import io.confluent.ksql.parser.tree.CreateTable; @@ -963,4 +964,36 @@ public void shouldFailOnPersistentQueryLimitClauseTable() { assertEquals(expectedMessage, actualMessage); } + + @Test + public void shouldBuildAssertTopic() { + // Given: + final SingleStatementContext stmt + = givenQuery("ASSERT TOPIC X;"); + + // When: + final AssertTopic assertTopic = (AssertTopic) builder.buildStatement(stmt); + + // Then: + assertThat(assertTopic.getTopic(), is("X")); + assertThat(assertTopic.getConfig().size(), is(0)); + assertThat(assertTopic.getTimeout(), is(Optional.empty())); + } + + @Test + public void shouldBuildAssertTopicWithConfigsAndTimeout() { + // Given: + final SingleStatementContext stmt + = givenQuery("ASSERT TOPIC X WITH (REPLICAS=1, partitions=1) TIMEOUT 10 SECONDS;"); + + // When: + final AssertTopic assertTopic = (AssertTopic) builder.buildStatement(stmt); + + // Then: + assertThat(assertTopic.getTopic(), is("X")); + assertThat(assertTopic.getConfig().get("REPLICAS").getValue(), is(1)); + assertThat(assertTopic.getConfig().get("partitions").getValue(), is(1)); + assertThat(assertTopic.getTimeout().get().getTimeUnit(), is(TimeUnit.SECONDS)); + assertThat(assertTopic.getTimeout().get().getValue(), is(10L)); + } } \ No newline at end of file diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/AssertTopicTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/AssertTopicTest.java new file mode 100644 index 000000000000..0673bb103a1f --- /dev/null +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/AssertTopicTest.java @@ -0,0 +1,57 @@ +package io.confluent.ksql.parser.tree; + +import com.google.common.collect.ImmutableMap; +import com.google.common.testing.EqualsTester; +import io.confluent.ksql.execution.windows.WindowTimeClause; +import io.confluent.ksql.parser.NodeLocation; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +import org.mockito.junit.MockitoRule; + +public class AssertTopicTest { + + @Rule + public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + public static final NodeLocation SOME_LOCATION = new NodeLocation(0, 0); + public static final NodeLocation OTHER_LOCATION = new NodeLocation(1, 0); + private final String SOME_TOPIC = "TOPIC"; + private final Map SOME_CONFIG = ImmutableMap.of("partitions", 1); + private final WindowTimeClause SOME_TIMEOUT = new WindowTimeClause(5, TimeUnit.SECONDS); + + @Test + public void shouldImplementHashCodeAndEquals() { + new EqualsTester() + .addEqualityGroup( + new AssertTopic(Optional.empty(), SOME_TOPIC, SOME_CONFIG, Optional.of(SOME_TIMEOUT)), + new AssertTopic(Optional.of(new NodeLocation(1, 1)), SOME_TOPIC, SOME_CONFIG, Optional.of(SOME_TIMEOUT))) + .addEqualityGroup( + new AssertTopic(Optional.empty(), "another topic", SOME_CONFIG, Optional.of(SOME_TIMEOUT))) + .addEqualityGroup( + new AssertTopic(Optional.empty(), SOME_TOPIC, ImmutableMap.of(), Optional.of(SOME_TIMEOUT))) + .addEqualityGroup( + new AssertTopic(Optional.empty(), SOME_TOPIC, SOME_CONFIG, Optional.empty())) + .testEquals(); + } + +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/AssertTopicExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/AssertTopicExecutor.java new file mode 100644 index 000000000000..1e20acec819b --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/AssertTopicExecutor.java @@ -0,0 +1,122 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.execution; + +import static io.confluent.ksql.rest.Errors.assertionFailedError; +import static io.confluent.ksql.util.KsqlConfig.KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS; + +import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.execution.expression.tree.Literal; +import io.confluent.ksql.parser.tree.AssertTopic; +import io.confluent.ksql.rest.SessionProperties; +import io.confluent.ksql.rest.entity.AssertTopicEntity; +import io.confluent.ksql.rest.server.resources.KsqlRestException; +import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.statement.ConfiguredStatement; +import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.RetryUtil; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public final class AssertTopicExecutor { + + private static final int RETRY_MS = 100; + + private AssertTopicExecutor() { + + } + + public static StatementExecutorResponse execute( + final ConfiguredStatement statement, + final SessionProperties sessionProperties, + final KsqlExecutionContext executionContext, + final ServiceContext serviceContext + ) { + final KafkaTopicClient client = serviceContext.getTopicClient(); + final int timeout = statement.getStatement().getTimeout().isPresent() + ? (int) statement.getStatement().getTimeout().get().toDuration().toMillis() + : executionContext.getKsqlConfig().getInt(KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS); + try { + RetryUtil.retryWithBackoff( + timeout/RETRY_MS, + RETRY_MS, + RETRY_MS, + () -> assertTopic( + client, statement.getStatement().getTopic(), statement.getStatement().getConfig()) + ); + } catch (final KsqlException e) { + throw new KsqlRestException(assertionFailedError(e.getMessage())); + } + return StatementExecutorResponse.handled(Optional.of( + new AssertTopicEntity(statement.getStatementText(), statement.getStatement().getTopic()))); + } + + private static void assertTopic( + final KafkaTopicClient client, + final String topic, + final Map config + ) { + boolean topicExists; + try { + topicExists = client.isTopicExists(topic); + } catch (final Exception e) { + throw new KsqlException("Cannot check that topic exists: " + e.getMessage()); + } + + if(topicExists) { + final int partitions = client.describeTopic(topic).partitions().size(); + final int replicas = client.describeTopic(topic).partitions().get(0).replicas().size(); + final List configErrors = new ArrayList<>(); + config.forEach((k, v) -> { + if (k.toLowerCase().equals("partitions")) { + if (!configMatches(v.getValue(), partitions)) { + configErrors.add( + createConfigError(topic, "partitions", v.getValue().toString(), partitions)); + } + } else if (k.toLowerCase().equals("replicas")) { + if (!configMatches(v.getValue(), replicas)) { + configErrors.add( + createConfigError(topic, "replicas", v.getValue().toString(), replicas)); + } + } else { + configErrors.add("Cannot assert unknown topic property: " + k); + } + }); + if (configErrors.size() > 0) { + throw new KsqlException(String.join("\n", configErrors)); + } + } else { + throw new KsqlException("Topic " + topic + " does not exist"); + } + } + + private static boolean configMatches(final Object expected, final int actual) { + if (expected instanceof Integer && (Integer) expected == actual) { + return true; + } + return false; + } + + private static String createConfigError( + final String topic, final String config, final String expected, final int actual) { + return String.format( + "Mismatched configuration for topic %s: For config %s, expected %s got %d", + topic, config, expected, actual); + } +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java index e04d8a4e6ab6..0885adad5fb6 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.parser.tree.AssertTopic; import io.confluent.ksql.parser.tree.CreateConnector; import io.confluent.ksql.parser.tree.DefineVariable; import io.confluent.ksql.parser.tree.DescribeConnector; @@ -72,6 +73,7 @@ public enum CustomExecutors { LIST_CONNECTOR_PLUGINS(ListConnectorPlugins.class, ListConnectorPluginsExecutor::execute), LIST_TYPES(ListTypes.class, ListTypesExecutor::execute), LIST_VARIABLES(ListVariables.class, ListVariablesExecutor::execute), + ASSERT_TOPIC(AssertTopic.class, AssertTopicExecutor::execute), SHOW_COLUMNS(ShowColumns.class, ListSourceExecutor::columns), EXPLAIN(Explain.class, ExplainExecutor::execute), diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java index aa80450a3d7f..d81a67f21937 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.parser.tree.AlterSystemProperty; +import io.confluent.ksql.parser.tree.AssertTopic; import io.confluent.ksql.parser.tree.CreateConnector; import io.confluent.ksql.parser.tree.DefineVariable; import io.confluent.ksql.parser.tree.DescribeConnector; @@ -93,6 +94,7 @@ public enum CustomValidators { LIST_TYPES(ListTypes.class, StatementValidator.NO_VALIDATION), CREATE_CONNECTOR(CreateConnector.class, ConnectExecutor::validate), DROP_CONNECTOR(DropConnector.class, StatementValidator.NO_VALIDATION), + ASSERT_TOPIC(AssertTopic.class, StatementValidator.NO_VALIDATION), LIST_VARIABLES(ListVariables.class, ListVariablesExecutor::execute), INSERT_VALUES(InsertValues.class, new InsertValuesExecutor()::execute), diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java index ee703c46235d..f791da6ae029 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java @@ -35,6 +35,7 @@ import static org.apache.kafka.common.resource.ResourceType.TOPIC; import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID; import static org.hamcrest.MatcherAssert.assertThat; + import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -51,6 +52,7 @@ import io.confluent.ksql.integration.IntegrationTestHarness; import io.confluent.ksql.integration.Retry; import io.confluent.ksql.rest.ApiJsonMapper; + import io.confluent.ksql.rest.entity.AssertTopicEntity; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.CommandId.Action; import io.confluent.ksql.rest.entity.CommandId.Type; @@ -84,6 +86,7 @@ import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpVersion; import io.vertx.ext.web.client.HttpResponse; + import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -1039,6 +1042,106 @@ public void shouldCreateStreamWithVariableSubstitution() { assertThat(query.size(), is(1)); } + @Test + public void shouldAssertTopicExists() { + // Given: + makeKsqlRequest("CREATE STREAM X AS SELECT * FROM " + PAGE_VIEW_STREAM + ";"); + + // When: + List response = makeKsqlRequest("ASSERT TOPIC X WITH (PARTITIONS=1) TIMEOUT 2 SECONDS;"); + + // Then: + assertThat(response.size(), is(1)); + assertThat(((AssertTopicEntity) response.get(0)).getTopicName(), is("X")); + } + + @Test + public void shouldFailToAssertNonExistantTopic() { + assertThatEventually(() -> { + try { + makeKsqlRequest("ASSERT TOPIC X WITH (PARTITIONS=1) TIMEOUT 1 SECONDS;"); + return "Should have thrown 'Topic does not exist' error."; + } catch (final Throwable t) { + return t.getMessage(); + } + }, containsString("Topic X does not exist")); + } + + @Test + public void shouldFailToAssertTopicWithWrongConfigs() { + // Given: + makeKsqlRequest("CREATE STREAM X AS SELECT * FROM " + PAGE_VIEW_STREAM + ";"); + + // When: + // Then: + assertThatEventually(() -> { + try { + makeKsqlRequest("ASSERT TOPIC X WITH (PARTITIONS='apples', REPLICAS=100, FAKE_CONFIG='Hello!') TIMEOUT 2 SECONDS;"); + return "Should have thrown config mismatch error"; + } catch (final Throwable t) { + return t.getMessage(); + } + }, containsString( + "Mismatched configuration for topic X: For config partitions, expected apples got 1\n" + + "Mismatched configuration for topic X: For config replicas, expected 100 got 1\n" + + "Cannot assert unknown topic property: FAKE_CONFIG")); + } + + @Test + public void shouldStopScriptOnFailedAssert() { + // Given: + makeKsqlRequest("CREATE STREAM X AS SELECT * FROM " + PAGE_VIEW_STREAM + ";"); + + // When: + // Then: + assertThatEventually(() -> { + try { + makeKsqlRequest("ASSERT TOPIC X WITH (PARTITIONS='apples', REPLICAS=100, FAKE_CONFIG='Hello!') TIMEOUT 2 SECONDS; CREATE STREAM Y AS SELECT * FROM X;"); + return "Should have thrown config mismatch error"; + } catch (final Throwable t) { + return t.getMessage(); + } + }, containsString( + "Mismatched configuration for topic X: For config partitions, expected apples got 1\n" + + "Mismatched configuration for topic X: For config replicas, expected 100 got 1\n" + + "Cannot assert unknown topic property: FAKE_CONFIG")); + assertThat(topicExists("Y"), is(false)); + } + + @Test + public void shouldTimeoutTheCorrectAmountOfTime() { + final long start = Instant.now().getEpochSecond(); + assertThatEventually(() -> { + try { + makeKsqlRequest("ASSERT TOPIC X WITH (PARTITIONS=1) TIMEOUT 3 SECONDS;"); + return "Should have thrown 'Topic does not exist' error."; + } catch (final Throwable t) { + return t.getMessage(); + } + }, containsString("Topic X does not exist")); + final long end = Instant.now().getEpochSecond(); + + assertThat(end - start >= 3, is(true)); + } + + @Test + public void shouldTimeoutTheDefaultAmountOfTime() { + final long start = Instant.now().getEpochSecond(); + assertThatEventually(() -> { + try { + makeKsqlRequest("ASSERT TOPIC X WITH (PARTITIONS=1);"); + return "Should have thrown 'Topic does not exist' error."; + } catch (final Throwable t) { + return t.getMessage(); + } + }, containsString("Topic X does not exist")); + final long end = Instant.now().getEpochSecond(); + + assertThat(end - start >= 1, is(true)); + assertThat(end - start <= 2, is(true)); + } + + private boolean topicExists(final String topicName) { return getServiceContext().getTopicClient().isTopicExists(topicName); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java index 4c4d91cb0e10..2d7d3629610d 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java @@ -18,6 +18,7 @@ import static io.netty.handler.codec.http.HttpHeaderNames.ACCEPT; import static io.netty.handler.codec.http.HttpHeaderNames.AUTHORIZATION; import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpHeaderNames.EXPIRES; import static io.vertx.core.http.HttpMethod.POST; import static io.vertx.core.http.HttpVersion.HTTP_1_1; @@ -555,6 +556,7 @@ public static List makeWsRequest( mediaType.ifPresent(mt -> headers.add(ACCEPT.toString(), mt)); contentType.ifPresent(ct -> headers.add(CONTENT_TYPE.toString(), ct)); + headers.add(EXPIRES.toString(), "Wed, 13 Apr 2022 07:28:00 GMT"); CompletableFuture> completableFuture = new CompletableFuture<>(); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/AssertTopicExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/AssertTopicExecutorTest.java new file mode 100644 index 000000000000..fe79b6fcad60 --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/AssertTopicExecutorTest.java @@ -0,0 +1,146 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.execution; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.config.SessionConfig; +import io.confluent.ksql.execution.expression.tree.IntegerLiteral; +import io.confluent.ksql.execution.expression.tree.Literal; +import io.confluent.ksql.parser.KsqlParser; +import io.confluent.ksql.parser.tree.AssertTopic; +import io.confluent.ksql.rest.SessionProperties; +import io.confluent.ksql.rest.entity.AssertTopicEntity; +import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.KsqlErrorMessage; +import io.confluent.ksql.rest.server.resources.KsqlRestException; +import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.statement.ConfiguredStatement; +import io.confluent.ksql.util.KsqlConfig; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.connect.runtime.isolation.PluginType; +import org.apache.kafka.connect.runtime.rest.entities.PluginInfo; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@SuppressWarnings("OptionalGetWithoutIsPresent") +@RunWith(MockitoJUnitRunner.class) +public class AssertTopicExecutorTest { + private static final PluginInfo INFO = new PluginInfo( + "org.apache.kafka.connect.file.FileStreamSinkConnector", + PluginType.SOURCE, + "2.1" + ); + + @Mock + private KsqlExecutionContext engine; + @Mock + private ServiceContext serviceContext; + @Mock + private KafkaTopicClient topicClient; + @Mock + private TopicDescription topicDescription; + @Mock + private TopicPartitionInfo partition; + private final KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of()); + + @Before + public void setUp() { + when(engine.getKsqlConfig()).thenReturn(ksqlConfig); + when(serviceContext.getTopicClient()).thenReturn(topicClient); + when(topicClient.isTopicExists("topicName")) + .thenReturn(true); + when(topicClient.describeTopic("topicName")) + .thenReturn(topicDescription); + when(topicDescription.partitions()) + .thenReturn(ImmutableList.of(partition)); + when(partition.replicas()) + .thenReturn(ImmutableList.of()); + } + + @Test + public void shouldAssertTopic() { + // Given: + final Map configs = ImmutableMap.of( + "partitions", new IntegerLiteral(1), "replicas", new IntegerLiteral(0)); + final AssertTopic assertTopic = new AssertTopic(Optional.empty(), "topicName", configs, Optional.empty()); + final ConfiguredStatement statement = ConfiguredStatement + .of(KsqlParser.PreparedStatement.of("", assertTopic), + SessionConfig.of(ksqlConfig, ImmutableMap.of())); + + // When: + final Optional entity = AssertTopicExecutor + .execute(statement, mock(SessionProperties.class), engine, serviceContext).getEntity(); + + // Then: + assertThat("expected response!", entity.isPresent()); + assertThat(((AssertTopicEntity) entity.get()).getTopicName(), is("topicName")); + } + + @Test + public void shouldFailToAssertNonExistingTopic() { + // Given: + final Map configs = ImmutableMap.of( + "partitions", new IntegerLiteral(1), "replicas", new IntegerLiteral(0)); + final AssertTopic assertTopic = new AssertTopic(Optional.empty(), "fakeTopic", configs, Optional.empty()); + final ConfiguredStatement statement = ConfiguredStatement + .of(KsqlParser.PreparedStatement.of("", assertTopic), + SessionConfig.of(ksqlConfig, ImmutableMap.of())); + + // When: + final KsqlRestException e = assertThrows(KsqlRestException.class, + () -> AssertTopicExecutor.execute(statement, mock(SessionProperties.class), engine, serviceContext)); + + // Then: + assertThat(e.getResponse().getStatus(), is(417)); + assertThat(((KsqlErrorMessage) e.getResponse().getEntity()).getMessage(), is("Topic fakeTopic does not exist")); + } + + @Test + public void shouldFailToAssertWrongConfigs() { + // Given: + final Map configs = ImmutableMap.of( + "partitions", new IntegerLiteral(10), "replicas", new IntegerLiteral(10), "abc", new IntegerLiteral(23)); + final AssertTopic assertTopic = new AssertTopic(Optional.empty(), "topicName", configs, Optional.empty()); + final ConfiguredStatement statement = ConfiguredStatement + .of(KsqlParser.PreparedStatement.of("", assertTopic), + SessionConfig.of(ksqlConfig, ImmutableMap.of())); + + // When: + final KsqlRestException e = assertThrows(KsqlRestException.class, + () -> AssertTopicExecutor.execute(statement, mock(SessionProperties.class), engine, serviceContext)); + + // Then: + assertThat(e.getResponse().getStatus(), is(417)); + assertThat(((KsqlErrorMessage) e.getResponse().getEntity()).getMessage(), is("Mismatched configuration for topic topicName: For config partitions, expected 10 got 1\n" + + "Mismatched configuration for topic topicName: For config replicas, expected 10 got 0\n" + + "Cannot assert unknown topic property: abc")); + } +} diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java index 89191e011d34..521b3eb84a26 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/Errors.java @@ -17,6 +17,7 @@ import static io.netty.handler.codec.http.HttpHeaderNames.RETRY_AFTER; import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.EXPECTATION_FAILED; import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; import static io.netty.handler.codec.http.HttpResponseStatus.MISDIRECTED_REQUEST; @@ -75,6 +76,8 @@ public final class Errors { public static final int ERROR_CODE_TOO_MANY_REQUESTS = toErrorCode(TOO_MANY_REQUESTS.code()); + public static final int ERROR_CODE_ASSERTION_FAILED = toErrorCode(EXPECTATION_FAILED.code()); + private final ErrorMessages errorMessages; public static int toStatusCode(final int errorCode) { @@ -218,6 +221,13 @@ public static EndpointResponse tooManyRequests(final String msg) { .build(); } + public static EndpointResponse assertionFailedError(final String errorMsg) { + return EndpointResponse.create() + .status(EXPECTATION_FAILED.code()) + .entity(new KsqlErrorMessage(ERROR_CODE_ASSERTION_FAILED, errorMsg)) + .build(); + } + public Errors(final ErrorMessages errorMessages) { this.errorMessages = Objects.requireNonNull(errorMessages, "errorMessages"); } diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/AssertTopicEntity.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/AssertTopicEntity.java new file mode 100644 index 000000000000..e4f24842362c --- /dev/null +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/AssertTopicEntity.java @@ -0,0 +1,44 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.entity; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Objects; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class AssertTopicEntity extends KsqlEntity { + private final String topicName; + + public AssertTopicEntity( + @JsonProperty("statementText") final String statementText, + @JsonProperty("topicName") final String topicName + ) { + super(statementText); + this.topicName = Objects.requireNonNull(topicName, "topicName"); + } + + public String getTopicName() { + return topicName; + } + + @Override + public String toString() { + return "AssertTopicEntity{" + + "topicName='" + topicName + '\'' + + '}'; + } +} diff --git a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java index 4ebcaabbfddb..20191b274216 100644 --- a/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java +++ b/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java @@ -51,7 +51,8 @@ @JsonSubTypes.Type(value = TypeList.class, name = "type_list"), @JsonSubTypes.Type(value = WarningEntity.class, name = "warning_entity"), @JsonSubTypes.Type(value = VariablesList.class, name = "variables"), - @JsonSubTypes.Type(value = TerminateQueryEntity.class, name = "terminate_query") + @JsonSubTypes.Type(value = TerminateQueryEntity.class, name = "terminate_query"), + @JsonSubTypes.Type(value = AssertTopicEntity.class, name = "assert_topic") }) public abstract class KsqlEntity { private final String statementText; From 82cc6412e78656e7f3fb969818b388abdf919c2f Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Tue, 26 Apr 2022 15:30:54 -0700 Subject: [PATCH 2/8] make timeout nonreserved --- .../src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 | 1 + 1 file changed, 1 insertion(+) diff --git a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index d4fc32019b59..fc14f5a49f4c 100644 --- a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -419,6 +419,7 @@ nonReserved | GRACE | PERIOD | DEFINE | UNDEFINE | VARIABLES | PLUGINS | SYSTEM + | TIMEOUT ; EMIT: 'EMIT'; From 6f53c7a09e96cdc5b11de25b8ab09b1595a84b99 Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Tue, 26 Apr 2022 16:15:08 -0700 Subject: [PATCH 3/8] checkstyle --- .../src/main/java/io/confluent/ksql/parser/AstBuilder.java | 1 - .../ksql/rest/server/execution/AssertTopicExecutor.java | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index 53c7772f7bb0..3854d7436285 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -173,7 +173,6 @@ import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.ParserUtil; import java.math.BigDecimal; -import java.time.Duration; import java.util.ArrayList; import java.util.HashSet; import java.util.List; diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/AssertTopicExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/AssertTopicExecutor.java index 1e20acec819b..da0365cf5cd0 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/AssertTopicExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/AssertTopicExecutor.java @@ -54,7 +54,7 @@ public static StatementExecutorResponse execute( : executionContext.getKsqlConfig().getInt(KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS); try { RetryUtil.retryWithBackoff( - timeout/RETRY_MS, + timeout / RETRY_MS, RETRY_MS, RETRY_MS, () -> assertTopic( @@ -72,14 +72,14 @@ private static void assertTopic( final String topic, final Map config ) { - boolean topicExists; + final boolean topicExists; try { topicExists = client.isTopicExists(topic); } catch (final Exception e) { throw new KsqlException("Cannot check that topic exists: " + e.getMessage()); } - if(topicExists) { + if (topicExists) { final int partitions = client.describeTopic(topic).partitions().size(); final int replicas = client.describeTopic(topic).partitions().get(0).replicas().size(); final List configErrors = new ArrayList<>(); From c3bd00bab9ad93aaea1c51ae5a45ad589a94d894 Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Tue, 26 Apr 2022 16:54:56 -0700 Subject: [PATCH 4/8] fix unit test --- .../src/test/java/io/confluent/ksql/parser/AstBuilderTest.java | 2 +- .../java/io/confluent/ksql/util/GrammarTokenExporterTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/AstBuilderTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/AstBuilderTest.java index 389117c1deaa..ab5fffa5d521 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/AstBuilderTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/AstBuilderTest.java @@ -992,7 +992,7 @@ public void shouldBuildAssertTopicWithConfigsAndTimeout() { // Then: assertThat(assertTopic.getTopic(), is("X")); assertThat(assertTopic.getConfig().get("REPLICAS").getValue(), is(1)); - assertThat(assertTopic.getConfig().get("partitions").getValue(), is(1)); + assertThat(assertTopic.getConfig().get("PARTITIONS").getValue(), is(1)); assertThat(assertTopic.getTimeout().get().getTimeUnit(), is(TimeUnit.SECONDS)); assertThat(assertTopic.getTimeout().get().getValue(), is(10L)); } diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/util/GrammarTokenExporterTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/util/GrammarTokenExporterTest.java index 12949a203a8f..afc21c1a9913 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/util/GrammarTokenExporterTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/util/GrammarTokenExporterTest.java @@ -41,7 +41,7 @@ public class GrammarTokenExporterTest { "CATALOG", "PROPERTIES", "BEGINNING", "UNSET", "RUN", "SCRIPT", "DECIMAL", "KEY", "CONNECTOR", "CONNECTORS", "SINK", "SOURCE", "NAMESPACE", "MATERIALIZED", "VIEW", "PRIMARY", "REPLACE", "ASSERT", "ADD", "ALTER", "VARIABLES", "PLUGINS", "HEADERS", - "HEADER", "SYSTEM", "IF", "EQ", "NEQ", "LT", "LTE", "GT", "GTE", "PLUS", "MINUS", "ASTERISK", + "HEADER", "SYSTEM", "TIMEOUT", "IF", "EQ", "NEQ", "LT", "LTE", "GT", "GTE", "PLUS", "MINUS", "ASTERISK", "SLASH", "PERCENT", "CONCAT", "ASSIGN", "STRING", "IDENTIFIER", "VARIABLE", "EXPONENT", "DIGIT", "LETTER", "WS")); From 925dd808f84d92d653f587dc9bb7bdddd1630bee Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Wed, 27 Apr 2022 11:57:22 -0700 Subject: [PATCH 5/8] update cli --- .../confluent/ksql/cli/console/Console.java | 6 ++++++ .../ksql/cli/console/ConsoleTest.java | 20 +++++++++++++++---- ...shouldPrintAssertTopicResult.approved.json | 6 ++++++ ...uldPrintAssertTopicResult.approved.tabular | 2 ++ .../io/confluent/ksql/parser/AstBuilder.java | 2 +- 5 files changed, 31 insertions(+), 5 deletions(-) create mode 100644 ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintAssertTopicResult.approved.json create mode 100644 ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintAssertTopicResult.approved.tabular diff --git a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index ebf6e6b3d95c..37af9de2c6a3 100644 --- a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -51,6 +51,7 @@ import io.confluent.ksql.query.QueryError; import io.confluent.ksql.rest.ApiJsonMapper; import io.confluent.ksql.rest.entity.ArgumentInfo; +import io.confluent.ksql.rest.entity.AssertTopicEntity; import io.confluent.ksql.rest.entity.CommandStatusEntity; import io.confluent.ksql.rest.entity.ConnectorDescription; import io.confluent.ksql.rest.entity.ConnectorList; @@ -191,6 +192,7 @@ public class Console implements Closeable { tablePrinter(VariablesList.class, ListVariablesTableBuilder::new)) .put(TerminateQueryEntity.class, tablePrinter(TerminateQueryEntity.class, TerminateQueryTableBuilder::new)) + .put(AssertTopicEntity.class, Console::printAssertTopic) .build(); private static Handler1 tablePrinter( @@ -904,6 +906,10 @@ private void printFunctionDescription(final FunctionDescriptionList describeFunc ); } + private void printAssertTopic(final AssertTopicEntity assertTopic) { + writer().printf("Topic " + assertTopic.getTopicName() + " exists.\n"); + } + private static String argToString(final ArgumentInfo arg) { final String type = arg.getType() + (arg.getIsVariadic() ? "[]" : ""); return arg.getName().isEmpty() ? type : (arg.getName() + " " + type); diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java index df43473f8e9a..49c466186796 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java @@ -26,8 +26,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -44,6 +42,7 @@ import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.ArgumentInfo; +import io.confluent.ksql.rest.entity.AssertTopicEntity; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.CommandStatus; import io.confluent.ksql.rest.entity.CommandStatusEntity; @@ -95,7 +94,6 @@ import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlConstants.KsqlQueryStatus; import io.confluent.ksql.util.KsqlConstants.KsqlQueryType; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -106,7 +104,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.apache.commons.lang3.StringUtils; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.ConnectorState; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.TaskState; @@ -1094,4 +1091,19 @@ public void shouldThrowOnInvalidCliPropertyValue() { assertThat(terminal.getOutputString(), containsString("Invalid value BURRITO for configuration WRAP: String must be one of: ON, OFF, null")); } + + @Test + public void shouldPrintAssertTopicResult() { + // Given: + final KsqlEntityList entities = new KsqlEntityList(ImmutableList.of( + new AssertTopicEntity("statement", "topic") + )); + + // When: + console.printKsqlEntityList(entities); + + // Then: + final String output = terminal.getOutputString(); + Approvals.verify(output, approvalOptions); + } } diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintAssertTopicResult.approved.json b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintAssertTopicResult.approved.json new file mode 100644 index 000000000000..43bf31d5902a --- /dev/null +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintAssertTopicResult.approved.json @@ -0,0 +1,6 @@ +[ { + "@type" : "assert_topic", + "statementText" : "statement", + "topicName" : "topic", + "warnings" : [ ] +} ] \ No newline at end of file diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintAssertTopicResult.approved.tabular b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintAssertTopicResult.approved.tabular new file mode 100644 index 000000000000..48f1327dcc59 --- /dev/null +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintAssertTopicResult.approved.tabular @@ -0,0 +1,2 @@ + +Topic topic exists. diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index 3854d7436285..956c079ef2bd 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -1403,7 +1403,7 @@ public Node visitRegisterType(final RegisterTypeContext context) { public Node visitAssertTopic(final AssertTopicContext context) { return new AssertTopic( getLocation(context), - ParserUtil.getIdentifierText(context.identifier()), + ParserUtil.getIdentifierText(true, context.identifier()), context.WITH() == null ? ImmutableMap.of() : processTableProperties(context.tableProperties()), From 29cff6ddae01a4d1eb727d172e55033568b4bf2e Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Wed, 27 Apr 2022 16:25:16 -0700 Subject: [PATCH 6/8] fix integration test --- .../ksql/rest/integration/RestApiTest.java | 51 ++++++++++--------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java index f791da6ae029..84c8d69e1596 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java @@ -190,6 +190,11 @@ public class RestApiTest { resource(TOPIC, "Y"), ops(ALL) ) + .withAcl( + NORMAL_USER, + resource(TOPIC, "Z"), + ops(ALL) + ) .withAcl( NORMAL_USER, resource(TOPIC, AGG_TABLE), @@ -1044,15 +1049,12 @@ public void shouldCreateStreamWithVariableSubstitution() { @Test public void shouldAssertTopicExists() { - // Given: - makeKsqlRequest("CREATE STREAM X AS SELECT * FROM " + PAGE_VIEW_STREAM + ";"); - // When: - List response = makeKsqlRequest("ASSERT TOPIC X WITH (PARTITIONS=1) TIMEOUT 2 SECONDS;"); + List response = makeKsqlRequest("ASSERT TOPIC " + PAGE_VIEW_TOPIC + " WITH (PARTITIONS=1) TIMEOUT 2 SECONDS;"); // Then: assertThat(response.size(), is(1)); - assertThat(((AssertTopicEntity) response.get(0)).getTopicName(), is("X")); + assertThat(((AssertTopicEntity) response.get(0)).getTopicName(), is(PAGE_VIEW_TOPIC)); } @Test @@ -1069,43 +1071,33 @@ public void shouldFailToAssertNonExistantTopic() { @Test public void shouldFailToAssertTopicWithWrongConfigs() { - // Given: - makeKsqlRequest("CREATE STREAM X AS SELECT * FROM " + PAGE_VIEW_STREAM + ";"); - - // When: - // Then: assertThatEventually(() -> { try { - makeKsqlRequest("ASSERT TOPIC X WITH (PARTITIONS='apples', REPLICAS=100, FAKE_CONFIG='Hello!') TIMEOUT 2 SECONDS;"); + makeKsqlRequest("ASSERT TOPIC " + PAGE_VIEW_TOPIC + " WITH (PARTITIONS='apples', REPLICAS=100, FAKE_CONFIG='Hello!') TIMEOUT 2 SECONDS;"); return "Should have thrown config mismatch error"; } catch (final Throwable t) { return t.getMessage(); } }, containsString( - "Mismatched configuration for topic X: For config partitions, expected apples got 1\n" - + "Mismatched configuration for topic X: For config replicas, expected 100 got 1\n" + "Mismatched configuration for topic PAGEVIEW_TOPIC: For config partitions, expected apples got 1\n" + + "Mismatched configuration for topic PAGEVIEW_TOPIC: For config replicas, expected 100 got 1\n" + "Cannot assert unknown topic property: FAKE_CONFIG")); } @Test public void shouldStopScriptOnFailedAssert() { - // Given: - makeKsqlRequest("CREATE STREAM X AS SELECT * FROM " + PAGE_VIEW_STREAM + ";"); - // When: - // Then: assertThatEventually(() -> { try { - makeKsqlRequest("ASSERT TOPIC X WITH (PARTITIONS='apples', REPLICAS=100, FAKE_CONFIG='Hello!') TIMEOUT 2 SECONDS; CREATE STREAM Y AS SELECT * FROM X;"); - return "Should have thrown config mismatch error"; + makeKsqlRequest("ASSERT TOPIC X TIMEOUT 2 SECONDS; CREATE STREAM Z AS SELECT * FROM " + PAGE_VIEW_STREAM + ";"); + return "Should have thrown 'Topic does not exist' error."; } catch (final Throwable t) { return t.getMessage(); } - }, containsString( - "Mismatched configuration for topic X: For config partitions, expected apples got 1\n" - + "Mismatched configuration for topic X: For config replicas, expected 100 got 1\n" - + "Cannot assert unknown topic property: FAKE_CONFIG")); - assertThat(topicExists("Y"), is(false)); + }, containsString("Topic X does not exist")); + + // Then: + assertThat(topicExists("Z"), is(false)); } @Test @@ -1141,6 +1133,17 @@ public void shouldTimeoutTheDefaultAmountOfTime() { assertThat(end - start <= 2, is(true)); } + @Test + public void shouldFailToAssertTopicWithNoAcls() { + assertThatEventually(() -> { + try { + makeKsqlRequest("ASSERT TOPIC ACLESS WITH (PARTITIONS=1);"); + return "Should have thrown an error."; + } catch (final Throwable t) { + return t.getMessage(); + } + }, containsString("Cannot check that topic exists: Authorization denied to Describe on topic(s): [ACLESS]")); + } private boolean topicExists(final String topicName) { return getServiceContext().getTopicClient().isTopicExists(topicName); From c1dceac371bca79270ee0b9db995d5000c6cac72 Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Wed, 27 Apr 2022 23:38:38 -0700 Subject: [PATCH 7/8] fix qtt --- .../src/test/resources/query-validation-tests/asserts.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/asserts.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/asserts.json index a481267c6a4f..4a9c0884b1fc 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/asserts.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/asserts.json @@ -7,7 +7,7 @@ ], "expectedException": { "type": "io.confluent.ksql.parser.exception.ParseFailedException", - "message": "mismatched input 'ASSERT'" + "message": "mismatched input 'VALUES'" } }, { From 022417c348d4423928c3e3ab8a9da58ca58fc606 Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Thu, 28 Apr 2022 11:11:04 -0700 Subject: [PATCH 8/8] address review comments --- .../java/io/confluent/ksql/cli/console/ConsoleTest.java | 2 +- .../ConsoleTest.shouldPrintAssertTopicResult.approved.json | 2 +- ...onsoleTest.shouldPrintAssertTopicResult.approved.tabular | 2 +- .../ksql/rest/server/execution/AssertTopicExecutor.java | 6 ++++-- .../ksql/rest/integration/RestIntegrationTestUtil.java | 1 - 5 files changed, 7 insertions(+), 6 deletions(-) diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java index 49c466186796..799f4609ef94 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java @@ -1096,7 +1096,7 @@ public void shouldThrowOnInvalidCliPropertyValue() { public void shouldPrintAssertTopicResult() { // Given: final KsqlEntityList entities = new KsqlEntityList(ImmutableList.of( - new AssertTopicEntity("statement", "topic") + new AssertTopicEntity("statement", "name") )); // When: diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintAssertTopicResult.approved.json b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintAssertTopicResult.approved.json index 43bf31d5902a..0b2601e7dc97 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintAssertTopicResult.approved.json +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintAssertTopicResult.approved.json @@ -1,6 +1,6 @@ [ { "@type" : "assert_topic", "statementText" : "statement", - "topicName" : "topic", + "topicName" : "name", "warnings" : [ ] } ] \ No newline at end of file diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintAssertTopicResult.approved.tabular b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintAssertTopicResult.approved.tabular index 48f1327dcc59..d18e0b2bdd25 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintAssertTopicResult.approved.tabular +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.shouldPrintAssertTopicResult.approved.tabular @@ -1,2 +1,2 @@ -Topic topic exists. +Topic name exists. diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/AssertTopicExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/AssertTopicExecutor.java index da0365cf5cd0..6dcfc35e9237 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/AssertTopicExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/AssertTopicExecutor.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import org.apache.kafka.common.TopicPartitionInfo; public final class AssertTopicExecutor { @@ -80,8 +81,9 @@ private static void assertTopic( } if (topicExists) { - final int partitions = client.describeTopic(topic).partitions().size(); - final int replicas = client.describeTopic(topic).partitions().get(0).replicas().size(); + final List partitionList = client.describeTopic(topic).partitions(); + final int partitions = partitionList.size(); + final int replicas = partitionList.get(0).replicas().size(); final List configErrors = new ArrayList<>(); config.forEach((k, v) -> { if (k.toLowerCase().equals("partitions")) { diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java index 2d7d3629610d..14b23e38f387 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestIntegrationTestUtil.java @@ -556,7 +556,6 @@ public static List makeWsRequest( mediaType.ifPresent(mt -> headers.add(ACCEPT.toString(), mt)); contentType.ifPresent(ct -> headers.add(CONTENT_TYPE.toString(), ct)); - headers.add(EXPIRES.toString(), "Wed, 13 Apr 2022 07:28:00 GMT"); CompletableFuture> completableFuture = new CompletableFuture<>();