Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: backport fixes from query close #4819

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public ExecuteResult execute(

@Override
public void close() {
allLiveQueries.forEach(QueryMetadata::close);
allLiveQueries.forEach(QueryMetadata::stop);
engineMetrics.close();
aggregateMetricsCollector.shutdown();
}
Expand All @@ -255,8 +255,10 @@ private void unregisterQuery(final QueryMetadata query) {
final String applicationId = query.getQueryApplicationId();

if (!query.getState().equalsIgnoreCase("NOT_RUNNING")) {
throw new IllegalStateException("query not stopped."
+ " id " + applicationId + ", state: " + query.getState());
log.warn(
"Unregistering query that has not terminated. "
+ "This may happen when streams threads are hung. State: " + query.getState()
);
}

if (!allLiveQueries.remove(query)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,9 @@ public Set<String> getSinkNames() {
public DataSource.DataSourceSerDe getResultTopicSerde() {
return resultTopic.getKsqlTopicSerDe().getSerDe();
}

@Override
public void stop() {
doClose(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryMetadata {
public abstract class QueryMetadata {

private static final Logger LOG = LoggerFactory.getLogger(QueryMetadata.class);

Expand Down Expand Up @@ -154,14 +154,39 @@ public boolean hasEverBeenStarted() {
return everStarted;
}


/**
* Stops the query without cleaning up the external resources
* so that it can be resumed when we call {@link #start()}.
*
* <p>NOTE: {@link QueuedQueryMetadata} overrides this method
* since any time a transient query is stopped the external resources
* should be cleaned up.</p>
*
* @see #close()
*/
public abstract void stop();

/**
* Closes the {@code QueryMetadata} and cleans up any of
* the resources associated with it (e.g. internal topics,
* schemas, etc...).
*
* @see QueryMetadata#stop()
*/
public void close() {
doClose(true);
closeCallback.accept(this);
}

protected void doClose(final boolean cleanUp) {
kafkaStreams.close();

kafkaStreams.cleanUp();
if (cleanUp) {
kafkaStreams.cleanUp();
}

queryStateListener.ifPresent(QueryStateListener::close);

closeCallback.accept(this);
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public void setLimitHandler(final OutputNode.LimitHandler limitHandler) {
getOutputNode().setLimitHandler(limitHandler);
}

@Override
public void stop() {
close();
}

@Override
public void close() {
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -640,7 +641,7 @@ public void shouldNotDeleteSchemaNorTopicForTable() throws Exception {
}

@Test
public void shouldCleanUpInternalTopicsOnClose() {
public void shouldCleanUpInternalTopicsOnEngineCloseForTransientQueries() {
// Given:
final QueryMetadata query = KsqlEngineTestUtil.execute(ksqlEngine,
"select * from test1;",
Expand All @@ -655,6 +656,42 @@ public void shouldCleanUpInternalTopicsOnClose() {
verify(topicClient).deleteInternalTopics(query.getQueryApplicationId());
}

@Test
public void shouldNotCleanUpInternalTopicsOnEngineCloseForPersistentQueries() {
// Given:
final List<QueryMetadata> query = KsqlEngineTestUtil.execute(
ksqlEngine,
"create stream persistent as select * from test1;",
KSQL_CONFIG, Collections.emptyMap()
);

query.get(0).start();

// When:
ksqlEngine.close();

// Then (there are no transient queries, so no internal topics should be deleted):
verify(topicClient, never()).deleteInternalTopics(any());
}

@Test
public void shouldCleanUpInternalTopicsOnQueryCloseForPersistentQueries() {
// Given:
final List<QueryMetadata> query = KsqlEngineTestUtil.execute(
ksqlEngine,
"create stream persistent as select * from test1;",
KSQL_CONFIG, Collections.emptyMap()
);

query.get(0).start();

// When:
query.get(0).close();

// Then (there are no transient queries, so no internal topics should be deleted):
verify(topicClient).deleteInternalTopics(query.get(0).getQueryApplicationId());
}

@Test
public void shouldNotCleanUpInternalTopicsOnCloseIfQueryNeverStarted() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import io.confluent.ksql.internal.QueryStateListener;
Expand Down Expand Up @@ -52,9 +54,11 @@ public class QueryMetadataTest {
@Mock
private Consumer<QueryMetadata> closeCallback;
private QueryMetadata query;
private boolean cleanUp;

@Before
public void setup() {
cleanUp = false;
query = new QueryMetadata(
"foo",
kafkaStreams,
Expand All @@ -66,7 +70,12 @@ public void setup() {
Collections.emptyMap(),
Collections.emptyMap(),
closeCallback
);
) {
@Override
public void stop() {
doClose(cleanUp);
}
};
}

@Test
Expand Down Expand Up @@ -128,6 +137,24 @@ public void shouldCloseKStreamsAppOnCloseThenCloseCallback() {
inOrder.verify(closeCallback).accept(query);
}

@Test
public void shouldNotCallCloseCallbackOnStop() {
// When:
query.stop();

// Then:
verifyNoMoreInteractions(closeCallback);
}

@Test
public void shouldCallKafkaStreamsCloseOnStop() {
// When:
query.stop();

// Then:
verify(kafkaStreams).close();
}

@Test
public void shouldCleanUpKStreamsAppAfterCloseOnClose() {
// When:
Expand All @@ -138,4 +165,25 @@ public void shouldCleanUpKStreamsAppAfterCloseOnClose() {
inOrder.verify(kafkaStreams).close();
inOrder.verify(kafkaStreams).cleanUp();
}

@Test
public void shouldNotCleanUpKStreamsAppOnStop() {
// When:
query.stop();

// Then:
verify(kafkaStreams, never()).cleanUp();
}

@Test
public void shouldCallCleanupOnStopIfCleanup() {
// Given:
cleanUp = true;

// When:
query.stop();

// Then:
verify(kafkaStreams).cleanUp();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,8 @@ private KsqlConfig buildMergedConfig(final Command command) {
private void terminateQuery(final PreparedStatement<TerminateQuery> terminateQuery) {
final QueryId queryId = terminateQuery.getStatement().getQueryId();

ksqlEngine.getPersistentQuery(queryId)
.orElseThrow(() ->
new KsqlException(String.format("No running query with id %s was found", queryId)))
.close();
final Optional<PersistentQueryMetadata> query = ksqlEngine.getPersistentQuery(queryId);
query.ifPresent(PersistentQueryMetadata::close);
}

private void maybeTerminateQueryForLegacyDropCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.rest.server.computation;

import static java.util.Collections.emptyMap;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
Expand Down Expand Up @@ -721,6 +722,39 @@ public void shouldRestoreLegacyRunScriptCommand() {
verify(mockParser, mockEngine, mockQuery);
}

@Test
public void shouldDoIdempotentTerminate() {
// Given:
final String queryStatement = "a persistent query";

final TerminateQuery terminate = mock(TerminateQuery.class);
expect(terminate.getQueryId()).andStubReturn(new QueryId("foo"));

expect(mockParser.parseSingleStatement(queryStatement))
.andStubReturn(PreparedStatement.of(queryStatement, terminate));

final PersistentQueryMetadata query = mock(PersistentQueryMetadata.class);
query.close();
expectLastCall();

expect(mockEngine.getPersistentQuery(new QueryId("foo"))).andReturn(Optional.of(query)).once();
expect(mockEngine.getPersistentQuery(new QueryId("foo"))).andReturn(Optional.empty()).once();

replayAll();
final QueuedCommand cmd = new QueuedCommand(
new CommandId(Type.TERMINATE, "-", Action.EXECUTE),
new Command(queryStatement, emptyMap(), emptyMap()),
Optional.empty()
);

// When:
statementExecutorWithMocks.handleStatement(cmd);
statementExecutorWithMocks.handleStatement(cmd);

// Then should not throw
verify(mockParser, mockEngine);
}

private void createStreamsAndStartTwoPersistentQueries() {
final Command csCommand = new Command(
"CREATE STREAM pageview ("
Expand Down