diff --git a/CHANGELOG.md b/CHANGELOG.md index bd2f15ae02..838bd05916 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ## Unreleased ### Fixed +- [#5791](https://github.com/thanos-io/thanos/pull/5791) Receive: Fix quorum validation for `ketama` hashing. ### Added diff --git a/examples/interactive/interactive_test.go b/examples/interactive/interactive_test.go index 7a40ea97ce..adf14a4a30 100644 --- a/examples/interactive/interactive_test.go +++ b/examples/interactive/interactive_test.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/objstore/client" "github.com/thanos-io/objstore/providers/s3" + "github.com/thanos-io/thanos/pkg/receive" "github.com/thanos-io/thanos/pkg/testutil" tracingclient "github.com/thanos-io/thanos/pkg/tracing/client" "github.com/thanos-io/thanos/pkg/tracing/jaeger" @@ -248,8 +249,6 @@ func TestReadOnlyThanosSetup(t *testing.T) { sidecarHA1 := e2edb.NewThanosSidecar(e, "sidecar-prom-ha1", promHA1, e2edb.WithImage("thanos:latest"), e2edb.WithFlagOverride(map[string]string{"--tracing.config": string(jaegerConfig)})) sidecar2 := e2edb.NewThanosSidecar(e, "sidecar2", prom2, e2edb.WithImage("thanos:latest")) - receive1 := e2ethanos.NewReceiveBuilder(e, "receiver-1").WithIngestionEnabled().Init() - testutil.Ok(t, exec("cp", "-r", prom1Data+"/.", promHA0.Dir())) testutil.Ok(t, exec("sh", "-c", "find "+prom1Data+"/ -maxdepth 1 -type d | tail -5 | xargs -I {} cp -r {} "+promHA1.Dir())) // Copy only 5 blocks from 9 to mimic replica 1 with partial data set. testutil.Ok(t, exec("cp", "-r", prom2Data+"/.", prom2.Dir())) @@ -280,7 +279,7 @@ func TestReadOnlyThanosSetup(t *testing.T) { })) testutil.Ok(t, e2e.StartAndWaitReady(m1)) - testutil.Ok(t, e2e.StartAndWaitReady(promHA0, promHA1, prom2, sidecarHA0, sidecarHA1, sidecar2, store1, store2, receive1)) + testutil.Ok(t, e2e.StartAndWaitReady(promHA0, promHA1, prom2, sidecarHA0, sidecarHA1, sidecar2, store1, store2)) // Let's start query on top of all those 6 store APIs (global query engine). // @@ -333,7 +332,6 @@ func TestReadOnlyThanosSetup(t *testing.T) { sidecarHA0.InternalEndpoint("grpc"), sidecarHA1.InternalEndpoint("grpc"), sidecar2.InternalEndpoint("grpc"), - receive1.InternalEndpoint("grpc"), }, e2edb.WithImage("thanos:latest"), e2edb.WithFlagOverride(map[string]string{"--tracing.config": string(jaegerConfig)}), @@ -353,3 +351,75 @@ func TestReadOnlyThanosSetup(t *testing.T) { testutil.Ok(t, m.OpenUserInterfaceInBrowser()) testutil.Ok(t, e2einteractive.RunUntilEndpointHit()) } + +// TestReadWriteThanosSetup sets up a Thanos deployment with 6 receiver replicas and replication factor 3 for writing. +// On top of that, the written metrics are queryable via querier. +func TestReadWriteThanosSetup(t *testing.T) { + t.Skip("This is interactive test - it will run until you will kill it or curl 'finish' endpoint. Comment and run as normal test to use it!") + + e, err := e2e.NewDockerEnvironment("interactive") + testutil.Ok(t, err) + t.Cleanup(e.Close) + + // Setup Jaeger. + j := e.Runnable("tracing").WithPorts(map[string]int{"http-front": 16686, "jaeger.thrift": 14268}).Init(e2e.StartOptions{Image: "jaegertracing/all-in-one:1.25"}) + testutil.Ok(t, e2e.StartAndWaitReady(j)) + + jaegerConfig, err := yaml.Marshal(tracingclient.TracingConfig{ + Type: tracingclient.Jaeger, + Config: jaeger.Config{ + ServiceName: "thanos", + SamplerType: "const", + SamplerParam: 1, + Endpoint: "http://" + j.InternalEndpoint("jaeger.thrift") + "/api/traces", + }, + }) + testutil.Ok(t, err) + + receive1 := e2ethanos.NewReceiveBuilder(e, "receiver-1").WithHashingAlgorithm(receive.AlgorithmKetama).WithIngestionEnabled().WithTracingConfig(string(jaegerConfig)) + receive2 := e2ethanos.NewReceiveBuilder(e, "receiver-2").WithHashingAlgorithm(receive.AlgorithmKetama).WithIngestionEnabled().WithTracingConfig(string(jaegerConfig)) + receive3 := e2ethanos.NewReceiveBuilder(e, "receiver-3").WithHashingAlgorithm(receive.AlgorithmKetama).WithIngestionEnabled().WithTracingConfig(string(jaegerConfig)) + receive4 := e2ethanos.NewReceiveBuilder(e, "receiver-4").WithHashingAlgorithm(receive.AlgorithmKetama).WithIngestionEnabled().WithTracingConfig(string(jaegerConfig)) + receive5 := e2ethanos.NewReceiveBuilder(e, "receiver-5").WithHashingAlgorithm(receive.AlgorithmKetama).WithIngestionEnabled().WithTracingConfig(string(jaegerConfig)) + receive6 := e2ethanos.NewReceiveBuilder(e, "receiver-6").WithHashingAlgorithm(receive.AlgorithmKetama).WithIngestionEnabled().WithTracingConfig(string(jaegerConfig)) + + h := receive.HashringConfig{ + Endpoints: []string{ + receive1.InternalEndpoint("grpc"), + receive2.InternalEndpoint("grpc"), + receive3.InternalEndpoint("grpc"), + receive4.InternalEndpoint("grpc"), + receive5.InternalEndpoint("grpc"), + receive6.InternalEndpoint("grpc"), + }, + } + + testutil.Ok(t, e2e.StartAndWaitReady( + receive1.WithRouting(3, h).Init(), receive2.WithRouting(3, h).Init(), receive3.WithRouting(3, h).Init(), + receive4.WithRouting(3, h).Init(), receive5.WithRouting(3, h).Init(), receive6.WithRouting(3, h).Init(), + )) + + query1 := e2edb.NewThanosQuerier( + e, + "query1", + []string{ + receive1.InternalEndpoint("grpc"), + receive2.InternalEndpoint("grpc"), + receive3.InternalEndpoint("grpc"), + receive4.InternalEndpoint("grpc"), + receive5.InternalEndpoint("grpc"), + receive6.InternalEndpoint("grpc"), + }, + e2edb.WithImage("thanos:latest"), + e2edb.WithFlagOverride(map[string]string{"--tracing.config": string(jaegerConfig)}), + ) + testutil.Ok(t, e2e.StartAndWaitReady(query1)) + + // Wait until we have 6 gRPC connections. + testutil.Ok(t, query1.WaitSumMetricsWithOptions(e2emon.Equals(6), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) + testutil.Ok(t, e2einteractive.OpenInBrowser(fmt.Sprintf("http://%s", query1.Endpoint("http")))) + + // Tracing endpoint. + testutil.Ok(t, e2einteractive.OpenInBrowser("http://"+j.Endpoint("http-front"))) + testutil.Ok(t, e2einteractive.RunUntilEndpointHit()) +} diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 12afb752b8..79b192c6a2 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -340,11 +340,12 @@ func (h *Handler) Run() error { return httpSrv.Serve(h.listener) } -// replica encapsulates the replica number of a request and if the request is -// already replicated. +// replica encapsulates the replication index of a request and if the request is +// already replicated. Replication index represents the number of logical replica to which +// the request belongs. type replica struct { - n uint64 - replicated bool + replicationIndex uint64 + replicated bool } // endpointReplica is a pair of a receive endpoint and a write request replica. @@ -372,11 +373,11 @@ func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, return errBadReplica } - r := replica{n: rep, replicated: rep != 0} + r := replica{replicationIndex: rep, replicated: rep != 0} // On the wire, format is 1-indexed and in-code is 0-indexed, so we decrement the value if it was already replicated. if r.replicated { - r.n-- + r.replicationIndex-- } // Forward any time series as necessary. All time series @@ -513,7 +514,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { responseStatusCode := http.StatusOK if err = h.handleRequest(ctx, rep, tenant, &wreq); err != nil { level.Debug(tLogger).Log("msg", "failed to handle request", "err", err) - switch determineWriteErrorCause(err, 1) { + switch determineWriteErrorCause(err, 1, false) { case errNotReady: responseStatusCode = http.StatusServiceUnavailable case errUnavailable: @@ -561,7 +562,7 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p // one request per time series. wreqs := make(map[endpointReplica]*prompb.WriteRequest) for i := range wreq.Timeseries { - endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[i], r.n) + endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[i], r.replicationIndex) if err != nil { h.mtx.RUnlock() return err @@ -586,7 +587,20 @@ func (h *Handler) writeQuorum() int { // fanoutForward fans out concurrently given set of write requests. It returns status immediately when quorum of // requests succeeds or fails or if context is canceled. func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]*prompb.WriteRequest, successThreshold int) error { - var errs errutil.MultiError + var ( + errs errutil.MultiError + + // Create channel to send errors together with endpoint and replica on which it occurred. + ec = make(chan errWithReplicaFunc) + + // requestsPerReplicationIndex counts the number of requests that will be made on behalf + // of each logical replica. This is used to determine write success. + requestsPerReplicationIndex = make([]int, h.options.ReplicationFactor) + ) + + for er := range wreqs { + requestsPerReplicationIndex[er.replica.replicationIndex]++ + } fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout) defer func() { @@ -607,8 +621,6 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e tLogger = log.With(h.logger, logTags) } - ec := make(chan error) - var wg sync.WaitGroup for er := range wreqs { er := er @@ -629,12 +641,12 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e }) if err != nil { h.replications.WithLabelValues(labelError).Inc() - ec <- errors.Wrapf(err, "replicate write request for endpoint %v", endpoint) + sendErrWithReplica(ec, r.replicationIndex, endpoint, errors.Wrapf(err, "replicate write request for endpoint %v", endpoint)) return } h.replications.WithLabelValues(labelSuccess).Inc() - ec <- nil + sendErrWithReplica(ec, r.replicationIndex, endpoint, nil) }(endpoint) continue @@ -658,10 +670,10 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // When a MultiError is added to another MultiError, the error slices are concatenated, not nested. // To avoid breaking the counting logic, we need to flatten the error. level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error()) - ec <- errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint) + sendErrWithReplica(ec, r.replicationIndex, endpoint, errors.Wrapf(determineWriteErrorCause(err, 1, false), "store locally for endpoint %v", endpoint)) return } - ec <- nil + sendErrWithReplica(ec, r.replicationIndex, endpoint, nil) }(endpoint) continue @@ -686,7 +698,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e cl, err = h.peers.get(fctx, endpoint) if err != nil { - ec <- errors.Wrapf(err, "get peer connection for endpoint %v", endpoint) + sendErrWithReplica(ec, r.replicationIndex, endpoint, errors.Wrapf(err, "get peer connection for endpoint %v", endpoint)) return } @@ -695,7 +707,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e if ok { if time.Now().Before(b.nextAllowed) { h.mtx.RUnlock() - ec <- errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint) + sendErrWithReplica(ec, r.replicationIndex, endpoint, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint)) return } } @@ -708,7 +720,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e Timeseries: wreqs[er].Timeseries, Tenant: tenant, // Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated. - Replica: int64(r.n + 1), + Replica: int64(r.replicationIndex + 1), }) }) if err != nil { @@ -727,14 +739,14 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e h.mtx.Unlock() } } - ec <- errors.Wrapf(err, "forwarding request to endpoint %v", endpoint) + sendErrWithReplica(ec, r.replicationIndex, endpoint, errors.Wrapf(err, "forwarding request to endpoint %v", endpoint)) return } h.mtx.Lock() delete(h.peerStates, endpoint) h.mtx.Unlock() - ec <- nil + sendErrWithReplica(ec, r.replicationIndex, endpoint, nil) }(endpoint) } @@ -747,7 +759,8 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // This is needed if context is canceled or if we reached success of fail quorum faster. defer func() { go func() { - for err := range ec { + for errReplicaFn := range ec { + _, _, err := errReplicaFn() if err != nil { level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", err) } @@ -755,26 +768,60 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e }() }() - var success int + var ( + // replicaRequestsSuccess tracks how many successful requests have been sent for each logical replica. + // Each index in the slice represents given replication index. + replicaRequestsSuccess = make([]int, h.options.ReplicationFactor) + + // replicaRequestsError records errors that resulted from replication requests. Each error is recorded + // into a multi error that corresponds to given logical replica. + // Multi error at each index in the slice represents given replication index. + replicaRequestsErrs = make([]errutil.MultiError, h.options.ReplicationFactor) + + // endpointFailures keeps list of endpoints to which at least one request failed. + endpointFailures []string + maxEndpointFailures = calculateMaxEndpointFailures(int(h.options.ReplicationFactor), successThreshold) + ) + for { select { case <-fctx.Done(): return fctx.Err() - case err, more := <-ec: + case errReplicaFn, more := <-ec: if !more { - return errs.Err() + for i, rme := range replicaRequestsErrs { + // Only if we can determine same error for all requests within replica group, + // we return the cause, otherwise inform that replication failed. + errs.Add(determineWriteErrorCause(rme.Err(), requestsPerReplicationIndex[i], requestsPerReplicationIndex[i] > 1)) + } + finalErr := errs.Err() + // Second success mode - it is permissible to fail all requests to specific endpoints, + // up to maxEndpointFailures, in which case we still know quorum is reached. + // This is useful if e.g. one or few nodes are unresponsive. + if len(endpointFailures) <= maxEndpointFailures { + level.Debug(tLogger).Log("msg", "some requests failed, but not needed to achieve quorum", "err", finalErr) + return nil + } + return finalErr } + replica, endpoint, err := errReplicaFn() if err == nil { - success++ - if success >= successThreshold { - // In case the success threshold is lower than the total - // number of requests, then we can finish early here. This - // is the case for quorum writes for example. - return nil + var replicationsSuccessful int + replicaRequestsSuccess[replica]++ + for i := range replicaRequestsSuccess { + // First success mode - if enough logical replica requests succeed, + // we can finish early (quorum is guaranteed). + if replicaRequestsSuccess[i] == requestsPerReplicationIndex[i] { + replicationsSuccessful++ + } + if replicationsSuccessful == successThreshold { + return nil + } } continue } - errs.Add(err) + replicaRequestsErrs[replica].Add(err) + endpointFailures = countEndpointFailures(endpointFailures, endpoint) } } } @@ -803,7 +850,7 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri er := endpointReplica{ endpoint: endpoint, - replica: replica{n: i, replicated: true}, + replica: replica{replicationIndex: i, replicated: true}, } replicatedRequest, ok := replicatedRequests[er] if !ok { @@ -820,7 +867,7 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri quorum := h.writeQuorum() // fanoutForward only returns an error if successThreshold (quorum) is not reached. if err := h.fanoutForward(ctx, tenant, replicatedRequests, quorum); err != nil { - return errors.Wrap(determineWriteErrorCause(err, quorum), "quorum not reached") + return errors.Wrap(determineWriteErrorCause(err, quorum, false), "quorum not reached") } return nil } @@ -834,7 +881,7 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st if err != nil { level.Debug(h.logger).Log("msg", "failed to handle request", "err", err) } - switch determineWriteErrorCause(err, 1) { + switch determineWriteErrorCause(err, 1, false) { case nil: return &storepb.WriteResponse{}, nil case errNotReady: @@ -867,6 +914,27 @@ func (h *Handler) relabel(wreq *prompb.WriteRequest) { wreq.Timeseries = timeSeries } +// countEndpointFailures count how many distinct endpoints has responded with an error. +func countEndpointFailures(endpointFailures []string, endpoint string) []string { + for _, ef := range endpointFailures { + if ef == endpoint { + return endpointFailures + } + } + + return append(endpointFailures, endpoint) +} + +// calculateMaxEndpointFailures returns maximum number of permissible distinct endpoints +// that can fail while we can still claim quorum. +func calculateMaxEndpointFailures(replicationFactor int, quorum int) int { + if quorum == 1 { + return 0 + } + + return replicationFactor - quorum +} + // isConflict returns whether or not the given error represents a conflict. func isConflict(err error) bool { if err == nil { @@ -933,17 +1001,31 @@ func (a expectedErrors) Len() int { return len(a) } func (a expectedErrors) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a expectedErrors) Less(i, j int) bool { return a[i].count < a[j].count } +func (a expectedErrors) moreThanOneCause() bool { + var oneCauseFound bool + for _, ee := range a { + if oneCauseFound && ee.count > 0 { + return true + } else if ee.count > 0 { + oneCauseFound = true + } + } + + return false +} + // determineWriteErrorCause extracts a sentinel error that has occurred more than the given threshold from a given fan-out error. -// It will inspect the error's cause if the error is a MultiError, -// It will return cause of each contained error but will not traverse any deeper. -func determineWriteErrorCause(err error, threshold int) error { +// It will inspect the error's cause if the error is a MultiError. It will return cause of each contained error but will not traverse any deeper. +// If no cause can be determined, the original error is returned. When forReplication is true, +// it will return failed replication error on any unknown error cause. +func determineWriteErrorCause(err error, threshold int, forReplication bool) error { if err == nil { return nil } unwrappedErr := errors.Cause(err) - errs, ok := unwrappedErr.(errutil.NonNilMultiError) - if !ok { + errs, isMultiError := unwrappedErr.(errutil.NonNilMultiError) + if !isMultiError { errs = []error{unwrappedErr} } if len(errs) == 0 { @@ -959,6 +1041,7 @@ func determineWriteErrorCause(err error, threshold int) error { {err: errNotReady, cause: isNotReady}, {err: errUnavailable, cause: isUnavailable}, } + var globalCount int for _, exp := range expErrs { exp.count = 0 for _, err := range errs { @@ -966,7 +1049,21 @@ func determineWriteErrorCause(err error, threshold int) error { exp.count++ } } + + // If conflict only errors, return it directly regardless of threshold. + if isMultiError && exp.err == errConflict && exp.count == len(errs) { + return errConflict + } + + globalCount += exp.count + } + + // If we're determining for a replication request, we want to + // respond with failure on any single unknown error or if there are mixed error causes. + if forReplication && (globalCount != len(errs) || expErrs.moreThanOneCause()) { + return errors.Wrapf(err, "replicating request failed") } + // Determine which error occurred most. sort.Sort(sort.Reverse(expErrs)) if exp := expErrs[0]; exp.count >= threshold { @@ -976,6 +1073,16 @@ func determineWriteErrorCause(err error, threshold int) error { return err } +// errWithReplicaFunc is a type to enable sending replica number and err over channel. +type errWithReplicaFunc func() (uint64, string, error) + +// sendErrWithReplica is a utility func to send replication index, endpoint and error over channel. +// Replica number and endpoint is used to determine write quorum, based on the number of requests, +// see the comments above. +func sendErrWithReplica(f chan errWithReplicaFunc, r uint64, e string, err error) { + f <- func() (uint64, string, error) { return r, e, err } +} + func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup { return &peerGroup{ dialOpts: dialOpts, diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 4a2a536038..dce7716c89 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -9,7 +9,6 @@ import ( "fmt" "io" "math" - "math/rand" "net/http" "net/http/httptest" "os" @@ -37,10 +36,6 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/extkingpin" @@ -49,14 +44,18 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" "github.com/thanos-io/thanos/pkg/testutil" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func TestDetermineWriteErrorCause(t *testing.T) { for _, tc := range []struct { - name string - err error - threshold int - exp error + name string + err error + threshold int + forReplication bool + exp error }{ { name: "nil", @@ -230,8 +229,62 @@ func TestDetermineWriteErrorCause(t *testing.T) { threshold: 1, exp: errors.New("baz: 3 errors: 3 errors: qux; rpc error: code = AlreadyExists desc = conflict; rpc error: code = AlreadyExists desc = conflict; foo; bar"), }, + { + name: "below threshold but only conflict errors", + err: errutil.NonNilMultiError([]error{ + status.Error(codes.AlreadyExists, "conflict"), + status.Error(codes.AlreadyExists, "conflict"), + }), + threshold: 3, + exp: errConflict, + }, + { + name: "below threshold but only conflict errors", + err: errutil.NonNilMultiError([]error{ + status.Error(codes.AlreadyExists, "conflict"), + status.Error(codes.AlreadyExists, "conflict"), + }), + threshold: 3, + exp: errConflict, + }, + { + name: "for replication with single error cause", + err: errutil.NonNilMultiError([]error{ + errNotReady, + errNotReady, + errNotReady, + errNotReady, + }), + threshold: 3, + forReplication: true, + exp: errNotReady, + }, + { + name: "for replication with mixed error causes", + err: errutil.NonNilMultiError([]error{ + errNotReady, + errConflict, + errNotReady, + errNotReady, + }), + threshold: 3, + forReplication: true, + exp: errors.New("replicating request failed: 4 errors: target not ready; conflict; target not ready; target not ready"), + }, + { + name: "for replication with unknown error", + err: errutil.NonNilMultiError([]error{ + storage.ErrOutOfOrderSample, + storage.ErrOutOfOrderSample, + storage.ErrOutOfOrderSample, + errors.New("foo"), + }), + threshold: 3, + forReplication: true, + exp: errors.New("replicating request failed: 4 errors: out of order sample; out of order sample; out of order sample; foo"), + }, } { - err := determineWriteErrorCause(tc.err, tc.threshold) + err := determineWriteErrorCause(tc.err, tc.threshold, tc.forReplication) if tc.exp != nil { testutil.NotOk(t, err) testutil.Equals(t, tc.exp.Error(), err.Error()) @@ -347,7 +400,7 @@ func (f *fakeAppender) Rollback() error { return f.rollbackErr() } -func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) { +func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64, hashringAlgo HashringAlgorithm) ([]*Handler, Hashring) { var ( cfg = []HashringConfig{{Hashring: "test"}} handlers []*Handler @@ -366,6 +419,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin }, } + ag := addrGen{} limiter, _ := NewLimiter(NewNopConfig(), nil, RouterIngestor, log.NewNopLogger()) for i := range appendables { h := NewHandler(nil, &Options{ @@ -378,48 +432,31 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin }) handlers = append(handlers, h) h.peers = peers - addr := randomAddr() + addr := ag.newAddr() h.options.Endpoint = addr cfg[0].Endpoints = append(cfg[0].Endpoints, h.options.Endpoint) peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h} } - hashring := newMultiHashring(AlgorithmHashmod, replicationFactor, cfg) + // Use hashmod as default. + if hashringAlgo == "" { + hashringAlgo = AlgorithmHashmod + } + + hashring := newMultiHashring(hashringAlgo, replicationFactor, cfg) for _, h := range handlers { h.Hashring(hashring) } return handlers, hashring } -func TestReceiveQuorum(t *testing.T) { +func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsistencyDelay bool) { appenderErrFn := func() error { return errors.New("failed to get appender") } conflictErrFn := func() error { return storage.ErrOutOfBounds } commitErrFn := func() error { return errors.New("failed to commit") } - wreq1 := &prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ - { - Labels: []labelpb.ZLabel{ - { - Name: "foo", - Value: "bar", - }, - }, - Samples: []prompb.Sample{ - { - Value: 1, - Timestamp: 1, - }, - { - Value: 2, - Timestamp: 2, - }, - { - Value: 3, - Timestamp: 3, - }, - }, - }, - }, + wreq := &prompb.WriteRequest{ + Timeseries: makeSeriesWithValues(50), } + for _, tc := range []struct { name string status int @@ -431,7 +468,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 1 success", status: http.StatusOK, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -442,7 +479,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 1 commit error", status: http.StatusInternalServerError, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -453,7 +490,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 1 conflict", status: http.StatusConflict, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(conflictErrFn, nil, nil), @@ -464,7 +501,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 2 success", status: http.StatusOK, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -478,7 +515,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 success", status: http.StatusOK, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -495,7 +532,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 success with replication", status: http.StatusOK, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -512,7 +549,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 commit error", status: http.StatusInternalServerError, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -529,7 +566,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 commit error with replication", status: http.StatusInternalServerError, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -546,7 +583,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 appender error with replication", status: http.StatusInternalServerError, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -566,7 +603,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 conflict with replication", status: http.StatusConflict, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(conflictErrFn, nil, nil), @@ -583,7 +620,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 conflict and commit error with replication", status: http.StatusConflict, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(conflictErrFn, commitErrFn, nil), @@ -600,7 +637,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 with replication and one faulty", status: http.StatusOK, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), @@ -617,7 +654,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 with replication and one commit error", status: http.StatusOK, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -634,7 +671,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 with replication and two conflicts", status: http.StatusConflict, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), @@ -651,11 +688,28 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 with replication one conflict and one commit error", status: http.StatusInternalServerError, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, { appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), }, + }, + }, + { + name: "size 3 with replication two commit errors", + status: http.StatusInternalServerError, + replicationFactor: 3, + wreq: wreq, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, commitErrFn, nil), + }, { appender: newFakeAppender(nil, commitErrFn, nil), }, @@ -665,10 +719,62 @@ func TestReceiveQuorum(t *testing.T) { }, }, { - name: "size 3 with replication two commit errors", + name: "size 6 with replication 3", + status: http.StatusOK, + replicationFactor: 3, + wreq: wreq, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + }, + }, + { + name: "size 6 with replication 3 one commit and two conflict error", status: http.StatusInternalServerError, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(nil, conflictErrFn, nil), + }, + { + appender: newFakeAppender(nil, conflictErrFn, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + }, + }, + { + name: "size 6 with replication 5 two commit errors", + status: http.StatusOK, + replicationFactor: 5, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -679,11 +785,20 @@ func TestReceiveQuorum(t *testing.T) { { appender: newFakeAppender(nil, nil, nil), }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, }, }, } { t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor) + handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor, hashringAlgo) tenant := "test" // Test from the point of view of every node // so that we know status code does not depend @@ -698,6 +813,11 @@ func TestReceiveQuorum(t *testing.T) { t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i, tc.status, rec.Code, rec.Body.String()) } } + + if withConsistencyDelay { + time.Sleep(50 * time.Millisecond) + } + // Test that each time series is stored // the correct amount of times in each fake DB. for _, ts := range tc.wreq.Timeseries { @@ -709,16 +829,30 @@ func TestReceiveQuorum(t *testing.T) { } } for j, a := range tc.appendables { - var expectedMin int - n := a.appender.(*fakeAppender).Get(lset) - got := uint64(len(n)) - if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { - // We have len(handlers) copies of each sample because the test case - // is run once for each handler and they all use the same appender. - expectedMin = int((tc.replicationFactor/2)+1) * len(ts.Samples) - } - if uint64(expectedMin) > got { - t.Errorf("handler: %d, labels %q: expected minimum of %d samples, got %d", j, lset.String(), expectedMin, got) + if withConsistencyDelay { + var expected int + n := a.appender.(*fakeAppender).Get(lset) + got := uint64(len(n)) + if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { + // We have len(handlers) copies of each sample because the test case + // is run once for each handler and they all use the same appender. + expected = len(handlers) * len(ts.Samples) + } + if uint64(expected) != got { + t.Errorf("handler: %d, labels %q: expected %d samples, got %d", j, lset.String(), expected, got) + } + } else { + var expectedMin int + n := a.appender.(*fakeAppender).Get(lset) + got := uint64(len(n)) + if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { + // We have len(handlers) copies of each sample because the test case + // is run once for each handler and they all use the same appender. + expectedMin = int((tc.replicationFactor/2)+1) * len(ts.Samples) + } + if uint64(expectedMin) > got { + t.Errorf("handler: %d, labels %q: expected minimum of %d samples, got %d", j, lset.String(), expectedMin, got) + } } } } @@ -726,6 +860,22 @@ func TestReceiveQuorum(t *testing.T) { } } +func TestReceiveQuorumHashmod(t *testing.T) { + testReceiveQuorum(t, AlgorithmHashmod, false) +} + +func TestReceiveQuorumKetama(t *testing.T) { + testReceiveQuorum(t, AlgorithmKetama, false) +} + +func TestReceiveWithConsistencyDelayHashmod(t *testing.T) { + testReceiveQuorum(t, AlgorithmHashmod, true) +} + +func TestReceiveWithConsistencyDelayKetama(t *testing.T) { + testReceiveQuorum(t, AlgorithmKetama, true) +} + func TestReceiveWriteRequestLimits(t *testing.T) { for _, tc := range []struct { name string @@ -778,7 +928,7 @@ func TestReceiveWriteRequestLimits(t *testing.T) { appender: newFakeAppender(nil, nil, nil), }, } - handlers, _ := newTestHandlerHashring(appendables, 3) + handlers, _ := newTestHandlerHashring(appendables, 3, AlgorithmHashmod) handler := handlers[0] tenant := "test" @@ -832,348 +982,6 @@ func TestReceiveWriteRequestLimits(t *testing.T) { } } -func TestReceiveWithConsistencyDelay(t *testing.T) { - appenderErrFn := func() error { return errors.New("failed to get appender") } - conflictErrFn := func() error { return storage.ErrOutOfBounds } - commitErrFn := func() error { return errors.New("failed to commit") } - wreq1 := &prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ - { - Labels: []labelpb.ZLabel{ - { - Name: "foo", - Value: "bar", - }, - }, - Samples: []prompb.Sample{ - { - Value: 1, - Timestamp: 1, - }, - { - Value: 2, - Timestamp: 2, - }, - { - Value: 3, - Timestamp: 3, - }, - }, - }, - }, - } - for _, tc := range []struct { - name string - status int - replicationFactor uint64 - wreq *prompb.WriteRequest - appendables []*fakeAppendable - }{ - { - name: "size 1 success", - status: http.StatusOK, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 1 commit error", - status: http.StatusInternalServerError, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - }, - }, - { - name: "size 1 conflict", - status: http.StatusConflict, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - }, - }, - { - name: "size 2 success", - status: http.StatusOK, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 success", - status: http.StatusOK, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 success with replication", - status: http.StatusOK, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 commit error", - status: http.StatusInternalServerError, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - }, - }, - { - name: "size 3 commit error with replication", - status: http.StatusInternalServerError, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - }, - }, - { - name: "size 3 appender error with replication", - status: http.StatusInternalServerError, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - appenderErr: appenderErrFn, - }, - { - appender: newFakeAppender(nil, nil, nil), - appenderErr: appenderErrFn, - }, - { - appender: newFakeAppender(nil, nil, nil), - appenderErr: appenderErrFn, - }, - }, - }, - { - name: "size 3 conflict with replication", - status: http.StatusConflict, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - }, - }, - { - name: "size 3 conflict and commit error with replication", - status: http.StatusConflict, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(conflictErrFn, commitErrFn, nil), - }, - { - appender: newFakeAppender(conflictErrFn, commitErrFn, nil), - }, - { - appender: newFakeAppender(conflictErrFn, commitErrFn, nil), - }, - }, - }, - { - name: "size 3 with replication and one faulty", - status: http.StatusOK, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 with replication and one commit error", - status: http.StatusOK, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 with replication and two conflicts", - status: http.StatusConflict, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), - }, - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 with replication one conflict and one commit error", - status: http.StatusInternalServerError, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 with replication two commit errors", - status: http.StatusInternalServerError, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - } { - // Run the quorum tests with consistency delay, which should allow us - // to see all requests completing all the time, since we're using local - // network we are not expecting anything to go wrong with these. - t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor) - tenant := "test" - // Test from the point of view of every node - // so that we know status code does not depend - // on which node is erroring and which node is receiving. - for i, handler := range handlers { - // Test that the correct status is returned. - rec, err := makeRequest(handler, tenant, tc.wreq) - if err != nil { - t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", tc.status, err) - } - if rec.Code != tc.status { - t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i, tc.status, rec.Code, rec.Body.String()) - } - } - - time.Sleep(50 * time.Millisecond) - - // Test that each time series is stored - // the correct amount of times in each fake DB. - for _, ts := range tc.wreq.Timeseries { - lset := make(labels.Labels, len(ts.Labels)) - for j := range ts.Labels { - lset[j] = labels.Label{ - Name: ts.Labels[j].Name, - Value: ts.Labels[j].Value, - } - } - for j, a := range tc.appendables { - var expected int - n := a.appender.(*fakeAppender).Get(lset) - got := uint64(len(n)) - if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { - // We have len(handlers) copies of each sample because the test case - // is run once for each handler and they all use the same appender. - expected = len(handlers) * len(ts.Samples) - } - if uint64(expected) != got { - t.Errorf("handler: %d, labels %q: expected %d samples, got %d", j, lset.String(), expected, got) - } - } - } - }) - } -} - // endpointHit is a helper to determine if a given endpoint in a hashring would be selected // for a given time series, tenant, and replication factor. func endpointHit(t *testing.T, h Hashring, rf uint64, endpoint, tenant string, timeSeries *prompb.TimeSeries) bool { @@ -1224,8 +1032,11 @@ func makeRequest(h *Handler, tenant string, wreq *prompb.WriteRequest) (*httptes return rec, nil } -func randomAddr() string { - return fmt.Sprintf("http://%d.%d.%d.%d:%d", rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(35000)+30000) +type addrGen struct{ n int } + +func (a *addrGen) newAddr() string { + a.n++ + return fmt.Sprintf("http://node-%d:%d", a.n, 12345+a.n) } type fakeRemoteWriteGRPCServer struct { @@ -1302,10 +1113,31 @@ func serializeSeriesWithOneSample(t testing.TB, series [][]labelpb.ZLabel) []byt return snappy.Encode(nil, body) } +func makeSeriesWithValues(numSeries int) []prompb.TimeSeries { + series := make([]prompb.TimeSeries, numSeries) + for i := 0; i < numSeries; i++ { + series[i] = prompb.TimeSeries{ + Labels: []labelpb.ZLabel{ + { + Name: fmt.Sprintf("pod-%d", i), + Value: fmt.Sprintf("nginx-%d", i), + }, + }, + Samples: []prompb.Sample{ + { + Value: float64(i), + Timestamp: 10, + }, + }, + } + } + return series +} + func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { dir := b.TempDir() - handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1) + handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1, AlgorithmHashmod) handler := handlers[0] reg := prometheus.NewRegistry() diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index c9f0403dec..250d68873b 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -461,6 +461,8 @@ type ReceiveBuilder struct { relabelConfigs []*relabel.Config replication int image string + hashringAlgo receive.HashringAlgorithm + tracingConfig string } func NewReceiveBuilder(e e2e.Environment, name string) *ReceiveBuilder { @@ -511,6 +513,16 @@ func (r *ReceiveBuilder) WithValidationEnabled(limit int, metaMonitoring string, return r } +func (r *ReceiveBuilder) WithHashingAlgorithm(algo receive.HashringAlgorithm) *ReceiveBuilder { + r.hashringAlgo = algo + return r +} + +func (r *ReceiveBuilder) WithTracingConfig(tracingConfig string) *ReceiveBuilder { + r.tracingConfig = tracingConfig + return r +} + // Init creates a Thanos Receive instance. // If ingestion is enabled it will be configured for ingesting samples. // If routing is configured (i.e. hashring configuration is provided) it routes samples to other receivers. @@ -581,6 +593,14 @@ func (r *ReceiveBuilder) Init() *e2emon.InstrumentedRunnable { args["--receive.replication-factor"] = strconv.Itoa(r.replication) } + if len(r.hashringAlgo) > 0 { + args["--receive.hashrings-algorithm"] = string(r.hashringAlgo) + } + + if len(r.tracingConfig) > 0 { + args["--tracing.config"] = r.tracingConfig + } + if len(r.relabelConfigs) > 0 { relabelConfigBytes, err := yaml.Marshal(r.relabelConfigs) if err != nil { diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index ba1537e895..b3bb897ed2 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -385,184 +385,29 @@ func TestReceive(t *testing.T) { }) }) - t.Run("replication", func(t *testing.T) { + t.Run("replication_hashmod", func(t *testing.T) { t.Parallel() - e, err := e2e.NewDockerEnvironment("replication") - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - // The replication suite creates three receivers but only one - // receives Prometheus remote-written data. The querier queries all - // receivers and the test verifies that the time series are - // replicated to all of the nodes. - - r1 := e2ethanos.NewReceiveBuilder(e, "1").WithIngestionEnabled() - r2 := e2ethanos.NewReceiveBuilder(e, "2").WithIngestionEnabled() - r3 := e2ethanos.NewReceiveBuilder(e, "3").WithIngestionEnabled() - - h := receive.HashringConfig{ - Endpoints: []string{ - r1.InternalEndpoint("grpc"), - r2.InternalEndpoint("grpc"), - r3.InternalEndpoint("grpc"), - }, - } - - // Create with hashring config. - r1Runnable := r1.WithRouting(3, h).Init() - r2Runnable := r2.WithRouting(3, h).Init() - r3Runnable := r3.WithRouting(3, h).Init() - testutil.Ok(t, e2e.StartAndWaitReady(r1Runnable, r2Runnable, r3Runnable)) - - prom1 := e2ethanos.NewPrometheus(e, "1", e2ethanos.DefaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.InternalEndpoint("remote-write")), "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage()) - testutil.Ok(t, e2e.StartAndWaitReady(prom1)) - - q := e2ethanos.NewQuerierBuilder(e, "1", r1.InternalEndpoint("grpc"), r2.InternalEndpoint("grpc"), r3.InternalEndpoint("grpc")).Init() - testutil.Ok(t, e2e.StartAndWaitReady(q)) - - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - t.Cleanup(cancel) - - testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) - - queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ - Deduplicate: false, - }, []model.Metric{ - { - "job": "myself", - "prometheus": "prom1", - "receive": "receive-1", - "replica": "0", - "tenant_id": "default-tenant", - }, - { - "job": "myself", - "prometheus": "prom1", - "receive": "receive-2", - "replica": "0", - "tenant_id": "default-tenant", - }, - { - "job": "myself", - "prometheus": "prom1", - "receive": "receive-3", - "replica": "0", - "tenant_id": "default-tenant", - }, - }) + runReplicationTest(t, receive.AlgorithmHashmod, false) }) - t.Run("replication_with_outage", func(t *testing.T) { + t.Run("replication_hashmod_with_outage", func(t *testing.T) { t.Parallel() - e, err := e2e.NewDockerEnvironment("outage") - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - // The replication suite creates a three-node hashring but one of the - // receivers is dead. In this case, replication should still - // succeed and the time series should be replicated to the other nodes. - - r1 := e2ethanos.NewReceiveBuilder(e, "1").WithIngestionEnabled() - r2 := e2ethanos.NewReceiveBuilder(e, "2").WithIngestionEnabled() - r3 := e2ethanos.NewReceiveBuilder(e, "3").WithIngestionEnabled() + runReplicationTest(t, receive.AlgorithmHashmod, true) - h := receive.HashringConfig{ - Endpoints: []string{ - r1.InternalEndpoint("grpc"), - r2.InternalEndpoint("grpc"), - r3.InternalEndpoint("grpc"), - }, - } - - // Create with hashring config. - r1Runnable := r1.WithRouting(3, h).Init() - r2Runnable := r2.WithRouting(3, h).Init() - testutil.Ok(t, e2e.StartAndWaitReady(r1Runnable, r2Runnable)) - - prom1 := e2ethanos.NewPrometheus(e, "1", e2ethanos.DefaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.InternalEndpoint("remote-write")), "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage()) - testutil.Ok(t, e2e.StartAndWaitReady(prom1)) - - q := e2ethanos.NewQuerierBuilder(e, "1", r1.InternalEndpoint("grpc"), r2.InternalEndpoint("grpc")).Init() - testutil.Ok(t, e2e.StartAndWaitReady(q)) - - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - t.Cleanup(cancel) - - testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) - - queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ - Deduplicate: false, - }, []model.Metric{ - { - "job": "myself", - "prometheus": "prom1", - "receive": "receive-1", - "replica": "0", - "tenant_id": "default-tenant", - }, - { - "job": "myself", - "prometheus": "prom1", - "receive": "receive-2", - "replica": "0", - "tenant_id": "default-tenant", - }, - }) }) - t.Run("multitenancy", func(t *testing.T) { + t.Run("replication_ketama", func(t *testing.T) { t.Parallel() - e, err := e2e.NewDockerEnvironment("multitenancy") - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - r1 := e2ethanos.NewReceiveBuilder(e, "1").WithIngestionEnabled() - - h := receive.HashringConfig{ - Endpoints: []string{ - r1.InternalEndpoint("grpc"), - }, - } - - // Create with hashring config. - r1Runnable := r1.WithRouting(1, h).Init() - testutil.Ok(t, e2e.StartAndWaitReady(r1Runnable)) - - rp1 := e2ethanos.NewReverseProxy(e, "1", "tenant-1", "http://"+r1.InternalEndpoint("remote-write")) - rp2 := e2ethanos.NewReverseProxy(e, "2", "tenant-2", "http://"+r1.InternalEndpoint("remote-write")) - testutil.Ok(t, e2e.StartAndWaitReady(rp1, rp2)) - - prom1 := e2ethanos.NewPrometheus(e, "1", e2ethanos.DefaultPromConfig("prom1", 0, "http://"+rp1.InternalEndpoint("http")+"/api/v1/receive", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage()) - prom2 := e2ethanos.NewPrometheus(e, "2", e2ethanos.DefaultPromConfig("prom2", 0, "http://"+rp2.InternalEndpoint("http")+"/api/v1/receive", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage()) - testutil.Ok(t, e2e.StartAndWaitReady(prom1, prom2)) + runReplicationTest(t, receive.AlgorithmKetama, false) + }) - q := e2ethanos.NewQuerierBuilder(e, "1", r1.InternalEndpoint("grpc")).Init() - testutil.Ok(t, e2e.StartAndWaitReady(q)) - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - t.Cleanup(cancel) + t.Run("replication_ketama_with_outage", func(t *testing.T) { + t.Parallel() - testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) - queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ - Deduplicate: false, - }, []model.Metric{ - { - "job": "myself", - "prometheus": "prom1", - "receive": "receive-1", - "replica": "0", - "tenant_id": "tenant-1", - }, - { - "job": "myself", - "prometheus": "prom2", - "receive": "receive-1", - "replica": "0", - "tenant_id": "tenant-2", - }, - }) + runReplicationTest(t, receive.AlgorithmKetama, true) }) t.Run("relabel", func(t *testing.T) { @@ -608,7 +453,6 @@ func TestReceive(t *testing.T) { }) t.Run("multitenant_active_series_limiting", func(t *testing.T) { - /* The multitenant_active_series_limiting suite configures a hashring with two avalanche writers and dedicated meta-monitoring. @@ -787,3 +631,92 @@ func TestReceive(t *testing.T) { }) }) } + +func runReplicationTest(t *testing.T, hashing receive.HashringAlgorithm, withOutage bool) { + e, err := e2e.NewDockerEnvironment(fmt.Sprintf("%s-%v", hashing, withOutage)) + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + // The replication suite creates three receivers but only one + // receives Prometheus remote-written data. The querier queries all + // receivers and the test verifies that the time series are + // replicated to all of the nodes. + + var rg receiverGroup + + r1 := e2ethanos.NewReceiveBuilder(e, "1").WithHashingAlgorithm(hashing).WithIngestionEnabled() + rg = append(rg, r1) + r2 := e2ethanos.NewReceiveBuilder(e, "2").WithHashingAlgorithm(hashing).WithIngestionEnabled() + rg = append(rg, r2) + + if !withOutage { + r3 := e2ethanos.NewReceiveBuilder(e, "3").WithHashingAlgorithm(hashing).WithIngestionEnabled() + rg = append(rg, r3) + } + + h := receive.HashringConfig{ + Endpoints: rg.endpoints(withOutage), + } + + // Create with hashring config. + rg.initAndRun(t, 3, h) + + prom1 := e2ethanos.NewPrometheus(e, "1", e2ethanos.DefaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.InternalEndpoint("remote-write")), "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage()) + testutil.Ok(t, e2e.StartAndWaitReady(prom1)) + + q := e2ethanos.NewQuerierBuilder(e, "1", rg.endpoints(withOutage)...).Init() + testutil.Ok(t, e2e.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(float64(len(rg))), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) + + queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, rg.expectedMetrics()) +} + +type receiverGroup []*e2ethanos.ReceiveBuilder + +func (r receiverGroup) endpoints(withOutage bool) []string { + var e []string + + for _, rg := range r { + e = append(e, rg.InternalEndpoint("grpc")) + } + + // If we want to simulate an outage, add one node + // that won't succeed in replication. + if withOutage { + e = append(e, "fake:1234") + } + + return e +} + +func (r receiverGroup) initAndRun(t *testing.T, replication int, hashringConfigs ...receive.HashringConfig) { + var ir []e2e.Runnable + + for _, rg := range r { + ir = append(ir, rg.WithRouting(replication, hashringConfigs...).Init()) + } + + testutil.Ok(t, e2e.StartAndWaitReady(ir...)) +} + +func (r receiverGroup) expectedMetrics() []model.Metric { + var m []model.Metric + + for _, rg := range r { + m = append(m, model.Metric{ + "job": "myself", + "prometheus": "prom1", + "receive": model.LabelValue(rg.Name()), + "replica": "0", + "tenant_id": "default-tenant", + }) + } + + return m +}