Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
lct45 committed Jun 7, 2021
1 parent d5c777a commit 3ddceae
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.QueryMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.ThreadMetadata;
Expand All @@ -23,28 +22,29 @@
public class UtilizationMetricsListener implements Runnable, QueryEventListener {

private final String STREAM_THREAD_GROUP = "stream-thread-metrics";
private final String THREAD_ID = "thread-id";

private final List<KafkaStreams> kafkaStreams;
private final Logger logger = LoggerFactory.getLogger(UtilizationMetricsListener.class);
private final List<String> metrics;
private final Time time;
private final double windowSize;

private final Map<String, Double> previousPollTime;
private final Map<String, Double> previousRestoreConsumerPollTime;
private final Map<String, Double> previousSendTime;
private final Map<String, Double> previousFlushTime;

public UtilizationMetricsListener(){
public UtilizationMetricsListener(final double windowSize){
this.kafkaStreams = new ArrayList<>();
this.metrics = new LinkedList<>();
// we can add these here or pass it in through the constructor
metrics.add("poll-time-total");
metrics.add("restore-consumer-poll-time-total");
metrics.add("send-time-total");
metrics.add("flush-time-total");
// just for sanity checking since this metric already exists
metrics.add("poll-total");
time = Time.SYSTEM;
this.windowSize = windowSize;
previousPollTime = new HashMap<>();
previousRestoreConsumerPollTime = new HashMap<>();
previousSendTime = new HashMap<>();
Expand All @@ -70,15 +70,8 @@ public void onDeregister(final QueryMetadata query) {

@Override
public void run() {
logger.info("Reporting CSU system level metrics");
reportSystemMetrics();
logger.info("Reporting CSU thread level metrics");
reportProcessingRatio();
/*for (KafkaStreams thread : kafkaStreams) {
for (String metric : metrics) {
reportThreadMetrics(thread, metric, STREAM_THREAD_GROUP);
}
}*/
logger.info("the current processing ratio is " + processingRatio() + "%");
}

private void reportSystemMetrics() {
Expand All @@ -89,46 +82,44 @@ private void reportSystemMetrics() {
String timeKernel = columns[14];
int clockTicks = 100;

logger.info("we've got " + timeUser + " for timeuser, " + timeKernel + " for timekernel, not sure about clock ticks");
logger.info("we've got " + timeUser + " for timeuser, " + timeKernel + " for timekernel");
} catch(Exception e) {
logger.info("something went wrong finding cpu utilization metrics " + e.getMessage());
}

logger.info("we're using some disk");
}

private void reportProcessingRatio() {
final double totalTime = 120000.0;
double blockedTime = 120000.0;
// public for testing
public double processingRatio() {
double blockedTime = windowSize;

final long windowEnd = time.milliseconds();
final long windowStart = (long) Math.max(0, windowEnd - totalTime);
logger.info("--- window start: " + windowStart + " ----");
final long windowStart = (long) Math.max(0, windowEnd - windowSize);
for (KafkaStreams stream : kafkaStreams) {
for (ThreadMetadata thread : stream.localThreadsMetadata()) {
blockedTime = Math.min(getProcessingRatio(thread.threadName(), stream, windowStart, totalTime), totalTime);
blockedTime = Math.min(getProcessingRatio(thread.threadName(), stream, windowStart, windowSize), windowSize);
}
}
final double notBlocked = totalTime - blockedTime;
final double processingRatio = (notBlocked / totalTime) * 100;
logger.info("total time - blocked time = " + (totalTime - blockedTime));
logger.info("the current processing ratio is " + processingRatio + "%");
final double notBlocked = windowSize - blockedTime;
return (notBlocked / windowSize) * 100;
}

private double getProcessingRatio(final String threadName, final KafkaStreams streams, final long windowStart, final double windowSize) {
final Map<String, Double> threadMetrics = streams.metrics().values().stream()
.filter(m -> m.metricName().group().equals("stream-thread-metrics") &&
m.metricName().tags().get("thread-id").equals(threadName) &&
.filter(m -> m.metricName().group().equals(STREAM_THREAD_GROUP) &&
m.metricName().tags().get(THREAD_ID).equals(threadName) &&
metrics.contains(m.metricName().name()))
.collect(Collectors.toMap(k -> k.metricName().name(), v -> (double) v.metricValue()));
final Long threadStartTime = (Long) streams.metrics().values().stream()
.filter(m -> m.metricName().group().equals("stream-thread-metrics") &&
m.metricName().tags().get("thread-id").equals(threadName) &&
m.metricName().name().equals("thread-start-time")).collect(Collectors.toList()).get(0).metricValue();
logger.info("--- thread start: " + threadStartTime + " ----");
.filter(m -> m.metricName().group().equals(STREAM_THREAD_GROUP) &&
m.metricName().tags().get(THREAD_ID).equals(threadName) &&
m.metricName().name().equals("thread-start-time"))
.collect(Collectors.toList())
.get(0)
.metricValue();
double blockedTime = 0;
if (threadStartTime > windowStart || threadStartTime == 0.0) {
logger.info("in fact, the thread was started after the window");
if (threadStartTime > windowStart) {
blockedTime += threadStartTime - windowStart;
previousPollTime.put(threadName, 0.0);
previousRestoreConsumerPollTime.put(threadName, 0.0);
Expand All @@ -140,27 +131,17 @@ private double getProcessingRatio(final String threadName, final KafkaStreams st
final double newFlushTime = threadMetrics.getOrDefault("flush-time-total", 0.0);
final double newSendTime = threadMetrics.getOrDefault("send-time-total", 0.0);
blockedTime += Math.max(newPollTime - previousPollTime.get(threadName), 0);
logger.info("added " + (newPollTime - previousPollTime.get(threadName)) + " to blocked time");
previousPollTime.put(threadName, newPollTime);

blockedTime += Math.max(newRestorePollTime - previousRestoreConsumerPollTime.get(threadName), 0);
previousRestoreConsumerPollTime.put(threadName, newRestorePollTime);

blockedTime += Math.max(newSendTime - previousSendTime.get(threadName), 0);
previousSendTime.put(threadName, newSendTime);

blockedTime += Math.max(newFlushTime - previousFlushTime.get(threadName), 0);
previousFlushTime.put(threadName, newFlushTime);

return Math.min(windowSize, blockedTime);
}

private void reportThreadMetrics(final KafkaStreams thread, final String metric, final String group) {
final List<Metric> metricsList = new ArrayList<Metric>(thread.metrics().values()).stream()
.filter(m -> m.metricName().name().equals(metric) &&
m.metricName().group().equals(group))
.collect(Collectors.toList());
for (Metric threadMetric : metricsList) {
logger.info(metric + " has a value of " + threadMetric.metricValue() + " for stream thread " + thread);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ public static Config getConfig(final Admin adminClient) {

public static String getKafkaClusterId(final ServiceContext serviceContext) {
try {
//-01aSZjrQGm81mcKgASjJQ
return serviceContext.getAdminClient()
.describeCluster()
.clusterId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ static KsqlRestApplication buildApplication(
.setNameFormat("ksql-csu-metrics-reporter-%d")
.build()
);
final UtilizationMetricsListener csuMetricReporter = new UtilizationMetricsListener();
final UtilizationMetricsListener csuMetricReporter = new UtilizationMetricsListener(300000L);
// will change to 300000 when we're ready
executorService.scheduleAtFixedRate(csuMetricReporter, 0, 12000, TimeUnit.MILLISECONDS);
final List<QueryEventListener> listeners = new ArrayList <>();
Expand Down

0 comments on commit 3ddceae

Please sign in to comment.