From 7ad53abda4ae361524d2f722a2e0f1ef974aab63 Mon Sep 17 00:00:00 2001 From: Thibault Duplessis Date: Wed, 25 Sep 2024 10:10:02 +0200 Subject: [PATCH] monitor traffic shaping --- src/main/scala/Monitor.scala | 2 ++ src/main/scala/netty/ActorChannelConnector.scala | 6 +++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/scala/Monitor.scala b/src/main/scala/Monitor.scala index 68e612a..a228bb0 100644 --- a/src/main/scala/Monitor.scala +++ b/src/main/scala/Monitor.scala @@ -188,3 +188,5 @@ object Monitor: val step = Kamon.gauge("connector.flush.config.step").withoutTags() val interval = Kamon.gauge("connector.flush.config.interval").withoutTags() val maxDelay = Kamon.gauge("connector.flush.config.maxDelay").withoutTags() + val qSize = Kamon.histogram("connector.flush.qSize").withoutTags() + val channelsToFlush = Kamon.histogram("connector.flush.channelsToFlush").withoutTags() diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index c0a6d47..ed61c88 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -62,8 +62,12 @@ final private class ActorChannelConnector( flushQ.add(channel) private def flush(): Unit = + val qSize = flushQ.size val maxDelayFactor = maxDelay.get().toDouble / interval.get() - var channelsToFlush = step.get().atLeast((flushQ.size * maxDelayFactor).toInt) + var channelsToFlush = step.get().atLeast((qSize * maxDelayFactor).toInt) + + monitor.qSize.record(qSize) + monitor.channelsToFlush.record(channelsToFlush) while channelsToFlush > 0 do Option(flushQ.poll()) match