From 5fbf17188282684834ba45077b45ea65c028c9cb Mon Sep 17 00:00:00 2001 From: Rohan Date: Wed, 29 Jul 2020 21:40:20 -0700 Subject: [PATCH] fix: adds a handler to gracefully shutdown (#5895) * fix: adds a handler to gracefully shutdown service Couple changes related to application lifecycle management. Firstly, adds a shutdown handler that gracefully shuts down the service when the jvm determines its time to shut down (e.g. when it receives a termination signal). Secondly, this patch reorganizes some of the startup, steady-state, and shutdown code to make shutdown easier to reason about. Specifically, all these methods are now called from the same thread, so both the thread-safety and order of execution are guaranteed. All the shutdown hook does is notify the main thread and then wait for it to exit. --- .../ksql/rest/server/ConnectExecutable.java | 15 ++++--- .../ksql/rest/server/Executable.java | 17 +++++--- .../ksql/rest/server/KsqlRestApplication.java | 31 +++++++------- .../ksql/rest/server/KsqlServerMain.java | 42 ++++++++++++++----- .../ksql/rest/server/MultiExecutable.java | 9 +++- .../ksql/rest/server/StandaloneExecutor.java | 10 +++-- .../rest/server/ConnectIntegrationTest.java | 2 +- .../rest/server/KsqlRestApplicationTest.java | 4 +- .../ksql/rest/server/KsqlServerMainTest.java | 30 +++++++++++-- .../ksql/rest/server/MultiExecutableTest.java | 19 +++++++-- .../StandaloneExecutorFunctionalTest.java | 4 +- .../rest/server/StandaloneExecutorTest.java | 7 ++-- .../ksql/rest/server/TestKsqlRestApp.java | 2 +- 13 files changed, 132 insertions(+), 60 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/ConnectExecutable.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/ConnectExecutable.java index c8f6f6b9b7c1..a2cc82072518 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/ConnectExecutable.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/ConnectExecutable.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CountDownLatch; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.cli.ConnectDistributed; import org.apache.kafka.connect.errors.ConnectException; @@ -35,6 +36,7 @@ public final class ConnectExecutable implements Executable { private final ConnectDistributed connectDistributed; private final Map workerProps; private Connect connect; + private final CountDownLatch terminateLatch = new CountDownLatch(1); public static ConnectExecutable of(final String configFile) throws IOException { final Map workerProps = !configFile.isEmpty() @@ -64,16 +66,19 @@ public void startAsync() { } @Override - public void triggerShutdown() { + public void shutdown() { if (connect != null) { connect.stop(); } } @Override - public void awaitTerminated() { - if (connect != null) { - connect.awaitStop(); - } + public void notifyTerminated() { + terminateLatch.countDown(); + } + + @Override + public void awaitTerminated() throws InterruptedException { + terminateLatch.await(); } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/Executable.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/Executable.java index 70a5a1385de3..b5bac199ed8f 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/Executable.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/Executable.java @@ -21,19 +21,24 @@ public interface Executable { /** - * Starts the executable asynchronously. + * Starts the executable asynchronously. Guaranteed to be called before shutdown. */ default void startAsync() throws Exception {} /** - * Triggers a shutdown asynchronously, in order to ensure that the shutdown - * has finished use {@link #awaitTerminated()} + * Called to notify threads awaiting termination (see #awaitTerminated) + * that it's time to shutdown. */ - default void triggerShutdown() throws Exception {} + default void notifyTerminated() {} /** - * Awaits the {@link #triggerShutdown()} to finish. This is a blocking - * operation. + * Shutdown the service. + */ + default void shutdown() throws Exception {} + + /** + * Awaits the {@link #notifyTerminated()} notification. This is a blocking + * operation. Guaranteed to be called before shutdown. */ default void awaitTerminated() throws InterruptedException {} } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index a5d34ad72183..eeb073c80de6 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -128,7 +128,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -169,7 +169,6 @@ public final class KsqlRestApplication implements Executable { private final Optional heartbeatAgent; private final Optional lagReportingAgent; private final PullQueryExecutor pullQueryExecutor; - private final AtomicBoolean shuttingDown = new AtomicBoolean(false); private final ServerInfoResource serverInfoResource; private final Optional heartbeatResource; private final Optional clusterStatusResource; @@ -185,7 +184,7 @@ public final class KsqlRestApplication implements Executable { // The startup thread that can be interrupted if necessary during shutdown. This should only // happen if startup hangs. - private volatile Thread startAsyncThread; + private AtomicReference startAsyncThreadRef = new AtomicReference<>(null); public static SourceName getCommandsStreamName() { return COMMANDS_STREAM_NAME; @@ -305,7 +304,7 @@ public void startAsync() { pullQueryExecutor ); - startAsyncThread = Thread.currentThread(); + startAsyncThreadRef.set(Thread.currentThread()); try { final Endpoints endpoints = new KsqlServerEndpoints( ksqlEngine, @@ -340,7 +339,7 @@ public void startAsync() { } catch (AbortApplicationStartException e) { log.error("Aborting application start", e); } finally { - startAsyncThread = null; + startAsyncThreadRef.set(null); } } @@ -392,14 +391,14 @@ private void waitForPreconditions() { 1000, 30000, this::checkPreconditions, - shuttingDown::get, + terminatedFuture::isDone, predicates ); } catch (KsqlFailedPrecondition e) { log.error("Failed to meet preconditions. Exiting...", e); } - if (shuttingDown.get()) { + if (terminatedFuture.isDone()) { throw new AbortApplicationStartException( "Shutting down application during waitForPreconditions"); } @@ -441,17 +440,19 @@ private void initialize(final KsqlConfig configWithApplicationServer) { serverState.setReady(); } - @SuppressWarnings("checkstyle:NPathComplexity") @Override - public void triggerShutdown() { - log.debug("ksqlDB triggerShutdown called"); - // First, make sure the server wasn't stuck in startup. Set the shutdown flag and interrupt the - // startup thread if it's been hanging. - shuttingDown.set(true); + public void notifyTerminated() { + terminatedFuture.complete(null); + final Thread startAsyncThread = startAsyncThreadRef.get(); if (startAsyncThread != null) { startAsyncThread.interrupt(); } + } + @SuppressWarnings("checkstyle:NPathComplexity") + @Override + public void shutdown() { + log.info("ksqlDB shutdown called"); try { streamedQueryResource.closeMetrics(); } catch (final Exception e) { @@ -496,9 +497,7 @@ public void triggerShutdown() { shutdownAdditionalAgents(); - log.debug("ksqlDB triggerShutdown complete"); - - terminatedFuture.complete(null); + log.info("ksqlDB shutdown complete"); } @Override diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java index b1275602c8d7..f83316e0e93a 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import org.apache.kafka.streams.StreamsConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +34,7 @@ public class KsqlServerMain { private static final Logger log = LoggerFactory.getLogger(KsqlServerMain.class); + private final Executor shutdownHandler; private final Executable executable; public static void main(final String[] args) { @@ -55,29 +58,46 @@ public static void main(final String[] args) { final Optional queriesFile = serverOptions.getQueriesFile(properties); final Executable executable = createExecutable( properties, queriesFile, installDir, ksqlConfig); - new KsqlServerMain(executable).tryStartApp(); + new KsqlServerMain( + executable, + r -> Runtime.getRuntime().addShutdownHook(new Thread(r)) + ).tryStartApp(); } catch (final Exception e) { log.error("Failed to start KSQL", e); System.exit(-1); } } - KsqlServerMain(final Executable executable) { + KsqlServerMain(final Executable executable, final Executor shutdownHandler) { this.executable = Objects.requireNonNull(executable, "executable"); + this.shutdownHandler = Objects.requireNonNull(shutdownHandler, "shutdownHandler"); } void tryStartApp() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + shutdownHandler.execute(() -> { + executable.notifyTerminated(); + try { + latch.await(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + }); try { - log.info("Starting server"); - executable.startAsync(); - log.info("Server up and running"); - executable.awaitTerminated(); - } catch (Throwable t) { - log.error("Unhandled exception in server startup", t); - throw t; + try { + log.info("Starting server"); + executable.startAsync(); + log.info("Server up and running"); + executable.awaitTerminated(); + } catch (Throwable t) { + log.error("Unhandled exception in server startup", t); + throw t; + } finally { + log.info("Server shutting down"); + executable.shutdown(); + } } finally { - log.info("Server shutting down"); - executable.triggerShutdown(); + latch.countDown(); } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/MultiExecutable.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/MultiExecutable.java index 94e7f1b03337..a22e124c5f4f 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/MultiExecutable.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/MultiExecutable.java @@ -43,8 +43,13 @@ public void startAsync() throws Exception { } @Override - public void triggerShutdown() throws Exception { - doAction(Executable::triggerShutdown); + public void shutdown() throws Exception { + doAction(Executable::shutdown); + } + + @Override + public void notifyTerminated() { + doAction(Executable::notifyTerminated); } @Override diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutor.java index 26cee2614a6d..57bb95616e70 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutor.java @@ -129,12 +129,17 @@ public void startAsync() { versionChecker.start(KsqlModuleType.SERVER, properties); } catch (final Exception e) { log.error("Failed to start KSQL Server with query file: " + queriesFile, e); - triggerShutdown(); throw e; } } - public void triggerShutdown() { + @Override + public void notifyTerminated() { + shutdownLatch.countDown(); + } + + @Override + public void shutdown() { try { ksqlEngine.close(); } catch (final Exception e) { @@ -145,7 +150,6 @@ public void triggerShutdown() { } catch (final Exception e) { log.warn("Failed to cleanly shutdown services", e); } - shutdownLatch.countDown(); } @Override diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/ConnectIntegrationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/ConnectIntegrationTest.java index 739f1bcbf85d..1647b0f23d4d 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/ConnectIntegrationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/ConnectIntegrationTest.java @@ -95,7 +95,7 @@ public static void setUpClass() { @AfterClass public static void tearDownClass() { - CONNECT.triggerShutdown(); + CONNECT.shutdown(); } private KsqlRestClient ksqlRestClient; diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 95aa713cfe70..f13c44144e4b 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -202,7 +202,7 @@ public void tearDown() { @Test public void shouldCloseServiceContextOnClose() { // When: - app.triggerShutdown(); + app.shutdown(); // Then: verify(serviceContext).close(); @@ -211,7 +211,7 @@ public void shouldCloseServiceContextOnClose() { @Test public void shouldCloseSecurityExtensionOnClose() { // When: - app.triggerShutdown(); + app.shutdown(); // Then: verify(securityExtension).close(); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlServerMainTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlServerMainTest.java index 6e45e715d23f..132da7958a6b 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlServerMainTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlServerMainTest.java @@ -15,7 +15,9 @@ package io.confluent.ksql.rest.server; +import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.newCapture; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; import static org.hamcrest.MatcherAssert.assertThat; @@ -27,6 +29,8 @@ import io.confluent.ksql.util.KsqlServerException; import java.io.File; +import java.util.concurrent.Executor; +import org.easymock.Capture; import org.easymock.EasyMockRunner; import org.easymock.Mock; import org.easymock.MockType; @@ -40,12 +44,14 @@ public class KsqlServerMainTest { @Mock(MockType.NICE) private Executable executable; + @Mock(MockType.NICE) + private Executor shutdownHandler; private final File mockStreamsStateDir = mock(File.class); @Before public void setUp() { - main = new KsqlServerMain(executable); + main = new KsqlServerMain(executable, shutdownHandler); when(mockStreamsStateDir.exists()).thenReturn(true); when(mockStreamsStateDir.mkdirs()).thenReturn(true); when(mockStreamsStateDir.isDirectory()).thenReturn(true); @@ -57,7 +63,7 @@ public void setUp() { @Test public void shouldStopAppOnJoin() throws Exception { // Given: - executable.triggerShutdown(); + executable.shutdown(); expectLastCall(); replay(executable); @@ -74,7 +80,7 @@ public void shouldStopAppOnErrorStarting() throws Exception { executable.startAsync(); expectLastCall().andThrow(new RuntimeException("Boom")); - executable.triggerShutdown(); + executable.shutdown(); expectLastCall(); replay(executable); @@ -90,6 +96,24 @@ public void shouldStopAppOnErrorStarting() throws Exception { verify(executable); } + @Test + public void shouldNotifyAppOnTerminate() throws Exception { + // Given: + final Capture captureShutdownHandler = newCapture(); + shutdownHandler.execute(capture(captureShutdownHandler)); + executable.notifyTerminated(); + expectLastCall(); + replay(shutdownHandler, executable); + main.tryStartApp(); + final Runnable handler = captureShutdownHandler.getValue(); + + // When: + handler.run(); + + // Then: + verify(executable); + } + @Test public void shouldFailIfStreamsStateDirectoryCannotBeCreated() { // Given: diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/MultiExecutableTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/MultiExecutableTest.java index f046752180cb..205b36585431 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/MultiExecutableTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/MultiExecutableTest.java @@ -69,15 +69,28 @@ public void shouldJoinAll() throws Exception { inOrder.verifyNoMoreInteractions(); } + @Test + public void shouldNotifyAllToShutdown() throws Exception { + // When: + multiExecutable.notifyTerminated(); + + // Then: + // Then: + final InOrder inOrder = Mockito.inOrder(executable1, executable2); + inOrder.verify(executable1).notifyTerminated(); + inOrder.verify(executable2).notifyTerminated(); + inOrder.verifyNoMoreInteractions(); + } + @Test public void shouldStopAll() throws Exception { // When: - multiExecutable.triggerShutdown(); + multiExecutable.shutdown(); // Then: final InOrder inOrder = Mockito.inOrder(executable1, executable2); - inOrder.verify(executable1).triggerShutdown(); - inOrder.verify(executable2).triggerShutdown(); + inOrder.verify(executable1).shutdown(); + inOrder.verify(executable2).shutdown(); inOrder.verifyNoMoreInteractions(); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFunctionalTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFunctionalTest.java index b860dfc0848c..5ec13a90ead7 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFunctionalTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFunctionalTest.java @@ -52,7 +52,6 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.FixMethodOrder; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; @@ -131,8 +130,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { - standalone.triggerShutdown(); - standalone.awaitTerminated(); + standalone.shutdown(); } @Test diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java index eec0a83717db..17317a714c86 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java @@ -361,8 +361,7 @@ public void shouldStartTheVersionCheckerAgentWithCorrectProperties() throws Inte // Then: verify(versionChecker).start(eq(KsqlModuleType.SERVER), captor.capture()); assertThat(captor.getValue().getProperty("confluent.support.metrics.enable"), equalTo("false")); - standaloneExecutor.triggerShutdown(); - standaloneExecutor.awaitTerminated(); + standaloneExecutor.shutdown(); } @Test @@ -712,7 +711,7 @@ public void shouldThrowIfExecuteThrows() { @Test public void shouldCloseEngineOnStop() { // When: - standaloneExecutor.triggerShutdown(); + standaloneExecutor.shutdown(); // Then: verify(ksqlEngine).close(); @@ -721,7 +720,7 @@ public void shouldCloseEngineOnStop() { @Test public void shouldCloseServiceContextOnStop() { // When: - standaloneExecutor.triggerShutdown(); + standaloneExecutor.shutdown(); // Then: verify(serviceContext).close(); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java index cdcb46093c16..0afc3ba3b688 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java @@ -268,7 +268,7 @@ protected void after() { listeners.clear(); internalListener = null; try { - ksqlRestApplication.triggerShutdown(); + ksqlRestApplication.shutdown(); } catch (final Exception e) { throw new RuntimeException(e); }