diff --git a/quesma/telemetry/duration.go b/quesma/telemetry/duration.go index e19e251be..c047df3fd 100644 --- a/quesma/telemetry/duration.go +++ b/quesma/telemetry/duration.go @@ -24,7 +24,7 @@ type Span interface { type DurationMeasurement interface { Begin() Span - Aggregate() DurationStats + AggregateAndReset() DurationStats } type DurationStats struct { @@ -111,7 +111,7 @@ func (a *durationMeasurement) ingestLoop() { } -func (a *durationMeasurement) Aggregate() (stats DurationStats) { +func (a *durationMeasurement) AggregateAndReset() (stats DurationStats) { a.m.Lock() defer a.m.Unlock() @@ -126,6 +126,13 @@ func (a *durationMeasurement) Aggregate() (stats DurationStats) { stats.OverThresholds = a.computeOverThresholdCounter() stats.Percentiles = a.computePercentiles() + a.count = 0 + a.sum = 0 + a.failed = 0 + for _, threshold := range overThresholds { + a.overThresholdCounters[threshold] = 0 + } + return stats } diff --git a/quesma/telemetry/duration_test.go b/quesma/telemetry/duration_test.go index 542fc9917..946de7f7d 100644 --- a/quesma/telemetry/duration_test.go +++ b/quesma/telemetry/duration_test.go @@ -38,7 +38,7 @@ func TestDurationMeasurement_Aggregate(t *testing.T) { } } - stats := measurement.Aggregate() + stats := measurement.AggregateAndReset() assert.Equal(t, int64(100), stats.Count) assert.Equal(t, int64(10), stats.Failed) @@ -78,10 +78,12 @@ func TestDurationMeasurement_Percentiles(t *testing.T) { } } - stats := measurement.Aggregate() + stats := measurement.AggregateAndReset() assert.Equal(t, float32(50.0), stats.Percentiles["50"]) + stats2 := measurement.AggregateAndReset() + assert.Equal(t, float32(0.0), stats2.Percentiles["50"]) } func TestDurationMeasurement_Percentiles_no_samples(t *testing.T) { @@ -91,7 +93,7 @@ func TestDurationMeasurement_Percentiles_no_samples(t *testing.T) { measurement := newDurationMeasurement(ctx) - stats := measurement.Aggregate() + stats := measurement.AggregateAndReset() assert.Equal(t, float32(0.0), stats.Percentiles["50"]) } @@ -115,7 +117,10 @@ func TestDurationMeasurement_Percentiles_single_sample(t *testing.T) { t.Errorf("ingest did not complete in time") } - stats := measurement.Aggregate() + stats := measurement.AggregateAndReset() assert.Equal(t, float32(1.0), stats.Percentiles["50"]) + + stats2 := measurement.AggregateAndReset() + assert.Equal(t, float32(0.0), stats2.Percentiles["50"]) } diff --git a/quesma/telemetry/empty.go b/quesma/telemetry/empty.go index a3121f592..dc576255b 100644 --- a/quesma/telemetry/empty.go +++ b/quesma/telemetry/empty.go @@ -19,7 +19,7 @@ func (d emptyTimer) Begin() Span { return emptySpan{} } -func (d emptyTimer) Aggregate() DurationStats { +func (d emptyTimer) AggregateAndReset() DurationStats { return DurationStats{} } @@ -29,11 +29,11 @@ func (d emptyMultiCounter) Add(key string, value int64) { // do nothing } -func (d emptyMultiCounter) Aggregate() MultiCounterStats { +func (d emptyMultiCounter) AggregateAndReset() MultiCounterStats { return MultiCounterStats{} } -func (d emptyMultiCounter) AggregateTopValues() MultiCounterTopValuesStats { +func (d emptyMultiCounter) AggregateTopValuesAndReset() MultiCounterTopValuesStats { return MultiCounterTopValuesStats{} } diff --git a/quesma/telemetry/multi_counter.go b/quesma/telemetry/multi_counter.go index 40181c155..03c4a3d2c 100644 --- a/quesma/telemetry/multi_counter.go +++ b/quesma/telemetry/multi_counter.go @@ -14,8 +14,8 @@ type MultiCounterTopValuesStats []string type MultiCounter interface { Add(key string, value int64) - Aggregate() MultiCounterStats - AggregateTopValues() MultiCounterTopValuesStats + AggregateAndReset() MultiCounterStats + AggregateTopValuesAndReset() MultiCounterTopValuesStats } type sampleMultiCounter struct { @@ -71,17 +71,18 @@ func (mc *multiCounter) Add(key string, value int64) { mc.ingest <- sampleMultiCounter{key, value} } -func (mc *multiCounter) Aggregate() (stats MultiCounterStats) { +func (mc *multiCounter) AggregateAndReset() (stats MultiCounterStats) { mc.m.Lock() defer mc.m.Unlock() stats = make(map[string]int64, len(mc.counters)) for k, v := range mc.counters { stats[k] = v } + mc.counters = make(map[string]int64) return stats } -func (mc *multiCounter) AggregateTopValues() (stats MultiCounterTopValuesStats) { +func (mc *multiCounter) AggregateTopValuesAndReset() (stats MultiCounterTopValuesStats) { mc.m.Lock() defer mc.m.Unlock() stats = make(MultiCounterTopValuesStats, 0, len(mc.counters)) diff --git a/quesma/telemetry/multi_counter_test.go b/quesma/telemetry/multi_counter_test.go index c9bb2fb2d..609b38d32 100644 --- a/quesma/telemetry/multi_counter_test.go +++ b/quesma/telemetry/multi_counter_test.go @@ -31,9 +31,12 @@ func TestMultiCounter_Add(t *testing.T) { } } - stats := mc.Aggregate() + stats := mc.AggregateAndReset() assert.Equal(t, int64(4), stats["key1"]) assert.Equal(t, int64(2), stats["key2"]) assert.Equal(t, 2, len(stats)) + + stats2 := mc.AggregateAndReset() + assert.Empty(t, stats2) } diff --git a/quesma/telemetry/phone_home.go b/quesma/telemetry/phone_home.go index 405990602..d524ca71e 100644 --- a/quesma/telemetry/phone_home.go +++ b/quesma/telemetry/phone_home.go @@ -514,18 +514,23 @@ func (a *agent) collect(ctx context.Context, reportType string) (stats PhoneHome stats.NumberOfPanics = recovery.PanicCounter.Load() stats.InstanceID = a.instanceId - stats.ClickHouse = a.CollectClickHouse(ctx) + stats.ClickHouseQueriesDuration = a.ClickHouseQueryDuration().AggregateAndReset() + stats.ClickHouseInsertsDuration = a.ClickHouseInsertDuration().AggregateAndReset() + stats.ElasticReadsDuration = a.ElasticReadRequestsDuration().AggregateAndReset() + stats.ElasticWritesDuration = a.ElasticWriteRequestsDuration().AggregateAndReset() + stats.ElasticBypassedReadsDuration = a.ElasticBypassedReadRequestsDuration().AggregateAndReset() + stats.ElasticBypassedWritesDuration = a.ElasticBypassedWriteRequestsDuration().AggregateAndReset() + stats.UserAgentCounters = a.userAgentCounters.AggregateTopValuesAndReset() + stats.Elasticsearch = a.CollectElastic(ctx) - stats.ClickHouseQueriesDuration = a.ClickHouseQueryDuration().Aggregate() - stats.ClickHouseInsertsDuration = a.ClickHouseInsertDuration().Aggregate() - stats.ElasticReadsDuration = a.ElasticReadRequestsDuration().Aggregate() - stats.ElasticWritesDuration = a.ElasticWriteRequestsDuration().Aggregate() - stats.ElasticBypassedReadsDuration = a.ElasticBypassedReadRequestsDuration().Aggregate() - stats.ElasticBypassedWritesDuration = a.ElasticBypassedWriteRequestsDuration().Aggregate() - stats.UserAgentCounters = a.userAgentCounters.AggregateTopValues() + if stats.ClickHouseInsertsDuration.Count > 0 || stats.ClickHouseQueriesDuration.Count > 0 { + stats.ClickHouse = a.CollectClickHouse(ctx) + } else { + stats.ClickHouse = ClickHouseStats{Status: "paused"} + } - stats.IngestCounters = a.ingestCounters.Aggregate() + stats.IngestCounters = a.ingestCounters.AggregateAndReset() stats.RuntimeStats = a.runtimeStats() stats.TopErrors = a.topErrors()