Skip to content

Commit

Permalink
feat: add JMX metric for commandRunner status (#4019)
Browse files Browse the repository at this point in the history
* feat: add metric for commandRunner status

* separate metric into separate class and added tests

* add clock
  • Loading branch information
stevenpyzhang authored Dec 5, 2019
1 parent cc7e4ee commit 55d75f2
Show file tree
Hide file tree
Showing 7 changed files with 341 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,11 @@ static KsqlRestApplication buildApplication(
commandStore,
maxStatementRetries,
new ClusterTerminator(ksqlEngine, serviceContext, managedTopics),
serverState
serverState,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
Duration.ofMillis(restConfig.getLong(
KsqlRestConfig.KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS)),
metricsPrefix
);

final KsqlResource ksqlResource = new KsqlResource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ public class KsqlRestConfig extends RestConfig {
"Minimum time between consecutive health check evaluations. Health check queries before "
+ "the interval has elapsed will receive cached responses.";

static final String KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS =
KSQL_CONFIG_PREFIX + "server.command.blocked.threshold.error.ms";

private static final String KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS_DOC =
"How long to wait for the command runner to process a command from the command topic "
+ "before reporting an error metric.";

private static final ConfigDef CONFIG_DEF;

static {
Expand Down Expand Up @@ -122,6 +129,13 @@ public class KsqlRestConfig extends RestConfig {
5000L,
Importance.LOW,
KSQL_HEALTHCHECK_INTERVAL_MS_DOC
)
.define(
KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS,
Type.LONG,
15000L,
Importance.LOW,
KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS_DOC
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.TerminateCluster;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.RetryUtil;
import java.io.Closeable;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,7 +47,6 @@
* Also responsible for taking care of any exceptions that occur in the process.
*/
public class CommandRunner implements Closeable {

private static final Logger log = LoggerFactory.getLogger(CommandRunner.class);

private static final int STATEMENT_RETRY_MS = 100;
Expand All @@ -59,20 +62,37 @@ public class CommandRunner implements Closeable {
private final ClusterTerminator clusterTerminator;
private final ServerState serverState;

private final CommandRunnerStatusMetric commandRunnerStatusMetric;
private final AtomicReference<Pair<QueuedCommand, Instant>> currentCommandRef;
private final Duration commandRunnerHealthTimeout;
private final Clock clock;

public enum CommandRunnerStatus {
RUNNING,
ERROR
}

public CommandRunner(
final InteractiveStatementExecutor statementExecutor,
final CommandQueue commandStore,
final int maxRetries,
final ClusterTerminator clusterTerminator,
final ServerState serverState
final ServerState serverState,
final String ksqlServiceId,
final Duration commandRunnerHealthTimeout,
final String metricsGroupPrefix
) {
this(
statementExecutor,
commandStore,
maxRetries,
clusterTerminator,
Executors.newSingleThreadExecutor(r -> new Thread(r, "CommandRunner")),
serverState
serverState,
ksqlServiceId,
commandRunnerHealthTimeout,
metricsGroupPrefix,
Clock.systemUTC()
);
}

Expand All @@ -83,14 +103,24 @@ public CommandRunner(
final int maxRetries,
final ClusterTerminator clusterTerminator,
final ExecutorService executor,
final ServerState serverState
final ServerState serverState,
final String ksqlServiceId,
final Duration commandRunnerHealthTimeout,
final String metricsGroupPrefix,
final Clock clock
) {
this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor");
this.commandStore = Objects.requireNonNull(commandStore, "commandStore");
this.maxRetries = maxRetries;
this.clusterTerminator = Objects.requireNonNull(clusterTerminator, "clusterTerminator");
this.executor = Objects.requireNonNull(executor, "executor");
this.serverState = Objects.requireNonNull(serverState, "serverState");
this.commandRunnerHealthTimeout =
Objects.requireNonNull(commandRunnerHealthTimeout, "commandRunnerHealthTimeout");
this.currentCommandRef = new AtomicReference<>(null);
this.commandRunnerStatusMetric =
new CommandRunnerStatusMetric(ksqlServiceId, this, metricsGroupPrefix);
this.clock = clock;
}

/**
Expand All @@ -114,6 +144,7 @@ public void close() {
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
commandRunnerStatusMetric.close();
commandStore.close();
}

Expand All @@ -128,13 +159,17 @@ public void processPriorCommands() {
return;
}
restoreCommands.forEach(
command -> RetryUtil.retryWithBackoff(
maxRetries,
STATEMENT_RETRY_MS,
MAX_STATEMENT_RETRY_MS,
() -> statementExecutor.handleRestore(command),
WakeupException.class
)
command -> {
currentCommandRef.set(new Pair<>(command, clock.instant()));
RetryUtil.retryWithBackoff(
maxRetries,
STATEMENT_RETRY_MS,
MAX_STATEMENT_RETRY_MS,
() -> statementExecutor.handleRestore(command),
WakeupException.class
);
currentCommandRef.set(null);
}
);
final KsqlEngine ksqlEngine = statementExecutor.getKsqlEngine();
ksqlEngine.getPersistentQueries().forEach(PersistentQueryMetadata::start);
Expand Down Expand Up @@ -174,13 +209,15 @@ private void executeStatement(final QueuedCommand queuedCommand) {
}
};

currentCommandRef.set(new Pair<>(queuedCommand, clock.instant()));
RetryUtil.retryWithBackoff(
maxRetries,
STATEMENT_RETRY_MS,
MAX_STATEMENT_RETRY_MS,
task,
WakeupException.class
);
currentCommandRef.set(null);
}

private static Optional<QueuedCommand> findTerminateCommand(
Expand All @@ -204,6 +241,21 @@ private void terminateCluster(final Command command) {
log.info("The KSQL server was terminated.");
}

CommandRunnerStatus checkCommandRunnerStatus() {
final Pair<QueuedCommand, Instant> currentCommand = currentCommandRef.get();
if (currentCommand == null) {
return CommandRunnerStatus.RUNNING;
}

return Duration.between(currentCommand.right, clock.instant()).toMillis()
< commandRunnerHealthTimeout.toMillis()
? CommandRunnerStatus.RUNNING : CommandRunnerStatus.ERROR;
}

Pair<QueuedCommand, Instant> getCurrentCommand() {
return currentCommandRef.get();
}

private class Runner implements Runnable {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.computation;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.util.KsqlConstants;

import java.io.Closeable;
import java.util.Collections;
import java.util.Objects;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;

/**
* Emits a JMX metric that indicates the health of the CommandRunner thread.
*/
public class CommandRunnerStatusMetric implements Closeable {

private static final String DEFAULT_METRIC_GROUP_PREFIX = "ksql-rest-app";
private static final String METRIC_GROUP_POST_FIX = "-command-runner";

private final Metrics metrics;
private final MetricName metricName;
private final String metricGroupName;

CommandRunnerStatusMetric(
final String ksqlServiceId,
final CommandRunner commandRunner,
final String metricGroupPrefix
) {
this(
MetricCollectors.getMetrics(),
commandRunner,
ksqlServiceId,
metricGroupPrefix.isEmpty() ? DEFAULT_METRIC_GROUP_PREFIX : metricGroupPrefix
);
}

@VisibleForTesting
CommandRunnerStatusMetric(
final Metrics metrics,
final CommandRunner commandRunner,
final String ksqlServiceId,
final String metricsGroupPrefix
) {
this.metrics = Objects.requireNonNull(metrics, "metrics");
this.metricGroupName = metricsGroupPrefix + METRIC_GROUP_POST_FIX;
this.metricName = metrics.metricName(
"status",
KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + ksqlServiceId + metricGroupName,
"The status of the commandRunner thread as it processes the command topic.",
Collections.emptyMap()
);

this.metrics.addMetric(metricName, (Gauge<String>)
(config, now) -> commandRunner.checkCommandRunnerStatus().name());
}

/**
* Close the metric
*/
@Override
public void close() {
metrics.removeMetric(metricName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.computation;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Collections;

@RunWith(MockitoJUnitRunner.class)
public class CommandRunnerStatusMetricTest {

private static final MetricName METRIC_NAME =
new MetricName("bob", "g1", "d1", ImmutableMap.of());
private static final String KSQL_SERVICE_ID = "kcql-1-";

@Mock
private Metrics metrics;
@Mock
private CommandRunner commandRunner;
@Captor
private ArgumentCaptor<Gauge<String>> gaugeCaptor;

private CommandRunnerStatusMetric commandRunnerStatusMetric;

@Before
public void setUp() {
when(metrics.metricName(any(), any(), any(), anyMap())).thenReturn(METRIC_NAME);
when(commandRunner.checkCommandRunnerStatus()).thenReturn(CommandRunner.CommandRunnerStatus.RUNNING);

commandRunnerStatusMetric = new CommandRunnerStatusMetric(metrics, commandRunner, KSQL_SERVICE_ID, "rest");
}

@Test
public void shouldAddMetricOnCreation() {
// When:
// Listener created in setup

// Then:
verify(metrics).metricName("status", "_confluent-ksql-kcql-1-rest-command-runner",
"The status of the commandRunner thread as it processes the command topic.",
Collections.emptyMap());

verify(metrics).addMetric(eq(METRIC_NAME), isA(Gauge.class));
}

@Test
public void shouldInitiallyBeRunningState() {
// When:
// CommandRunnerStatusMetric created in setup

// Then:
assertThat(currentGaugeValue(), is(CommandRunner.CommandRunnerStatus.RUNNING.name()));
}

@Test
public void shouldUpdateToErrorState() {
// When:
when(commandRunner.checkCommandRunnerStatus()).thenReturn(CommandRunner.CommandRunnerStatus.ERROR);

// Then:
assertThat(currentGaugeValue(), is(CommandRunner.CommandRunnerStatus.ERROR.name()));
}

@Test
public void shouldRemoveMetricOnClose() {
// When:
commandRunnerStatusMetric.close();

// Then:
verify(metrics).removeMetric(METRIC_NAME);
}

private String currentGaugeValue() {
verify(metrics).addMetric(any(), gaugeCaptor.capture());
return gaugeCaptor.getValue().value(null, 0L);
}
}
Loading

0 comments on commit 55d75f2

Please sign in to comment.