Skip to content

Commit

Permalink
Plumb overrides into remote write config generation and test
Browse files Browse the repository at this point in the history
  • Loading branch information
zalegrala committed Jun 18, 2024
1 parent 7b9898f commit 2f53a6d
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 53 deletions.
4 changes: 4 additions & 0 deletions modules/generator/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,3 +397,7 @@ func (n noopAppender) Rollback() error { return nil }
func (n noopAppender) UpdateMetadata(prometheus_storage.SeriesRef, labels.Labels, metadata.Metadata) (prometheus_storage.SeriesRef, error) {
return 0, nil
}

func (n noopAppender) AppendCTZeroSample(ref prometheus_storage.SeriesRef, l labels.Labels, t, ct int64) (prometheus_storage.SeriesRef, error) {
return 0, nil
}
1 change: 1 addition & 0 deletions modules/generator/overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type metricsGeneratorOverrides interface {
registry.Overrides
storage.Overrides

MetricsGeneratorGenerateNativeHistograms(userID string) bool
MetricsGeneratorIngestionSlack(userID string) time.Duration
MetricsGeneratorProcessors(userID string) map[string]struct{}
MetricsGeneratorProcessorServiceGraphsHistogramBuckets(userID string) []float64
Expand Down
9 changes: 9 additions & 0 deletions modules/generator/registry/appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
prometheus_storage "github.com/prometheus/prometheus/storage"
)

type noopAppender struct{}
Expand Down Expand Up @@ -41,6 +42,10 @@ func (n noopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.
return 0, nil
}

func (n noopAppender) AppendCTZeroSample(ref prometheus_storage.SeriesRef, l labels.Labels, t, ct int64) (prometheus_storage.SeriesRef, error) {
return 0, nil
}

type capturingAppender struct {
samples []sample
exemplars []exemplarSample
Expand Down Expand Up @@ -118,3 +123,7 @@ func (c *capturingAppender) Rollback() error {
func (c *capturingAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) {
return 0, nil
}

func (c *capturingAppender) AppendCTZeroSample(ref prometheus_storage.SeriesRef, l labels.Labels, t, ct int64) (prometheus_storage.SeriesRef, error) {
return 0, nil
}
39 changes: 17 additions & 22 deletions modules/generator/registry/native_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ type nativeHistogram struct {
// Downside: you need to list labels at creation time while our interfaces only pass labels at observe time, this
// will requires a bigger refactor, maybe something for a second pass?
// Might break processors that have variable amount of labels...
//promHistogram prometheus.HistogramVec
// promHistogram prometheus.HistogramVec

seriesMtx sync.Mutex
series map[uint64]*nativeHistogramSeries

onAddSerie func(count uint32) bool
onRemoveSerie func(count uint32)

buckets []float64

traceIDLabelName string
}

Expand All @@ -45,7 +47,7 @@ var (
_ metric = (*nativeHistogram)(nil)
)

func newNativeHistogram(name string, onAddSeries func(uint32) bool, onRemoveSeries func(count uint32), traceIDLabelName string) *nativeHistogram {
func newNativeHistogram(name string, buckets []float64, onAddSeries func(uint32) bool, onRemoveSeries func(count uint32), traceIDLabelName string) *nativeHistogram {
if onAddSeries == nil {
onAddSeries = func(uint32) bool {
return true
Expand All @@ -65,6 +67,7 @@ func newNativeHistogram(name string, onAddSeries func(uint32) bool, onRemoveSeri
onAddSerie: onAddSeries,
onRemoveSerie: onRemoveSeries,
traceIDLabelName: traceIDLabelName,
buckets: buckets,
}
}

Expand All @@ -85,11 +88,6 @@ func (h *nativeHistogram) ObserveWithExemplar(labelValueCombo *LabelValueCombo,
}

newSeries := h.newSeries(labelValueCombo, value, traceID, multiplier)
s, ok = h.series[hash]
if ok {
h.updateSeries(s, value, traceID, multiplier)
return
}
h.series[hash] = newSeries
}

Expand All @@ -98,11 +96,9 @@ func (h *nativeHistogram) newSeries(labelValueCombo *LabelValueCombo, value floa
// TODO move these labels in HistogramOpts.ConstLabels?
labels: labelValueCombo.getLabelPair(),
promHistogram: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: h.name(),
// TODO support help text
Help: "Native histogram for metric " + h.name(),
// TODO we can set these to also emit a classic histogram
Buckets: nil,
Name: h.name(),
Help: "Native histogram for metric " + h.name(),
Buckets: h.buckets,
// TODO check if these values are sensible and break them out
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
Expand Down Expand Up @@ -138,6 +134,7 @@ func (h *nativeHistogram) collectMetrics(appender storage.Appender, timeMs int64
}
lbls := make(labels.Labels, 1+len(externalLabels)+labelsCount)
lb := labels.NewBuilder(lbls)
activeSeries = len(h.series)

lb.Set(labels.MetricName, h.metricName)

Expand All @@ -164,37 +161,35 @@ func (h *nativeHistogram) collectMetrics(appender storage.Appender, timeMs int64
encodedHistogram := encodedMetric.GetHistogram()

// Decode to Prometheus representation
h := promhistogram.Histogram{
hist := promhistogram.Histogram{
Schema: encodedHistogram.GetSchema(),
Count: encodedHistogram.GetSampleCount(),
Sum: encodedHistogram.GetSampleSum(),
ZeroThreshold: encodedHistogram.GetZeroThreshold(),
ZeroCount: encodedHistogram.GetZeroCount(),
}
if len(encodedHistogram.PositiveSpan) > 0 {
h.PositiveSpans = make([]promhistogram.Span, len(encodedHistogram.PositiveSpan))
hist.PositiveSpans = make([]promhistogram.Span, len(encodedHistogram.PositiveSpan))
for i, span := range encodedHistogram.PositiveSpan {
h.PositiveSpans[i] = promhistogram.Span{
hist.PositiveSpans[i] = promhistogram.Span{
Offset: span.GetOffset(),
Length: span.GetLength(),
}
}
}
h.PositiveBuckets = encodedHistogram.PositiveDelta
hist.PositiveBuckets = encodedHistogram.PositiveDelta
if len(encodedHistogram.NegativeSpan) > 0 {
h.NegativeSpans = make([]promhistogram.Span, len(encodedHistogram.NegativeSpan))
hist.NegativeSpans = make([]promhistogram.Span, len(encodedHistogram.NegativeSpan))
for i, span := range encodedHistogram.NegativeSpan {
h.NegativeSpans[i] = promhistogram.Span{
hist.NegativeSpans[i] = promhistogram.Span{
Offset: span.GetOffset(),
Length: span.GetLength(),
}
}
}
h.NegativeBuckets = encodedHistogram.NegativeDelta

// TODO update activeSeries
hist.NegativeBuckets = encodedHistogram.NegativeDelta

_, err = appender.AppendHistogram(0, lb.Labels(), timeMs, &h, nil)
_, err = appender.AppendHistogram(0, lb.Labels(), timeMs, &hist, nil)
if err != nil {
return activeSeries, err
}
Expand Down
28 changes: 21 additions & 7 deletions modules/generator/registry/native_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,14 @@ import (
"github.com/stretchr/testify/assert"
)

// TODO 😶

func Test_native_histogram(t *testing.T) {
var seriesAdded int
onAdd := func(count uint32) bool {
seriesAdded++
seriesAdded += int(count)
return true
}

h := newNativeHistogram("my_histogram", onAdd, nil, "trace_id")
h := newNativeHistogram("my_histogram", nil, onAdd, nil, "trace_id")

h.ObserveWithExemplar(newLabelValueCombo([]string{"label"}, []string{"value-1"}), 1.0, "trace-1", 1.0)
h.ObserveWithExemplar(newLabelValueCombo([]string{"label"}, []string{"value-2"}), 1.5, "trace-2", 1.0)
Expand All @@ -26,7 +24,7 @@ func Test_native_histogram(t *testing.T) {
collectionTimeMs := time.Now().UnixMilli()
expectedSamples := []sample{}
expectedExemplars := []exemplarSample{}
collectMetricAndAssert(t, h, collectionTimeMs, nil, 10, expectedSamples, expectedExemplars)
collectMetricAndAssert(t, h, collectionTimeMs, nil, 2, expectedSamples, expectedExemplars)

h.ObserveWithExemplar(newLabelValueCombo([]string{"label"}, []string{"value-2"}), 2.5, "trace-2.2", 1.0)
h.ObserveWithExemplar(newLabelValueCombo([]string{"label"}, []string{"value-3"}), 3.0, "trace-3", 1.0)
Expand All @@ -36,7 +34,7 @@ func Test_native_histogram(t *testing.T) {
collectionTimeMs = time.Now().UnixMilli()
expectedSamples = []sample{}
expectedExemplars = []exemplarSample{}
collectMetricAndAssert(t, h, collectionTimeMs, nil, 15, expectedSamples, expectedExemplars)
collectMetricAndAssert(t, h, collectionTimeMs, nil, 3, expectedSamples, expectedExemplars)

h.ObserveWithExemplar(newLabelValueCombo([]string{"label"}, []string{"value-2"}), 2.5, "trace-2.2", 20.0)
h.ObserveWithExemplar(newLabelValueCombo([]string{"label"}, []string{"value-3"}), 3.0, "trace-3", 13.5)
Expand All @@ -47,5 +45,21 @@ func Test_native_histogram(t *testing.T) {
collectionTimeMs = time.Now().UnixMilli()
expectedSamples = []sample{}
expectedExemplars = []exemplarSample{}
collectMetricAndAssert(t, h, collectionTimeMs, nil, 15, expectedSamples, expectedExemplars)
collectMetricAndAssert(t, h, collectionTimeMs, nil, 3, expectedSamples, expectedExemplars)
}

// Duplicate labels should not grow the series count.
func Test_ObserveWithExemplar_duplicate(t *testing.T) {
var seriesAdded int
onAdd := func(count uint32) bool {
seriesAdded += int(count)
return true
}
h := newNativeHistogram("my_histogram", []float64{0.1, 0.2}, onAdd, nil, "trace_id")

lv := newLabelValueCombo([]string{"label"}, []string{"value-1"})

h.ObserveWithExemplar(lv, 1.0, "trace-1", 1.0)
h.ObserveWithExemplar(lv, 1.1, "trace-1", 1.0)
assert.Equal(t, 1, seriesAdded)
}
2 changes: 1 addition & 1 deletion modules/generator/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (r *ManagedRegistry) NewHistogram(name string, buckets []float64) (h Histog
h = newHistogram(name, buckets, r.onAddMetricSeries, r.onRemoveMetricSeries, traceIDLabelName)
} else {
level.Warn(r.logger).Log("msg", "creating native histogram!", "metric", name)
h = newNativeHistogram(name, r.onAddMetricSeries, r.onRemoveMetricSeries, traceIDLabelName)
h = newNativeHistogram(name, buckets, r.onAddMetricSeries, r.onRemoveMetricSeries, traceIDLabelName)
}

r.registerMetric(h)
Expand Down
6 changes: 5 additions & 1 deletion modules/generator/storage/config_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// generateTenantRemoteWriteConfigs creates a copy of the remote write configurations with the
// X-Scope-OrgID header present for the given tenant, unless Tempo is run in single tenant mode or instructed not to add X-Scope-OrgID header.
func generateTenantRemoteWriteConfigs(originalCfgs []prometheus_config.RemoteWriteConfig, tenant string, headers map[string]string, addOrgIDHeader bool, logger log.Logger) []*prometheus_config.RemoteWriteConfig {
func generateTenantRemoteWriteConfigs(originalCfgs []prometheus_config.RemoteWriteConfig, tenant string, headers map[string]string, addOrgIDHeader bool, logger log.Logger, sendNativeHistograms bool) []*prometheus_config.RemoteWriteConfig {
var cloneCfgs []*prometheus_config.RemoteWriteConfig

for _, originalCfg := range originalCfgs {
Expand Down Expand Up @@ -42,6 +42,10 @@ func generateTenantRemoteWriteConfigs(originalCfgs []prometheus_config.RemoteWri
cloneCfg.Headers[k] = v
}

cloneCfg.SendNativeHistograms = sendNativeHistograms
// TODO: enable exemplars
// cloneCfg.SendExemplars = sendExemplars

cloneCfgs = append(cloneCfgs, cloneCfg)
}

Expand Down
23 changes: 20 additions & 3 deletions modules/generator/storage/config_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func Test_generateTenantRemoteWriteConfigs(t *testing.T) {

addOrgIDHeader := true

result := generateTenantRemoteWriteConfigs(original, "my-tenant", nil, addOrgIDHeader, logger)
result := generateTenantRemoteWriteConfigs(original, "my-tenant", nil, addOrgIDHeader, logger, false)

assert.Equal(t, original[0].URL, result[0].URL)
assert.Equal(t, map[string]string{}, original[0].Headers, "Original headers have been modified")
Expand Down Expand Up @@ -61,7 +61,7 @@ func Test_generateTenantRemoteWriteConfigs_singleTenant(t *testing.T) {

addOrgIDHeader := true

result := generateTenantRemoteWriteConfigs(original, util.FakeTenantID, nil, addOrgIDHeader, logger)
result := generateTenantRemoteWriteConfigs(original, util.FakeTenantID, nil, addOrgIDHeader, logger, false)

assert.Equal(t, original[0].URL, result[0].URL)

Expand Down Expand Up @@ -95,7 +95,7 @@ func Test_generateTenantRemoteWriteConfigs_addOrgIDHeader(t *testing.T) {

addOrgIDHeader := false

result := generateTenantRemoteWriteConfigs(original, "my-tenant", nil, addOrgIDHeader, logger)
result := generateTenantRemoteWriteConfigs(original, "my-tenant", nil, addOrgIDHeader, logger, false)

assert.Equal(t, original[0].URL, result[0].URL)
assert.Empty(t, original[0].Headers, "X-Scope-OrgID header is not added")
Expand All @@ -104,6 +104,23 @@ func Test_generateTenantRemoteWriteConfigs_addOrgIDHeader(t *testing.T) {
assert.Equal(t, map[string]string{"foo": "bar", "x-scope-orgid": "fake-tenant"}, result[1].Headers, "Original headers not modified")
}

func Test_generateTenantRemoteWriteConfigs_sendNativeHistograms(t *testing.T) {
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))

original := []prometheus_config.RemoteWriteConfig{
{
URL: &prometheus_common_config.URL{URL: urlMustParse("http://prometheus-1/api/prom/push")},
Headers: map[string]string{},
},
}

result := generateTenantRemoteWriteConfigs(original, "my-tenant", nil, false, logger, true)
assert.Equal(t, true, result[0].SendNativeHistograms, "SendNativeHistograms should be true")

result = generateTenantRemoteWriteConfigs(original, "my-tenant", nil, false, logger, false)
assert.Equal(t, false, result[0].SendNativeHistograms, "SendNativeHistograms should be true")
}

func Test_copyMap(t *testing.T) {
original := map[string]string{
"k1": "v1",
Expand Down
43 changes: 26 additions & 17 deletions modules/generator/storage/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
)

var metricStorageHeadersUpdateFailed = promauto.NewCounterVec(prometheus.CounterOpts{
var metricStorageRemoteWriteUpdateFailed = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "metrics_generator_storage_headers_update_failed_total",
Help: "The total number of times updating the remote write headers failed",
Name: "metrics_generator_storage_remote_write_update_failed_total",
Help: "The total number of times updating the remote write configueration failed",
}, []string{"tenant"})

type Storage interface {
Expand All @@ -40,10 +40,14 @@ type storageImpl struct {
remote *remote.Storage
storage storage.Storage

tenantID string
currentHeaders map[string]string
overrides Overrides
closeCh chan struct{}
tenantID string

// Cached from the overrides
currentHeaders map[string]string
sendNativeHistograms bool

overrides Overrides
closeCh chan struct{}

logger log.Logger
}
Expand Down Expand Up @@ -81,8 +85,10 @@ func New(cfg *Config, o Overrides, tenant string, reg prometheus.Registerer, log
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTimeCallback, walDir, cfg.RemoteWriteFlushDeadline, &noopScrapeManager{})

headers := o.MetricsGeneratorRemoteWriteHeaders(tenant)
sendNativeHistograms := o.MetricsGeneratorGenerateNativeHistograms(tenant)

remoteStorageConfig := &prometheus_config.Config{
RemoteWriteConfigs: generateTenantRemoteWriteConfigs(cfg.RemoteWrite, tenant, headers, cfg.RemoteWriteAddOrgIDHeader, logger),
RemoteWriteConfigs: generateTenantRemoteWriteConfigs(cfg.RemoteWrite, tenant, headers, cfg.RemoteWriteAddOrgIDHeader, logger, sendNativeHistograms),
}

err = remoteStorage.ApplyConfig(remoteStorageConfig)
Expand All @@ -102,10 +108,11 @@ func New(cfg *Config, o Overrides, tenant string, reg prometheus.Registerer, log
remote: remoteStorage,
storage: storage.NewFanout(logger, wal, remoteStorage),

tenantID: tenant,
currentHeaders: headers,
overrides: o,
closeCh: make(chan struct{}),
tenantID: tenant,
currentHeaders: headers,
sendNativeHistograms: sendNativeHistograms,
overrides: o,
closeCh: make(chan struct{}),

logger: logger,
}
Expand Down Expand Up @@ -141,15 +148,17 @@ func (s *storageImpl) watchOverrides() {
select {
case <-t.C:
newHeaders := s.overrides.MetricsGeneratorRemoteWriteHeaders(s.tenantID)
if !headersEqual(s.currentHeaders, newHeaders) {
level.Info(s.logger).Log("msg", "updating remote write headers")
newSendNativeHistograms := s.overrides.MetricsGeneratorGenerateNativeHistograms(s.tenantID)
if !headersEqual(s.currentHeaders, newHeaders) || s.sendNativeHistograms != newSendNativeHistograms {
level.Info(s.logger).Log("msg", "updating remote write configuration")
s.currentHeaders = newHeaders
s.sendNativeHistograms = newSendNativeHistograms
err := s.remote.ApplyConfig(&prometheus_config.Config{
RemoteWriteConfigs: generateTenantRemoteWriteConfigs(s.cfg.RemoteWrite, s.tenantID, newHeaders, s.cfg.RemoteWriteAddOrgIDHeader, s.logger),
RemoteWriteConfigs: generateTenantRemoteWriteConfigs(s.cfg.RemoteWrite, s.tenantID, newHeaders, s.cfg.RemoteWriteAddOrgIDHeader, s.logger, newSendNativeHistograms),
})
if err != nil {
metricStorageHeadersUpdateFailed.WithLabelValues(s.tenantID).Inc()
level.Error(s.logger).Log("msg", "Failed to update remote write headers. Remote write will continue with old headers", "err", err)
metricStorageRemoteWriteUpdateFailed.WithLabelValues(s.tenantID).Inc()
level.Error(s.logger).Log("msg", "Failed to update remote write configuration. Remote write will continue with configuration", "err", err)
}
}
case <-s.closeCh:
Expand Down
Loading

0 comments on commit 2f53a6d

Please sign in to comment.