diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java index fd937a5d912a..7cdec5c1c471 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java @@ -238,7 +238,7 @@ public QueryMetadata executeQuery( @Override public void close() { - allLiveQueries.forEach(QueryMetadata::close); + allLiveQueries.forEach(QueryMetadata::stop); engineMetrics.close(); aggregateMetricsCollector.shutdown(); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java index 970c6b7cb64d..7e9ffb196e04 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java @@ -236,7 +236,12 @@ public QueryMetadata buildTransientQuery( overrides, queryCloseCallback, ksqlConfig.getLong(KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG) - ); + ) { + @Override + public void stop() { + close(); + } + }; } private static Optional getMaterializationInfo(final Object result) { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java b/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java index 6036ec1e78f3..81eed2eebe15 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java @@ -141,4 +141,9 @@ public Optional getMaterialization( ) { return materializationProvider.map(builder -> builder.build(queryId, contextStacker)); } + + @Override + public void stop() { + doClose(false); + } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java b/ksql-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java index afceaddd9ea2..e5ee1c7a4fce 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java @@ -36,7 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class QueryMetadata { +public abstract class QueryMetadata { private static final Logger LOG = LoggerFactory.getLogger(QueryMetadata.class); @@ -169,14 +169,39 @@ public boolean hasEverBeenStarted() { return everStarted; } + + /** + * Stops the query without cleaning up the external resources + * so that it can be resumed when we call {@link #start()}. + * + *

NOTE: {@link TransientQueryMetadata} overrides this method + * since any time a transient query is stopped the external resources + * should be cleaned up.

+ * + * @see #close() + */ + public abstract void stop(); + + /** + * Closes the {@code QueryMetadata} and cleans up any of + * the resources associated with it (e.g. internal topics, + * schemas, etc...). + * + * @see QueryMetadata#stop() + */ public void close() { + doClose(true); + closeCallback.accept(this); + } + + protected void doClose(final boolean cleanUp) { kafkaStreams.close(Duration.ofMillis(closeTimeout)); - kafkaStreams.cleanUp(); + if (cleanUp) { + kafkaStreams.cleanUp(); + } queryStateListener.ifPresent(QueryStateListener::close); - - closeCallback.accept(this); } public void start() { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java b/ksql-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java index 3c83876d073e..e41fe35d4f82 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java @@ -99,13 +99,18 @@ public void setLimitHandler(final LimitHandler limitHandler) { } @Override - public void close() { + public void stop() { + close(); + } + + @Override + protected void doClose(final boolean cleanUp) { // To avoid deadlock, close the queue first to ensure producer side isn't blocked trying to // write to the blocking queue, otherwise super.close call can deadlock: rowQueue.close(); // Now safe to close: - super.close(); + super.doClose(cleanUp); isRunning.set(false); } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java index 371376f8900b..eccb3a7f9a10 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java @@ -33,6 +33,7 @@ import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -675,6 +676,63 @@ public void shouldCleanUpInternalTopicsOnClose() { verify(topicClient).deleteInternalTopics(query.getQueryApplicationId()); } + @Test + public void shouldCleanUpInternalTopicsOnEngineCloseForTransientQueries() { + // Given: + final QueryMetadata query = KsqlEngineTestUtil.executeQuery( + serviceContext, + ksqlEngine, + "select * from test1 EMIT CHANGES;", + KSQL_CONFIG, Collections.emptyMap() + ); + + query.start(); + + // When: + ksqlEngine.close(); + + // Then: + verify(topicClient).deleteInternalTopics(query.getQueryApplicationId()); + } + + @Test + public void shouldNotCleanUpInternalTopicsOnEngineCloseForPersistentQueries() { + // Given: + final List query = KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + "create stream persistent as select * from test1 EMIT CHANGES;", + KSQL_CONFIG, Collections.emptyMap() + ); + + query.get(0).start(); + + // When: + ksqlEngine.close(); + + // Then (there are no transient queries, so no internal topics should be deleted): + verify(topicClient, never()).deleteInternalTopics(any()); + } + + @Test + public void shouldCleanUpInternalTopicsOnQueryCloseForPersistentQueries() { + // Given: + final List query = KsqlEngineTestUtil.execute( + serviceContext, + ksqlEngine, + "create stream persistent as select * from test1 EMIT CHANGES;", + KSQL_CONFIG, Collections.emptyMap() + ); + + query.get(0).start(); + + // When: + query.get(0).close(); + + // Then (there are no transient queries, so no internal topics should be deleted): + verify(topicClient).deleteInternalTopics(query.get(0).getQueryApplicationId()); + } + @Test public void shouldNotCleanUpInternalTopicsOnCloseIfQueryNeverStarted() { // Given: diff --git a/ksql-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java b/ksql-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java index 44d89e7baee0..c25c596cac89 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java @@ -18,7 +18,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableSet; @@ -60,9 +62,11 @@ public class QueryMetadataTest { @Mock private Consumer closeCallback; private QueryMetadata query; + private boolean cleanUp; @Before public void setup() { + cleanUp = false; query = new QueryMetadata( "foo", kafkaStreams, @@ -74,7 +78,12 @@ public void setup() { Collections.emptyMap(), Collections.emptyMap(), closeCallback, - closeTimeout); + closeTimeout) { + @Override + public void stop() { + doClose(cleanUp); + } + }; } @Test @@ -136,6 +145,24 @@ public void shouldCloseKStreamsAppOnCloseThenCloseCallback() { inOrder.verify(closeCallback).accept(query); } + @Test + public void shouldNotCallCloseCallbackOnStop() { + // When: + query.stop(); + + // Then: + verifyNoMoreInteractions(closeCallback); + } + + @Test + public void shouldCallKafkaStreamsCloseOnStop() { + // When: + query.stop(); + + // Then: + verify(kafkaStreams).close(Duration.ofMillis(closeTimeout)); + } + @Test public void shouldCleanUpKStreamsAppAfterCloseOnClose() { // When: @@ -147,6 +174,27 @@ public void shouldCleanUpKStreamsAppAfterCloseOnClose() { inOrder.verify(kafkaStreams).cleanUp(); } + @Test + public void shouldNotCleanUpKStreamsAppOnStop() { + // When: + query.stop(); + + // Then: + verify(kafkaStreams, never()).cleanUp(); + } + + @Test + public void shouldCallCleanupOnStopIfCleanup() { + // Given: + cleanUp = true; + + // When: + query.stop(); + + // Then: + verify(kafkaStreams).cleanUp(); + } + @Test public void shouldReturnSources() { assertThat(query.getSourceNames(), is(SOME_SOURCES)); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java b/ksql-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java index 198529e24342..74c87153ba90 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java @@ -17,6 +17,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.verify; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.BlockingRowQueue; @@ -87,4 +88,17 @@ public void shouldCloseQueueBeforeTopologyToAvoidDeadLock() { inOrder.verify(rowQueue).close(); inOrder.verify(kafkaStreams).close(any()); } + + @Test + public void shouldCallCloseOnStop() { + // When: + query.stop(); + + // Then: + final InOrder inOrder = inOrder(rowQueue, kafkaStreams, closeCallback); + inOrder.verify(rowQueue).close(); + inOrder.verify(kafkaStreams).close(any()); + inOrder.verify(kafkaStreams).cleanUp(); + inOrder.verify(closeCallback).accept(query); + } } \ No newline at end of file