diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/rewrite/StatementRewriter.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/rewrite/StatementRewriter.java index 758710592a86..b4ac9199059d 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/rewrite/StatementRewriter.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/rewrite/StatementRewriter.java @@ -1026,7 +1026,7 @@ protected Node visitInsertInto(final InsertInto node, final Object context) { protected Node visitDropTable(final DropTable node, final Object context) { return new DropTable(node.getLocation(), - node.getTableName(), + node.getName(), node.getIfExists(), node.isDeleteTopic()); } diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/AbstractStreamDropStatement.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/AbstractStreamDropStatement.java index df6fa8551799..2ebad0aa6d27 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/AbstractStreamDropStatement.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/AbstractStreamDropStatement.java @@ -14,14 +14,66 @@ package io.confluent.ksql.parser.tree; +import com.google.common.base.MoreObjects; +import java.util.Objects; import java.util.Optional; public abstract class AbstractStreamDropStatement extends Statement { - public AbstractStreamDropStatement(final Optional location) { + + private final QualifiedName name; + private final boolean ifExists; + private final boolean deleteTopic; + + AbstractStreamDropStatement( + final Optional location, + final QualifiedName name, + final boolean deleteTopic, + final boolean ifExists) { super(location); + this.name = Objects.requireNonNull(name, "name can't be null"); + this.deleteTopic = deleteTopic; + this.ifExists = ifExists; + } + + public QualifiedName getName() { + return name; + } + + public boolean isDeleteTopic() { + return deleteTopic; + } + + public boolean getIfExists() { + return ifExists; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final AbstractStreamDropStatement that = (AbstractStreamDropStatement) o; + return deleteTopic == that.deleteTopic + && ifExists == that.ifExists + && Objects.equals(name, that.name); } - public abstract boolean getIfExists(); + @Override + public int hashCode() { + return Objects.hash(name, deleteTopic, ifExists); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("name", name) + .add("deleteTopic", deleteTopic) + .add("ifExists", ifExists) + .toString(); + } - public abstract QualifiedName getName(); } diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/DropStream.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/DropStream.java index 9bfe35981bce..bca24e193df0 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/DropStream.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/DropStream.java @@ -14,18 +14,11 @@ package io.confluent.ksql.parser.tree; -import static com.google.common.base.MoreObjects.toStringHelper; - -import java.util.Objects; import java.util.Optional; public class DropStream extends AbstractStreamDropStatement implements ExecutableDdlStatement { - private final QualifiedName streamName; - private final boolean ifExists; - private final boolean deleteTopic; - public DropStream( final QualifiedName tableName, final boolean ifExists, @@ -40,30 +33,11 @@ public DropStream(final NodeLocation location, this(Optional.of(location), tableName, ifExists, deleteTopic); } - private DropStream(final Optional location, + public DropStream(final Optional location, final QualifiedName streamName, final boolean ifExists, final boolean deleteTopic) { - super(location); - this.streamName = streamName; - this.ifExists = ifExists; - this.deleteTopic = deleteTopic; - } - - public QualifiedName getName() { - return streamName; - } - - public boolean getIfExists() { - return ifExists; - } - - public QualifiedName getStreamName() { - return streamName; - } - - public boolean isDeleteTopic() { - return deleteTopic; + super(location, streamName, deleteTopic, ifExists); } @Override @@ -71,31 +45,4 @@ public R accept(final AstVisitor visitor, final C context) { return visitor.visitDropStream(this, context); } - @Override - public int hashCode() { - return Objects.hash(streamName, ifExists); - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } - if ((obj == null) || (getClass() != obj.getClass())) { - return false; - } - final DropStream o = (DropStream) obj; - return Objects.equals(streamName, o.streamName) - && (ifExists == o.ifExists) - && (deleteTopic == o.deleteTopic); - } - - @Override - public String toString() { - return toStringHelper(this) - .add("tableName", streamName) - .add("ifExists", ifExists) - .add("deleteTopic", deleteTopic) - .toString(); - } } diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/DropTable.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/DropTable.java index acd3265825af..8ca18aa87a58 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/DropTable.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/DropTable.java @@ -14,17 +14,10 @@ package io.confluent.ksql.parser.tree; -import static com.google.common.base.MoreObjects.toStringHelper; - -import java.util.Objects; import java.util.Optional; public class DropTable extends AbstractStreamDropStatement implements ExecutableDdlStatement { - private final QualifiedName tableName; - private final boolean ifExists; - private final boolean deleteTopic; - public DropTable( final QualifiedName tableName, final boolean ifExists, @@ -36,58 +29,11 @@ public DropTable(final Optional location, final QualifiedName tableName, final boolean ifExists, final boolean deleteTopic) { - super(location); - this.tableName = tableName; - this.ifExists = ifExists; - this.deleteTopic = deleteTopic; - } - - public QualifiedName getName() { - return tableName; - } - - public boolean getIfExists() { - return ifExists; - } - - public QualifiedName getTableName() { - return tableName; - } - - public boolean isDeleteTopic() { - return deleteTopic; + super(location, tableName, deleteTopic, ifExists); } @Override public R accept(final AstVisitor visitor, final C context) { return visitor.visitDropTable(this, context); } - - @Override - public int hashCode() { - return Objects.hash(tableName, ifExists); - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } - if ((obj == null) || (getClass() != obj.getClass())) { - return false; - } - final DropTable o = (DropTable) obj; - return Objects.equals(tableName, o.tableName) - && (ifExists == o.ifExists) - && (deleteTopic == o.deleteTopic); - } - - @Override - public String toString() { - return toStringHelper(this) - .add("tableName", tableName) - .add("ifExists", ifExists) - .add("deleteTopic", deleteTopic) - .toString(); - } } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/AbstractStreamDropStatementTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/AbstractStreamDropStatementTest.java new file mode 100644 index 000000000000..1bda9391ff4f --- /dev/null +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/AbstractStreamDropStatementTest.java @@ -0,0 +1,27 @@ +package io.confluent.ksql.parser.tree; + +import com.google.common.testing.EqualsTester; +import java.util.Optional; +import org.junit.Test; + +public class AbstractStreamDropStatementTest { + + @Test + public void testHashAndEquals() { + new EqualsTester() + // equals should ignore NodeLocation + .addEqualityGroup( + new DropStream(Optional.empty(), QualifiedName.of("a"), true, true), + new DropStream(Optional.empty(), QualifiedName.of("a"), true, true), + new DropStream(Optional.of(new NodeLocation(0, 0)), QualifiedName.of("a"), true, true), + new DropStream(Optional.of(new NodeLocation(0, 1)), QualifiedName.of("a"), true, true)) + // equals should check qualified names + .addEqualityGroup(new DropStream(Optional.empty(), QualifiedName.of("b"), true, true)) + // equals should check ifExists + .addEqualityGroup(new DropStream(Optional.empty(), QualifiedName.of("a"), false, true)) + // equals should check deleteTopic + .addEqualityGroup(new DropStream(Optional.empty(), QualifiedName.of("a"), true, false)) + .testEquals(); + } + +} \ No newline at end of file diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java index 9343b49a5b37..5f0cc5ccb622 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java @@ -22,9 +22,12 @@ import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.tree.CreateAsSelect; import io.confluent.ksql.parser.tree.CreateTableAsSelect; +import io.confluent.ksql.parser.tree.DropStream; +import io.confluent.ksql.parser.tree.DropTable; import io.confluent.ksql.parser.tree.ExecutableDdlStatement; import io.confluent.ksql.parser.tree.InsertInto; import io.confluent.ksql.parser.tree.RunScript; +import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.parser.tree.TerminateQuery; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.entity.CommandStatus; @@ -187,10 +190,10 @@ private void executeStatement( DdlCommandResult result = null; String successMessage = ""; if (statement.getStatement() instanceof ExecutableDdlStatement) { + final ExecutableDdlStatement ddlStatement = + maybeUnsetDeleteTopic(statement.getStatement(), mode); result = ksqlEngine.executeDdlStatement( - statement.getStatementText(), - (ExecutableDdlStatement) statement.getStatement(), - command.getOverwriteProperties()); + statement.getStatementText(), ddlStatement, command.getOverwriteProperties()); } else if (statement.getStatement() instanceof CreateAsSelect) { startQuery(statement, command, mode); successMessage = statement.getStatement() instanceof CreateTableAsSelect @@ -217,6 +220,28 @@ private void executeStatement( putFinalStatus(commandId, commandStatusFuture, successStatus); } + private ExecutableDdlStatement maybeUnsetDeleteTopic(final Statement statement, final Mode mode) { + if (mode == Mode.RESTORE && statement instanceof DropStream) { + final DropStream dropStream = (DropStream) statement; + return new DropStream( + dropStream.getLocation(), + dropStream.getName(), + dropStream.getIfExists(), + false + ); + } else if (mode == Mode.RESTORE && statement instanceof DropTable) { + final DropTable dropTable = (DropTable) statement; + return new DropTable( + dropTable.getLocation(), + dropTable.getName(), + dropTable.getIfExists(), + false + ); + } else { + return (ExecutableDdlStatement) statement; + } + } + private void handleRunScript(final Command command, final Mode mode) { if (command.getOverwriteProperties().containsKey(KsqlConstants.RUN_SCRIPT_STATEMENTS_CONTENT)) { diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index d369cb278ad3..02b9223e659a 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -40,6 +40,7 @@ import io.confluent.ksql.serde.KsqlTopicSerDe; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.services.TestServiceContext; +import io.confluent.ksql.util.KafkaTopicClient; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy; @@ -62,6 +63,7 @@ import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.After; import org.junit.Test; +import org.mockito.Mockito; public class RecoveryTest { private final KsqlConfig ksqlConfig = new KsqlConfig( @@ -82,7 +84,8 @@ public void tearDown() { serviceContext.close(); } - private KsqlEngine createKsqlEngine() { + private KsqlEngine createKsqlEngine( + final ServiceContext serviceContext) { final KsqlEngineMetrics engineMetrics = EasyMock.niceMock(KsqlEngineMetrics.class); EasyMock.replay(engineMetrics); return KsqlEngineTestUtil.createKsqlEngine( @@ -154,7 +157,11 @@ private class KsqlServer { final CommandRunner commandRunner; KsqlServer(final List commandLog) { - this.ksqlEngine = createKsqlEngine(); + this(commandLog, serviceContext); + } + + KsqlServer(final List commandLog, final ServiceContext serviceContext) { + this.ksqlEngine = createKsqlEngine(serviceContext); this.commandIdAssigner = new CommandIdAssigner(ksqlEngine.getMetaStore()); this.fakeCommandQueue = new FakeCommandQueue( new CommandIdAssigner(ksqlEngine.getMetaStore()), @@ -541,6 +548,25 @@ public void shouldRecoverDrop() { shouldRecover(commands); } + @Test + public void shouldNotDeleteTopicOnRecovery() { + server1.submitCommands( + "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", + "CREATE STREAM B AS SELECT * FROM A;", + "TERMINATE CSAS_B_0;", + "DROP STREAM B DELETE TOPIC;" + ); + + KafkaTopicClient mockKafkaClient = Mockito.mock(KafkaTopicClient.class); + Mockito.when(mockKafkaClient.isTopicExists(Mockito.anyString())).thenReturn(true); + + final KsqlServer recoverServer = new KsqlServer( + commands, TestServiceContext.create(mockKafkaClient)); + + recoverServer.recover(); + Mockito.verify(mockKafkaClient, Mockito.never()).deleteTopics(Mockito.anyCollection()); + } + @Test public void shouldRecoverLogWithRepeatedTerminates() { server1.submitCommands( diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java index e96aaad969c8..583ed9b7f7fd 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java @@ -18,6 +18,7 @@ import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.matches; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reportMatcher; import static org.easymock.EasyMock.verify; @@ -30,6 +31,7 @@ import static org.hamcrest.Matchers.not; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.confluent.ksql.KsqlEngine; import io.confluent.ksql.ddl.commands.DdlCommandResult; @@ -498,7 +500,7 @@ private CreateStreamAsSelect mockCSAS(final String name) { private DropStream mockDropStream(final String name) { final DropStream mockDropStream = mock(DropStream.class); expect(mockDropStream.getName()).andStubReturn(QualifiedName.of(name)); - expect(mockDropStream.getStreamName()).andStubReturn(QualifiedName.of(name)); + expect(mockDropStream.getName()).andStubReturn(QualifiedName.of(name)); expect(mockParser.parseSingleStatement("DROP")) .andReturn(new PreparedStatement<>("DROP", mockDropStream)); return mockDropStream; @@ -570,6 +572,10 @@ public void shouldSkipStartWhenReplayingLog() { public void shouldCascade4Dot1DropStreamCommand() { // Given: final DropStream mockDropStream = mockDropStream("foo"); + expect(mockDropStream.getLocation()).andStubReturn(Optional.empty()); + expect(mockDropStream.isDeleteTopic()).andStubReturn(false); + expect(mockDropStream.getIfExists()).andStubReturn(true); + expect(mockMetaStore.getSource("foo")) .andStubReturn(mock(StructuredDataSource.class)); expect(mockMetaStore.getQueriesWithSink("foo")) @@ -578,7 +584,8 @@ public void shouldCascade4Dot1DropStreamCommand() { expect(mockEngine.getPersistentQuery(new QueryId("query-id"))).andReturn(Optional.of(mockQueryMetadata)); mockQueryMetadata.close(); expectLastCall(); - expect(mockEngine.executeDdlStatement("DROP", mockDropStream, Collections.emptyMap())) + expect(mockEngine.executeDdlStatement( + matches("DROP"), anyObject(DropStream.class), eq(ImmutableMap.of()))) .andReturn(new DdlCommandResult(true, "SUCCESS")); replayAll(); @@ -599,8 +606,13 @@ public void shouldNotCascadeDropStreamCommand() { // Given: final String drop = "DROP"; final DropStream mockDropStream = mockDropStream("foo"); - expect(mockEngine.executeDdlStatement(drop, mockDropStream, Collections.emptyMap())) - .andReturn(new DdlCommandResult(true, "SUCCESS")); + expect(mockDropStream.getLocation()).andStubReturn(Optional.empty()); + expect(mockDropStream.isDeleteTopic()).andStubReturn(false); + expect(mockDropStream.getIfExists()).andStubReturn(true); + + expect(mockEngine.executeDdlStatement( + matches("DROP"), anyObject(DropStream.class), eq(ImmutableMap.of()))) + .andReturn(new DdlCommandResult(true, "SUCCESS"));; replayAll(); // When: