Skip to content

Commit

Permalink
feat: assert not exists topic (#9086)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zara Lim authored May 2, 2022
1 parent 90a609d commit 4b57b55
Show file tree
Hide file tree
Showing 14 changed files with 184 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,8 @@ private void printFunctionDescription(final FunctionDescriptionList describeFunc
}

private void printAssertTopic(final AssertTopicEntity assertTopic) {
writer().printf("Topic " + assertTopic.getTopicName() + " exists.\n");
final String existence = assertTopic.getExists() ? " exists" : " does not exist";
writer().printf("Topic " + assertTopic.getTopicName() + existence + ".\n");
}

private static String argToString(final ArgumentInfo arg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,22 @@ public void shouldThrowOnInvalidCliPropertyValue() {
public void shouldPrintAssertTopicResult() {
// Given:
final KsqlEntityList entities = new KsqlEntityList(ImmutableList.of(
new AssertTopicEntity("statement", "name")
new AssertTopicEntity("statement", "name", true)
));

// When:
console.printKsqlEntityList(entities);

// Then:
final String output = terminal.getOutputString();
Approvals.verify(output, approvalOptions);
}

@Test
public void shouldPrintAssertNotExistsTopicResult() {
// Given:
final KsqlEntityList entities = new KsqlEntityList(ImmutableList.of(
new AssertTopicEntity("statement", "name", false)
));

// When:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[ {
"@type" : "assert_topic",
"statementText" : "statement",
"topicName" : "name",
"exists" : false,
"warnings" : [ ]
} ]
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

Topic name does not exist.
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"@type" : "assert_topic",
"statementText" : "statement",
"topicName" : "name",
"exists" : true,
"warnings" : [ ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ statement
| CREATE TYPE (IF NOT EXISTS)? identifier AS type #registerType
| DROP TYPE (IF EXISTS)? identifier #dropType
| ALTER (STREAM | TABLE) sourceName alterOption (',' alterOption)* #alterSource
| ASSERT TOPIC identifier (WITH tableProperties)? timeout? #assertTopic
| ASSERT (NOT EXISTS)? TOPIC identifier
(WITH tableProperties)? timeout? #assertTopic
;

assertStatement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1410,7 +1410,8 @@ public Node visitAssertTopic(final AssertTopicContext context) {
context.timeout() == null
? Optional.empty()
: Optional.of(getTimeClause(
context.timeout().number(), context.timeout().windowUnit()))
context.timeout().number(), context.timeout().windowUnit())),
context.EXISTS() == null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,20 @@ public class AssertTopic extends Statement {
private final String topic;
private final ImmutableMap<String, Literal> config;
private final Optional<WindowTimeClause> timeout;
private final boolean exists;

public AssertTopic(
final Optional<NodeLocation> location,
final String topic,
final Map<String, Literal> config,
final Optional<WindowTimeClause> timeout
final Optional<WindowTimeClause> timeout,
final boolean exists
) {
super(location);
this.topic = Objects.requireNonNull(topic, "topic");
this.config = ImmutableMap.copyOf(Objects.requireNonNull(config, "config"));
this.timeout = Objects.requireNonNull(timeout, "timeout");
this.exists = exists;
}

public String getTopic() {
Expand All @@ -55,6 +58,10 @@ public Optional<WindowTimeClause> getTimeout() {
return timeout;
}

public boolean checkExists() {
return exists;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -66,7 +73,8 @@ public boolean equals(final Object o) {
final AssertTopic that = (AssertTopic) o;
return topic.equals(that.topic)
&& Objects.equals(config, that.config)
&& timeout.equals(that.timeout);
&& timeout.equals(that.timeout)
&& exists == that.exists;
}

@Override
Expand All @@ -80,6 +88,7 @@ public String toString() {
+ "topic=" + topic
+ ",config=" + config
+ ",timeout=" + timeout
+ ",exists=" + exists
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -978,13 +978,14 @@ public void shouldBuildAssertTopic() {
assertThat(assertTopic.getTopic(), is("X"));
assertThat(assertTopic.getConfig().size(), is(0));
assertThat(assertTopic.getTimeout(), is(Optional.empty()));
assertThat(assertTopic.checkExists(), is(true));
}

@Test
public void shouldBuildAssertTopicWithConfigsAndTimeout() {
public void shouldBuildAssertNotExistsTopicWithConfigsAndTimeout() {
// Given:
final SingleStatementContext stmt
= givenQuery("ASSERT TOPIC X WITH (REPLICAS=1, partitions=1) TIMEOUT 10 SECONDS;");
= givenQuery("ASSERT NOT EXISTS TOPIC X WITH (REPLICAS=1, partitions=1) TIMEOUT 10 SECONDS;");

// When:
final AssertTopic assertTopic = (AssertTopic) builder.buildStatement(stmt);
Expand All @@ -995,5 +996,6 @@ public void shouldBuildAssertTopicWithConfigsAndTimeout() {
assertThat(assertTopic.getConfig().get("PARTITIONS").getValue(), is(1));
assertThat(assertTopic.getTimeout().get().getTimeUnit(), is(TimeUnit.SECONDS));
assertThat(assertTopic.getTimeout().get().getValue(), is(10L));
assertThat(assertTopic.checkExists(), is(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ public class AssertTopicTest {
public void shouldImplementHashCodeAndEquals() {
new EqualsTester()
.addEqualityGroup(
new AssertTopic(Optional.empty(), SOME_TOPIC, SOME_CONFIG, Optional.of(SOME_TIMEOUT)),
new AssertTopic(Optional.of(new NodeLocation(1, 1)), SOME_TOPIC, SOME_CONFIG, Optional.of(SOME_TIMEOUT)))
new AssertTopic(Optional.empty(), SOME_TOPIC, SOME_CONFIG, Optional.of(SOME_TIMEOUT), true),
new AssertTopic(Optional.of(new NodeLocation(1, 1)), SOME_TOPIC, SOME_CONFIG, Optional.of(SOME_TIMEOUT), true))
.addEqualityGroup(
new AssertTopic(Optional.empty(), "another topic", SOME_CONFIG, Optional.of(SOME_TIMEOUT)))
new AssertTopic(Optional.empty(), "another topic", SOME_CONFIG, Optional.of(SOME_TIMEOUT), true))
.addEqualityGroup(
new AssertTopic(Optional.empty(), SOME_TOPIC, ImmutableMap.of(), Optional.of(SOME_TIMEOUT)))
new AssertTopic(Optional.empty(), SOME_TOPIC, ImmutableMap.of(), Optional.of(SOME_TIMEOUT), true))
.addEqualityGroup(
new AssertTopic(Optional.empty(), SOME_TOPIC, SOME_CONFIG, Optional.empty()))
new AssertTopic(Optional.empty(), SOME_TOPIC, SOME_CONFIG, Optional.empty(), true))
.addEqualityGroup(
new AssertTopic(Optional.empty(), SOME_TOPIC, SOME_CONFIG, Optional.of(SOME_TIMEOUT), false))
.testEquals();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.TopicPartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class AssertTopicExecutor {

private static final int RETRY_MS = 100;
private static final Logger LOG = LoggerFactory.getLogger(AssertTopicExecutor.class);

private AssertTopicExecutor() {

Expand All @@ -59,52 +62,75 @@ public static StatementExecutorResponse execute(
RETRY_MS,
RETRY_MS,
() -> assertTopic(
client, statement.getStatement().getTopic(), statement.getStatement().getConfig())
client,
statement.getStatement().getTopic(),
statement.getStatement().getConfig(),
statement.getStatement().checkExists())
);
} catch (final KsqlException e) {
throw new KsqlRestException(assertionFailedError(e.getMessage()));
}
return StatementExecutorResponse.handled(Optional.of(
new AssertTopicEntity(statement.getStatementText(), statement.getStatement().getTopic())));
new AssertTopicEntity(
statement.getStatementText(),
statement.getStatement().getTopic(),
statement.getStatement().checkExists())));
}

private static void assertTopic(
final KafkaTopicClient client,
final String topic,
final Map<String, Literal> config
final Map<String, Literal> config,
final boolean assertExists
) {
final boolean topicExists;
try {
topicExists = client.isTopicExists(topic);
} catch (final Exception e) {
throw new KsqlException("Cannot check that topic exists: " + e.getMessage());
throw new KsqlException("Cannot check topic existence: " + e.getMessage());
}

if (topicExists) {
final List<TopicPartitionInfo> partitionList = client.describeTopic(topic).partitions();
final int partitions = partitionList.size();
final int replicas = partitionList.get(0).replicas().size();
final List<String> configErrors = new ArrayList<>();
config.forEach((k, v) -> {
if (k.toLowerCase().equals("partitions")) {
if (!configMatches(v.getValue(), partitions)) {
configErrors.add(
createConfigError(topic, "partitions", v.getValue().toString(), partitions));
}
} else if (k.toLowerCase().equals("replicas")) {
if (!configMatches(v.getValue(), replicas)) {
configErrors.add(
createConfigError(topic, "replicas", v.getValue().toString(), replicas));
}
} else {
configErrors.add("Cannot assert unknown topic property: " + k);
}
});
if (configErrors.size() > 0) {
throw new KsqlException(String.join("\n", configErrors));
if (!assertExists) {
if (config.size() > 0) {
LOG.warn("Will skip topic config check for topic non-existence assertion.");
}
if (topicExists) {
throw new KsqlException("Topic " + topic + " exists");
}
} else {
throw new KsqlException("Topic " + topic + " does not exist");
if (topicExists) {
final List<TopicPartitionInfo> partitionList = client.describeTopic(topic).partitions();
checkConfigs(topic, config, partitionList.size(), partitionList.get(0).replicas().size());
} else {
throw new KsqlException("Topic " + topic + " does not exist");
}
}
}

private static void checkConfigs(
final String topic,
final Map<String, Literal> config,
final int partitions,
final int replicas
) {
final List<String> configErrors = new ArrayList<>();
config.forEach((k, v) -> {
if (k.toLowerCase().equals("partitions")) {
if (!configMatches(v.getValue(), partitions)) {
configErrors.add(
createConfigError(topic, "partitions", v.getValue().toString(), partitions));
}
} else if (k.toLowerCase().equals("replicas")) {
if (!configMatches(v.getValue(), replicas)) {
configErrors.add(
createConfigError(topic, "replicas", v.getValue().toString(), replicas));
}
} else {
configErrors.add("Cannot assert unknown topic property: " + k);
}
});
if (configErrors.size() > 0) {
throw new KsqlException(String.join("\n", configErrors));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,7 @@ public void shouldAssertTopicExists() {
// Then:
assertThat(response.size(), is(1));
assertThat(((AssertTopicEntity) response.get(0)).getTopicName(), is(PAGE_VIEW_TOPIC));
assertThat(((AssertTopicEntity) response.get(0)).getExists(), is(true));
}

@Test
Expand Down Expand Up @@ -1100,6 +1101,29 @@ public void shouldStopScriptOnFailedAssert() {
assertThat(topicExists("Z"), is(false));
}

@Test
public void shouldAssertTopicDoesNotExists() {
// When:
List<KsqlEntity> response = makeKsqlRequest("ASSERT NOT EXISTS TOPIC X WITH (PARTITIONS=1) TIMEOUT 2 SECONDS;");

// Then:
assertThat(response.size(), is(1));
assertThat(((AssertTopicEntity) response.get(0)).getTopicName(), is("X"));
assertThat(((AssertTopicEntity) response.get(0)).getExists(), is(false));
}

@Test
public void shouldFailToAssertTopicDoesntExist() {
assertThatEventually(() -> {
try {
makeKsqlRequest("ASSERT NOT EXISTS TOPIC " + PAGE_VIEW_TOPIC + " WITH (PARTITIONS=1) TIMEOUT 1 SECONDS;");
return "Should have thrown 'Topioc exists' error.";
} catch (final Throwable t) {
return t.getMessage();
}
}, containsString("Topic PAGEVIEW_TOPIC exists"));
}

@Test
public void shouldTimeoutTheCorrectAmountOfTime() {
final long start = Instant.now().getEpochSecond();
Expand Down Expand Up @@ -1142,7 +1166,7 @@ public void shouldFailToAssertTopicWithNoAcls() {
} catch (final Throwable t) {
return t.getMessage();
}
}, containsString("Cannot check that topic exists: Authorization denied to Describe on topic(s): [ACLESS]"));
}, containsString("Cannot check topic existence: Authorization denied to Describe on topic(s): [ACLESS]"));
}

private boolean topicExists(final String topicName) {
Expand Down
Loading

0 comments on commit 4b57b55

Please sign in to comment.