-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add ASSERT TOPIC command #9066
Changes from 7 commits
3f54364
82cc641
6f53c7a
c3bd00b
925dd80
29cff6d
c1dceac
022417c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
[ { | ||
"@type" : "assert_topic", | ||
"statementText" : "statement", | ||
"topicName" : "topic", | ||
"warnings" : [ ] | ||
} ] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
|
||
Topic topic exists. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -661,6 +661,12 @@ public class KsqlConfig extends AbstractConfig { | |
private static final boolean KSQL_ENDPOINT_MIGRATE_QUERY_DEFAULT = true; | ||
private static final String KSQL_ENDPOINT_MIGRATE_QUERY_DOC | ||
= "Migrates the /query endpoint to use the same handler as /query-stream."; | ||
public static final String KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need/want to document this setting? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I plan on doing docs all in one go. It saves us the trouble of having to revert things in case the full project doesn't land in the next release. |
||
= "ksql.assert.topic.default.timeout.ms"; | ||
private static final int KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DEFAULT = 1000; | ||
private static final String KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DOC | ||
= "The default timeout for ASSERT TOPIC statements if not timeout is specified " | ||
+ "in the statement."; | ||
|
||
private enum ConfigGeneration { | ||
LEGACY, | ||
|
@@ -1428,6 +1434,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { | |
Importance.LOW, | ||
KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS_DOC | ||
) | ||
.define( | ||
KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS, | ||
Type.INT, | ||
KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DEFAULT, | ||
Importance.LOW, | ||
KSQL_ASSERT_TOPIC_DEFAULT_TIMEOUT_MS_DOC | ||
) | ||
.withClientSslSupport(); | ||
|
||
for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,6 +89,7 @@ statement | |
| CREATE TYPE (IF NOT EXISTS)? identifier AS type #registerType | ||
| DROP TYPE (IF EXISTS)? identifier #dropType | ||
| ALTER (STREAM | TABLE) sourceName alterOption (',' alterOption)* #alterSource | ||
| ASSERT TOPIC identifier (WITH tableProperties)? timeout? #assertTopic | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to ask... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, the ones below are only used by YATT. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you adding the "NOT" support in this PR or a separate one? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's going in a separate one. |
||
; | ||
|
||
assertStatement | ||
|
@@ -119,6 +120,10 @@ resultMaterialization | |
| FINAL | ||
; | ||
|
||
timeout | ||
: TIMEOUT number windowUnit | ||
; | ||
|
||
alterOption | ||
: ADD (COLUMN)? identifier type | ||
; | ||
|
@@ -414,6 +419,7 @@ nonReserved | |
| GRACE | PERIOD | ||
| DEFINE | UNDEFINE | VARIABLES | ||
| PLUGINS | SYSTEM | ||
| TIMEOUT | ||
; | ||
|
||
EMIT: 'EMIT'; | ||
|
@@ -553,6 +559,7 @@ PLUGINS: 'PLUGINS'; | |
HEADERS: 'HEADERS'; | ||
HEADER: 'HEADER'; | ||
SYSTEM: 'SYSTEM'; | ||
TIMEOUT: 'TIMEOUT'; | ||
|
||
IF: 'IF'; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
/* | ||
* Copyright 2022 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"; you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.parser.tree; | ||
|
||
import com.google.common.collect.ImmutableMap; | ||
import com.google.errorprone.annotations.Immutable; | ||
import io.confluent.ksql.execution.expression.tree.Literal; | ||
import io.confluent.ksql.execution.windows.WindowTimeClause; | ||
import io.confluent.ksql.parser.NodeLocation; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
|
||
@Immutable | ||
public class AssertTopic extends Statement { | ||
|
||
private final String topic; | ||
private final ImmutableMap<String, Literal> config; | ||
private final Optional<WindowTimeClause> timeout; | ||
|
||
public AssertTopic( | ||
final Optional<NodeLocation> location, | ||
final String topic, | ||
final Map<String, Literal> config, | ||
final Optional<WindowTimeClause> timeout | ||
) { | ||
super(location); | ||
this.topic = Objects.requireNonNull(topic, "topic"); | ||
this.config = ImmutableMap.copyOf(Objects.requireNonNull(config, "config")); | ||
this.timeout = Objects.requireNonNull(timeout, "timeout"); | ||
} | ||
|
||
public String getTopic() { | ||
return topic; | ||
} | ||
|
||
public Map<String, Literal> getConfig() { | ||
return config; | ||
} | ||
|
||
public Optional<WindowTimeClause> getTimeout() { | ||
return timeout; | ||
} | ||
|
||
@Override | ||
public boolean equals(final Object o) { | ||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
final AssertTopic that = (AssertTopic) o; | ||
return topic.equals(that.topic) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to check that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope, |
||
&& Objects.equals(config, that.config) | ||
&& timeout.equals(that.timeout); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(topic, config, timeout); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "AssertTopic{" | ||
+ "topic=" + topic | ||
+ ",config=" + config | ||
+ ",timeout=" + timeout | ||
+ '}'; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package io.confluent.ksql.parser.tree; | ||
|
||
import com.google.common.collect.ImmutableMap; | ||
import com.google.common.testing.EqualsTester; | ||
import io.confluent.ksql.execution.windows.WindowTimeClause; | ||
import io.confluent.ksql.parser.NodeLocation; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.TimeUnit; | ||
import org.junit.Rule; | ||
import org.junit.Test; | ||
import org.mockito.Mock; | ||
import org.mockito.junit.MockitoJUnit; | ||
/* | ||
* Copyright 2022 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"; you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
import org.mockito.junit.MockitoRule; | ||
|
||
public class AssertTopicTest { | ||
|
||
@Rule | ||
public final MockitoRule mockitoRule = MockitoJUnit.rule(); | ||
|
||
public static final NodeLocation SOME_LOCATION = new NodeLocation(0, 0); | ||
public static final NodeLocation OTHER_LOCATION = new NodeLocation(1, 0); | ||
private final String SOME_TOPIC = "TOPIC"; | ||
private final Map SOME_CONFIG = ImmutableMap.of("partitions", 1); | ||
private final WindowTimeClause SOME_TIMEOUT = new WindowTimeClause(5, TimeUnit.SECONDS); | ||
|
||
@Test | ||
public void shouldImplementHashCodeAndEquals() { | ||
new EqualsTester() | ||
.addEqualityGroup( | ||
new AssertTopic(Optional.empty(), SOME_TOPIC, SOME_CONFIG, Optional.of(SOME_TIMEOUT)), | ||
new AssertTopic(Optional.of(new NodeLocation(1, 1)), SOME_TOPIC, SOME_CONFIG, Optional.of(SOME_TIMEOUT))) | ||
.addEqualityGroup( | ||
new AssertTopic(Optional.empty(), "another topic", SOME_CONFIG, Optional.of(SOME_TIMEOUT))) | ||
.addEqualityGroup( | ||
new AssertTopic(Optional.empty(), SOME_TOPIC, ImmutableMap.of(), Optional.of(SOME_TIMEOUT))) | ||
.addEqualityGroup( | ||
new AssertTopic(Optional.empty(), SOME_TOPIC, SOME_CONFIG, Optional.empty())) | ||
.testEquals(); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,7 +41,7 @@ public class GrammarTokenExporterTest { | |
"CATALOG", "PROPERTIES", "BEGINNING", "UNSET", "RUN", "SCRIPT", "DECIMAL", | ||
"KEY", "CONNECTOR", "CONNECTORS", "SINK", "SOURCE", "NAMESPACE", "MATERIALIZED", | ||
"VIEW", "PRIMARY", "REPLACE", "ASSERT", "ADD", "ALTER", "VARIABLES", "PLUGINS", "HEADERS", | ||
"HEADER", "SYSTEM", "IF", "EQ", "NEQ", "LT", "LTE", "GT", "GTE", "PLUS", "MINUS", "ASTERISK", | ||
"HEADER", "SYSTEM", "TIMEOUT", "IF", "EQ", "NEQ", "LT", "LTE", "GT", "GTE", "PLUS", "MINUS", "ASTERISK", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's our procedure for adding new reserved words? I imagine we do want to note that somewhere. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
"SLASH", "PERCENT", "CONCAT", "ASSIGN", "STRING", "IDENTIFIER", "VARIABLE", "EXPONENT", | ||
"DIGIT", "LETTER", "WS")); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an optional thing, it may be worth considering a new other than "topic" for the topic. That avoids the approved version being "Topic topic exists."
shrugs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol okay. Done.