Skip to content

Commit

Permalink
fix: Make pull query metrics apply only to pull and not also push (#6944
Browse files Browse the repository at this point in the history
)

* fix: Make pull query metrics apply only to pull and not also push
  • Loading branch information
AlanConfluent authored Mar 11, 2021
1 parent 42496c1 commit 1db18b3
Show file tree
Hide file tree
Showing 15 changed files with 214 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.RateLimiter;
import io.confluent.ksql.api.server.MetricsCallbackHolder;
import io.confluent.ksql.api.server.QueryHandle;
import io.confluent.ksql.api.spi.QueryPublisher;
import io.confluent.ksql.config.SessionConfig;
Expand Down Expand Up @@ -58,7 +59,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.Time;

public class QueryEndpoint {

Expand Down Expand Up @@ -99,17 +99,17 @@ public QueryPublisher createQueryPublisher(
final JsonObject properties,
final Context context,
final WorkerExecutor workerExecutor,
final ServiceContext serviceContext) {
final long startTimeNanos = Time.SYSTEM.nanoseconds();
final ServiceContext serviceContext,
final MetricsCallbackHolder metricsCallbackHolder) {
// Must be run on worker as all this stuff is slow
VertxUtils.checkIsWorker();

final ConfiguredStatement<Query> statement = createStatement(sql, properties.getMap());

if (statement.getStatement().isPullQuery()) {
return createPullQueryPublisher(
context, serviceContext, statement, pullQueryMetrics,
startTimeNanos, workerExecutor);
context, serviceContext, statement, pullQueryMetrics, workerExecutor,
metricsCallbackHolder);
} else {
return createPushQueryPublisher(context, serviceContext, statement, workerExecutor);
}
Expand Down Expand Up @@ -146,9 +146,17 @@ private QueryPublisher createPullQueryPublisher(
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final long startTimeNanos,
final WorkerExecutor workerExecutor
final WorkerExecutor workerExecutor,
final MetricsCallbackHolder metricsCallbackHolder
) {
// First thing, set the metrics callback so that it gets called, even if we hit an error
metricsCallbackHolder.setCallback((requestBytes, responseBytes, startTimeNanos) -> {
pullQueryMetrics.ifPresent(metrics -> {
metrics.recordRequestSize(requestBytes);
metrics.recordResponseSize(responseBytes);
metrics.recordLatency(startTimeNanos);
});
});

final RoutingOptions routingOptions = new PullQueryConfigRoutingOptions(
ksqlConfig,
Expand Down Expand Up @@ -177,9 +185,6 @@ private QueryPublisher createPullQueryPublisher(

result.onCompletionOrException((v, throwable) -> {
decrementer.decrementAtMostOnce();
if (throwable == null) {
pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos));
}
});

final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.server;

/**
* Interface for reporting metrics to a resource. A resource may choose to break things down
* arbitrarily, e.g. /query is used for both push and pull queries so we let the resource
* determine how to report the metrics.
*/
public interface MetricsCallback {

/**
* Called to report metrics when the request is complete, error or success
* @param requestBytes The request bytes
* @param responseBytes The response bytes
* @param startTimeNanos The start time of the request in nanos
*/
void reportMetricsOnCompletion(long requestBytes, long responseBytes, long startTimeNanos);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.server;

import java.util.concurrent.atomic.AtomicReference;

/**
* This class give a resource the opportunity to register a set of particular callbacks based upon
* arbitrary criteria. Once the response is complete, the callback is invoked.
*/
public class MetricsCallbackHolder {

private AtomicReference<MetricsCallback> callbackRef = new AtomicReference<>(null);

public MetricsCallbackHolder() {
}

public void setCallback(final MetricsCallback callback) {
this.callbackRef.set(callback);
}

void reportMetrics(final long requestBytes, final long responseBytes, final long startTimeNanos) {
final MetricsCallback callback = callbackRef.get();
if (callback != null) {
callback.reportMetricsOnCompletion(requestBytes, responseBytes, startTimeNanos);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import io.confluent.ksql.api.auth.ApiSecurityContext;
import io.confluent.ksql.api.auth.DefaultApiSecurityContext;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
Expand Down Expand Up @@ -55,7 +54,7 @@ static <T> void handleOldApiRequest(
final Server server,
final RoutingContext routingContext,
final Class<T> requestClass,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final Optional<MetricsCallbackHolder> metricsCallbackHolder,
final BiFunction<T, ApiSecurityContext, CompletableFuture<EndpointResponse>> requestor) {
final long startTimeNanos = Time.SYSTEM.nanoseconds();
final T requestObject;
Expand All @@ -69,28 +68,25 @@ static <T> void handleOldApiRequest(
} else {
requestObject = null;
}
pullQueryMetrics
.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordRequestSize(
routingContext.request().bytesRead()));
final CompletableFuture<EndpointResponse> completableFuture = requestor
.apply(requestObject, DefaultApiSecurityContext.create(routingContext));
completableFuture.thenAccept(endpointResponse -> {
handleOldApiResponse(
server, routingContext, endpointResponse, pullQueryMetrics, startTimeNanos);
server, routingContext, endpointResponse, metricsCallbackHolder, startTimeNanos);
}).exceptionally(t -> {
if (t instanceof CompletionException) {
t = t.getCause();
}
handleOldApiResponse(
server, routingContext, mapException(t), pullQueryMetrics, startTimeNanos);
server, routingContext, mapException(t), metricsCallbackHolder, startTimeNanos);
return null;
});
}

static void handleOldApiResponse(
final Server server, final RoutingContext routingContext,
final EndpointResponse endpointResponse,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final Optional<MetricsCallbackHolder> metricsCallbackHolder,
final long startTimeNanos
) {
final HttpServerResponse response = routingContext.response();
Expand All @@ -111,7 +107,8 @@ static void handleOldApiResponse(
return;
}
response.putHeader(TRANSFER_ENCODING, CHUNKED_ENCODING);
streamEndpointResponse(server, routingContext, streamingOutput);
streamEndpointResponse(server, routingContext, streamingOutput, metricsCallbackHolder,
startTimeNanos);
} else {
if (endpointResponse.getEntity() == null) {
response.end();
Expand All @@ -124,18 +121,15 @@ static void handleOldApiResponse(
}
response.end(responseBody);
}
reportMetrics(routingContext, metricsCallbackHolder, startTimeNanos);
}
pullQueryMetrics
.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordResponseSize(
routingContext.response().bytesWritten()));
pullQueryMetrics.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics
.recordLatency(startTimeNanos));

}

private static void streamEndpointResponse(final Server server,
final RoutingContext routingContext,
final StreamingOutput streamingOutput) {
final StreamingOutput streamingOutput,
final Optional<MetricsCallbackHolder> metricsCallbackHolder,
final long startTimeNanos) {
final WorkerExecutor workerExecutor = server.getWorkerExecutor();
final VertxCompletableFuture<Void> vcf = new VertxCompletableFuture<>();
workerExecutor.executeBlocking(promise -> {
Expand All @@ -162,6 +156,21 @@ private static void streamEndpointResponse(final Server server,
}
}
}, vcf);
vcf.handle((v, throwable) -> {
reportMetrics(routingContext, metricsCallbackHolder, startTimeNanos);
return null;
});
}

private static void reportMetrics(
final RoutingContext routingContext,
final Optional<MetricsCallbackHolder> metricsCallbackHolder,
final long startTimeNanos
) {
metricsCallbackHolder.ifPresent(mc -> mc.reportMetrics(
routingContext.request().bytesRead(),
routingContext.response().bytesWritten(),
startTimeNanos));
}

public static EndpointResponse mapException(final Throwable exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.vertx.ext.web.RoutingContext;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -76,8 +77,11 @@ public void handle(final RoutingContext routingContext) {
return;
}

final MetricsCallbackHolder metricsCallbackHolder = new MetricsCallbackHolder();
final long startTimeNanos = Time.SYSTEM.nanoseconds();
endpoints.createQueryPublisher(queryStreamArgs.get().sql, queryStreamArgs.get().properties,
context, server.getWorkerExecutor(), DefaultApiSecurityContext.create(routingContext))
context, server.getWorkerExecutor(), DefaultApiSecurityContext.create(routingContext),
metricsCallbackHolder)
.thenAccept(queryPublisher -> {

final QueryResponseMetadata metadata;
Expand All @@ -88,7 +92,13 @@ public void handle(final RoutingContext routingContext) {
queryPublisher.getColumnTypes());

// When response is complete, publisher should be closed
routingContext.response().endHandler(v -> queryPublisher.close());
routingContext.response().endHandler(v -> {
queryPublisher.close();
metricsCallbackHolder.reportMetrics(
routingContext.request().bytesRead(),
routingContext.response().bytesWritten(),
startTimeNanos);
});
} else {
final PushQueryHolder query = connectionQueryManager
.createApiQuery(queryPublisher, routingContext.request());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,17 @@ private void handleQueryRequest(final RoutingContext routingContext) {

final CompletableFuture<Void> connectionClosedFuture = new CompletableFuture<>();
routingContext.request().connection().closeHandler(v -> connectionClosedFuture.complete(null));
handleOldApiRequest(server, routingContext, KsqlRequest.class, pullQueryMetrics,
final MetricsCallbackHolder metricsCallbackHolder = new MetricsCallbackHolder();
handleOldApiRequest(server, routingContext, KsqlRequest.class,
Optional.of(metricsCallbackHolder),
(request, apiSecurityContext) ->
endpoints
.executeQueryRequest(
request, server.getWorkerExecutor(), connectionClosedFuture,
DefaultApiSecurityContext.create(routingContext),
isInternalRequest(routingContext),
getContentType(routingContext)
getContentType(routingContext),
metricsCallbackHolder
)

);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.confluent.ksql.api.auth.ApiSecurityContext;
import io.confluent.ksql.api.server.InsertResult;
import io.confluent.ksql.api.server.InsertsStreamSubscriber;
import io.confluent.ksql.api.server.MetricsCallbackHolder;
import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.entity.HeartbeatMessage;
Expand Down Expand Up @@ -51,7 +52,8 @@ public interface Endpoints {
* @return A CompletableFuture representing the future result of the operation
*/
CompletableFuture<QueryPublisher> createQueryPublisher(String sql, JsonObject properties,
Context context, WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext);
Context context, WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext,
MetricsCallbackHolder metricsCallbackHolder);

/**
* Create a subscriber which will receive a stream of inserts from the API server and process
Expand Down Expand Up @@ -83,7 +85,8 @@ CompletableFuture<EndpointResponse> executeQueryRequest(
KsqlRequest request, WorkerExecutor workerExecutor,
CompletableFuture<Void> connectionClosedFuture, ApiSecurityContext apiSecurityContext,
Optional<Boolean> isInternalRequest,
KsqlMediaType mediaType);
KsqlMediaType mediaType,
MetricsCallbackHolder metricsCallbackHolder);

CompletableFuture<EndpointResponse> executeInfo(ApiSecurityContext apiSecurityContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.api.impl.QueryEndpoint;
import io.confluent.ksql.api.server.InsertResult;
import io.confluent.ksql.api.server.InsertsStreamSubscriber;
import io.confluent.ksql.api.server.MetricsCallbackHolder;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.api.spi.QueryPublisher;
import io.confluent.ksql.engine.KsqlEngine;
Expand Down Expand Up @@ -142,7 +143,8 @@ public CompletableFuture<QueryPublisher> createQueryPublisher(final String sql,
final JsonObject properties,
final Context context,
final WorkerExecutor workerExecutor,
final ApiSecurityContext apiSecurityContext) {
final ApiSecurityContext apiSecurityContext,
final MetricsCallbackHolder metricsCallbackHolder) {
final KsqlSecurityContext ksqlSecurityContext = ksqlSecurityContextProvider
.provide(apiSecurityContext);
return executeOnWorker(() -> {
Expand All @@ -155,7 +157,8 @@ public CompletableFuture<QueryPublisher> createQueryPublisher(final String sql,
properties,
context,
workerExecutor,
ksqlSecurityContext.getServiceContext());
ksqlSecurityContext.getServiceContext(),
metricsCallbackHolder);
} finally {
ksqlSecurityContext.getServiceContext().close();
}
Expand Down Expand Up @@ -194,15 +197,17 @@ public CompletableFuture<EndpointResponse> executeQueryRequest(
final CompletableFuture<Void> connectionClosedFuture,
final ApiSecurityContext apiSecurityContext,
final Optional<Boolean> isInternalRequest,
final KsqlMediaType mediaType
final KsqlMediaType mediaType,
final MetricsCallbackHolder metricsCallbackHolder
) {
return executeOldApiEndpointOnWorker(apiSecurityContext,
ksqlSecurityContext -> streamedQueryResource.streamQuery(
ksqlSecurityContext,
request,
connectionClosedFuture,
isInternalRequest,
mediaType
mediaType,
metricsCallbackHolder
), workerExecutor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,9 @@ public synchronized void subscribe(final Subscriber<Collection<StreamedRow>> sub
true
);

result.onCompletionOrException((v, t) -> decrementer.decrementAtMostOnce());
result.onCompletion(v -> {
pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos));
});
result.onCompletionOrException((v, throwable) -> {
decrementer.decrementAtMostOnce();
if (throwable == null) {
pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos));
}
pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos));
});

final PullQuerySubscription subscription = new PullQuerySubscription(
Expand Down
Loading

0 comments on commit 1db18b3

Please sign in to comment.