Skip to content

Commit

Permalink
Phone home change (#548)
Browse files Browse the repository at this point in the history
Two changes:
- allow ClickHouse to enter idle by not querying it during phone home
when there is no traffic
- reset counters after each phone home (every hour)

CC @nablaone
  • Loading branch information
jakozaur authored Jul 19, 2024
1 parent 8f76d82 commit 393c3ab
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 23 deletions.
11 changes: 9 additions & 2 deletions quesma/telemetry/duration.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Span interface {

type DurationMeasurement interface {
Begin() Span
Aggregate() DurationStats
AggregateAndReset() DurationStats
}

type DurationStats struct {
Expand Down Expand Up @@ -111,7 +111,7 @@ func (a *durationMeasurement) ingestLoop() {

}

func (a *durationMeasurement) Aggregate() (stats DurationStats) {
func (a *durationMeasurement) AggregateAndReset() (stats DurationStats) {
a.m.Lock()
defer a.m.Unlock()

Expand All @@ -126,6 +126,13 @@ func (a *durationMeasurement) Aggregate() (stats DurationStats) {
stats.OverThresholds = a.computeOverThresholdCounter()
stats.Percentiles = a.computePercentiles()

a.count = 0
a.sum = 0
a.failed = 0
for _, threshold := range overThresholds {
a.overThresholdCounters[threshold] = 0
}

return stats
}

Expand Down
13 changes: 9 additions & 4 deletions quesma/telemetry/duration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestDurationMeasurement_Aggregate(t *testing.T) {
}
}

stats := measurement.Aggregate()
stats := measurement.AggregateAndReset()

assert.Equal(t, int64(100), stats.Count)
assert.Equal(t, int64(10), stats.Failed)
Expand Down Expand Up @@ -78,10 +78,12 @@ func TestDurationMeasurement_Percentiles(t *testing.T) {
}
}

stats := measurement.Aggregate()
stats := measurement.AggregateAndReset()

assert.Equal(t, float32(50.0), stats.Percentiles["50"])

stats2 := measurement.AggregateAndReset()
assert.Equal(t, float32(0.0), stats2.Percentiles["50"])
}

func TestDurationMeasurement_Percentiles_no_samples(t *testing.T) {
Expand All @@ -91,7 +93,7 @@ func TestDurationMeasurement_Percentiles_no_samples(t *testing.T) {

measurement := newDurationMeasurement(ctx)

stats := measurement.Aggregate()
stats := measurement.AggregateAndReset()

assert.Equal(t, float32(0.0), stats.Percentiles["50"])
}
Expand All @@ -115,7 +117,10 @@ func TestDurationMeasurement_Percentiles_single_sample(t *testing.T) {
t.Errorf("ingest did not complete in time")
}

stats := measurement.Aggregate()
stats := measurement.AggregateAndReset()

assert.Equal(t, float32(1.0), stats.Percentiles["50"])

stats2 := measurement.AggregateAndReset()
assert.Equal(t, float32(0.0), stats2.Percentiles["50"])
}
6 changes: 3 additions & 3 deletions quesma/telemetry/empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (d emptyTimer) Begin() Span {
return emptySpan{}
}

func (d emptyTimer) Aggregate() DurationStats {
func (d emptyTimer) AggregateAndReset() DurationStats {
return DurationStats{}
}

Expand All @@ -29,11 +29,11 @@ func (d emptyMultiCounter) Add(key string, value int64) {
// do nothing
}

func (d emptyMultiCounter) Aggregate() MultiCounterStats {
func (d emptyMultiCounter) AggregateAndReset() MultiCounterStats {
return MultiCounterStats{}
}

func (d emptyMultiCounter) AggregateTopValues() MultiCounterTopValuesStats {
func (d emptyMultiCounter) AggregateTopValuesAndReset() MultiCounterTopValuesStats {
return MultiCounterTopValuesStats{}
}

Expand Down
9 changes: 5 additions & 4 deletions quesma/telemetry/multi_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ type MultiCounterTopValuesStats []string

type MultiCounter interface {
Add(key string, value int64)
Aggregate() MultiCounterStats
AggregateTopValues() MultiCounterTopValuesStats
AggregateAndReset() MultiCounterStats
AggregateTopValuesAndReset() MultiCounterTopValuesStats
}

type sampleMultiCounter struct {
Expand Down Expand Up @@ -71,17 +71,18 @@ func (mc *multiCounter) Add(key string, value int64) {
mc.ingest <- sampleMultiCounter{key, value}
}

func (mc *multiCounter) Aggregate() (stats MultiCounterStats) {
func (mc *multiCounter) AggregateAndReset() (stats MultiCounterStats) {
mc.m.Lock()
defer mc.m.Unlock()
stats = make(map[string]int64, len(mc.counters))
for k, v := range mc.counters {
stats[k] = v
}
mc.counters = make(map[string]int64)
return stats
}

func (mc *multiCounter) AggregateTopValues() (stats MultiCounterTopValuesStats) {
func (mc *multiCounter) AggregateTopValuesAndReset() (stats MultiCounterTopValuesStats) {
mc.m.Lock()
defer mc.m.Unlock()
stats = make(MultiCounterTopValuesStats, 0, len(mc.counters))
Expand Down
5 changes: 4 additions & 1 deletion quesma/telemetry/multi_counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ func TestMultiCounter_Add(t *testing.T) {
}
}

stats := mc.Aggregate()
stats := mc.AggregateAndReset()

assert.Equal(t, int64(4), stats["key1"])
assert.Equal(t, int64(2), stats["key2"])
assert.Equal(t, 2, len(stats))

stats2 := mc.AggregateAndReset()
assert.Empty(t, stats2)
}
23 changes: 14 additions & 9 deletions quesma/telemetry/phone_home.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,18 +514,23 @@ func (a *agent) collect(ctx context.Context, reportType string) (stats PhoneHome
stats.NumberOfPanics = recovery.PanicCounter.Load()
stats.InstanceID = a.instanceId

stats.ClickHouse = a.CollectClickHouse(ctx)
stats.ClickHouseQueriesDuration = a.ClickHouseQueryDuration().AggregateAndReset()
stats.ClickHouseInsertsDuration = a.ClickHouseInsertDuration().AggregateAndReset()
stats.ElasticReadsDuration = a.ElasticReadRequestsDuration().AggregateAndReset()
stats.ElasticWritesDuration = a.ElasticWriteRequestsDuration().AggregateAndReset()
stats.ElasticBypassedReadsDuration = a.ElasticBypassedReadRequestsDuration().AggregateAndReset()
stats.ElasticBypassedWritesDuration = a.ElasticBypassedWriteRequestsDuration().AggregateAndReset()
stats.UserAgentCounters = a.userAgentCounters.AggregateTopValuesAndReset()

stats.Elasticsearch = a.CollectElastic(ctx)

stats.ClickHouseQueriesDuration = a.ClickHouseQueryDuration().Aggregate()
stats.ClickHouseInsertsDuration = a.ClickHouseInsertDuration().Aggregate()
stats.ElasticReadsDuration = a.ElasticReadRequestsDuration().Aggregate()
stats.ElasticWritesDuration = a.ElasticWriteRequestsDuration().Aggregate()
stats.ElasticBypassedReadsDuration = a.ElasticBypassedReadRequestsDuration().Aggregate()
stats.ElasticBypassedWritesDuration = a.ElasticBypassedWriteRequestsDuration().Aggregate()
stats.UserAgentCounters = a.userAgentCounters.AggregateTopValues()
if stats.ClickHouseInsertsDuration.Count > 0 || stats.ClickHouseQueriesDuration.Count > 0 {
stats.ClickHouse = a.CollectClickHouse(ctx)
} else {
stats.ClickHouse = ClickHouseStats{Status: "paused"}
}

stats.IngestCounters = a.ingestCounters.Aggregate()
stats.IngestCounters = a.ingestCounters.AggregateAndReset()

stats.RuntimeStats = a.runtimeStats()
stats.TopErrors = a.topErrors()
Expand Down

0 comments on commit 393c3ab

Please sign in to comment.