Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding metric-collection for DESCRIBE EXTEND #475

Merged
merged 15 commits into from
Dec 1, 2017
14 changes: 6 additions & 8 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Scanner;
import java.util.*;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -371,7 +366,7 @@ private void listProperties(String statementText) throws IOException {
PropertiesList propertiesList = (PropertiesList) ksqlEntityList.get(0);
propertiesList.getProperties().putAll(restClient.getLocalProperties());
terminal.printKsqlEntityList(
Arrays.asList(propertiesList)
Collections.singletonList(propertiesList)
);
}

Expand All @@ -381,7 +376,10 @@ private void printKsqlResponse(RestResponse<KsqlEntityList> response) throws IOE
boolean noErrorFromServer = true;
for (KsqlEntity entity : ksqlEntities) {
if (entity instanceof ErrorMessageEntity) {
terminal.printErrorMessage(((ErrorMessageEntity) entity).getErrorMessage());
ErrorMessageEntity errorMsg = (ErrorMessageEntity) entity;
terminal.printErrorMessage(errorMsg.getErrorMessage());
LOGGER.error(errorMsg.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't print anything useful as toString() isn't overridden

LOGGER.error(errorMsg.getErrorMessage().getStackTrace().toString().replace(", ", ", \n "));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why don't just log errorMsg.getErrorMessage().toString()? it has the stack trace etc

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops

noErrorFromServer = false;
} else if (entity instanceof CommandStatusEntity &&
(((CommandStatusEntity) entity).getCommandStatus().getStatus() == CommandStatus.Status.ERROR)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,20 @@

package io.confluent.ksql.metrics;

import com.google.common.collect.ImmutableMap;
import io.confluent.common.utils.Time;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Total;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -60,15 +61,19 @@ public class ConsumerCollector implements MetricCollector {
private final Map<String, Counter> topicPartitionCounters = new HashMap<>();
private Metrics metrics;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final?

private String id;
private Time time;

public void configure(Map<String, ?> map) {
String id = (String) map.get(ConsumerConfig.GROUP_ID_CONFIG);
configure(MetricCollectors.addCollector(id, this), id);
if (id == null) id = (String) map.get(ConsumerConfig.CLIENT_ID_CONFIG);
if (id.contains(""))
configure(MetricCollectors.getMetrics(), MetricCollectors.addCollector(id, this), MetricCollectors.getTime());
}

ConsumerCollector configure(final Metrics metrics, final String id) {
ConsumerCollector configure(final Metrics metrics, final String id, final Time time) {
this.id = id;
this.metrics = metrics;
this.time = time;
return this;
}

Expand All @@ -85,87 +90,42 @@ public ConsumerRecords onConsume(ConsumerRecords records) {
@SuppressWarnings("unchecked")
private void collect(ConsumerRecords consumerRecords) {
Stream<ConsumerRecord> stream = StreamSupport.stream(consumerRecords.spliterator(), false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add SupressWarnings("unchecked")

stream.forEach((record) -> topicPartitionCounters.computeIfAbsent(getKey(record.topic().toLowerCase()), k ->
stream.forEach((record) -> topicPartitionCounters.computeIfAbsent(getCounterKey(record.topic().toLowerCase()), k ->
new Counter<>(record.topic().toLowerCase(), buildSensors(k))
).increment(record));
}

private String getKey(String topic) {
private String getCounterKey(String topic) {
return topic;
}

private Map<String, Counter.SensorMetric<ConsumerRecord>> buildSensors(String key) {

HashMap<String, Counter.SensorMetric<ConsumerRecord>> results = new HashMap<>();
HashMap<String, Counter.SensorMetric<ConsumerRecord>> sensors = new HashMap<>();

// Note: synchronized due to metrics registry not handling concurrent add/check-exists activity in a reliable way
synchronized (this.metrics) {
addRateSensor(key, results);
addBandwidthSensor(key, results);
addTotalSensor(key, results);
addSensor(key, "events-per-sec", new Rate(), sensors);
addSensor(key, "total-events", new Total(), sensors);
}
return results;
return sensors;
}

private void addRateSensor(String key, HashMap<String, Counter.SensorMetric<ConsumerRecord>> results) {
String name = "cons-" + key + "-rate-per-sec";
private void addSensor(String key, String metricNameString, MeasurableStat stat, HashMap<String, Counter.SensorMetric<ConsumerRecord>> sensors) {
String name = "cons-" + key + "-" + metricNameString + "-" + id;

//noinspection unchecked
MetricName metricName = new MetricName("consume rate-per-sec", name, "consumer-rate-per-sec", Collections.EMPTY_MAP);
MetricName metricName = new MetricName(metricNameString, "consumer-metrics", "consumer-" + name, ImmutableMap.of("key", key, "id", id));
Sensor existingSensor = metrics.getSensor(name);
Sensor sensor = metrics.sensor(name);

// re-use the existing measurable stats to share between consumers
if (existingSensor == null) {
sensor.add(metricName, new Rate(TimeUnit.SECONDS));
if (existingSensor == null || metrics.metrics().get(metricName) == null) {
sensor.add(metricName, stat);
}

KafkaMetric rate = metrics.metrics().get(metricName);
results.put(metricName.name(), new Counter.SensorMetric<ConsumerRecord>(sensor, rate) {
void record(ConsumerRecord record) {
sensor.record(1);
super.record(record);
}
});
}

private void addBandwidthSensor(String key, HashMap<String, Counter.SensorMetric<ConsumerRecord>> results) {
String name = "cons-" + key + "-bytes-per-sec";

//noinspection unchecked
MetricName metricName = new MetricName("bytes-per-sec", name, "consumer-bytes-per-sec", Collections.EMPTY_MAP);
Sensor existingSensor = metrics.getSensor(name);
Sensor sensor = metrics.sensor(name);

// re-use the existing measurable stats to share between consumers
if (existingSensor == null) {
sensor.add(metricName, new Rate(TimeUnit.SECONDS));
}
KafkaMetric metric = metrics.metrics().get(metricName);

results.put(metricName.name(), new Counter.SensorMetric<ConsumerRecord>(sensor, metric) {
void record(ConsumerRecord record) {
sensor.record(record.serializedValueSize());
super.record(record);
}
});
}

private void addTotalSensor(String key, HashMap<String, Counter.SensorMetric<ConsumerRecord>> sensors) {
String name = "cons-" + key + "-total-events";

//noinspection unchecked
MetricName metricName = new MetricName("total-events", name, "consumer-total-events", Collections.EMPTY_MAP);
Sensor existingSensor = metrics.getSensor(name);
Sensor sensor = metrics.sensor(name);

// re-use the existing measurable stats to share between consumers
if (existingSensor == null) {
sensor.add(metricName, new Total());
}
KafkaMetric metric = metrics.metrics().get(metricName);

sensors.put(metricName.name(), new Counter.SensorMetric<ConsumerRecord>(sensor, metric) {
sensors.put(metricName.name(), new Counter.SensorMetric<ConsumerRecord>(sensor, metric, time) {
void record(ConsumerRecord record) {
sensor.record(1);
super.record(record);
Expand All @@ -178,21 +138,10 @@ public void close() {
topicPartitionCounters.values().forEach(v -> v.close(metrics));
}

public String statsForTopic(String topic) {
return statsForTopic(topic, false);
}

public String statsForTopic(final String topic, boolean verbose) {
List<Counter> last = new ArrayList<>();

String stats = topicPartitionCounters.values().stream().filter(counter -> (counter.isTopic(topic) && last.add(counter))).map(record -> record.statsAsString(verbose)).collect(Collectors.joining(", "));

// Add timestamp information
if (!last.isEmpty()) {
Counter.SensorMetric sensor = (Counter.SensorMetric) last.stream().findFirst().get().sensors.values().stream().findFirst().get();
stats += " " + sensor.lastEventTime();
}
return stats;
public Collection<Counter.Stat> stats(String topic) {
final List<Counter.Stat> list = new ArrayList<>();
topicPartitionCounters.values().stream().filter(counter -> counter.isTopic(topic)).forEach(record -> list.addAll(record.stats()));
return list;
}

@Override
Expand Down
114 changes: 92 additions & 22 deletions ksql-engine/src/main/java/io/confluent/ksql/metrics/Counter.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@
**/
package io.confluent.ksql.metrics;

import io.confluent.common.utils.Time;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;

import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

class Counter<R> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that the entire point of this class is to group sensors by topic. Is this right? If so, I don't think we should name them like this, because Counter, Stat, Sensor, Metric are all classes from apache kafka utils, and they mean very different things there.

Maybe a name like TopicSensors would be more appropriate?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree


private final String topic;
Map<String, SensorMetric<R>> sensors = new HashMap<>();
private final Map<String, SensorMetric<R>> sensors;

Counter(String topic, final Map<String, SensorMetric<R>> sensors) {
this.topic = topic.toLowerCase();
Expand All @@ -44,57 +45,126 @@ public void close(Metrics metrics) {
sensors.values().forEach(v -> v.close(metrics));
}

public boolean isTopic(String topic) {
boolean isTopic(String topic) {
return this.topic.equals(topic);
}

public String statsAsString(boolean verbose) {
return sensors.values().stream().map(sensor -> sensor.toString(verbose)).collect(Collectors.joining(" "));
Collection<Stat> stats() {
return sensors.values().stream().map(sensor -> sensor.asStat()).collect(Collectors.toList());
}

public SensorMetric<R> firstSensor() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't seem to be used.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted

if (sensors.isEmpty()) throw new RuntimeException("Sensors is empty");
return this.sensors.values().iterator().next();
}

static class Stat {
private final String name;
private double value;
private final long timestamp;


public Stat(String name, double value, long timestamp) {
this.name = name;
this.value = value;
this.timestamp = timestamp;
}
public String formatted() {
return String.format("%s:%10.2f", name, value);
}

public String timestamp() {
return SimpleDateFormat.getDateTimeInstance(3, 1, Locale.getDefault()).format(new Date(timestamp));
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

Stat stat = (Stat) o;

if (Double.compare(stat.value, value) != 0) return false;
if (Double.compare(stat.timestamp, timestamp) != 0) return false;
return name != null ? name.equals(stat.name) : stat.name == null;
}

@Override
public int hashCode() {
int result;
long temp;
result = name != null ? name.hashCode() : 0;
temp = Double.doubleToLongBits(value);
result = 31 * result + (int) (temp ^ (temp >>> 32));
temp = Double.doubleToLongBits(timestamp);
result = 31 * result + (int) (temp ^ (temp >>> 32));
return result;
}


@Override
public String toString() {
return "Stat{" +
"name='" + name + '\'' +
", value=" + value +
", timestamp=" + timestamp +
'}';
}

public String name() {
return name;
}

public double getValue() {
return value;
}

public long getTimestamp() {
return timestamp;
}

public Stat aggregate(double value) {
this.value += value;
return this;
}
}

abstract static class SensorMetric<P> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps I have missed something, but why can't we just have a List<Sensor> inside the Counter class (or whatever we rename it to)? Each sensor already has a name. Each sensor already has a list of metrics. Each metric can have a particular type of Stat.

So it seems to me that we can add utility functions to Counter to extract existing Kafka stats in a way that make them amenable to aggregation without introducing all these classes.

Copy link
Author

@bluemonk3y bluemonk3y Dec 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sensor is used to record() data and the KafkaMetric is used to read/metric.value() 's. You need both unfortunately. You also need a way of making them a bit more open to do aggs, and reregister properly. The reporters will rely on KafkaMetric to display mbean values etc.

Copy link
Contributor

@apurvam apurvam Dec 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this has been merged already, but it seems to me that the main public method in TopicSensor are increment and stats.

If we didn't define SensorMetric, and had a list of Sensor instead, these could still be implemented as follows:

 void increment(R record) {
    sensors.forEach(v -> v.record(record));
 }

Collection<Stat> stats() {
    return sensors.stream().map(sensor -> extractValueFromSensor(sensor)).collect(Collectors.toList());
  }

private List<Stat> extractStatsFromSensor(Sensor sensor) {
   long now = time.now();
    sensor.metrics().stream().map(metric -> new Stat(metric.name(), metric.measurableValue(now), now));
}

It seems that Stat just helps associate metric names with values and time for aggregation. We probably could get rid of it too, since all of the data is in the underlying metric. We would just move the extractStatsFromSensor logic to the point of aggregation.

This would help simplify a little bit, but doesn't really change any of the logic.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh man - why didnt I see the metrics() call sensor - i thought it was daft and instead it was me being blind.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) Yea, it is easy to miss. I spent a lot of time digging through the Kafka metrics code because it seemed to me that we shouldn't have to write these types of classes in general.

private final Sensor sensor;
private final KafkaMetric metric;
private Time time;
private long lastEvent = 0;

SensorMetric(Sensor sensor, KafkaMetric metric) {
SensorMetric(Sensor sensor, KafkaMetric metric, Time time) {
this.sensor = sensor;
this.metric = metric;
this.time = time;
}

/**
* Anon class must call down to this for timestamp recording
* @param object
*/
void record(P object) {
this.lastEvent = System.currentTimeMillis();
this.lastEvent = time.milliseconds();
}

public double value() {
return metric.measurable().measure(metric.config(), System.currentTimeMillis());
}

public String lastEventTime() {
if (lastEvent == 0) return "No-events";
return "Last-event: " + SimpleDateFormat.getDateTimeInstance(3, 1, Locale.getDefault()).format(new Date(lastEvent));
return metric.measurable().measure(metric.config(), time.milliseconds());
}

public void close(Metrics metrics) {
metrics.removeSensor(sensor.name());
metrics.removeMetric(metric.metricName());
}

public String toString(boolean verbose) {
if (verbose) {
return metric.metricName().group() + "." + metric.metricName().name() + ":" + String.format("%10.2f", value());
} else {
return metric.metricName().name() + ":" + String.format("%10.2f", value());
}
}

@Override
public String toString() {
return toString(false);
return super.toString() + " " + asStat().toString();
}

public Stat asStat() {
return new Stat(metric.metricName().name(), value(), lastEvent);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Collection;
import java.util.Map;

interface MetricCollector extends ConsumerInterceptor, ProducerInterceptor {
Expand All @@ -42,5 +43,5 @@ default void configure(Map<String, ?> map) { }

String getId();

String statsForTopic(String topic);
Collection<Counter.Stat> stats(String topic);
}
Loading