Skip to content

Commit

Permalink
seperate tests since metrics get mixed
Browse files Browse the repository at this point in the history
  • Loading branch information
vpapavas committed Sep 27, 2021
1 parent 8ea8a6f commit 6be273c
Show file tree
Hide file tree
Showing 3 changed files with 545 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,13 @@

package io.confluent.ksql.rest.integration;

import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.VALID_USER1;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.api.utils.QueryResponse;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
Expand All @@ -35,36 +31,21 @@
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.test.util.secure.Credentials;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PageViewDataProvider;
import io.confluent.ksql.util.VertxCompletableFuture;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import java.util.List;
import java.util.Optional;
import javax.ws.rs.HEAD;
import javax.ws.rs.core.MediaType;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.Timeout;

@Ignore
public class PullQueryMetricsFunctionalTest {

private static final PageViewDataProvider PAGE_VIEWS_PROVIDER = new PageViewDataProvider();
Expand All @@ -75,7 +56,6 @@ public class PullQueryMetricsFunctionalTest {
private static final String AGG_TABLE = "AGG_TABLE";
private static final String AN_AGG_KEY = "USER_1";
private static final String A_STREAM_KEY = "PAGE_1";
private static final Credentials SUPER_USER = VALID_USER1;

private static final PhysicalSchema AGGREGATE_SCHEMA = PhysicalSchema.from(
LogicalSchema.builder()
Expand Down Expand Up @@ -184,8 +164,6 @@ public class PullQueryMetricsFunctionalTest {
@Rule
public final Timeout timeout = Timeout.seconds(60);

private Vertx vertx;
private WebClient client;
private Metrics metrics;

@BeforeClass
Expand Down Expand Up @@ -220,22 +198,6 @@ public static void setUpClass() {
@Before
public void setUp() {
metrics = ((KsqlEngine)REST_APP.getEngine()).getEngineMetrics().getMetrics();
vertx = Vertx.vertx();
client = createClient();
}

@After
public void tearDown() {
if (client != null) {
client.close();
}
if (vertx != null) {
vertx.close();
}
}

@AfterClass
public static void classTearDown() {
}

@Test
Expand Down Expand Up @@ -282,120 +244,4 @@ public void shouldVerifyMetrics() {
assertThat(totalRequestsStreamMetric.metricValue(), is(1.0));
assertThat((Double)requestDistributionStreamMetric.metricValue(), greaterThan(1.0));
}

@Test
public void shouldVerifyMetricsHttp2() {
// Given:
final KafkaMetric recordsReturnedTableMetric = metrics.metric(recordsReturnedTable);
final KafkaMetric latencyTableMetric = metrics.metric(latencyTable);
final KafkaMetric responseSizeTableMetric = metrics.metric(responseSizeTable);
final KafkaMetric totalRequestsTableMetric = metrics.metric(totalRequestsTable);
final KafkaMetric requestDistributionTableMetric = metrics.metric(requestDistributionTable);

final KafkaMetric recordsReturnedStreamMetric = metrics.metric(recordsReturnedStream);
final KafkaMetric latencyStreamMetric = metrics.metric(latencyStream);
final KafkaMetric responseSizeStreamMetric = metrics.metric(responseSizeStream);
final KafkaMetric totalRequestsStreamMetric = metrics.metric(totalRequestsStream);
final KafkaMetric requestDistributionStreamMetric = metrics.metric(requestDistributionStream);

// When:
final String sqlTable = "SELECT COUNT, USERID from " + AGG_TABLE + " WHERE USERID='" + AN_AGG_KEY + "';";
QueryResponse queryResponse1 = executeQuery(sqlTable);
assertThat(queryResponse1.rows, hasSize(1));

final String sqlStream = "SELECT * from " + PAGE_VIEW_STREAM + " WHERE PAGEID='" + A_STREAM_KEY + "';";
QueryResponse queryResponse2 = executeQuery(sqlStream);
assertThat(queryResponse2.rows, hasSize(1));

// Then:
assertThat(recordsReturnedTableMetric.metricValue(), is(2.0));
assertThat((Double)latencyTableMetric.metricValue(), greaterThan(1.0));
assertThat((Double)responseSizeTableMetric.metricValue(), greaterThan(1.0));
assertThat(totalRequestsTableMetric.metricValue(), is(2.0));
assertThat((Double)requestDistributionTableMetric.metricValue(), greaterThan(1.0));

assertThat(recordsReturnedStreamMetric.metricValue(), is(2.0));
assertThat((Double)latencyStreamMetric.metricValue(), greaterThan(1.0));
assertThat((Double)responseSizeStreamMetric.metricValue(), greaterThan(1.0));
assertThat(totalRequestsStreamMetric.metricValue(), is(2.0));
assertThat((Double)requestDistributionStreamMetric.metricValue(), greaterThan(1.0));
}

@Test
public void shouldVerifyMetricsWS() {
// Given:
final KafkaMetric recordsReturnedTableMetric = metrics.metric(recordsReturnedTable);
final KafkaMetric latencyTableMetric = metrics.metric(latencyTable);
final KafkaMetric totalRequestsTableMetric = metrics.metric(totalRequestsTable);
final KafkaMetric requestDistributionTableMetric = metrics.metric(requestDistributionTable);

final KafkaMetric recordsReturnedStreamMetric = metrics.metric(recordsReturnedStream);
final KafkaMetric latencyStreamMetric = metrics.metric(latencyStream);
final KafkaMetric totalRequestsStreamMetric = metrics.metric(totalRequestsStream);
final KafkaMetric requestDistributionStreamMetric = metrics.metric(requestDistributionStream);

// When:
RestIntegrationTestUtil.makeWsRequest(
REST_APP.getWsListener(),
"SELECT COUNT, USERID from " + AGG_TABLE + " WHERE USERID='" + AN_AGG_KEY + "';",
Optional.of(MediaType.APPLICATION_JSON),
Optional.of(MediaType.APPLICATION_JSON),
Optional.of(SUPER_USER)
);

RestIntegrationTestUtil.makeWsRequest(
REST_APP.getWsListener(),
"SELECT * from " + PAGE_VIEW_STREAM + " WHERE PAGEID='" + A_STREAM_KEY + "';",
Optional.of(MediaType.APPLICATION_JSON),
Optional.of(MediaType.APPLICATION_JSON),
Optional.of(SUPER_USER)
);

// Then:
assertThat(recordsReturnedTableMetric.metricValue(), is(1.0));
assertThat((Double) latencyTableMetric.metricValue(), greaterThan(1.0));
assertThat(totalRequestsTableMetric.metricValue(), is(1.0));
assertThat((Double) requestDistributionTableMetric.metricValue(), greaterThan(1.0));

assertThat(recordsReturnedStreamMetric.metricValue(), is(1.0));
assertThat((Double) latencyStreamMetric.metricValue(), greaterThan(1.0));
assertThat(totalRequestsStreamMetric.metricValue(), is(1.0));
assertThat((Double) requestDistributionStreamMetric.metricValue(), greaterThan(1.0));
}

private QueryResponse executeQuery(final String sql) {
return executeQueryWithVariables(sql, new JsonObject());
}

private QueryResponse executeQueryWithVariables(final String sql, final JsonObject variables) {
JsonObject properties = new JsonObject();
JsonObject requestBody = new JsonObject()
.put("sql", sql).put("properties", properties).put("sessionVariables", variables);
HttpResponse<Buffer> response = sendRequest("/query-stream", requestBody.toBuffer());
return new QueryResponse(response.bodyAsString());
}

private WebClient createClient() {
WebClientOptions options = new WebClientOptions().
setProtocolVersion(HttpVersion.HTTP_2).setHttp2ClearTextUpgrade(false)
.setDefaultHost("localhost").setDefaultPort(REST_APP.getListeners().get(0).getPort());
return WebClient.create(vertx, options);
}

private HttpResponse<Buffer> sendRequest(final String uri, final Buffer requestBody) {
return sendRequest(client, uri, requestBody);
}

private HttpResponse<Buffer> sendRequest(final WebClient client, final String uri,
final Buffer requestBody) {
VertxCompletableFuture<HttpResponse<Buffer>> requestFuture = new VertxCompletableFuture<>();
client
.post(uri)
.sendBuffer(requestBody, requestFuture);
try {
return requestFuture.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Loading

0 comments on commit 6be273c

Please sign in to comment.