Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ASSERT TOPIC command #9066

Merged
merged 8 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,12 @@ public class KsqlConfig extends AbstractConfig {
private static final boolean KSQL_ENDPOINT_MIGRATE_QUERY_DEFAULT = true;
private static final String KSQL_ENDPOINT_MIGRATE_QUERY_DOC
= "Migrates the /query endpoint to use the same handler as /query-stream.";
public static final String KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need/want to document this setting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan on doing docs all in one go. It saves us the trouble of having to revert things in case the full project doesn't land in the next release.

= "ksql.assert.topic.default.timeout.ms";
private static final int KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DEFAULT = 1000;
private static final String KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DOC
= "The default timeout for ASSERT TOPIC statements if not timeout is specified "
+ "in the statement.";

private enum ConfigGeneration {
LEGACY,
Expand Down Expand Up @@ -1428,6 +1434,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS_DOC
)
.define(
KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS,
Type.INT,
KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DEFAULT,
Importance.LOW,
KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DOC
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ statement
| CREATE TYPE (IF NOT EXISTS)? identifier AS type #registerType
| DROP TYPE (IF EXISTS)? identifier #dropType
| ALTER (STREAM | TABLE) sourceName alterOption (',' alterOption)* #alterSource
| ASSERT TOPIC identifier (WITH tableProperties)? timeout? #assertTopic
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to ask... ASSERT TOPIC needs to be here to be processed with other DDL commands instead of with assertStatement below? (That's my guess; just validating.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, the ones below are only used by YATT.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you adding the "NOT" support in this PR or a separate one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's going in a separate one.

;

assertStatement
Expand Down Expand Up @@ -119,6 +120,10 @@ resultMaterialization
| FINAL
;

timeout
: TIMEOUT number windowUnit
;

alterOption
: ADD (COLUMN)? identifier type
;
Expand Down Expand Up @@ -414,6 +419,7 @@ nonReserved
| GRACE | PERIOD
| DEFINE | UNDEFINE | VARIABLES
| PLUGINS | SYSTEM
| TIMEOUT
;

EMIT: 'EMIT';
Expand Down Expand Up @@ -553,6 +559,7 @@ PLUGINS: 'PLUGINS';
HEADERS: 'HEADERS';
HEADER: 'HEADER';
SYSTEM: 'SYSTEM';
TIMEOUT: 'TIMEOUT';

IF: 'IF';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import io.confluent.ksql.parser.SqlBaseParser.AssertStreamContext;
import io.confluent.ksql.parser.SqlBaseParser.AssertTableContext;
import io.confluent.ksql.parser.SqlBaseParser.AssertTombstoneContext;
import io.confluent.ksql.parser.SqlBaseParser.AssertTopicContext;
import io.confluent.ksql.parser.SqlBaseParser.AssertValuesContext;
import io.confluent.ksql.parser.SqlBaseParser.CreateConnectorContext;
import io.confluent.ksql.parser.SqlBaseParser.DescribeConnectorContext;
Expand Down Expand Up @@ -106,6 +107,7 @@
import io.confluent.ksql.parser.tree.AssertStatement;
import io.confluent.ksql.parser.tree.AssertStream;
import io.confluent.ksql.parser.tree.AssertTombstone;
import io.confluent.ksql.parser.tree.AssertTopic;
import io.confluent.ksql.parser.tree.AssertValues;
import io.confluent.ksql.parser.tree.ColumnConstraints;
import io.confluent.ksql.parser.tree.CreateConnector;
Expand Down Expand Up @@ -1397,6 +1399,21 @@ public Node visitRegisterType(final RegisterTypeContext context) {
);
}

@Override
public Node visitAssertTopic(final AssertTopicContext context) {
return new AssertTopic(
getLocation(context),
ParserUtil.getIdentifierText(context.identifier()),
context.WITH() == null
? ImmutableMap.of()
: processTableProperties(context.tableProperties()),
context.timeout() == null
? Optional.empty()
: Optional.of(getTimeClause(
context.timeout().number(), context.timeout().windowUnit()))
);
}

@Override
public Node visitAssertValues(final AssertValuesContext context) {
final SourceName targetName = ParserUtil.getSourceName(context.sourceName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.tree;

import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.execution.expression.tree.Literal;
import io.confluent.ksql.execution.windows.WindowTimeClause;
import io.confluent.ksql.parser.NodeLocation;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

@Immutable
public class AssertTopic extends Statement {

private final String topic;
private final ImmutableMap<String, Literal> config;
private final Optional<WindowTimeClause> timeout;

public AssertTopic(
final Optional<NodeLocation> location,
final String topic,
final Map<String, Literal> config,
final Optional<WindowTimeClause> timeout
) {
super(location);
this.topic = Objects.requireNonNull(topic, "topic");
this.config = ImmutableMap.copyOf(Objects.requireNonNull(config, "config"));
this.timeout = Objects.requireNonNull(timeout, "timeout");
}

public String getTopic() {
return topic;
}

public Map<String, Literal> getConfig() {
return config;
}

public Optional<WindowTimeClause> getTimeout() {
return timeout;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final AssertTopic that = (AssertTopic) o;
return topic.equals(that.topic)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check that the location is the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, location just refers to what line/position the ASSERT token was found, so it doesn't really matter here.

&& Objects.equals(config, that.config)
&& timeout.equals(that.timeout);
}

@Override
public int hashCode() {
return Objects.hash(topic, config, timeout);
}

@Override
public String toString() {
return "AssertTopic{"
+ "topic=" + topic
+ ",config=" + config
+ ",timeout=" + timeout
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.confluent.ksql.parser.exception.ParseFailedException;
import io.confluent.ksql.parser.tree.AliasedRelation;
import io.confluent.ksql.parser.tree.AllColumns;
import io.confluent.ksql.parser.tree.AssertTopic;
import io.confluent.ksql.parser.tree.ColumnConstraints;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.CreateTable;
Expand Down Expand Up @@ -963,4 +964,36 @@ public void shouldFailOnPersistentQueryLimitClauseTable() {

assertEquals(expectedMessage, actualMessage);
}

@Test
public void shouldBuildAssertTopic() {
// Given:
final SingleStatementContext stmt
= givenQuery("ASSERT TOPIC X;");

// When:
final AssertTopic assertTopic = (AssertTopic) builder.buildStatement(stmt);

// Then:
assertThat(assertTopic.getTopic(), is("X"));
assertThat(assertTopic.getConfig().size(), is(0));
assertThat(assertTopic.getTimeout(), is(Optional.empty()));
}

@Test
public void shouldBuildAssertTopicWithConfigsAndTimeout() {
// Given:
final SingleStatementContext stmt
= givenQuery("ASSERT TOPIC X WITH (REPLICAS=1, partitions=1) TIMEOUT 10 SECONDS;");

// When:
final AssertTopic assertTopic = (AssertTopic) builder.buildStatement(stmt);

// Then:
assertThat(assertTopic.getTopic(), is("X"));
assertThat(assertTopic.getConfig().get("REPLICAS").getValue(), is(1));
assertThat(assertTopic.getConfig().get("PARTITIONS").getValue(), is(1));
assertThat(assertTopic.getTimeout().get().getTimeUnit(), is(TimeUnit.SECONDS));
assertThat(assertTopic.getTimeout().get().getValue(), is(10L));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.confluent.ksql.parser.tree;

import com.google.common.collect.ImmutableMap;
import com.google.common.testing.EqualsTester;
import io.confluent.ksql.execution.windows.WindowTimeClause;
import io.confluent.ksql.parser.NodeLocation;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

import org.mockito.junit.MockitoRule;

public class AssertTopicTest {

@Rule
public final MockitoRule mockitoRule = MockitoJUnit.rule();

public static final NodeLocation SOME_LOCATION = new NodeLocation(0, 0);
public static final NodeLocation OTHER_LOCATION = new NodeLocation(1, 0);
private final String SOME_TOPIC = "TOPIC";
private final Map SOME_CONFIG = ImmutableMap.of("partitions", 1);
private final WindowTimeClause SOME_TIMEOUT = new WindowTimeClause(5, TimeUnit.SECONDS);

@Test
public void shouldImplementHashCodeAndEquals() {
new EqualsTester()
.addEqualityGroup(
new AssertTopic(Optional.empty(), SOME_TOPIC, SOME_CONFIG, Optional.of(SOME_TIMEOUT)),
new AssertTopic(Optional.of(new NodeLocation(1, 1)), SOME_TOPIC, SOME_CONFIG, Optional.of(SOME_TIMEOUT)))
.addEqualityGroup(
new AssertTopic(Optional.empty(), "another topic", SOME_CONFIG, Optional.of(SOME_TIMEOUT)))
.addEqualityGroup(
new AssertTopic(Optional.empty(), SOME_TOPIC, ImmutableMap.of(), Optional.of(SOME_TIMEOUT)))
.addEqualityGroup(
new AssertTopic(Optional.empty(), SOME_TOPIC, SOME_CONFIG, Optional.empty()))
.testEquals();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class GrammarTokenExporterTest {
"CATALOG", "PROPERTIES", "BEGINNING", "UNSET", "RUN", "SCRIPT", "DECIMAL",
"KEY", "CONNECTOR", "CONNECTORS", "SINK", "SOURCE", "NAMESPACE", "MATERIALIZED",
"VIEW", "PRIMARY", "REPLACE", "ASSERT", "ADD", "ALTER", "VARIABLES", "PLUGINS", "HEADERS",
"HEADER", "SYSTEM", "IF", "EQ", "NEQ", "LT", "LTE", "GT", "GTE", "PLUS", "MINUS", "ASTERISK",
"HEADER", "SYSTEM", "TIMEOUT", "IF", "EQ", "NEQ", "LT", "LTE", "GT", "GTE", "PLUS", "MINUS", "ASTERISK",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's our procedure for adding new reserved words? I imagine we do want to note that somewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIMEOUT is a non-reserved word. This is just a list of all the grammar tokens.

"SLASH", "PERCENT", "CONCAT", "ASSIGN", "STRING", "IDENTIFIER", "VARIABLE", "EXPONENT",
"DIGIT", "LETTER", "WS"));

Expand Down
Loading