-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add persistent query saturation metric #7955
feat: add persistent query saturation metric #7955
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey not sure if this was ready for review but I have a couple of comments that might affect this if the shared runtimes gets merged first
} | ||
|
||
private void report(final DataPoint dataPoint) { | ||
final MetricName metricName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would either the group of the datapoint contain the QueryID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be wrong, but I think the queryId would be in the tags?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep - that's correct for metrics that are specific to a given query
saturation.ifPresent(s -> report(now, s)); | ||
|
||
final Set<String> appIds = queries.stream() | ||
.map(PersistentQueryMetadata::getQueryApplicationId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use QueryID instead? That will work better with the shared runtimes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the contrary I think it makes sense to track saturation at the runtime/app level. At the lowest level it's a property of a stream thread. We then take the max saturation over all stream threads to compute runtime-level saturation. And max over all runtimes to compute node-level saturation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. It looks like the metrics are still being created for each query, can you filter out duplicates? Otherwise I think we get an illegal argument exception
|
||
private Optional<Double> measure(final Instant now, final PersistentQueryMetadata queryMetadata) { | ||
final KafkaStreamsSaturation ksSaturation = perKafkaStreamsStats.computeIfAbsent( | ||
queryMetadata.getQueryApplicationId(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto here, or filter for duplicates if you are intending to have it be per runtime not per query
now, | ||
QUERY_THREAD_SATURATION, | ||
saturation, | ||
ImmutableMap.of(THREAD_ID, name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we would also want query-id as a tag?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're reporting query-level below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh myb, i read thread-id as task-id
} | ||
|
||
private void report(final DataPoint dataPoint) { | ||
final MetricName metricName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be wrong, but I think the queryId would be in the tags?
void report(List<DataPoint> dataPoints); | ||
|
||
/** | ||
* Notifies te reporter that the metric with name and tags can be cleaned up |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo te -> the
LOGGER.debug("{}: record and measure with now {}, window {} ({} : {})", threadName, now, windowStart, earliest, latest); | ||
samples.add(current); | ||
samples.removeIf(s -> s.timestamp.isBefore(earliest)); | ||
if (!inRange(samples.get(0).timestamp, earliest, latest) && !startTime.isAfter(windowStart)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it okay for the metric to be out of range if the thread was started after windowStart? In general, I'm a little confused about what earliest
and latest
are used for. I guess since we're no longer able to guarantee that the sample time happens every 5 minutes this helps us get a good set of metrics for our window? If so, why is sampleMargin
a variable instead of a fixed time we decide on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it okay for the metric to be out of range if the thread was started after windowStart?
If the thread was started after windowStart
we consider the time from windowStart
to the thread start time as blocked.
I guess since we're no longer able to guarantee that the sample time happens every 5 minutes this helps us get a good set of metrics for our window?
Yep.
If so, why is sampleMargin a variable instead of a fixed time we decide on?
I wanted to leave the option open to have it be configurable. But it shouldn't ever change once the server is started.
(measured.map(Object::toString).orElse("")) | ||
); | ||
measured.ifPresent(m -> reportThreadSaturation(now, m, threadName, reporter)); | ||
saturation = compareSaturation(saturation, measured) > 0 ? saturation : measured; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To see if I'm following - this basically picks the larger of saturation or measured, right? So as we go through the loop to calculate thread metrics, we'll track the largest thread saturation and then ultimately report that when we exit the for loop. How would saturation ever not exist? We set it to Optional.of(0.0)
to begin with so if the first measured doesn't exist we'd return -1 and reset saturation to Optional.of(0.0)
, right? So is the check for if the first parameter exits just for when we compare saturation in run()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may not exist if we don't have good data to compute saturation. This agent runs every minute and samples the total blocked time for each stream thread. It then computes the blocked time for the current window by looking back five minutes and taking the difference between the current value and that sample. However, it's possible that there isn't a good (as defined as a sample that's within sampleMargin) sample from five minutes ago (e.g. if the thread couldn't run for some reason). If that happens, we don't know the current saturation value so we return Optional.empty().
|
||
private void cleanup(final MetricsReporter reporter) { | ||
for (final String threadName : perThreadSaturation.keySet()) { | ||
reporter.cleanup(QUERY_THREAD_SATURATION, ImmutableMap.of(THREAD_ID, threadName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the extra thread metrics not already cleaned up at the end of KafkaStreamsSaturation#measure
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to clean up the thread metrics in the case a thread restarts, or in the case an app is closed. This method is called in the latter case.
final Collection<PersistentQueryMetadata> queries = engine.getPersistentQueries(); | ||
final Optional<Double> saturation = queries.stream() | ||
.map(q -> measure(now, q)) | ||
.max(PersistentQuerySaturationMetrics::compareSaturation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What two metrics are we comparing here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are comparing the saturation across multiple KafkaStreams applications
private static final Logger LOGGER | ||
= LoggerFactory.getLogger(PersistentQuerySaturationMetrics.class); | ||
|
||
private static final String QUERY_SATURATION = "query-saturation"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it looks like for all the other ksql metrics we use _
and not -
, imo it would be good to be consistent and use _
instead here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where do you see that? KsqlEngineMetrics
for example uses -
as the delimiter. It gets converted to _
by the telemetry reporter though, so it should all come out the same on the other end anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh I was looking on the cc-spec-ksql allow-list and on the metrics pipeline but I guess that's after it's cleaned so it shouldn't matter
private final AtomicReference<DataPoint> dataPointRef; | ||
private final Duration staleThreshold; | ||
|
||
private DataPointBasedGauge( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that when the underlying stream thread within the KS runtime comes and goes as we replace them, we would keep each as a separate data point and automatically "mute" those data points that are too stale, meaning the thread may be already gone. So when we apply the saturation equation we may temporarily be counting on some threads that are already gone but that would resume back to normal. Is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep - once the saturation metric agent sees that a thread is missing, it will clean up the underlying KafkaMetric
by calling cleanup
on this class.
final Map<String, Metric> metricsForThread = entry.getValue(); | ||
if (!metricsForThread.containsKey(STREAMS_TOTAL_BLOCKED_TIME) | ||
|| !metricsForThread.containsKey(STREAMS_THREAD_START_TIME)) { | ||
LOGGER.info("Missing required metrics for thread: {}", threadName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the thread is replaced after metricsByThread
we would not see the metrics since they are GC'ed and would end here, is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We really should never hit this condition. If a thread is replaced then it won't be returned by metricsByThread
at all.
measured.ifPresent(m -> reportThreadSaturation(now, m, threadName, reporter)); | ||
saturation = compareSaturation(saturation, measured) > 0 ? saturation : measured; | ||
} | ||
saturation.ifPresent(s -> reportQuerySaturation(now, s, queryId, reporter)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be a duplicate comment with @wcarlson5 but with the shared runtime the mapping from queries -> threads is no longer one to many, but many to many, so we may end up double maxing the thread saturation for some queries? Since now it is just taking the max that maybe fine, but if we ever change it to a different equation we may end up duplicating the measurements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also with the new shared runtime, using max
over all the threads that host at least one task of the query may be largely exaggerating the saturation metrics: e.g. let's say a single thread hosting some task from query A and some task from query B, and when query B's workload becomes a beast to drag down that thread, query A's saturation metric would also report as "full". This would be bad..
In the new shared runtime, there's no good way to tell how much of the saturation of a thread was contributed by one of its hosted query than the other, so a second least approach may be to just consider averaging over the threads.. thoughts @lct45 @wcarlson5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's say a single thread hosting some task from query A and some task from query B, and when query B's workload becomes a beast to drag down that thread, query A's saturation metric would also report as "full". This would be bad..
I agree that's what would happen, but I think that's actually what we want. The saturation metric should tell you whether a given query is bottlenecked or not - not what % of the capacity that one query is using. The latter is a more challenging problem (I have an idea here around measuring the average record processing time and multiplying by the observed throughput into the query's source partitions) that we're not aiming to solve with this patch. I can see why this might be confusing though - we could leave the query-level metric internal for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, let's table that for now and merge as-is. We can see how it works well or not in production for monitoring purposes and decide whether we want to do something differently. I was concerned that we may exaggerating the saturation situation and hence put too much monitoring noise to ourselves, where a query that was only partially saturated on one of the hosting threads but not on others would be shown as 100% saturated. But that could be totally over-thinking too :)
if (dataPoint.getTime().isAfter(Instant.ofEpochMilli(now).minus(staleThreshold))) { | ||
return dataPoint.getValue(); | ||
} | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would we interpret null
on the other end? Let's say for some reason we do not get newly reported value for a while and the old value is stale, would this end up showing as a unknown
value on the user-facing UI / our own monitoring, or something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reporter should just skip reporting the value, so we'd see missing data in the metrics store (which is what we want)
LGTM! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
LGTM! |
Add a persistent query saturation metric, and report it over JMX. Saturation is computed by sampling total-blocked-time for each stream thread. We sample the total blocked time every minute, and compute saturation by looking back 5 minutes and computing how long the thread was blocked in that interval. Saturation is (5 minutes - blocked time)/(5 minutes). This patch also adds a reporter that exposes reported data points over JMX. The reporter adds a new Metric for every new data point name/tag combination it sees, and implements the metric by reading the latest data point for the name/tag, with a threshold on staleness.
9242f6b
to
c23f6ff
Compare
ed4878e
to
46408e4
Compare
|
||
final Duration duration = Duration.between(observedStart, current.timestamp); | ||
final double durationNs = duration.toNanos(); | ||
return Optional.of((durationNs - Math.min(blocked, durationNs)) / durationNs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rodesai - Why did we take the difference between the time interval and blocked time to compute the saturation. Shouldn't this be just Math.min(blocked, durationNs) / duration
Add a persistent query saturation metric, and report it over JMX. Saturation
is computed by sampling total-blocked-time for each stream thread. We sample
the total blocked time every minute, and compute saturation by looking back
5 minutes and computing how long the thread was blocked in that interval.
Saturation is (5 minutes - blocked time)/(5 minutes).
This patch also adds a reporter that exposes reported data points over JMX.
The reporter adds a new Metric for every new data point name/tag combination
it sees, and implements the metric by reading the latest data point for the
name/tag, with a threshold on staleness.