Skip to content

Commit

Permalink
feat(executor): Allow SourceDescription returning executors to return…
Browse files Browse the repository at this point in the history
… cluster wide stream stats (#7252)

* feat: add random-beans libraries

These give a simple implementation of object mother pattern. Again
this should help us simplify testing a bit

* refactor(executor): turn RemoteDataAugmenter into an Executor

There is really no reason for this class to handle the transform.
Instea, let each call site ow modify the results returned from
RemoteHostExecutor in situ as they please.

It's fine to return the first KsqlEntity always because we run the
executor on singular statements, so KsqlEntityList
always has length 1

* feat(executor): add an executor for fetching remote source descriptions

* feat(executor): encapsulate cluster wide query metrics

ClusterQueryStats encapsulates the cluster wide
query stats for the entire cluster. It comes with one
constructor for assembling the stats divided by host

* fix(test): get to the bottom of the test flakes

A few problems fixed in this commit:
1. some tests were missing the correct mockito test runner - this
could cause trouble if some of the code called out to by these uses
mockito mocks
2. rules need to be public and annotated, otherwise they cause races
3. remove static declaration of side effects - also caused races

* feat(executor): collate cluster wide source stats on all ListSources

This means SourceDescription queries will be able to give you a per-node
breakdown of stats available on the new fields:
- clusterStatistics
- clusterErrorStats
  • Loading branch information
swist authored Mar 24, 2021
1 parent b6d80dd commit 0b30ed9
Show file tree
Hide file tree
Showing 21 changed files with 708 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

package io.confluent.ksql.metrics;

import static com.google.common.collect.ImmutableMap.toImmutableMap;

import com.google.common.base.Functions;
import com.google.common.collect.ImmutableMap;
import io.confluent.common.utils.Time;
import io.confluent.ksql.util.AppInfo;
import io.confluent.ksql.util.KsqlConfig;
Expand All @@ -24,7 +28,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -43,11 +46,9 @@
*/
@SuppressWarnings("ClassDataAbstractionCoupling")
public final class MetricCollectors {
private static final String KSQL_JMX_PREFIX = "io.confluent.ksql.metrics";

public static final String RESOURCE_LABEL_PREFIX =
CommonClientConfigs.METRICS_CONTEXT_PREFIX + "resource.";
private static final String KSQL_RESOURCE_TYPE = "ksql";

public static final String RESOURCE_LABEL_TYPE =
RESOURCE_LABEL_PREFIX + "type";
public static final String RESOURCE_LABEL_VERSION =
Expand All @@ -58,16 +59,16 @@ public final class MetricCollectors {
RESOURCE_LABEL_PREFIX + "cluster.id";
public static final String RESOURCE_LABEL_KAFKA_CLUSTER_ID =
RESOURCE_LABEL_PREFIX + "kafka.cluster.id";

private static final String KSQL_JMX_PREFIX = "io.confluent.ksql.metrics";
private static final String KSQL_RESOURCE_TYPE = "ksql";
private static final Time time = new io.confluent.common.utils.SystemTime();
private static Map<String, MetricCollector> collectorMap;
private static Metrics metrics;

static {
initialize();
}

private static final Time time = new io.confluent.common.utils.SystemTime();

private MetricCollectors() {
}

Expand Down Expand Up @@ -156,7 +157,7 @@ static void remove(final String id) {
collectorMap.remove(id);
}

public static Map<String, TopicSensors.Stat> getStatsFor(
public static ImmutableMap<String, TopicSensors.Stat> getStatsFor(
final String topic, final boolean isError) {
return getAggregateMetrics(
collectorMap.values().stream()
Expand All @@ -171,18 +172,14 @@ public static String getAndFormatStatsFor(final String topic, final boolean isEr
isError ? "last-failed" : "last-message");
}

static Map<String, TopicSensors.Stat> getAggregateMetrics(
static ImmutableMap<String, TopicSensors.Stat> getAggregateMetrics(
final List<TopicSensors.Stat> allStats
) {
final Map<String, TopicSensors.Stat> results = new TreeMap<>();
allStats.forEach(stat -> {
results.computeIfAbsent(
stat.name(),
k -> new TopicSensors.Stat(stat.name(), 0, stat.getTimestamp())
);
results.get(stat.name()).aggregate(stat.getValue());
});
return results;
return allStats.stream().collect(toImmutableMap(
TopicSensors.Stat::name,
Functions.identity(),
(first, other) -> first.aggregate(other.getValue())
));
}

public static String format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;

public class TopicSensors<R> {
public final class TopicSensors<R> {

private final String topic;
private final List<SensorMetric<R>> sensors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

package io.confluent.ksql.util;

import com.fasterxml.jackson.annotation.JsonValue;
import com.google.errorprone.annotations.Immutable;
import java.util.Objects;
import org.apache.kafka.streams.state.HostInfo;


/**
Expand All @@ -25,15 +27,19 @@
*/
@Immutable
public class KsqlHostInfo {

private final String host;

private final int port;

public KsqlHostInfo(final String host, final int port) {
this.host = host;
this.port = port;
}

public static KsqlHostInfo fromHostInfo(final HostInfo hostInfo) {
return new KsqlHostInfo(hostInfo.host(), hostInfo.port());
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down Expand Up @@ -61,6 +67,7 @@ public int port() {
}

@Override
@JsonValue
public String toString() {
return "KsqlHostInfo{host='" + this.host + '\'' + ", port=" + this.port + '}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Optional;

@Immutable
public class ShowColumns extends Statement {
public class ShowColumns extends StatementWithExtendedClause {

private final SourceName table;
private final boolean isExtended;
Expand All @@ -38,7 +38,7 @@ public ShowColumns(
final SourceName table,
final boolean isExtended
) {
super(location);
super(location, isExtended);
this.table = requireNonNull(table, "table");
this.isExtended = isExtended;
}
Expand Down
8 changes: 8 additions & 0 deletions ksqldb-rest-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.jeasy</groupId>
<artifactId>easy-random-core</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@

package io.confluent.ksql.rest.entity;

import static com.google.common.collect.ImmutableMap.toImmutableMap;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.metrics.TopicSensors;
import io.confluent.ksql.rest.util.EntityUtil;
import io.confluent.ksql.schema.utils.FormatOptions;
import io.confluent.ksql.util.KsqlHostInfo;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.admin.TopicDescription;

Expand All @@ -36,8 +41,32 @@ public static SourceDescription create(
final List<RunningQuery> readQueries,
final List<RunningQuery> writeQueries,
final Optional<TopicDescription> topicDescription,
final List<QueryOffsetSummary> offsetSummaries,
final List<QueryOffsetSummary> queryOffsetSummaries,
final List<String> sourceConstraints
) {
return create(
dataSource,
extended,
readQueries,
writeQueries,
topicDescription,
queryOffsetSummaries,
sourceConstraints,
ImmutableMap.of(),
ImmutableMap.of()
);
}

public static SourceDescription create(
final DataSource dataSource,
final boolean extended,
final List<RunningQuery> readQueries,
final List<RunningQuery> writeQueries,
final Optional<TopicDescription> topicDescription,
final List<QueryOffsetSummary> queryOffsetSummaries,
final List<String> sourceConstraints,
final ImmutableMap<KsqlHostInfo, ImmutableMap<String, TopicSensors.Stat>> stats,
final ImmutableMap<KsqlHostInfo, ImmutableMap<String, TopicSensors.Stat>> errorStats
) {
return new SourceDescription(
dataSource.getName().toString(FormatOptions.noEscape()),
Expand Down Expand Up @@ -66,7 +95,19 @@ public static SourceDescription create(
topicDescription.map(td -> td.partitions().size()).orElse(0),
topicDescription.map(td -> td.partitions().get(0).replicas().size()).orElse(0),
dataSource.getSqlExpression(),
offsetSummaries,
sourceConstraints);
queryOffsetSummaries,
sourceConstraints,
stats.entrySet()
.stream()
.collect(toImmutableMap(
(e) -> new KsqlHostInfoEntity(e.getKey()),
Map.Entry::getValue
)),
errorStats.entrySet()
.stream()
.collect(toImmutableMap(
(e) -> new KsqlHostInfoEntity(e.getKey()),
Map.Entry::getValue
)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.execution;

import static com.google.common.collect.ImmutableMap.toImmutableMap;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.metrics.TopicSensors;
import io.confluent.ksql.rest.entity.SourceDescription;
import io.confluent.ksql.util.KsqlHostInfo;
import java.util.Collection;
import java.util.Map;

public final class ClusterQueryStats {
private final ImmutableMap<KsqlHostInfo, ImmutableMap<String, TopicSensors.Stat>> stats;
private final ImmutableMap<KsqlHostInfo, ImmutableMap<String, TopicSensors.Stat>> errors;
private final String sourceName;

private ClusterQueryStats(
final ImmutableMap<KsqlHostInfo, ImmutableMap<String, TopicSensors.Stat>> stats,
final ImmutableMap<KsqlHostInfo, ImmutableMap<String, TopicSensors.Stat>> errors,
final String sourceName
) {
this.stats = stats;
this.errors = errors;
this.sourceName = sourceName;
}

public static ClusterQueryStats create(
final KsqlHostInfo localHostInfo,
final SourceDescription localSourceDescription,
final Collection<RemoteSourceDescription> remoteSourceDescriptions
) {
final Map<KsqlHostInfo, SourceDescription> rds = remoteSourceDescriptions.stream()
.collect(toImmutableMap(
RemoteSourceDescription::getKsqlHostInfo,
RemoteSourceDescription::getSourceDescription
));


final ImmutableMap<KsqlHostInfo, ImmutableMap<String, TopicSensors.Stat>> remoteStats = rds
.entrySet()
.stream()
.collect(
toImmutableMap(
Map.Entry::getKey,
e -> ImmutableMap.copyOf(e.getValue().getStatisticsMap())
)
);
final ImmutableMap<KsqlHostInfo, ImmutableMap<String, TopicSensors.Stat>> remoteErrors = rds
.entrySet()
.stream()
.collect(
toImmutableMap(
Map.Entry::getKey,
e -> ImmutableMap.copyOf(e.getValue().getErrorStatsMap())
)
);


final ImmutableMap<KsqlHostInfo, ImmutableMap<String, TopicSensors.Stat>> stats = ImmutableMap
.<KsqlHostInfo, ImmutableMap<String, TopicSensors.Stat>>builder()
.put(localHostInfo, localSourceDescription.getStatisticsMap())
.putAll(remoteStats)
.build();

final ImmutableMap<KsqlHostInfo, ImmutableMap<String, TopicSensors.Stat>> errors = ImmutableMap
.<KsqlHostInfo, ImmutableMap<String, TopicSensors.Stat>>builder()
.put(localHostInfo, localSourceDescription.getErrorStatsMap())
.putAll(remoteErrors)
.build();

return new ClusterQueryStats(stats, errors, localSourceDescription.getName());
}

public ImmutableMap<KsqlHostInfo, ImmutableMap<String, TopicSensors.Stat>> getStats() {
return stats;
}

public ImmutableMap<KsqlHostInfo, ImmutableMap<String, TopicSensors.Stat>> getErrors() {
return errors;
}

public String getSourceName() {
return sourceName;
}
}
Loading

0 comments on commit 0b30ed9

Please sign in to comment.