From a52a87ba43c902a486aaee1d380f6c85eb5af08b Mon Sep 17 00:00:00 2001 From: Joshua Powers Date: Thu, 23 Sep 2021 16:46:13 -0600 Subject: [PATCH] Revert "Reset the flush interval timer when flush is requested or batch is ready. (#8953)" This reverts commit a6d2c4f254dbe9f7353961d892f8b91d907423ea. --- agent/agent.go | 12 ++++++++---- agent/tick.go | 19 +++++++------------ 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 78097bcd47731..7bd6b108df048 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -775,7 +775,7 @@ func (a *Agent) runOutputs( func (a *Agent) flushLoop( ctx context.Context, output *models.RunningOutput, - ticker *RollingTicker, + ticker Ticker, ) { logError := func(err error) { if err != nil { @@ -804,11 +804,15 @@ func (a *Agent) flushLoop( case <-ticker.Elapsed(): logError(a.flushOnce(output, ticker, output.Write)) case <-flushRequested: - ticker.Reset() logError(a.flushOnce(output, ticker, output.Write)) case <-output.BatchReady: - ticker.Reset() - logError(a.flushOnce(output, ticker, output.WriteBatch)) + // Favor the ticker over batch ready + select { + case <-ticker.Elapsed(): + logError(a.flushOnce(output, ticker, output.Write)) + default: + logError(a.flushOnce(output, ticker, output.WriteBatch)) + } } } } diff --git a/agent/tick.go b/agent/tick.go index 9696cd2c18c16..16233ba6d4adb 100644 --- a/agent/tick.go +++ b/agent/tick.go @@ -214,7 +214,6 @@ type RollingTicker struct { ch chan time.Time cancel context.CancelFunc wg sync.WaitGroup - timer *clock.Timer } func NewRollingTicker(interval, jitter time.Duration) *RollingTicker { @@ -231,12 +230,12 @@ func newRollingTicker(interval, jitter time.Duration, clock clock.Clock) *Rollin } d := t.next() - t.timer = clock.Timer(d) + timer := clock.Timer(d) t.wg.Add(1) go func() { defer t.wg.Done() - t.run(ctx) + t.run(ctx, timer) }() return t @@ -246,28 +245,24 @@ func (t *RollingTicker) next() time.Duration { return t.interval + internal.RandomDuration(t.jitter) } -func (t *RollingTicker) run(ctx context.Context) { +func (t *RollingTicker) run(ctx context.Context, timer *clock.Timer) { for { select { case <-ctx.Done(): - t.timer.Stop() + timer.Stop() return - case now := <-t.timer.C: + case now := <-timer.C: select { case t.ch <- now: default: } - t.Reset() + d := t.next() + timer.Reset(d) } } } -// Reset the ticker to the next interval + jitter. -func (t *RollingTicker) Reset() { - t.timer.Reset(t.next()) -} - func (t *RollingTicker) Elapsed() <-chan time.Time { return t.ch }