Skip to content

Commit

Permalink
Prevent deletion of topics during recovery. Fixes confluentinc#2329 (…
Browse files Browse the repository at this point in the history
…partially)
  • Loading branch information
agavra committed Jan 23, 2019
1 parent 4cb75cd commit 8cc94ed
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,65 @@

package io.confluent.ksql.parser.tree;

import com.google.common.base.MoreObjects;
import java.util.Objects;
import java.util.Optional;

public abstract class AbstractStreamDropStatement extends Statement {
public AbstractStreamDropStatement(final Optional<NodeLocation> location) {

private final QualifiedName name;
private final boolean ifExists;
private final boolean deleteTopic;

public AbstractStreamDropStatement(final Optional<NodeLocation> location,
final QualifiedName name,
final boolean deleteTopic,
final boolean ifExists) {
super(location);
this.name = name;
this.deleteTopic = deleteTopic;
this.ifExists = ifExists;
}

public QualifiedName getName() {
return name;
}

public boolean isDeleteTopic() {
return deleteTopic;
}

public boolean getIfExists() {
return ifExists;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

final AbstractStreamDropStatement that = (AbstractStreamDropStatement) o;
return deleteTopic == that.deleteTopic
&& ifExists == that.ifExists
&& Objects.equals(name, that.name);
}

public abstract boolean getIfExists();
@Override
public int hashCode() {
return Objects.hash(name, deleteTopic, ifExists);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("name", name)
.add("deleteTopic", deleteTopic)
.add("ifExists", ifExists)
.toString();
}

public abstract QualifiedName getName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,11 @@

package io.confluent.ksql.parser.tree;

import static com.google.common.base.MoreObjects.toStringHelper;

import java.util.Objects;
import java.util.Optional;

public class DropStream
extends AbstractStreamDropStatement implements ExecutableDdlStatement {

private final QualifiedName streamName;
private final boolean ifExists;
private final boolean deleteTopic;

public DropStream(
final QualifiedName tableName,
final boolean ifExists,
Expand All @@ -40,62 +33,20 @@ public DropStream(final NodeLocation location,
this(Optional.of(location), tableName, ifExists, deleteTopic);
}

private DropStream(final Optional<NodeLocation> location,
public DropStream(final Optional<NodeLocation> location,
final QualifiedName streamName,
final boolean ifExists,
final boolean deleteTopic) {
super(location);
this.streamName = streamName;
this.ifExists = ifExists;
this.deleteTopic = deleteTopic;
}

public QualifiedName getName() {
return streamName;
}

public boolean getIfExists() {
return ifExists;
super(location, streamName, deleteTopic, ifExists);
}

public QualifiedName getStreamName() {
return streamName;
}

public boolean isDeleteTopic() {
return deleteTopic;
return getName();
}

@Override
public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
return visitor.visitDropStream(this, context);
}

@Override
public int hashCode() {
return Objects.hash(streamName, ifExists);
}

@Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if ((obj == null) || (getClass() != obj.getClass())) {
return false;
}
final DropStream o = (DropStream) obj;
return Objects.equals(streamName, o.streamName)
&& (ifExists == o.ifExists)
&& (deleteTopic == o.deleteTopic);
}

@Override
public String toString() {
return toStringHelper(this)
.add("tableName", streamName)
.add("ifExists", ifExists)
.add("deleteTopic", deleteTopic)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,10 @@

package io.confluent.ksql.parser.tree;

import static com.google.common.base.MoreObjects.toStringHelper;

import java.util.Objects;
import java.util.Optional;

public class DropTable extends AbstractStreamDropStatement implements ExecutableDdlStatement {

private final QualifiedName tableName;
private final boolean ifExists;
private final boolean deleteTopic;

public DropTable(
final QualifiedName tableName,
final boolean ifExists,
Expand All @@ -36,58 +29,15 @@ public DropTable(final Optional<NodeLocation> location,
final QualifiedName tableName,
final boolean ifExists,
final boolean deleteTopic) {
super(location);
this.tableName = tableName;
this.ifExists = ifExists;
this.deleteTopic = deleteTopic;
}

public QualifiedName getName() {
return tableName;
}

public boolean getIfExists() {
return ifExists;
super(location, tableName, deleteTopic, ifExists);
}

public QualifiedName getTableName() {
return tableName;
}

public boolean isDeleteTopic() {
return deleteTopic;
return getName();
}

@Override
public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
return visitor.visitDropTable(this, context);
}

@Override
public int hashCode() {
return Objects.hash(tableName, ifExists);
}

@Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if ((obj == null) || (getClass() != obj.getClass())) {
return false;
}
final DropTable o = (DropTable) obj;
return Objects.equals(tableName, o.tableName)
&& (ifExists == o.ifExists)
&& (deleteTopic == o.deleteTopic);
}

@Override
public String toString() {
return toStringHelper(this)
.add("tableName", tableName)
.add("ifExists", ifExists)
.add("deleteTopic", deleteTopic)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateTableAsSelect;
import io.confluent.ksql.parser.tree.DropStream;
import io.confluent.ksql.parser.tree.DropTable;
import io.confluent.ksql.parser.tree.ExecutableDdlStatement;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.RunScript;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.TerminateQuery;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.entity.CommandStatus;
Expand Down Expand Up @@ -187,10 +190,10 @@ private void executeStatement(
DdlCommandResult result = null;
String successMessage = "";
if (statement.getStatement() instanceof ExecutableDdlStatement) {
final ExecutableDdlStatement ddlStatement =
maybeUnsetDeleteTopic(statement.getStatement(), mode);
result = ksqlEngine.executeDdlStatement(
statement.getStatementText(),
(ExecutableDdlStatement) statement.getStatement(),
command.getOverwriteProperties());
statement.getStatementText(), ddlStatement, command.getOverwriteProperties());
} else if (statement.getStatement() instanceof CreateAsSelect) {
startQuery(statement, command, mode);
successMessage = statement.getStatement() instanceof CreateTableAsSelect
Expand All @@ -217,6 +220,28 @@ private void executeStatement(
putFinalStatus(commandId, commandStatusFuture, successStatus);
}

private ExecutableDdlStatement maybeUnsetDeleteTopic(final Statement statement, final Mode mode) {
if (mode == Mode.RESTORE && statement instanceof DropStream) {
final DropStream dropStream = (DropStream) statement;
return new DropStream(
dropStream.getLocation(),
dropStream.getName(),
dropStream.getIfExists(),
false
);
} else if (mode == Mode.RESTORE && statement instanceof DropTable) {
final DropTable dropTable = (DropTable) statement;
return new DropTable(
dropTable.getLocation(),
dropTable.getName(),
dropTable.getIfExists(),
false
);
} else {
return (ExecutableDdlStatement) statement;
}
}

private void handleRunScript(final Command command, final Mode mode) {

if (command.getOverwriteProperties().containsKey(KsqlConstants.RUN_SCRIPT_STATEMENTS_CONTENT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.services.TestServiceContext;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy;
Expand All @@ -62,6 +63,7 @@
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.After;
import org.junit.Test;
import org.mockito.Mockito;

public class RecoveryTest {
private final KsqlConfig ksqlConfig = new KsqlConfig(
Expand All @@ -82,7 +84,8 @@ public void tearDown() {
serviceContext.close();
}

private KsqlEngine createKsqlEngine() {
private KsqlEngine createKsqlEngine(
final ServiceContext serviceContext) {
final KsqlEngineMetrics engineMetrics = EasyMock.niceMock(KsqlEngineMetrics.class);
EasyMock.replay(engineMetrics);
return KsqlEngineTestUtil.createKsqlEngine(
Expand Down Expand Up @@ -154,7 +157,11 @@ private class KsqlServer {
final CommandRunner commandRunner;

KsqlServer(final List<QueuedCommand> commandLog) {
this.ksqlEngine = createKsqlEngine();
this(commandLog, serviceContext);
}

KsqlServer(final List<QueuedCommand> commandLog, final ServiceContext serviceContext) {
this.ksqlEngine = createKsqlEngine(serviceContext);
this.commandIdAssigner = new CommandIdAssigner(ksqlEngine.getMetaStore());
this.fakeCommandQueue = new FakeCommandQueue(
new CommandIdAssigner(ksqlEngine.getMetaStore()),
Expand Down Expand Up @@ -541,6 +548,25 @@ public void shouldRecoverDrop() {
shouldRecover(commands);
}

@Test
public void shouldNotDeleteTopicOnRecovery() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"TERMINATE CSAS_B_0;",
"DROP STREAM B DELETE TOPIC;"
);

KafkaTopicClient mockKafkaClient = Mockito.mock(KafkaTopicClient.class);
Mockito.when(mockKafkaClient.isTopicExists(Mockito.anyString())).thenReturn(true);

final KsqlServer recoverServer = new KsqlServer(
commands, TestServiceContext.create(mockKafkaClient));

recoverServer.recover();
Mockito.verify(mockKafkaClient, Mockito.never()).deleteTopics(Mockito.anyCollection());
}

@Test
public void shouldRecoverLogWithRepeatedTerminates() {
server1.submitCommands(
Expand Down
Loading

0 comments on commit 8cc94ed

Please sign in to comment.