Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: backport fixes from query close #4662

Merged
merged 2 commits into from
Feb 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public ExecuteResult execute(

@Override
public void close() {
allLiveQueries.forEach(QueryMetadata::close);
allLiveQueries.forEach(QueryMetadata::stop);
engineMetrics.close();
aggregateMetricsCollector.shutdown();
}
Expand All @@ -201,8 +201,10 @@ private void unregisterQuery(final ServiceContext serviceContext, final QueryMet
final String applicationId = query.getQueryApplicationId();

if (!query.getState().equalsIgnoreCase("NOT_RUNNING")) {
throw new IllegalStateException("query not stopped."
+ " id " + applicationId + ", state: " + query.getState());
log.warn(
"Unregistering query that has not terminated. "
+ "This may happen when streams threads are hung. State: " + query.getState()
);
}

if (!allLiveQueries.remove(query)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,9 @@ public Optional<Materialization> getMaterialization(
) {
return materializationProvider.map(builder -> builder.build(queryId, contextStacker));
}

@Override
public void stop() {
doClose(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryMetadata {
public abstract class QueryMetadata {

private static final Logger LOG = LoggerFactory.getLogger(QueryMetadata.class);

Expand Down Expand Up @@ -140,14 +140,39 @@ public boolean hasEverBeenStarted() {
return everStarted;
}


/**
* Stops the query without cleaning up the external resources
* so that it can be resumed when we call {@link #start()}.
*
* <p>NOTE: {@link TransientQueryMetadata} overrides this method
* since any time a transient query is stopped the external resources
* should be cleaned up.</p>
*
* @see #close()
*/
public abstract void stop();

/**
* Closes the {@code QueryMetadata} and cleans up any of
* the resources associated with it (e.g. internal topics,
* schemas, etc...).
*
* @see QueryMetadata#stop()
*/
public void close() {
doClose(true);
closeCallback.accept(this);
}

protected void doClose(final boolean cleanUp) {
kafkaStreams.close();

kafkaStreams.cleanUp();
if (cleanUp) {
kafkaStreams.cleanUp();
}

queryStateListener.ifPresent(QueryStateListener::close);

closeCallback.accept(this);
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,18 @@ public void setLimitHandler(final LimitHandler limitHandler) {
}

@Override
public void close() {
public void stop() {
close();
}

@Override
protected void doClose(final boolean cleanUp) {
// To avoid deadlock, close the queue first to ensure producer side isn't blocked trying to
// write to the blocking queue, otherwise super.close call can deadlock:
rowQueue.close();

// Now safe to close:
super.close();
super.doClose(cleanUp);
isRunning.set(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

Expand Down Expand Up @@ -675,6 +676,63 @@ public void shouldCleanUpInternalTopicsOnClose() {
verify(topicClient).deleteInternalTopics(query.getQueryApplicationId());
}

@Test
public void shouldCleanUpInternalTopicsOnEngineCloseForTransientQueries() {
// Given:
final List<QueryMetadata> query = KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"select * from test1 EMIT CHANGES;",
KSQL_CONFIG, Collections.emptyMap()
);

query.get(0).start();

// When:
ksqlEngine.close();

// Then:
verify(topicClient).deleteInternalTopics(query.get(0).getQueryApplicationId());
}

@Test
public void shouldNotCleanUpInternalTopicsOnEngineCloseForPersistentQueries() {
// Given:
final List<QueryMetadata> query = KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"create stream persistent as select * from test1 EMIT CHANGES;",
KSQL_CONFIG, Collections.emptyMap()
);

query.get(0).start();

// When:
ksqlEngine.close();

// Then (there are no transient queries, so no internal topics should be deleted):
verify(topicClient, never()).deleteInternalTopics(any());
}

@Test
public void shouldCleanUpInternalTopicsOnQueryCloseForPersistentQueries() {
// Given:
final List<QueryMetadata> query = KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"create stream persistent as select * from test1 EMIT CHANGES;",
KSQL_CONFIG, Collections.emptyMap()
);

query.get(0).start();

// When:
query.get(0).close();

// Then (there are no transient queries, so no internal topics should be deleted):
verify(topicClient).deleteInternalTopics(query.get(0).getQueryApplicationId());
}

@Test
public void shouldNotCleanUpInternalTopicsOnCloseIfQueryNeverStarted() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@

package io.confluent.ksql.util;

import static io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -59,9 +60,11 @@ public class QueryMetadataTest {
@Mock
private Consumer<QueryMetadata> closeCallback;
private QueryMetadata query;
private boolean cleanUp;

@Before
public void setup() {
cleanUp = false;
query = new QueryMetadata(
"foo",
kafkaStreams,
Expand All @@ -72,8 +75,12 @@ public void setup() {
topoplogy,
Collections.emptyMap(),
Collections.emptyMap(),
closeCallback
);
closeCallback) {
@Override
public void stop() {
doClose(cleanUp);
}
};
}

@Test
Expand Down Expand Up @@ -135,6 +142,24 @@ public void shouldCloseKStreamsAppOnCloseThenCloseCallback() {
inOrder.verify(closeCallback).accept(query);
}

@Test
public void shouldNotCallCloseCallbackOnStop() {
// When:
query.stop();

// Then:
verifyNoMoreInteractions(closeCallback);
}

@Test
public void shouldCallKafkaStreamsCloseOnStop() {
// When:
query.stop();

// Then:
verify(kafkaStreams).close();
}

@Test
public void shouldCleanUpKStreamsAppAfterCloseOnClose() {
// When:
Expand All @@ -146,6 +171,27 @@ public void shouldCleanUpKStreamsAppAfterCloseOnClose() {
inOrder.verify(kafkaStreams).cleanUp();
}

@Test
public void shouldNotCleanUpKStreamsAppOnStop() {
// When:
query.stop();

// Then:
verify(kafkaStreams, never()).cleanUp();
}

@Test
public void shouldCallCleanupOnStopIfCleanup() {
// Given:
cleanUp = true;

// When:
query.stop();

// Then:
verify(kafkaStreams).cleanUp();
}

@Test
public void shouldReturnSources() {
assertThat(query.getSourceNames(), is(SOME_SOURCES));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.util;

import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.verify;

import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.BlockingRowQueue;
Expand Down Expand Up @@ -84,4 +85,17 @@ public void shouldCloseQueueBeforeTopologyToAvoidDeadLock() {
inOrder.verify(rowQueue).close();
inOrder.verify(kafkaStreams).close();
}

@Test
public void shouldCallCloseOnStop() {
// When:
query.stop();

// Then:
final InOrder inOrder = inOrder(rowQueue, kafkaStreams, closeCallback);
inOrder.verify(rowQueue).close();
inOrder.verify(kafkaStreams).close();
inOrder.verify(kafkaStreams).cleanUp();
inOrder.verify(closeCallback).accept(query);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,8 @@ private KsqlConfig buildMergedConfig(final Command command) {
private void terminateQuery(final PreparedStatement<TerminateQuery> terminateQuery) {
final QueryId queryId = terminateQuery.getStatement().getQueryId();

ksqlEngine.getPersistentQuery(queryId)
.orElseThrow(() ->
new KsqlException(String.format("No running query with id %s was found", queryId)))
.close();
final Optional<PersistentQueryMetadata> query = ksqlEngine.getPersistentQuery(queryId);
query.ifPresent(PersistentQueryMetadata::close);
}

private void maybeTerminateQueryForLegacyDropCommand(
Expand Down