Skip to content

Commit

Permalink
feat: add health check endpoint (#3501)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Oct 16, 2019
1 parent 594ea44 commit 2308686
Show file tree
Hide file tree
Showing 20 changed files with 1,123 additions and 87 deletions.
24 changes: 23 additions & 1 deletion docs/developer-guide/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use the ``curl`` command to query the ``/info`` endpoint:
Your output should resemble:

.. codewithvars:: bash
.. codewithvars:: json

{
"KsqlServerInfo": {
Expand All @@ -92,6 +92,28 @@ Your output should resemble:
}
}

You can also check the health of your KSQL server via the ``/healthcheck`` resource:

.. code:: bash
curl -sX GET "http://localhost:8088/healthcheck" | jq '.'
Your output should resemble:

.. codewithvars:: json

{
"isHealthy": true,
"details": {
"metastore": {
"isHealthy": true
},
"kafka": {
"isHealthy": true
}
}
}

Run a KSQL Statement
--------------------

Expand Down
8 changes: 4 additions & 4 deletions docs/operations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ ksql-server-start
ksql-server-stop
This script stops the KSQL server. It is located in the ``/bin`` directory of your |cp| installation.

============
Healthchecks
============
=============
Health Checks
=============

- The KSQL REST API supports a "server info" request at ``http://<server>:8088/info``.
- The KSQL REST API supports a "server info" request at ``http://<server>:8088/info`` and a basic server health check endpoint at ``http://<server>:8088/healthcheck``.
- Check runtime stats for the KSQL server that you are connected to via ``DESCRIBE EXTENDED <stream or table>`` and
``EXPLAIN <name of query>``.
- Run ``ksql-print-metrics`` on a KSQL server. For example, see this `blog post <https://www.confluent.io/blog/ksql-january-release-streaming-sql-apache-kafka/>`_.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.healthcheck;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.HealthCheckResponse;
import io.confluent.ksql.rest.entity.HealthCheckResponseDetail;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.services.SimpleKsqlClient;
import io.confluent.rest.RestConfig;
import java.net.URI;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigException;

public class HealthCheckAgent {

public static final String METASTORE_CHECK_NAME = "metastore";
public static final String KAFKA_CHECK_NAME = "kafka";

private static final List<Check> DEFAULT_CHECKS = ImmutableList.of(
new ExecuteStatementCheck(METASTORE_CHECK_NAME, "list streams; list tables; list queries;"),
new ExecuteStatementCheck(KAFKA_CHECK_NAME, "list topics extended;")
);

private final SimpleKsqlClient ksqlClient;
private final URI serverEndpoint;

public HealthCheckAgent(
final SimpleKsqlClient ksqlClient,
final KsqlRestConfig restConfig
) {
this.ksqlClient = Objects.requireNonNull(ksqlClient, "ksqlClient");
this.serverEndpoint = getServerAddress(restConfig);
}

public HealthCheckResponse checkHealth() {
final Map<String, HealthCheckResponseDetail> results = DEFAULT_CHECKS.stream()
.collect(Collectors.toMap(
Check::getName,
check -> check.check(ksqlClient, serverEndpoint)
));
final boolean allHealthy = results.values().stream()
.allMatch(HealthCheckResponseDetail::getIsHealthy);
return new HealthCheckResponse(allHealthy, results);
}

private static URI getServerAddress(final KsqlRestConfig restConfig) {
final List<String> listeners = restConfig.getList(RestConfig.LISTENERS_CONFIG);
final String address = listeners.stream()
.map(String::trim)
.findFirst()
.orElseThrow(() -> invalidAddressException(listeners, "value cannot be empty"));

try {
return new URL(address).toURI();
} catch (final Exception e) {
throw invalidAddressException(listeners, e.getMessage());
}
}

private static RuntimeException invalidAddressException(
final List<String> serverAddresses,
final String message
) {
return new ConfigException(RestConfig.LISTENERS_CONFIG, serverAddresses, message);
}

private interface Check {
String getName();

HealthCheckResponseDetail check(SimpleKsqlClient ksqlClient, URI serverEndpoint);
}

private static class ExecuteStatementCheck implements Check {
private final String name;
private final String ksqlStatement;

ExecuteStatementCheck(final String name, final String ksqlStatement) {
this.name = Objects.requireNonNull(name, "name");
this.ksqlStatement = Objects.requireNonNull(ksqlStatement, "ksqlStatement");
}

@Override
public String getName() {
return name;
}

@Override
public HealthCheckResponseDetail check(
final SimpleKsqlClient ksqlClient,
final URI serverEndpoint
) {
final RestResponse<KsqlEntityList> response =
ksqlClient.makeKsqlRequest(serverEndpoint, ksqlStatement);
return new HealthCheckResponseDetail(response.isSuccessful());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.confluent.ksql.rest.server.computation.StatementExecutor;
import io.confluent.ksql.rest.server.context.KsqlRestServiceContextBinder;
import io.confluent.ksql.rest.server.filters.KsqlAuthorizationFilter;
import io.confluent.ksql.rest.server.resources.HealthCheckResource;
import io.confluent.ksql.rest.server.resources.KsqlConfigurable;
import io.confluent.ksql.rest.server.resources.KsqlExceptionMapper;
import io.confluent.ksql.rest.server.resources.KsqlResource;
Expand Down Expand Up @@ -201,6 +202,7 @@ public void setupResources(final Configurable<?> config, final KsqlRestConfig ap
config.register(statusResource);
config.register(ksqlResource);
config.register(streamedQueryResource);
config.register(HealthCheckResource.create(ksqlResource, serviceContext, this.config));
config.register(new KsqlExceptionMapper());
config.register(new ServerStateDynamicBinding(serverState));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ public class KsqlRestConfig extends RestConfig {
"Whether or not to set KsqlUncaughtExceptionHandler as the UncaughtExceptionHandler "
+ "for all threads in the application (this can be overridden). Default is false.";

public static final String KSQL_HEALTHCHECK_INTERVAL_MS_CONFIG =
KSQL_CONFIG_PREFIX + "healthcheck.interval.ms";
private static final String KSQL_HEALTHCHECK_INTERVAL_MS_DOC =
"Minimum time between consecutive health check evaluations. Health check queries before "
+ "the interval has elapsed will receive cached responses.";

private static final ConfigDef CONFIG_DEF;

static {
Expand Down Expand Up @@ -102,14 +108,20 @@ public class KsqlRestConfig extends RestConfig {
KSQL_SERVER_PRECONDITIONS,
Type.LIST,
"",
Importance.LOW,
KSQL_SERVER_PRECONDITIONS_DOC
Importance.LOW,
KSQL_SERVER_PRECONDITIONS_DOC
).define(
KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER,
ConfigDef.Type.BOOLEAN,
false,
Importance.LOW,
KSQL_SERVER_UNCAUGHT_EXCEPTION_HANDLER_DOC
).define(
KSQL_HEALTHCHECK_INTERVAL_MS_CONFIG,
Type.LONG,
5000L,
Importance.LOW,
KSQL_HEALTHCHECK_INTERVAL_MS_DOC
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.rest.server.filters;

import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.server.resources.HealthCheckResource;
import io.confluent.ksql.rest.server.resources.ServerMetadataResource;
import io.confluent.ksql.security.KsqlAuthorizationProvider;
import java.lang.reflect.Method;
Expand All @@ -39,8 +40,10 @@
public class KsqlAuthorizationFilter implements ContainerRequestFilter {
private static final Logger log = LoggerFactory.getLogger(KsqlAuthorizationFilter.class);

private static final Set<String> PATHS_WITHOUT_AUTHORIZATION =
getPathsFrom(ServerMetadataResource.class);
private static final Set<String> PATHS_WITHOUT_AUTHORIZATION = getPathsFrom(
ServerMetadataResource.class,
HealthCheckResource.class
);

private final KsqlAuthorizationProvider authorizationProvider;

Expand Down Expand Up @@ -75,17 +78,20 @@ private boolean requiresAuthorization(final String path) {
return !PATHS_WITHOUT_AUTHORIZATION.contains(path);
}

private static Set<String> getPathsFrom(final Class<?> resourceClass) {
private static Set<String> getPathsFrom(final Class<?> ...resourceClass) {

final Set<String> paths = new HashSet<>();
final String mainPath = StringUtils.stripEnd(
resourceClass.getAnnotation(Path.class).value(), "/"
);
for (final Class<?> clazz : resourceClass) {
final String mainPath = StringUtils.stripEnd(
clazz.getAnnotation(Path.class).value(), "/"
);

paths.add(mainPath);
for (Method m : resourceClass.getMethods()) {
if (m.isAnnotationPresent(Path.class)) {
paths.add(mainPath + "/"
+ StringUtils.strip(m.getAnnotation(Path.class).value(), "/"));
paths.add(mainPath);
for (Method m : clazz.getMethods()) {
if (m.isAnnotationPresent(Path.class)) {
paths.add(mainPath + "/"
+ StringUtils.strip(m.getAnnotation(Path.class).value(), "/"));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.resources;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.confluent.ksql.rest.entity.HealthCheckResponse;
import io.confluent.ksql.rest.entity.Versions;
import io.confluent.ksql.rest.healthcheck.HealthCheckAgent;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.services.ServerInternalKsqlClient;
import io.confluent.ksql.services.ServiceContext;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

@Path("/healthcheck")
@Produces({Versions.KSQL_V1_JSON, MediaType.APPLICATION_JSON})
public class HealthCheckResource {

private static final Boolean KEY = Boolean.TRUE;

private final LoadingCache<Boolean, HealthCheckResponse> responseCache;

@VisibleForTesting
HealthCheckResource(
final HealthCheckAgent healthCheckAgent,
final Duration healthCheckInterval
) {
Objects.requireNonNull(healthCheckAgent, "healthCheckAgent");
Objects.requireNonNull(healthCheckInterval, "healthCheckInterval");
this.responseCache = createResponseCache(healthCheckAgent, healthCheckInterval);
}

@GET
public Response checkHealth() {
return Response.ok(getResponse()).build();
}

private HealthCheckResponse getResponse() {
// This calls healthCheckAgent.checkHealth() if the cached result is expired
return responseCache.getUnchecked(KEY);
}

public static HealthCheckResource create(
final KsqlResource ksqlResource,
final ServiceContext serviceContext,
final KsqlRestConfig restConfig
) {
return new HealthCheckResource(
new HealthCheckAgent(
new ServerInternalKsqlClient(ksqlResource, serviceContext),
restConfig),
Duration.ofMillis(restConfig.getLong(KsqlRestConfig.KSQL_HEALTHCHECK_INTERVAL_MS_CONFIG))
);
}

private static LoadingCache<Boolean, HealthCheckResponse> createResponseCache(
final HealthCheckAgent healthCheckAgent,
final Duration cacheDuration
) {
final CacheLoader<Boolean, HealthCheckResponse> loader =
new CacheLoader<Boolean, HealthCheckResponse>() {
@Override
public HealthCheckResponse load(@Nonnull final Boolean key) {
if (!key.equals(KEY)) {
throw new IllegalArgumentException("Unexpected response cache key: " + key);
}
return healthCheckAgent.checkHealth();
}
};
return CacheBuilder.newBuilder()
.expireAfterWrite(cacheDuration.toMillis(), TimeUnit.MILLISECONDS)
.build(loader);
}
}
Loading

0 comments on commit 2308686

Please sign in to comment.