Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive: Include current config hash in forward requests #3138

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/thanos/pkg/extkingpin"

"github.com/thanos-io/thanos/pkg/component"
Expand Down Expand Up @@ -85,6 +86,7 @@ func registerReceive(app *extkingpin.App) {

tsdbMinBlockDuration := extkingpin.ModelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden())
tsdbMaxBlockDuration := extkingpin.ModelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden())

walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool()
noLockFile := cmd.Flag("tsdb.no-lockfile", "Do not create lockfile in TSDB data directory. In any case, the lockfiles will be deleted on next startup.").Default("false").Bool()

Expand Down
26 changes: 20 additions & 6 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
DefaultTenantLabel = "tenant_id"
// DefaultReplicaHeader is the default header used to designate the replica count of a write request.
DefaultReplicaHeader = "THANOS-REPLICA"

// Labels for metrics.
labelSuccess = "success"
labelError = "error"
Expand Down Expand Up @@ -94,9 +95,10 @@ type Handler struct {
expBackoff backoff.Backoff
peerStates map[string]*retryState

forwardRequests *prometheus.CounterVec
replications *prometheus.CounterVec
replicationFactor prometheus.Gauge
forwardRequests *prometheus.CounterVec
forwardRequestConfigurationMismatch prometheus.Counter
replications *prometheus.CounterVec
replicationFactor prometheus.Gauge
}

func NewHandler(logger log.Logger, o *Options) *Handler {
Expand All @@ -122,6 +124,12 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
Help: "The number of forward requests.",
}, []string{"result"},
),
forwardRequestConfigurationMismatch: promauto.With(o.Registry).NewCounter(
prometheus.CounterOpts{
Name: "thanos_receive_forward_request_config_mismatches_total",
Help: "The number of forward requests that have mismatching configurations.",
},
),
replications: promauto.With(o.Registry).NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_receive_replications_total",
Expand Down Expand Up @@ -250,12 +258,17 @@ type replica struct {
replicated bool
}

func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, wreq *prompb.WriteRequest) error {
func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, config string, wreq *prompb.WriteRequest) error {
// The replica value in the header is one-indexed, thus we need >.
if rep > h.options.ReplicationFactor {
return errBadReplica
}

if h.hashring.ConfigHash() != config {
h.forwardRequestConfigurationMismatch.Inc()
level.Warn(h.logger).Log("msg", "hasring configuration mismatch", "current", h.hashring.ConfigHash(), "received", config)
}

r := replica{
n: rep,
replicated: rep != 0,
Expand Down Expand Up @@ -315,7 +328,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
tenant = h.options.DefaultTenantID
}

err = h.handleRequest(ctx, rep, tenant, &wreq)
err = h.handleRequest(ctx, rep, tenant, h.hashring.ConfigHash(), &wreq)
switch err {
case nil:
return
Expand Down Expand Up @@ -513,6 +526,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
Tenant: tenant,
// Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated.
Replica: int64(replicas[endpoint].n + 1),
Config: h.hashring.ConfigHash(),
})
})
if err != nil {
Expand Down Expand Up @@ -634,7 +648,7 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
span, ctx := tracing.StartSpan(ctx, "receive_grpc")
defer span.Finish()

err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, &prompb.WriteRequest{Timeseries: r.Timeseries})
err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, r.Config, &prompb.WriteRequest{Timeseries: r.Timeseries})
switch err {
case nil:
return &storepb.WriteResponse{}, nil
Expand Down
27 changes: 27 additions & 0 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ package receive

import (
"context"
"crypto/sha256"
"fmt"
"sort"
"sync"

"github.com/cespare/xxhash"
"github.com/pkg/errors"

"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

Expand All @@ -36,6 +38,8 @@ type Hashring interface {
Get(tenant string, timeSeries *prompb.TimeSeries) (string, error)
// GetN returns the nth node that should handle the given tenant and time series.
GetN(tenant string, timeSeries *prompb.TimeSeries, n uint64) (string, error)
// ConfigHash string returns the hash of the loaded configuration.
ConfigHash() string
}

// hash returns a hash for the given tenant and time series.
Expand Down Expand Up @@ -71,6 +75,11 @@ func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (stri
return string(s), nil
}

// ConfigHash implements the Hashring interface.
func (s SingleNodeHashring) ConfigHash() string {
return string(s)
}

// simpleHashring represents a group of nodes handling write requests.
type simpleHashring []string

Expand All @@ -87,6 +96,15 @@ func (s simpleHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
return s[(hash(tenant, ts)+n)%uint64(len(s))], nil
}

// ConfigHash string returns the hash of the loaded configuration.
func (s simpleHashring) ConfigHash() string {
h := sha256.New()
for _, v := range s {
_, _ = h.Write([]byte(v))
}
return string(h.Sum(nil))
}

// multiHashring represents a set of hashrings.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
Expand Down Expand Up @@ -135,6 +153,15 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
return "", errors.New("no matching hashring to handle tenant")
}

// ConfigHash string returns the hash of the loaded configuration.
func (m *multiHashring) ConfigHash() string {
h := sha256.New()
for _, v := range m.hashrings {
_, _ = h.Write([]byte(v.ConfigHash()))
}
return string(h.Sum(nil))
}

// newMultiHashring creates a multi-tenant hashring for a given slice of
// groups.
// Which hashring to use for a tenant is determined
Expand Down
Loading