Skip to content

Commit

Permalink
separate metric into separate class and added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed Dec 4, 2019
1 parent e5bd35d commit e0d55ab
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,8 @@ static KsqlRestApplication buildApplication(
maxStatementRetries,
new ClusterTerminator(ksqlEngine, serviceContext, managedTopics),
serverState,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
Duration.ofMillis(restConfig.getLong(KsqlRestConfig.KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS))
);

final KsqlResource ksqlResource = new KsqlResource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ public class KsqlRestConfig extends RestConfig {
"Minimum time between consecutive health check evaluations. Health check queries before "
+ "the interval has elapsed will receive cached responses.";

static final String KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS =
KSQL_CONFIG_PREFIX + "server.command.runner.healthcheck.ms";

private static final String KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS_DOC =
"How long to wait for the command runner to process a command from the command topic "
+ "before reporting an error metric.";

private static final ConfigDef CONFIG_DEF;

static {
Expand Down Expand Up @@ -122,6 +129,13 @@ public class KsqlRestConfig extends RestConfig {
5000L,
Importance.LOW,
KSQL_HEALTHCHECK_INTERVAL_MS_DOC
)
.define(
KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS,
Type.LONG,
15000L,
Importance.LOW,
KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS_DOC
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,18 +46,12 @@
* Also responsible for taking care of any exceptions that occur in the process.
*/
public class CommandRunner implements Closeable {

private static final String DEFAULT_METRIC_GROUP_PREFIX = "ksql-rest-app";
private static final String METRIC_GROUP_POST_FIX = "-command-runner-status";
private static final String metricGroupName = DEFAULT_METRIC_GROUP_PREFIX + METRIC_GROUP_POST_FIX;

private static final Logger log = LoggerFactory.getLogger(CommandRunner.class);

private static final int STATEMENT_RETRY_MS = 100;
private static final int MAX_STATEMENT_RETRY_MS = 5 * 1000;
private static final Duration NEW_CMDS_TIMEOUT = Duration.ofMillis(MAX_STATEMENT_RETRY_MS);
private static final int SHUTDOWN_TIMEOUT_MS = 3 * MAX_STATEMENT_RETRY_MS;
private static final Duration COMMAND_RUNNER_HEALTH_TIMEOUT = Duration.ofMillis(15000);

private final InteractiveStatementExecutor statementExecutor;
private final CommandQueue commandStore;
Expand All @@ -67,10 +60,12 @@ public class CommandRunner implements Closeable {
private final int maxRetries;
private final ClusterTerminator clusterTerminator;
private final ServerState serverState;

private final CommandRunnerStatusMetric commandRunnerStatusMetric;
private final AtomicReference<Pair<QueuedCommand, Instant>> currentCommandRef;
private final Duration commandRunnerHealthTimeout;

protected enum CommandRunnerStatus {
public enum CommandRunnerStatus {
RUNNING,
ERROR
}
Expand All @@ -81,7 +76,8 @@ public CommandRunner(
final int maxRetries,
final ClusterTerminator clusterTerminator,
final ServerState serverState,
final String ksqlServiceId
final String ksqlServiceId,
final Duration commandRunnerHealthTimeout
) {
this(
statementExecutor,
Expand All @@ -90,7 +86,8 @@ public CommandRunner(
clusterTerminator,
Executors.newSingleThreadExecutor(r -> new Thread(r, "CommandRunner")),
serverState,
ksqlServiceId
ksqlServiceId,
commandRunnerHealthTimeout
);
}

Expand All @@ -102,14 +99,17 @@ public CommandRunner(
final ClusterTerminator clusterTerminator,
final ExecutorService executor,
final ServerState serverState,
final String ksqlServiceId
final String ksqlServiceId,
final Duration commandRunnerHealthTimeout
) {
this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor");
this.commandStore = Objects.requireNonNull(commandStore, "commandStore");
this.maxRetries = maxRetries;
this.clusterTerminator = Objects.requireNonNull(clusterTerminator, "clusterTerminator");
this.executor = Objects.requireNonNull(executor, "executor");
this.serverState = Objects.requireNonNull(serverState, "serverState");
this.commandRunnerHealthTimeout =
Objects.requireNonNull(commandRunnerHealthTimeout, "commandRunnerHealthTimeout");
this.currentCommandRef = new AtomicReference<>(null);
this.commandRunnerStatusMetric = new CommandRunnerStatusMetric(ksqlServiceId, this);
}
Expand Down Expand Up @@ -232,17 +232,17 @@ private void terminateCluster(final Command command) {
log.info("The KSQL server was terminated.");
}

protected CommandRunnerStatus checkCommandRunnerStatus() {
CommandRunnerStatus checkCommandRunnerStatus() {
final Pair<QueuedCommand, Instant> currentCommand = currentCommandRef.get();
if (currentCommand == null) {
return CommandRunnerStatus.RUNNING;
}

return Duration.between(currentCommand.right, Instant.now()).toMillis()
< COMMAND_RUNNER_HEALTH_TIMEOUT.toMillis()
< commandRunnerHealthTimeout.toMillis()
? CommandRunnerStatus.RUNNING : CommandRunnerStatus.ERROR;
}

private class Runner implements Runnable {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.computation;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Collections;

@RunWith(MockitoJUnitRunner.class)
public class CommandRunnerStatusMetricTest {

private static final MetricName METRIC_NAME =
new MetricName("bob", "g1", "d1", ImmutableMap.of());
private static final String KSQL_SERVICE_ID = "kcql-1";

@Mock
private Metrics metrics;
@Mock
private CommandRunner commandRunner;
@Captor
private ArgumentCaptor<Gauge<String>> gaugeCaptor;

private CommandRunnerStatusMetric commandRunnerStatusMetric;

@Before
public void setUp() {
when(metrics.metricName(any(), any(), any(), anyMap())).thenReturn(METRIC_NAME);
when(commandRunner.checkCommandRunnerStatus()).thenReturn(CommandRunner.CommandRunnerStatus.RUNNING);

commandRunnerStatusMetric = new CommandRunnerStatusMetric(metrics, commandRunner, KSQL_SERVICE_ID);
}

@Test
public void shouldAddMetricOnCreation() {
// When:
// Listener created in setup

// Then:
verify(metrics).metricName("status", "_confluent-ksql-kcql-1ksql-rest-app-command-runner",
"The status of the commandRunner thread as it processes the command topic.",
Collections.emptyMap());

verify(metrics).addMetric(eq(METRIC_NAME), isA(Gauge.class));
}

@Test
public void shouldInitiallyBeRunningState() {
// When:
// CommandRunnerStatusMetric created in setup

// Then:
assertThat(currentGaugeValue(), is(CommandRunner.CommandRunnerStatus.RUNNING.name()));
}

@Test
public void shouldUpdateToErrorState() {
// When:
when(commandRunner.checkCommandRunnerStatus()).thenReturn(CommandRunner.CommandRunnerStatus.ERROR);

// Then:
assertThat(currentGaugeValue(), is(CommandRunner.CommandRunnerStatus.ERROR.name()));
}

@Test
public void shouldRemoveMetricOnClose() {
// When:
commandRunnerStatusMetric.close();

// Then:
verify(metrics).removeMetric(METRIC_NAME);
}

private String currentGaugeValue() {
verify(metrics).addMetric(any(), gaugeCaptor.capture());
return gaugeCaptor.getValue().value(null, 0L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

package io.confluent.ksql.rest.server.computation;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
Expand All @@ -35,7 +37,13 @@
import io.confluent.ksql.rest.util.TerminateCluster;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -46,6 +54,7 @@

@RunWith(MockitoJUnitRunner.class)
public class CommandRunnerTest {
private static long COMMAND_RUNNER_HEALTH_TIMEOUT = 2000;

@Mock
private InteractiveStatementExecutor statementExecutor;
Expand Down Expand Up @@ -93,7 +102,8 @@ public void setup() {
clusterTerminator,
executor,
serverState,
"ksql-service-id"
"ksql-service-id",
Duration.ofMillis(COMMAND_RUNNER_HEALTH_TIMEOUT)
);
}

Expand Down Expand Up @@ -171,6 +181,75 @@ public void shouldEarlyOutIfNewCommandsContainsTerminate() {
verify(statementExecutor, never()).handleRestore(queuedCommand3);
}

@Test
public void shouldReportRunningIfNotStuckProcessingCommand() throws BrokenBarrierException, InterruptedException, ExecutionException {
try {
checkCommandRunnerStatus(
COMMAND_RUNNER_HEALTH_TIMEOUT - 500,
COMMAND_RUNNER_HEALTH_TIMEOUT - 1000,
CommandRunner.CommandRunnerStatus.RUNNING
);
} catch (Exception e) {
// fail test if an exception happens
assertThat(true, equalTo(false));
}
}

@Test
public void shouldReportErrorIfStuckProcessingCommand() throws BrokenBarrierException, InterruptedException, ExecutionException {
try {
checkCommandRunnerStatus(
COMMAND_RUNNER_HEALTH_TIMEOUT + 1000,
COMMAND_RUNNER_HEALTH_TIMEOUT + 500,
CommandRunner.CommandRunnerStatus.ERROR
);
} catch (Exception e) {
// fail test if an exception happens
assertThat(true, equalTo(false));
}
}

private void checkCommandRunnerStatus(
long commandProcessingTimeMs,
long timeToCheckMetricMs,
CommandRunner.CommandRunnerStatus expectedStatus
) throws BrokenBarrierException, InterruptedException, ExecutionException {
// Given:
givenQueuedCommands(queuedCommand1);
doAnswer((Answer) invocation -> {
Thread.sleep(commandProcessingTimeMs);
return null;
}).when(statementExecutor).handleStatement(queuedCommand1);

// When:
final CyclicBarrier gate = new CyclicBarrier(3);
AtomicReference<Exception> expectedException = new AtomicReference<>(null);
(new Thread(() -> {
try {
gate.await();
commandRunner.fetchAndRunCommands();
} catch (Exception e) {
expectedException.set(e);
}
})).start();

CompletableFuture<CommandRunner.CommandRunnerStatus> statusFuture = new CompletableFuture<>();
(new Thread(() -> {
try {
gate.await();
Thread.sleep(timeToCheckMetricMs);
statusFuture.complete(commandRunner.checkCommandRunnerStatus());
} catch (Exception e) {
expectedException.set(e);
}
})).start();

// Then:
gate.await();
assertThat(statusFuture.get(), equalTo(expectedStatus));
assertThat(expectedException.get(), equalTo(null));
}

@Test
public void shouldEarlyOutOnShutdown() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ private class KsqlServer {
1,
mock(ClusterTerminator.class),
serverState,
"ksql-service-id"
"ksql-service-id",
Duration.ofMillis(2000)
);

this.ksqlResource = new KsqlResource(
Expand Down

0 comments on commit e0d55ab

Please sign in to comment.