-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding metric-collection for DESCRIBE EXTEND #475
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left initial comments.
If I undestand correctly, this code will plugin interceptors for every streams producer and consumer running in a KSQLEngine
and compute the message rate for each topic and partition being produced/consumed to/from.
These stats are exposed via JMX, and will eventually also be exposed via DESCRIBE
statements.
Is that correct?
A general question is how do these stats decay? Is it some sort of sliding average over the last N minutes?
|
||
final Map<String, Counter> topicCounters = new HashMap<>(); | ||
private final Metrics metrics; | ||
private String id; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably should be named clientId
, because that's what it seems to be set to.
sensor = metrics.sensor(name); | ||
System.out.println("Registering:" + name); | ||
new RuntimeException("Register:" + name).printStackTrace(); | ||
MetricName producerRate = new MetricName(name + "-rate-per-sec", name, "consumer-stats"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like you mean consumerRate
here. just goes to show that there is a strong overalp between this Counter
and the Counter
in teh ProducerCollector
. We should look to drop the repetition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah there def is. Could we also share the Counter
class with the ProducerCollector
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realise this - the problem is the source collection entity is different, ProducerRecord versus ConsumerRecords. I could inject an external iterator to process them and call down to a counter, the other problem is that both of the data types contain different data which doesnt necessarily overlap. i.e. I dont know the size of ProducerRecord.bytes, whereas the consumer record we do - this means it can be used to track bandwidth as opposed to records.
|
||
public class ProducerCollector { | ||
|
||
final Map<String, Counter> topicCounters = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should think about how we clean up this map for inactive clients, otherwise its a memory leak.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They should get closed when the producer is closed - right? i.e, when a stream is closed the producers and consumer and their metric collectors are closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be called back via the Interceptor.Close method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @bluemonk3y - we probably need tests for MetricCollector
etc, too
|
||
import java.util.Map; | ||
|
||
public class AbstractMetricCollector implements ConsumerInterceptor, ProducerInterceptor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make this an interface instead and mark the methods as default
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
|
||
public class ConsumerCollector { | ||
|
||
final Map<String, Counter> topicCounters = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
public class ConsumerCollector { | ||
|
||
final Map<String, Counter> topicCounters = new HashMap<>(); | ||
private Metrics metrics; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
?
private Metrics metrics; | ||
private String id; | ||
|
||
public ConsumerCollector(Metrics metrics) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package private?
} | ||
|
||
|
||
public ConsumerRecords onConsume(ConsumerRecords records) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
} | ||
|
||
private Metrics buildMetrics() { | ||
MetricConfig metricConfig = new MetricConfig().samples(100).timeWindow(1000, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just replace this with new Metrics()
|
||
public class ConsumerCollectorTest { | ||
|
||
public static final String TEST_TOPIC = "TestTopic"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private
import org.apache.kafka.common.TopicPartition; | ||
import org.junit.Test; | ||
|
||
import java.util.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import *
import java.util.List; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.junit.Assert.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import *
|
||
} | ||
|
||
private Metrics buildMetrics() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above in other test
retest this please |
1 similar comment
retest this please |
This patch should target the |
@dguy @apurvam @hjafarpour - can you review the latest commit for metrics pls |
PropertiesList propertiesList = (PropertiesList) ksqlEntityList.get(0); | ||
propertiesList.getProperties().putAll(restClient.getLocalProperties()); | ||
terminal.printKsqlEntityList( | ||
Arrays.asList(propertiesList) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collection.singletonList(..)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
return statsForTopic(topic, false); | ||
} | ||
|
||
public String statsForTopic(final String topic, boolean verbose) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package-private?
}); | ||
} | ||
|
||
private void addBandwidthSensor(String key, HashMap<String, Counter.SensorMetric<ConsumerRecord>> results) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method and the next one look pretty similar - anyway we can share the duplicate code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok - done that - dropped bandwidth measurement on the consumer because the anon-class impl uses the consumerrecord.serializedbytes instead of passing in 1
class Counter<R> { | ||
|
||
private final String topic; | ||
Map<String, SensorMetric<R>> sensors = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this private final
and add methods to the class like findFirst
etc so that ProducerCollector
and ConsumerCollector
don't need access to the sensors
map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok - done
sensors.values().forEach(v -> v.close(metrics)); | ||
} | ||
|
||
public boolean isTopic(String topic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
return metric.measurable().measure(metric.config(), System.currentTimeMillis()); | ||
} | ||
|
||
public String lastEventTime() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
public String lastEventTime() { | ||
if (lastEvent == 0) return "No-events"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
java formatting? i.e, ..{..}
?
metrics = new Metrics(metricConfig, reporters, new SystemTime()); | ||
} | ||
|
||
static Metrics addCollector(String id, MetricCollector collector) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to be synchronized? i.e, can multiple consumers/producers update concurrently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private void addRatePerSecond(String key, HashMap<String, Counter.SensorMetric<ProducerRecord>> sensors) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above in ConsumerCollector
String statsForTopic = collector.statsForTopic(TEST_TOPIC, true); | ||
assertNotNull(statsForTopic); | ||
|
||
assertTrue("Missing byres-per-sec stat:" + statsForTopic, statsForTopic.contains("bytes-per-sec.bytes-per-sec:")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertThat(statsForTopic, containsString(...))
?
and elsewhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a couple of minor comments otherwise, LGTM
terminal.printErrorMessage(((ErrorMessageEntity) entity).getErrorMessage()); | ||
ErrorMessageEntity errorMsg = (ErrorMessageEntity) entity; | ||
terminal.printErrorMessage(errorMsg.getErrorMessage()); | ||
LOGGER.error(errorMsg.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't print anything useful as toString()
isn't overridden
ErrorMessageEntity errorMsg = (ErrorMessageEntity) entity; | ||
terminal.printErrorMessage(errorMsg.getErrorMessage()); | ||
LOGGER.error(errorMsg.toString()); | ||
LOGGER.error(errorMsg.getErrorMessage().getStackTrace().toString().replace(", ", ", \n ")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason why don't just log errorMsg.getErrorMessage().toString()
? it has the stack trace etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import *
you should change your ide settings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@apurvam @hjafarpour - guys - review pls? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a minor comment, otherwise LGTM.
)).collect(Collectors.toList()); | ||
} else if (ksqlEntity instanceof SourceDescription) { | ||
List<SourceDescription.FieldSchemaInfo> fields = ((SourceDescription) ksqlEntity).getSchema(); | ||
SourceDescription ksqlEntity1 = (SourceDescription) ksqlEntity; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we name this variable sourceDescription
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
return id; | ||
} | ||
|
||
public ProducerRecord onSend(ProducerRecord record) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and in the ConsumerCollector
, shouldn't we add the Override
annotation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
public static String getCollectorStatsByTopic(final String topic) { | ||
|
||
ArrayList<Counter.Stat> allStats = new ArrayList<>(); | ||
collectorMap.values().forEach(c -> allStats.addAll(c.stats(topic.toLowerCase()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this thread safe? Particularly, c.stats
iterates over a non-concurrent hashmap which may be getting updates. This may result it corrupted values being reported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collectorMap is concurrent
return sensors.values().stream().map(sensor -> sensor.asStat()).collect(Collectors.toList()); | ||
} | ||
|
||
public SensorMetric<R> firstSensor() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't seem to be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleted
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
class Counter<R> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that the entire point of this class is to group sensors by topic. Is this right? If so, I don't think we should name them like this, because Counter
, Stat
, Sensor
, Metric
are all classes from apache kafka utils, and they mean very different things there.
Maybe a name like TopicSensors
would be more appropriate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree
} | ||
} | ||
|
||
abstract static class SensorMetric<P> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps I have missed something, but why can't we just have a List<Sensor>
inside the Counter
class (or whatever we rename it to)? Each sensor already has a name. Each sensor already has a list of metrics. Each metric can have a particular type of Stat.
So it seems to me that we can add utility functions to Counter
to extract existing Kafka stats in a way that make them amenable to aggregation without introducing all these classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sensor is used to record() data and the KafkaMetric is used to read/metric.value() 's. You need both unfortunately. You also need a way of making them a bit more open to do aggs, and reregister properly. The reporters will rely on KafkaMetric to display mbean values etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this has been merged already, but it seems to me that the main public method in TopicSensor
are increment
and stats
.
If we didn't define SensorMetric
, and had a list of Sensor
instead, these could still be implemented as follows:
void increment(R record) {
sensors.forEach(v -> v.record(record));
}
Collection<Stat> stats() {
return sensors.stream().map(sensor -> extractValueFromSensor(sensor)).collect(Collectors.toList());
}
private List<Stat> extractStatsFromSensor(Sensor sensor) {
long now = time.now();
sensor.metrics().stream().map(metric -> new Stat(metric.name(), metric.measurableValue(now), now));
}
It seems that Stat
just helps associate metric names with values and time for aggregation. We probably could get rid of it too, since all of the data is in the underlying metric. We would just move the extractStatsFromSensor
logic to the point of aggregation.
This would help simplify a little bit, but doesn't really change any of the logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh man - why didnt I see the metrics() call sensor - i thought it was daft and instead it was me being blind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:) Yea, it is easy to miss. I spent a lot of time digging through the Kafka metrics code because it seemed to me that we shouldn't have to write these types of classes in general.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
First pass at adding metrics to DESCRIBE #463
Current iteration uses commons.monitoring for metering and hooks into the Prod/Cons interceptors
Next step - wire into describe and add more metrics