Skip to content

Commit

Permalink
feat: Expose Vert.x metrics (#5340)
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox authored May 21, 2020
1 parent 1bab911 commit e82f762
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 2 deletions.
6 changes: 6 additions & 0 deletions ksqldb-rest-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
<version>${vertx.version}</version>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-dropwizard-metrics</artifactId>
<version>${vertx.version}</version>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.impl;

import io.vertx.ext.dropwizard.Match;
import io.vertx.ext.dropwizard.MatchType;
import java.util.ArrayList;
import java.util.List;

public final class MonitoredEndpoints {

private MonitoredEndpoints() {
}

/**
* @return List of endpoint matches that we're going to provide metrics for
*/
public static List<Match> getMonitoredEndpoints() {
final List<Match> matches = new ArrayList<>();
matches.add(new Match().setValue("/ksql"));
matches.add(new Match().setValue("/ksql/terminate"));
matches.add(new Match().setValue("/query"));
matches.add(new Match().setValue("/query-stream"));
matches.add(new Match().setValue("/inserts-stream"));
matches.add(new Match().setValue("/close-query"));
matches.add(new Match().setValue("/ws/query.*").setType(MatchType.REGEX).setAlias("ws-query"));
return matches;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.confluent.ksql.api.auth.KsqlAuthorizationProviderHandler;
import io.confluent.ksql.api.impl.DefaultKsqlSecurityContextProvider;
import io.confluent.ksql.api.impl.KsqlSecurityContextProvider;
import io.confluent.ksql.api.impl.MonitoredEndpoints;
import io.confluent.ksql.api.server.Server;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.engine.KsqlEngine;
Expand Down Expand Up @@ -100,6 +101,8 @@
import io.confluent.ksql.version.metrics.collector.KsqlModuleType;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.ext.dropwizard.DropwizardMetricsOptions;
import io.vertx.ext.dropwizard.Match;
import java.io.Console;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
Expand Down Expand Up @@ -234,8 +237,10 @@ public static SourceName getCommandsStreamName() {
this.heartbeatAgent = requireNonNull(heartbeatAgent, "heartbeatAgent");
this.lagReportingAgent = requireNonNull(lagReportingAgent, "lagReportingAgent");
this.vertx = Vertx.vertx(
new VertxOptions().setMaxWorkerExecuteTimeUnit(TimeUnit.MILLISECONDS)
.setMaxWorkerExecuteTime(Long.MAX_VALUE));
new VertxOptions()
.setMaxWorkerExecuteTimeUnit(TimeUnit.MILLISECONDS)
.setMaxWorkerExecuteTime(Long.MAX_VALUE)
.setMetricsOptions(setUpHttpMetrics(ksqlConfig)));
this.vertx.exceptionHandler(t -> log.error("Unhandled exception in Vert.x", t));

this.serverInfoResource = new ServerInfoResource(serviceContext, ksqlConfigNoPort);
Expand Down Expand Up @@ -845,6 +850,7 @@ private static Optional<AuthenticationPlugin> loadAuthenticationPlugin(
}

private void displayWelcomeMessage() {

final Console console = System.console();
if (console == null) {
return;
Expand Down Expand Up @@ -960,6 +966,18 @@ private int resolvePort(final URL listener) {
new IllegalStateException("Failed resolve port for listener: " + listener));
}

private static DropwizardMetricsOptions setUpHttpMetrics(final KsqlConfig ksqlConfig) {
final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
final DropwizardMetricsOptions metricsOptions = new DropwizardMetricsOptions()
.setJmxEnabled(true).setBaseName("_confluent-ksql-" + serviceId)
.setJmxDomain("io.confluent.ksql.metrics");
final List<Match> matches = MonitoredEndpoints.getMonitoredEndpoints();
for (Match match : matches) {
metricsOptions.addMonitoredHttpServerUri(match);
}
return metricsOptions;
}

private static KsqlRestConfig injectPathsWithoutAuthentication(final KsqlRestConfig restConfig) {
final Set<String> authenticationSkipPaths = new HashSet<>(
restConfig.getList(KsqlRestConfig.AUTHENTICATION_SKIP_PATHS_CONFIG)
Expand Down

0 comments on commit e82f762

Please sign in to comment.