-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
POC for CSU Utilization Metrics (WIP) #1
Conversation
final long windowStart = (long) Math.max(0, windowEnd - windowSize); | ||
for (KafkaStreams stream : kafkaStreams) { | ||
for (ThreadMetadata thread : stream.localThreadsMetadata()) { | ||
blockedTime = Math.min(getProcessingRatio(thread.threadName(), stream, windowStart, windowSize), windowSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we want to take the minimum blocked time across all the threads - do you mean min(getProcessingRatio(...), blockedTime)
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh yup, got mixed up when I renamed variables
|
||
@Override | ||
public void run() { | ||
logger.info("Reporting CSU thread level metrics"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll probably get a more accurate measurement if you actually store the observed sample times rather than assuming the windowSize each time. So something like:
public void run() {
while (true) {
logger.info("the current processing ratio is " + processingRatio() + "%");
Thread.sleep(windowSize);
}
}
public double processingRatio() {
long sampleTime = time.milliseconds();
double blockedTime = sampleTime - lastSampleTime;
long windowStart = lastSampleTime;
long windowSize = sampleTime - lastSampleTime;
...
lastSampleTime = sampleTime;
}
This way if the interval between runs is a little longer than you requested (which is more likely under heavy load) your computation won't be off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we compute the processing ratio at the start of the window, and then sleep for the full window size, at which point the thread should call run()
again, right? Won't those overlap in the ideal case (when everything is running on-time)?
I get storing sample time instead of assuming window size but I don't get the Thread.sleep(windowSize);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry - I wrote this comment before I saw you were using a ScheduledExecutorService. I assumed you were just starting a thread. You should just be able to do:
public void run() {
logger.info("the current processing ratio is " + processingRatio() + "%");
}
My main point was about using the observed difference between sample times rather than the requested difference (the interval we pass to the executor service).
@@ -284,8 +285,11 @@ private void registerQuery( | |||
} | |||
} | |||
allLiveQueries.add(query); | |||
// For the CSU metrics we need to have initialized first. It seems like initialize could throw errors though | |||
// so we probably want to notify the other listeners first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you rebase on top of confluentinc#7627 (once its merged) you wont need this
|
||
@Override | ||
public void onDeregister(final QueryMetadata query) { | ||
// Question - if we terminate a query and then restart it, will the underling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we would not reuse the name on a terminate (it gets a new ID), so we should be good here. We do reuse the name on an upgrade. I think we should always just clear up all our samples for the query here. It's not a big deal if we miss a window because a user upgraded their query.
long sampleTime = time.milliseconds(); | ||
double blockedTime = sampleTime - lastSampleTime; | ||
|
||
final long windowSize = Math.max((sampleTime - lastSampleTime), this.windowSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are you taking a max with the windowSize here? If it's to catch the case where sampleTime < lastSampleTime it would be more accurate to just return 0 or throw - something has definitely gone wrong and we shouldn't just compute a value that uses the requested windowSize.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was something I mostly did for testing - to make sure we either getting a window size that's bigger than the set window size because the thread is slow or we're getting the window size, but nothing smaller. I removed this when I did the test clean up tho
@Override | ||
public void onDeregister(final QueryMetadata query) { | ||
kafkaStreams.remove(query.getKafkaStreams()); | ||
previousPollTime.remove("poll-time-total"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the keys being removed here should be thread ids right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, my bad. Did that without fully thinking. Just fixed
23b5825
to
70d873a
Compare
BREAKING CHANGE: Existing queries that relied on vague implicit casting will not be started after an upgrade, and new queries that rely on vague implicit casting will be rejected. For example, foo(INT, INT) will not be able to resolve against two underlying function signatures of foo(BIGINT, BIGINT) and foo(DOUBLE, DOUBLE). Calling a function whose only parameter is variadic with an explicit null will also result in the call being rejected as vague.
* feat: implement comparisons for TIME/DATE * rename some stuff * add compareutil test, reject time/timestamp comparisons * checkstyle
* test: add DATE/TIME to connect integration test * rename test * update rest api mapper * checkstyle * checkstyle * disable classdataabstractioncouplinc
Description
What behavior do you want to change, why, how does your patch achieve the changes?
Testing done
Describe the testing strategy. Unit and integration tests are expected for any behavior changes.
Reviewer checklist