diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 658821aa422d..40b894b08d29 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -519,7 +519,11 @@ static KsqlRestApplication buildApplication( commandStore, maxStatementRetries, new ClusterTerminator(ksqlEngine, serviceContext, managedTopics), - serverState + serverState, + ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG), + Duration.ofMillis(restConfig.getLong( + KsqlRestConfig.KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS)), + metricsPrefix ); final KsqlResource ksqlResource = new KsqlResource( diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index e98aad32d6e9..a2e0e97ea7e2 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -77,6 +77,13 @@ public class KsqlRestConfig extends RestConfig { "Minimum time between consecutive health check evaluations. Health check queries before " + "the interval has elapsed will receive cached responses."; + static final String KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS = + KSQL_CONFIG_PREFIX + "server.command.blocked.threshold.error.ms"; + + private static final String KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS_DOC = + "How long to wait for the command runner to process a command from the command topic " + + "before reporting an error metric."; + private static final ConfigDef CONFIG_DEF; static { @@ -122,6 +129,13 @@ public class KsqlRestConfig extends RestConfig { 5000L, Importance.LOW, KSQL_HEALTHCHECK_INTERVAL_MS_DOC + ) + .define( + KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS, + Type.LONG, + 15000L, + Importance.LOW, + KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS_DOC ); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index 046a3e994d78..5adb297d554a 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -21,10 +21,13 @@ import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.rest.util.ClusterTerminator; import io.confluent.ksql.rest.util.TerminateCluster; +import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.RetryUtil; import java.io.Closeable; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -32,6 +35,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.common.errors.WakeupException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +47,6 @@ * Also responsible for taking care of any exceptions that occur in the process. */ public class CommandRunner implements Closeable { - private static final Logger log = LoggerFactory.getLogger(CommandRunner.class); private static final int STATEMENT_RETRY_MS = 100; @@ -59,12 +62,25 @@ public class CommandRunner implements Closeable { private final ClusterTerminator clusterTerminator; private final ServerState serverState; + private final CommandRunnerStatusMetric commandRunnerStatusMetric; + private final AtomicReference> currentCommandRef; + private final Duration commandRunnerHealthTimeout; + private final Clock clock; + + public enum CommandRunnerStatus { + RUNNING, + ERROR + } + public CommandRunner( final InteractiveStatementExecutor statementExecutor, final CommandQueue commandStore, final int maxRetries, final ClusterTerminator clusterTerminator, - final ServerState serverState + final ServerState serverState, + final String ksqlServiceId, + final Duration commandRunnerHealthTimeout, + final String metricsGroupPrefix ) { this( statementExecutor, @@ -72,7 +88,11 @@ public CommandRunner( maxRetries, clusterTerminator, Executors.newSingleThreadExecutor(r -> new Thread(r, "CommandRunner")), - serverState + serverState, + ksqlServiceId, + commandRunnerHealthTimeout, + metricsGroupPrefix, + Clock.systemUTC() ); } @@ -83,7 +103,11 @@ public CommandRunner( final int maxRetries, final ClusterTerminator clusterTerminator, final ExecutorService executor, - final ServerState serverState + final ServerState serverState, + final String ksqlServiceId, + final Duration commandRunnerHealthTimeout, + final String metricsGroupPrefix, + final Clock clock ) { this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor"); this.commandStore = Objects.requireNonNull(commandStore, "commandStore"); @@ -91,6 +115,12 @@ public CommandRunner( this.clusterTerminator = Objects.requireNonNull(clusterTerminator, "clusterTerminator"); this.executor = Objects.requireNonNull(executor, "executor"); this.serverState = Objects.requireNonNull(serverState, "serverState"); + this.commandRunnerHealthTimeout = + Objects.requireNonNull(commandRunnerHealthTimeout, "commandRunnerHealthTimeout"); + this.currentCommandRef = new AtomicReference<>(null); + this.commandRunnerStatusMetric = + new CommandRunnerStatusMetric(ksqlServiceId, this, metricsGroupPrefix); + this.clock = clock; } /** @@ -114,6 +144,7 @@ public void close() { } catch (final InterruptedException e) { Thread.currentThread().interrupt(); } + commandRunnerStatusMetric.close(); commandStore.close(); } @@ -128,13 +159,17 @@ public void processPriorCommands() { return; } restoreCommands.forEach( - command -> RetryUtil.retryWithBackoff( - maxRetries, - STATEMENT_RETRY_MS, - MAX_STATEMENT_RETRY_MS, - () -> statementExecutor.handleRestore(command), - WakeupException.class - ) + command -> { + currentCommandRef.set(new Pair<>(command, clock.instant())); + RetryUtil.retryWithBackoff( + maxRetries, + STATEMENT_RETRY_MS, + MAX_STATEMENT_RETRY_MS, + () -> statementExecutor.handleRestore(command), + WakeupException.class + ); + currentCommandRef.set(null); + } ); final KsqlEngine ksqlEngine = statementExecutor.getKsqlEngine(); ksqlEngine.getPersistentQueries().forEach(PersistentQueryMetadata::start); @@ -174,6 +209,7 @@ private void executeStatement(final QueuedCommand queuedCommand) { } }; + currentCommandRef.set(new Pair<>(queuedCommand, clock.instant())); RetryUtil.retryWithBackoff( maxRetries, STATEMENT_RETRY_MS, @@ -181,6 +217,7 @@ private void executeStatement(final QueuedCommand queuedCommand) { task, WakeupException.class ); + currentCommandRef.set(null); } private static Optional findTerminateCommand( @@ -204,6 +241,21 @@ private void terminateCluster(final Command command) { log.info("The KSQL server was terminated."); } + CommandRunnerStatus checkCommandRunnerStatus() { + final Pair currentCommand = currentCommandRef.get(); + if (currentCommand == null) { + return CommandRunnerStatus.RUNNING; + } + + return Duration.between(currentCommand.right, clock.instant()).toMillis() + < commandRunnerHealthTimeout.toMillis() + ? CommandRunnerStatus.RUNNING : CommandRunnerStatus.ERROR; + } + + Pair getCurrentCommand() { + return currentCommandRef.get(); + } + private class Runner implements Runnable { @Override diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetric.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetric.java new file mode 100644 index 000000000000..ebb9ee968ea2 --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetric.java @@ -0,0 +1,82 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.computation; + +import com.google.common.annotations.VisibleForTesting; +import io.confluent.ksql.metrics.MetricCollectors; +import io.confluent.ksql.util.KsqlConstants; + +import java.io.Closeable; +import java.util.Collections; +import java.util.Objects; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.Metrics; + +/** + * Emits a JMX metric that indicates the health of the CommandRunner thread. + */ +public class CommandRunnerStatusMetric implements Closeable { + + private static final String DEFAULT_METRIC_GROUP_PREFIX = "ksql-rest-app"; + private static final String METRIC_GROUP_POST_FIX = "-command-runner"; + + private final Metrics metrics; + private final MetricName metricName; + private final String metricGroupName; + + CommandRunnerStatusMetric( + final String ksqlServiceId, + final CommandRunner commandRunner, + final String metricGroupPrefix + ) { + this( + MetricCollectors.getMetrics(), + commandRunner, + ksqlServiceId, + metricGroupPrefix.isEmpty() ? DEFAULT_METRIC_GROUP_PREFIX : metricGroupPrefix + ); + } + + @VisibleForTesting + CommandRunnerStatusMetric( + final Metrics metrics, + final CommandRunner commandRunner, + final String ksqlServiceId, + final String metricsGroupPrefix + ) { + this.metrics = Objects.requireNonNull(metrics, "metrics"); + this.metricGroupName = metricsGroupPrefix + METRIC_GROUP_POST_FIX; + this.metricName = metrics.metricName( + "status", + KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + ksqlServiceId + metricGroupName, + "The status of the commandRunner thread as it processes the command topic.", + Collections.emptyMap() + ); + + this.metrics.addMetric(metricName, (Gauge) + (config, now) -> commandRunner.checkCommandRunnerStatus().name()); + } + + /** + * Close the metric + */ + @Override + public void close() { + metrics.removeMetric(metricName); + } +} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetricTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetricTest.java new file mode 100644 index 000000000000..0e56d3da3362 --- /dev/null +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetricTest.java @@ -0,0 +1,108 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.computation; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.Metrics; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Collections; + +@RunWith(MockitoJUnitRunner.class) +public class CommandRunnerStatusMetricTest { + + private static final MetricName METRIC_NAME = + new MetricName("bob", "g1", "d1", ImmutableMap.of()); + private static final String KSQL_SERVICE_ID = "kcql-1-"; + + @Mock + private Metrics metrics; + @Mock + private CommandRunner commandRunner; + @Captor + private ArgumentCaptor> gaugeCaptor; + + private CommandRunnerStatusMetric commandRunnerStatusMetric; + + @Before + public void setUp() { + when(metrics.metricName(any(), any(), any(), anyMap())).thenReturn(METRIC_NAME); + when(commandRunner.checkCommandRunnerStatus()).thenReturn(CommandRunner.CommandRunnerStatus.RUNNING); + + commandRunnerStatusMetric = new CommandRunnerStatusMetric(metrics, commandRunner, KSQL_SERVICE_ID, "rest"); + } + + @Test + public void shouldAddMetricOnCreation() { + // When: + // Listener created in setup + + // Then: + verify(metrics).metricName("status", "_confluent-ksql-kcql-1-rest-command-runner", + "The status of the commandRunner thread as it processes the command topic.", + Collections.emptyMap()); + + verify(metrics).addMetric(eq(METRIC_NAME), isA(Gauge.class)); + } + + @Test + public void shouldInitiallyBeRunningState() { + // When: + // CommandRunnerStatusMetric created in setup + + // Then: + assertThat(currentGaugeValue(), is(CommandRunner.CommandRunnerStatus.RUNNING.name())); + } + + @Test + public void shouldUpdateToErrorState() { + // When: + when(commandRunner.checkCommandRunnerStatus()).thenReturn(CommandRunner.CommandRunnerStatus.ERROR); + + // Then: + assertThat(currentGaugeValue(), is(CommandRunner.CommandRunnerStatus.ERROR.name())); + } + + @Test + public void shouldRemoveMetricOnClose() { + // When: + commandRunnerStatusMetric.close(); + + // Then: + verify(metrics).removeMetric(METRIC_NAME); + } + + private String currentGaugeValue() { + verify(metrics).addMetric(any(), gaugeCaptor.capture()); + return gaugeCaptor.getValue().value(null, 0L); + } +} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index 5aa9515cf176..c729d386259f 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -15,11 +15,16 @@ package io.confluent.ksql.rest.server.computation; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.notNullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNotNull; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; @@ -27,14 +32,23 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.hamcrest.MockitoHamcrest.argThat; +import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually; import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.rest.util.ClusterTerminator; import io.confluent.ksql.rest.util.TerminateCluster; + +import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.Arrays; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -45,6 +59,7 @@ @RunWith(MockitoJUnitRunner.class) public class CommandRunnerTest { + private static long COMMAND_RUNNER_HEALTH_TIMEOUT = 1000; @Mock private InteractiveStatementExecutor statementExecutor; @@ -57,6 +72,8 @@ public class CommandRunnerTest { @Mock private KsqlEngine ksqlEngine; @Mock + private Clock clock; + @Mock private Command command; @Mock private Command clusterTerminate; @@ -72,6 +89,7 @@ public class CommandRunnerTest { @Before public void setup() { + MetricCollectors.initialize(); when(statementExecutor.getKsqlEngine()).thenReturn(ksqlEngine); when(command.getStatement()).thenReturn("something that is not terminate"); @@ -90,7 +108,12 @@ public void setup() { 1, clusterTerminator, executor, - serverState); + serverState, + "ksql-service-id", + Duration.ofMillis(COMMAND_RUNNER_HEALTH_TIMEOUT), + "", + clock + ); } @Test @@ -167,6 +190,45 @@ public void shouldEarlyOutIfNewCommandsContainsTerminate() { verify(statementExecutor, never()).handleRestore(queuedCommand3); } + @Test + public void shouldTransitionFromRunningToError() throws InterruptedException { + // Given: + givenQueuedCommands(queuedCommand1); + + final Instant current = Instant.now(); + final CountDownLatch handleStatementLatch = new CountDownLatch(1); + final CountDownLatch commandSetLatch = new CountDownLatch(1); + when(clock.instant()).thenReturn(current) + .thenReturn(current.plusMillis(500)) + .thenReturn(current.plusMillis(1500)) + .thenReturn(current.plusMillis(2500)); + doAnswer(invocation -> { + commandSetLatch.countDown(); + handleStatementLatch.await(); + return null; + }).when(statementExecutor).handleStatement(queuedCommand1); + + // When: + AtomicReference expectedException = new AtomicReference<>(null); + final Thread commandRunnerThread = (new Thread(() -> { + try { + commandRunner.fetchAndRunCommands(); + } catch (Exception e) { + expectedException.set(e); + } + })); + + // Then: + commandRunnerThread.start(); + commandSetLatch.await(); + assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.RUNNING)); + assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.ERROR)); + handleStatementLatch.countDown(); + commandRunnerThread.join(); + assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.RUNNING)); + assertThat(expectedException.get(), equalTo(null)); + } + @Test public void shouldEarlyOutOnShutdown() { // Given: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index e202dccd3e13..d885c6587ae4 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -34,6 +34,7 @@ import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.MetaStoreImpl; import io.confluent.ksql.metastore.model.DataSource; +import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.query.id.SpecificQueryIdGenerator; @@ -202,12 +203,16 @@ private class KsqlServer { queryIdGenerator ); + MetricCollectors.initialize(); this.commandRunner = new CommandRunner( statementExecutor, fakeCommandQueue, 1, mock(ClusterTerminator.class), - serverState + serverState, + "ksql-service-id", + Duration.ofMillis(2000), + "" ); this.ksqlResource = new KsqlResource(