From 1fe0a743c41ebdf6a6ebaf8572990eb3db083ee6 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 14 Oct 2022 16:47:43 +0200 Subject: [PATCH 01/11] Ignore threshold for conflict errors If there are only conlifct errors present during the error determination, it can be safely assumed other requests succeeded and therefore we can directly return conflict. Signed-off-by: Matej Gera --- pkg/receive/handler.go | 6 ++++++ pkg/receive/handler_test.go | 9 +++++++++ 2 files changed, 15 insertions(+) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 156bb74566..bb5910d806 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -966,7 +966,13 @@ func determineWriteErrorCause(err error, threshold int) error { exp.count++ } } + + // If conflict only errors, return it directly regardless of threshold. + if exp.err == errConflict && exp.count == len(errs) { + return errConflict + } } + // Determine which error occurred most. sort.Sort(sort.Reverse(expErrs)) if exp := expErrs[0]; exp.count >= threshold { diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 44076de141..b183ea6722 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -226,6 +226,15 @@ func TestDetermineWriteErrorCause(t *testing.T) { threshold: 1, exp: errors.New("baz: 3 errors: 3 errors: qux; rpc error: code = AlreadyExists desc = conflict; rpc error: code = AlreadyExists desc = conflict; foo; bar"), }, + { + name: "below threshold but only conflict errors", + err: errutil.NonNilMultiError([]error{ + status.Error(codes.AlreadyExists, "conflict"), + status.Error(codes.AlreadyExists, "conflict"), + }), + threshold: 3, + exp: errConflict, + }, } { err := determineWriteErrorCause(tc.err, tc.threshold) if tc.exp != nil { From df79779cb3512c3989e9b454117bfcba6bba20d1 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 14 Oct 2022 16:49:36 +0200 Subject: [PATCH 02/11] Decide write quorum on the basis on number of succesfull replica requests Instead of deciding on the basis of individual requests, this change takes into consideration the fact that there can be multiple requests per each replica number (for simplification I refer to these as 'replica groups'). For confirming quorum it is then decisive to assert whether ALL requests succeeded from AT LEAST the number of replica groups equal to the write quorom. Signed-off-by: Matej Gera --- pkg/receive/handler.go | 79 +++++++++++++++++++++++++++++++----------- 1 file changed, 59 insertions(+), 20 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index bb5910d806..89c709cc30 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -607,7 +607,15 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e tLogger = log.With(h.logger, logTags) } - ec := make(chan error) + // Create channel to send errors together with replica on which it occurred. + ec := make(chan errWithReplicaFunc) + + // replicaGroupReqs counts the number of requests that will be made on behalf + // on each replica. This is used to determine write success. + replicaGroupReqs := make([]int, h.options.ReplicationFactor) + for er := range wreqs { + replicaGroupReqs[er.replica.n]++ + } var wg sync.WaitGroup for er := range wreqs { @@ -629,12 +637,12 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e }) if err != nil { h.replications.WithLabelValues(labelError).Inc() - ec <- errors.Wrapf(err, "replicate write request for endpoint %v", endpoint) + sendErrWithReplica(ec, r.n, errors.Wrapf(err, "replicate write request for endpoint %v", endpoint)) return } h.replications.WithLabelValues(labelSuccess).Inc() - ec <- nil + sendErrWithReplica(ec, r.n, nil) }(endpoint) continue @@ -658,10 +666,10 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // When a MultiError is added to another MultiError, the error slices are concatenated, not nested. // To avoid breaking the counting logic, we need to flatten the error. level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error()) - ec <- errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint) + sendErrWithReplica(ec, r.n, errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint)) return } - ec <- nil + sendErrWithReplica(ec, r.n, nil) }(endpoint) continue @@ -686,7 +694,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e cl, err = h.peers.get(fctx, endpoint) if err != nil { - ec <- errors.Wrapf(err, "get peer connection for endpoint %v", endpoint) + sendErrWithReplica(ec, r.n, errors.Wrapf(err, "get peer connection for endpoint %v", endpoint)) return } @@ -695,7 +703,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e if ok { if time.Now().Before(b.nextAllowed) { h.mtx.RUnlock() - ec <- errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint) + sendErrWithReplica(ec, r.n, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint)) return } } @@ -727,14 +735,14 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e h.mtx.Unlock() } } - ec <- errors.Wrapf(err, "forwarding request to endpoint %v", endpoint) + sendErrWithReplica(ec, r.n, errors.Wrapf(err, "forwarding request to endpoint %v", endpoint)) return } h.mtx.Lock() delete(h.peerStates, endpoint) h.mtx.Unlock() - ec <- nil + sendErrWithReplica(ec, r.n, nil) }(endpoint) } @@ -747,7 +755,8 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // This is needed if context is canceled or if we reached success of fail quorum faster. defer func() { go func() { - for err := range ec { + for errReplicaFn := range ec { + _, err := errReplicaFn() if err != nil { level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", err) } @@ -755,26 +764,46 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e }() }() - var success int + var ( + // success counts how many requests per replica have succedeed. + success = make([]int, h.options.ReplicationFactor) + // replicaMultiError contains all requests errors per replica. + replicaMultiError = make([]errutil.MultiError, h.options.ReplicationFactor) + // replicaGroupSuccess counts how many replica groups have succeeded. + // This needs to amount to the success threshold (quorum), in order to + // claim replication success. + replicaGroupSuccess int + ) + for { select { case <-fctx.Done(): return fctx.Err() - case err, more := <-ec: + case errReplicaFn, more := <-ec: if !more { + for i, rme := range replicaMultiError { + // Only if we can determine same error for all requests within replica group, + // we return the cause, otherwise fallback to original original. + errs.Add(determineWriteErrorCause(rme.Err(), replicaGroupReqs[i])) + } return errs.Err() } + replica, err := errReplicaFn() if err == nil { - success++ - if success >= successThreshold { - // In case the success threshold is lower than the total - // number of requests, then we can finish early here. This - // is the case for quorum writes for example. - return nil + success[replica]++ + for i := range success { + if success[i] >= replicaGroupReqs[i] { + // If enough replica groups succeed, we can finish early (quorum + // is guaranteed). + replicaGroupSuccess++ + if replicaGroupSuccess == successThreshold { + return nil + } + } } continue } - errs.Add(err) + replicaMultiError[replica].Add(err) } } } @@ -970,7 +999,7 @@ func determineWriteErrorCause(err error, threshold int) error { // If conflict only errors, return it directly regardless of threshold. if exp.err == errConflict && exp.count == len(errs) { return errConflict - } + } } // Determine which error occurred most. @@ -982,6 +1011,16 @@ func determineWriteErrorCause(err error, threshold int) error { return err } +// errWithReplicaFunc is a type to enable sending replica number and err over channel. +type errWithReplicaFunc func() (uint64, error) + +// sendErrWithReplica is a utility func to send replica number and error over channel. +// Replica number is used to determine write quorum, based on the number of requests, +// see the comments above. +func sendErrWithReplica(f chan errWithReplicaFunc, r uint64, err error) { + f <- func() (uint64, error) { return r, err } +} + func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup { return &peerGroup{ dialOpts: dialOpts, From 0a81ff90db3fb4239620b6ab34ed49e09a411d26 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 14 Oct 2022 16:53:47 +0200 Subject: [PATCH 03/11] Extend receiver E2E replication tests Extends the E2E tests to cover both hashing algorithms. Signed-off-by: Matej Gera --- test/e2e/e2ethanos/services.go | 22 ++- test/e2e/receive_test.go | 265 ++++++++++++--------------------- 2 files changed, 120 insertions(+), 167 deletions(-) diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index c9f0403dec..d9c6293a53 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -461,6 +461,8 @@ type ReceiveBuilder struct { relabelConfigs []*relabel.Config replication int image string + hashringAlgo receive.HashringAlgorithm + tracingConfig string } func NewReceiveBuilder(e e2e.Environment, name string) *ReceiveBuilder { @@ -511,6 +513,16 @@ func (r *ReceiveBuilder) WithValidationEnabled(limit int, metaMonitoring string, return r } +func (r *ReceiveBuilder) WithHashingAlgorithm(algo receive.HashringAlgorithm) *ReceiveBuilder { + r.hashringAlgo = algo + return r +} + +func (r *ReceiveBuilder) WithTracingConfig(tracingConfig string) *ReceiveBuilder { + r.tracingConfig = tracingConfig + return r +} + // Init creates a Thanos Receive instance. // If ingestion is enabled it will be configured for ingesting samples. // If routing is configured (i.e. hashring configuration is provided) it routes samples to other receivers. @@ -528,7 +540,7 @@ func (r *ReceiveBuilder) Init() *e2emon.InstrumentedRunnable { "--remote-write.address": ":8081", "--label": fmt.Sprintf(`receive="%s"`, r.Name()), "--tsdb.path": filepath.Join(r.InternalDir(), "data"), - "--log.level": infoLogLevel, + "--log.level": "debug", "--tsdb.max-exemplars": fmt.Sprintf("%v", r.maxExemplars), } @@ -581,6 +593,14 @@ func (r *ReceiveBuilder) Init() *e2emon.InstrumentedRunnable { args["--receive.replication-factor"] = strconv.Itoa(r.replication) } + if len(r.hashringAlgo) > 0 { + args["--receive.hashrings-algorithm"] = string(r.hashringAlgo) + } + + if len(r.tracingConfig) > 0 { + args["--tracing.config"] = r.tracingConfig + } + if len(r.relabelConfigs) > 0 { relabelConfigBytes, err := yaml.Marshal(r.relabelConfigs) if err != nil { diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index ba1537e895..b3bb897ed2 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -385,184 +385,29 @@ func TestReceive(t *testing.T) { }) }) - t.Run("replication", func(t *testing.T) { + t.Run("replication_hashmod", func(t *testing.T) { t.Parallel() - e, err := e2e.NewDockerEnvironment("replication") - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - // The replication suite creates three receivers but only one - // receives Prometheus remote-written data. The querier queries all - // receivers and the test verifies that the time series are - // replicated to all of the nodes. - - r1 := e2ethanos.NewReceiveBuilder(e, "1").WithIngestionEnabled() - r2 := e2ethanos.NewReceiveBuilder(e, "2").WithIngestionEnabled() - r3 := e2ethanos.NewReceiveBuilder(e, "3").WithIngestionEnabled() - - h := receive.HashringConfig{ - Endpoints: []string{ - r1.InternalEndpoint("grpc"), - r2.InternalEndpoint("grpc"), - r3.InternalEndpoint("grpc"), - }, - } - - // Create with hashring config. - r1Runnable := r1.WithRouting(3, h).Init() - r2Runnable := r2.WithRouting(3, h).Init() - r3Runnable := r3.WithRouting(3, h).Init() - testutil.Ok(t, e2e.StartAndWaitReady(r1Runnable, r2Runnable, r3Runnable)) - - prom1 := e2ethanos.NewPrometheus(e, "1", e2ethanos.DefaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.InternalEndpoint("remote-write")), "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage()) - testutil.Ok(t, e2e.StartAndWaitReady(prom1)) - - q := e2ethanos.NewQuerierBuilder(e, "1", r1.InternalEndpoint("grpc"), r2.InternalEndpoint("grpc"), r3.InternalEndpoint("grpc")).Init() - testutil.Ok(t, e2e.StartAndWaitReady(q)) - - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - t.Cleanup(cancel) - - testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) - - queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ - Deduplicate: false, - }, []model.Metric{ - { - "job": "myself", - "prometheus": "prom1", - "receive": "receive-1", - "replica": "0", - "tenant_id": "default-tenant", - }, - { - "job": "myself", - "prometheus": "prom1", - "receive": "receive-2", - "replica": "0", - "tenant_id": "default-tenant", - }, - { - "job": "myself", - "prometheus": "prom1", - "receive": "receive-3", - "replica": "0", - "tenant_id": "default-tenant", - }, - }) + runReplicationTest(t, receive.AlgorithmHashmod, false) }) - t.Run("replication_with_outage", func(t *testing.T) { + t.Run("replication_hashmod_with_outage", func(t *testing.T) { t.Parallel() - e, err := e2e.NewDockerEnvironment("outage") - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - // The replication suite creates a three-node hashring but one of the - // receivers is dead. In this case, replication should still - // succeed and the time series should be replicated to the other nodes. - - r1 := e2ethanos.NewReceiveBuilder(e, "1").WithIngestionEnabled() - r2 := e2ethanos.NewReceiveBuilder(e, "2").WithIngestionEnabled() - r3 := e2ethanos.NewReceiveBuilder(e, "3").WithIngestionEnabled() + runReplicationTest(t, receive.AlgorithmHashmod, true) - h := receive.HashringConfig{ - Endpoints: []string{ - r1.InternalEndpoint("grpc"), - r2.InternalEndpoint("grpc"), - r3.InternalEndpoint("grpc"), - }, - } - - // Create with hashring config. - r1Runnable := r1.WithRouting(3, h).Init() - r2Runnable := r2.WithRouting(3, h).Init() - testutil.Ok(t, e2e.StartAndWaitReady(r1Runnable, r2Runnable)) - - prom1 := e2ethanos.NewPrometheus(e, "1", e2ethanos.DefaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.InternalEndpoint("remote-write")), "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage()) - testutil.Ok(t, e2e.StartAndWaitReady(prom1)) - - q := e2ethanos.NewQuerierBuilder(e, "1", r1.InternalEndpoint("grpc"), r2.InternalEndpoint("grpc")).Init() - testutil.Ok(t, e2e.StartAndWaitReady(q)) - - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - t.Cleanup(cancel) - - testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) - - queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ - Deduplicate: false, - }, []model.Metric{ - { - "job": "myself", - "prometheus": "prom1", - "receive": "receive-1", - "replica": "0", - "tenant_id": "default-tenant", - }, - { - "job": "myself", - "prometheus": "prom1", - "receive": "receive-2", - "replica": "0", - "tenant_id": "default-tenant", - }, - }) }) - t.Run("multitenancy", func(t *testing.T) { + t.Run("replication_ketama", func(t *testing.T) { t.Parallel() - e, err := e2e.NewDockerEnvironment("multitenancy") - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - r1 := e2ethanos.NewReceiveBuilder(e, "1").WithIngestionEnabled() - - h := receive.HashringConfig{ - Endpoints: []string{ - r1.InternalEndpoint("grpc"), - }, - } - - // Create with hashring config. - r1Runnable := r1.WithRouting(1, h).Init() - testutil.Ok(t, e2e.StartAndWaitReady(r1Runnable)) - - rp1 := e2ethanos.NewReverseProxy(e, "1", "tenant-1", "http://"+r1.InternalEndpoint("remote-write")) - rp2 := e2ethanos.NewReverseProxy(e, "2", "tenant-2", "http://"+r1.InternalEndpoint("remote-write")) - testutil.Ok(t, e2e.StartAndWaitReady(rp1, rp2)) - - prom1 := e2ethanos.NewPrometheus(e, "1", e2ethanos.DefaultPromConfig("prom1", 0, "http://"+rp1.InternalEndpoint("http")+"/api/v1/receive", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage()) - prom2 := e2ethanos.NewPrometheus(e, "2", e2ethanos.DefaultPromConfig("prom2", 0, "http://"+rp2.InternalEndpoint("http")+"/api/v1/receive", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage()) - testutil.Ok(t, e2e.StartAndWaitReady(prom1, prom2)) + runReplicationTest(t, receive.AlgorithmKetama, false) + }) - q := e2ethanos.NewQuerierBuilder(e, "1", r1.InternalEndpoint("grpc")).Init() - testutil.Ok(t, e2e.StartAndWaitReady(q)) - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - t.Cleanup(cancel) + t.Run("replication_ketama_with_outage", func(t *testing.T) { + t.Parallel() - testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) - queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ - Deduplicate: false, - }, []model.Metric{ - { - "job": "myself", - "prometheus": "prom1", - "receive": "receive-1", - "replica": "0", - "tenant_id": "tenant-1", - }, - { - "job": "myself", - "prometheus": "prom2", - "receive": "receive-1", - "replica": "0", - "tenant_id": "tenant-2", - }, - }) + runReplicationTest(t, receive.AlgorithmKetama, true) }) t.Run("relabel", func(t *testing.T) { @@ -608,7 +453,6 @@ func TestReceive(t *testing.T) { }) t.Run("multitenant_active_series_limiting", func(t *testing.T) { - /* The multitenant_active_series_limiting suite configures a hashring with two avalanche writers and dedicated meta-monitoring. @@ -787,3 +631,92 @@ func TestReceive(t *testing.T) { }) }) } + +func runReplicationTest(t *testing.T, hashing receive.HashringAlgorithm, withOutage bool) { + e, err := e2e.NewDockerEnvironment(fmt.Sprintf("%s-%v", hashing, withOutage)) + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + // The replication suite creates three receivers but only one + // receives Prometheus remote-written data. The querier queries all + // receivers and the test verifies that the time series are + // replicated to all of the nodes. + + var rg receiverGroup + + r1 := e2ethanos.NewReceiveBuilder(e, "1").WithHashingAlgorithm(hashing).WithIngestionEnabled() + rg = append(rg, r1) + r2 := e2ethanos.NewReceiveBuilder(e, "2").WithHashingAlgorithm(hashing).WithIngestionEnabled() + rg = append(rg, r2) + + if !withOutage { + r3 := e2ethanos.NewReceiveBuilder(e, "3").WithHashingAlgorithm(hashing).WithIngestionEnabled() + rg = append(rg, r3) + } + + h := receive.HashringConfig{ + Endpoints: rg.endpoints(withOutage), + } + + // Create with hashring config. + rg.initAndRun(t, 3, h) + + prom1 := e2ethanos.NewPrometheus(e, "1", e2ethanos.DefaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.InternalEndpoint("remote-write")), "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage()) + testutil.Ok(t, e2e.StartAndWaitReady(prom1)) + + q := e2ethanos.NewQuerierBuilder(e, "1", rg.endpoints(withOutage)...).Init() + testutil.Ok(t, e2e.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(float64(len(rg))), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) + + queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, rg.expectedMetrics()) +} + +type receiverGroup []*e2ethanos.ReceiveBuilder + +func (r receiverGroup) endpoints(withOutage bool) []string { + var e []string + + for _, rg := range r { + e = append(e, rg.InternalEndpoint("grpc")) + } + + // If we want to simulate an outage, add one node + // that won't succeed in replication. + if withOutage { + e = append(e, "fake:1234") + } + + return e +} + +func (r receiverGroup) initAndRun(t *testing.T, replication int, hashringConfigs ...receive.HashringConfig) { + var ir []e2e.Runnable + + for _, rg := range r { + ir = append(ir, rg.WithRouting(replication, hashringConfigs...).Init()) + } + + testutil.Ok(t, e2e.StartAndWaitReady(ir...)) +} + +func (r receiverGroup) expectedMetrics() []model.Metric { + var m []model.Metric + + for _, rg := range r { + m = append(m, model.Metric{ + "job": "myself", + "prometheus": "prom1", + "receive": model.LabelValue(rg.Name()), + "replica": "0", + "tenant_id": "default-tenant", + }) + } + + return m +} From b967d50be077b8f478ff11ec12043e6d8230348c Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 14 Oct 2022 16:54:36 +0200 Subject: [PATCH 04/11] Add interactive test for receivers Signed-off-by: Matej Gera --- examples/interactive/interactive_test.go | 78 ++++++++++++++++++++++-- test/e2e/e2ethanos/services.go | 2 +- 2 files changed, 75 insertions(+), 5 deletions(-) diff --git a/examples/interactive/interactive_test.go b/examples/interactive/interactive_test.go index 7a40ea97ce..adf14a4a30 100644 --- a/examples/interactive/interactive_test.go +++ b/examples/interactive/interactive_test.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/objstore/client" "github.com/thanos-io/objstore/providers/s3" + "github.com/thanos-io/thanos/pkg/receive" "github.com/thanos-io/thanos/pkg/testutil" tracingclient "github.com/thanos-io/thanos/pkg/tracing/client" "github.com/thanos-io/thanos/pkg/tracing/jaeger" @@ -248,8 +249,6 @@ func TestReadOnlyThanosSetup(t *testing.T) { sidecarHA1 := e2edb.NewThanosSidecar(e, "sidecar-prom-ha1", promHA1, e2edb.WithImage("thanos:latest"), e2edb.WithFlagOverride(map[string]string{"--tracing.config": string(jaegerConfig)})) sidecar2 := e2edb.NewThanosSidecar(e, "sidecar2", prom2, e2edb.WithImage("thanos:latest")) - receive1 := e2ethanos.NewReceiveBuilder(e, "receiver-1").WithIngestionEnabled().Init() - testutil.Ok(t, exec("cp", "-r", prom1Data+"/.", promHA0.Dir())) testutil.Ok(t, exec("sh", "-c", "find "+prom1Data+"/ -maxdepth 1 -type d | tail -5 | xargs -I {} cp -r {} "+promHA1.Dir())) // Copy only 5 blocks from 9 to mimic replica 1 with partial data set. testutil.Ok(t, exec("cp", "-r", prom2Data+"/.", prom2.Dir())) @@ -280,7 +279,7 @@ func TestReadOnlyThanosSetup(t *testing.T) { })) testutil.Ok(t, e2e.StartAndWaitReady(m1)) - testutil.Ok(t, e2e.StartAndWaitReady(promHA0, promHA1, prom2, sidecarHA0, sidecarHA1, sidecar2, store1, store2, receive1)) + testutil.Ok(t, e2e.StartAndWaitReady(promHA0, promHA1, prom2, sidecarHA0, sidecarHA1, sidecar2, store1, store2)) // Let's start query on top of all those 6 store APIs (global query engine). // @@ -333,7 +332,6 @@ func TestReadOnlyThanosSetup(t *testing.T) { sidecarHA0.InternalEndpoint("grpc"), sidecarHA1.InternalEndpoint("grpc"), sidecar2.InternalEndpoint("grpc"), - receive1.InternalEndpoint("grpc"), }, e2edb.WithImage("thanos:latest"), e2edb.WithFlagOverride(map[string]string{"--tracing.config": string(jaegerConfig)}), @@ -353,3 +351,75 @@ func TestReadOnlyThanosSetup(t *testing.T) { testutil.Ok(t, m.OpenUserInterfaceInBrowser()) testutil.Ok(t, e2einteractive.RunUntilEndpointHit()) } + +// TestReadWriteThanosSetup sets up a Thanos deployment with 6 receiver replicas and replication factor 3 for writing. +// On top of that, the written metrics are queryable via querier. +func TestReadWriteThanosSetup(t *testing.T) { + t.Skip("This is interactive test - it will run until you will kill it or curl 'finish' endpoint. Comment and run as normal test to use it!") + + e, err := e2e.NewDockerEnvironment("interactive") + testutil.Ok(t, err) + t.Cleanup(e.Close) + + // Setup Jaeger. + j := e.Runnable("tracing").WithPorts(map[string]int{"http-front": 16686, "jaeger.thrift": 14268}).Init(e2e.StartOptions{Image: "jaegertracing/all-in-one:1.25"}) + testutil.Ok(t, e2e.StartAndWaitReady(j)) + + jaegerConfig, err := yaml.Marshal(tracingclient.TracingConfig{ + Type: tracingclient.Jaeger, + Config: jaeger.Config{ + ServiceName: "thanos", + SamplerType: "const", + SamplerParam: 1, + Endpoint: "http://" + j.InternalEndpoint("jaeger.thrift") + "/api/traces", + }, + }) + testutil.Ok(t, err) + + receive1 := e2ethanos.NewReceiveBuilder(e, "receiver-1").WithHashingAlgorithm(receive.AlgorithmKetama).WithIngestionEnabled().WithTracingConfig(string(jaegerConfig)) + receive2 := e2ethanos.NewReceiveBuilder(e, "receiver-2").WithHashingAlgorithm(receive.AlgorithmKetama).WithIngestionEnabled().WithTracingConfig(string(jaegerConfig)) + receive3 := e2ethanos.NewReceiveBuilder(e, "receiver-3").WithHashingAlgorithm(receive.AlgorithmKetama).WithIngestionEnabled().WithTracingConfig(string(jaegerConfig)) + receive4 := e2ethanos.NewReceiveBuilder(e, "receiver-4").WithHashingAlgorithm(receive.AlgorithmKetama).WithIngestionEnabled().WithTracingConfig(string(jaegerConfig)) + receive5 := e2ethanos.NewReceiveBuilder(e, "receiver-5").WithHashingAlgorithm(receive.AlgorithmKetama).WithIngestionEnabled().WithTracingConfig(string(jaegerConfig)) + receive6 := e2ethanos.NewReceiveBuilder(e, "receiver-6").WithHashingAlgorithm(receive.AlgorithmKetama).WithIngestionEnabled().WithTracingConfig(string(jaegerConfig)) + + h := receive.HashringConfig{ + Endpoints: []string{ + receive1.InternalEndpoint("grpc"), + receive2.InternalEndpoint("grpc"), + receive3.InternalEndpoint("grpc"), + receive4.InternalEndpoint("grpc"), + receive5.InternalEndpoint("grpc"), + receive6.InternalEndpoint("grpc"), + }, + } + + testutil.Ok(t, e2e.StartAndWaitReady( + receive1.WithRouting(3, h).Init(), receive2.WithRouting(3, h).Init(), receive3.WithRouting(3, h).Init(), + receive4.WithRouting(3, h).Init(), receive5.WithRouting(3, h).Init(), receive6.WithRouting(3, h).Init(), + )) + + query1 := e2edb.NewThanosQuerier( + e, + "query1", + []string{ + receive1.InternalEndpoint("grpc"), + receive2.InternalEndpoint("grpc"), + receive3.InternalEndpoint("grpc"), + receive4.InternalEndpoint("grpc"), + receive5.InternalEndpoint("grpc"), + receive6.InternalEndpoint("grpc"), + }, + e2edb.WithImage("thanos:latest"), + e2edb.WithFlagOverride(map[string]string{"--tracing.config": string(jaegerConfig)}), + ) + testutil.Ok(t, e2e.StartAndWaitReady(query1)) + + // Wait until we have 6 gRPC connections. + testutil.Ok(t, query1.WaitSumMetricsWithOptions(e2emon.Equals(6), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) + testutil.Ok(t, e2einteractive.OpenInBrowser(fmt.Sprintf("http://%s", query1.Endpoint("http")))) + + // Tracing endpoint. + testutil.Ok(t, e2einteractive.OpenInBrowser("http://"+j.Endpoint("http-front"))) + testutil.Ok(t, e2einteractive.RunUntilEndpointHit()) +} diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index d9c6293a53..250d68873b 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -540,7 +540,7 @@ func (r *ReceiveBuilder) Init() *e2emon.InstrumentedRunnable { "--remote-write.address": ":8081", "--label": fmt.Sprintf(`receive="%s"`, r.Name()), "--tsdb.path": filepath.Join(r.InternalDir(), "data"), - "--log.level": "debug", + "--log.level": infoLogLevel, "--tsdb.max-exemplars": fmt.Sprintf("%v", r.maxExemplars), } From 89e8be145b05f59126778397bffc75446325b5ba Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 21 Oct 2022 10:26:00 +0200 Subject: [PATCH 05/11] Add second success mode; fix replica success group count - Adds second success mode - even if requests from multiple replica group fails, we can still claim quorum as long as the failed requests are from max permissible number of endpoints (this depends on quorum). - Fix replica success group count from previous change - the count should be local to the success counting loop - Adjust error write determination for requests counted per replica- we need to either ensure there is only single error cause or fallback to a single error to signal failed replication; this is because if there's a mixed bag of errors, we cannot guarantee write success and we simply need to return that this particular replication failed - Add more tags to traces Signed-off-by: Matej Gera --- pkg/receive/handler.go | 152 ++++++++++++++++++++++++++++------------- 1 file changed, 104 insertions(+), 48 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 89c709cc30..84f4bf9427 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -513,7 +513,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { responseStatusCode := http.StatusOK if err = h.handleRequest(ctx, rep, tenant, &wreq); err != nil { level.Debug(tLogger).Log("msg", "failed to handle request", "err", err) - switch determineWriteErrorCause(err, 1) { + switch determineWriteErrorCause(err, 1, false) { case errNotReady: responseStatusCode = http.StatusServiceUnavailable case errUnavailable: @@ -586,7 +586,20 @@ func (h *Handler) writeQuorum() int { // fanoutForward fans out concurrently given set of write requests. It returns status immediately when quorum of // requests succeeds or fails or if context is canceled. func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]*prompb.WriteRequest, successThreshold int) error { - var errs errutil.MultiError + var ( + errs errutil.MultiError + + // Create channel to send errors together with endpoint and replica on which it occurred. + ec = make(chan errWithReplicaFunc) + + // replicaGroupReqs counts the number of requests that will be made on behalf + // on each replica. This is used to determine write success. + replicaGroupReqs = make([]int, h.options.ReplicationFactor) + ) + + for er := range wreqs { + replicaGroupReqs[er.replica.n]++ + } fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout) defer func() { @@ -607,16 +620,6 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e tLogger = log.With(h.logger, logTags) } - // Create channel to send errors together with replica on which it occurred. - ec := make(chan errWithReplicaFunc) - - // replicaGroupReqs counts the number of requests that will be made on behalf - // on each replica. This is used to determine write success. - replicaGroupReqs := make([]int, h.options.ReplicationFactor) - for er := range wreqs { - replicaGroupReqs[er.replica.n]++ - } - var wg sync.WaitGroup for er := range wreqs { er := er @@ -637,12 +640,12 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e }) if err != nil { h.replications.WithLabelValues(labelError).Inc() - sendErrWithReplica(ec, r.n, errors.Wrapf(err, "replicate write request for endpoint %v", endpoint)) + sendErrWithReplica(ec, r.n, endpoint, errors.Wrapf(err, "replicate write request for endpoint %v", endpoint)) return } h.replications.WithLabelValues(labelSuccess).Inc() - sendErrWithReplica(ec, r.n, nil) + sendErrWithReplica(ec, r.n, endpoint, nil) }(endpoint) continue @@ -666,10 +669,10 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // When a MultiError is added to another MultiError, the error slices are concatenated, not nested. // To avoid breaking the counting logic, we need to flatten the error. level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error()) - sendErrWithReplica(ec, r.n, errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint)) + sendErrWithReplica(ec, r.n, endpoint, errors.Wrapf(determineWriteErrorCause(err, 1, false), "store locally for endpoint %v", endpoint)) return } - sendErrWithReplica(ec, r.n, nil) + sendErrWithReplica(ec, r.n, endpoint, nil) }(endpoint) continue @@ -694,7 +697,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e cl, err = h.peers.get(fctx, endpoint) if err != nil { - sendErrWithReplica(ec, r.n, errors.Wrapf(err, "get peer connection for endpoint %v", endpoint)) + sendErrWithReplica(ec, r.n, endpoint, errors.Wrapf(err, "get peer connection for endpoint %v", endpoint)) return } @@ -703,7 +706,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e if ok { if time.Now().Before(b.nextAllowed) { h.mtx.RUnlock() - sendErrWithReplica(ec, r.n, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint)) + sendErrWithReplica(ec, r.n, endpoint, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint)) return } } @@ -735,14 +738,14 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e h.mtx.Unlock() } } - sendErrWithReplica(ec, r.n, errors.Wrapf(err, "forwarding request to endpoint %v", endpoint)) + sendErrWithReplica(ec, r.n, endpoint, errors.Wrapf(err, "forwarding request to endpoint %v", endpoint)) return } h.mtx.Lock() delete(h.peerStates, endpoint) h.mtx.Unlock() - sendErrWithReplica(ec, r.n, nil) + sendErrWithReplica(ec, r.n, endpoint, nil) }(endpoint) } @@ -756,7 +759,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e defer func() { go func() { for errReplicaFn := range ec { - _, err := errReplicaFn() + _, _, err := errReplicaFn() if err != nil { level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", err) } @@ -769,10 +772,9 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e success = make([]int, h.options.ReplicationFactor) // replicaMultiError contains all requests errors per replica. replicaMultiError = make([]errutil.MultiError, h.options.ReplicationFactor) - // replicaGroupSuccess counts how many replica groups have succeeded. - // This needs to amount to the success threshold (quorum), in order to - // claim replication success. - replicaGroupSuccess int + + endpointFailures []string + maxEndpointFailures = calculateMaxEndpointFailures(int(h.options.ReplicationFactor), successThreshold) ) for { @@ -783,27 +785,37 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e if !more { for i, rme := range replicaMultiError { // Only if we can determine same error for all requests within replica group, - // we return the cause, otherwise fallback to original original. - errs.Add(determineWriteErrorCause(rme.Err(), replicaGroupReqs[i])) + // we return the cause, otherwise inform that replication failed. + errs.Add(determineWriteErrorCause(rme.Err(), replicaGroupReqs[i], replicaGroupReqs[i] > 1)) + } + finalErr := errs.Err() + // Second success mode - it is permissible to fail all requests to a number of endpoints, + // in which case we still know quorum is reached. + // This is useful if e.g. one or few nodes are unresponsive. + if len(endpointFailures) <= maxEndpointFailures { + level.Debug(tLogger).Log("msg", "some requests failed, but not needed to achieve quorum", "err", finalErr) + return nil } - return errs.Err() + return finalErr } - replica, err := errReplicaFn() + replica, endpoint, err := errReplicaFn() if err == nil { + var replicaGroupSuccess int success[replica]++ for i := range success { - if success[i] >= replicaGroupReqs[i] { - // If enough replica groups succeed, we can finish early (quorum - // is guaranteed). + // First success mode - if enough replica groups succeed, we can finish early (quorum + // is guaranteed). + if success[i] == replicaGroupReqs[i] { replicaGroupSuccess++ - if replicaGroupSuccess == successThreshold { - return nil - } + } + if replicaGroupSuccess == successThreshold { + return nil } } continue } replicaMultiError[replica].Add(err) + endpointFailures = countEndpointFailures(endpointFailures, endpoint) } } } @@ -849,7 +861,7 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri quorum := h.writeQuorum() // fanoutForward only returns an error if successThreshold (quorum) is not reached. if err := h.fanoutForward(ctx, tenant, replicatedRequests, quorum); err != nil { - return errors.Wrap(determineWriteErrorCause(err, quorum), "quorum not reached") + return errors.Wrap(determineWriteErrorCause(err, quorum, false), "quorum not reached") } return nil } @@ -863,7 +875,7 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st if err != nil { level.Debug(h.logger).Log("msg", "failed to handle request", "err", err) } - switch determineWriteErrorCause(err, 1) { + switch determineWriteErrorCause(err, 1, false) { case nil: return &storepb.WriteResponse{}, nil case errNotReady: @@ -896,6 +908,27 @@ func (h *Handler) relabel(wreq *prompb.WriteRequest) { wreq.Timeseries = timeSeries } +// countEndpointFailures count how many distinct endpoints has responded with an error. +func countEndpointFailures(endpointFailures []string, endpoint string) []string { + for _, ef := range endpointFailures { + if ef == endpoint { + return endpointFailures + } + } + + return append(endpointFailures, endpoint) +} + +// calculateMaxEndpointFailures returns maximum number of permissible distinct endpoints +// that can fail while we can still claim quorum. +func calculateMaxEndpointFailures(replicationFactor int, quorum int) int { + if quorum == 1 { + return 0 + } + + return replicationFactor - quorum +} + // isConflict returns whether or not the given error represents a conflict. func isConflict(err error) bool { if err == nil { @@ -962,17 +995,31 @@ func (a expectedErrors) Len() int { return len(a) } func (a expectedErrors) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a expectedErrors) Less(i, j int) bool { return a[i].count < a[j].count } +func (a expectedErrors) moreThanOneCause() bool { + var oneCauseFound bool + for _, ee := range a { + if oneCauseFound && ee.count > 0 { + return true + } else if ee.count > 0 { + oneCauseFound = true + } + } + + return false +} + // determineWriteErrorCause extracts a sentinel error that has occurred more than the given threshold from a given fan-out error. -// It will inspect the error's cause if the error is a MultiError, -// It will return cause of each contained error but will not traverse any deeper. -func determineWriteErrorCause(err error, threshold int) error { +// It will inspect the error's cause if the error is a MultiError. It will return cause of each contained error but will not traverse any deeper. +// If no cause can be determined, the original error is returned. When forReplication is true, +// it will return failed replication error on any unknown error cause. +func determineWriteErrorCause(err error, threshold int, forReplication bool) error { if err == nil { return nil } unwrappedErr := errors.Cause(err) - errs, ok := unwrappedErr.(errutil.NonNilMultiError) - if !ok { + errs, isMultiError := unwrappedErr.(errutil.NonNilMultiError) + if !isMultiError { errs = []error{unwrappedErr} } if len(errs) == 0 { @@ -988,6 +1035,7 @@ func determineWriteErrorCause(err error, threshold int) error { {err: errNotReady, cause: isNotReady}, {err: errUnavailable, cause: isUnavailable}, } + var globalCount int for _, exp := range expErrs { exp.count = 0 for _, err := range errs { @@ -997,9 +1045,17 @@ func determineWriteErrorCause(err error, threshold int) error { } // If conflict only errors, return it directly regardless of threshold. - if exp.err == errConflict && exp.count == len(errs) { + if isMultiError && exp.err == errConflict && exp.count == len(errs) { return errConflict } + + globalCount += exp.count + } + + // If we're determining for a replication request, we want to + // respond with failure on any single unknown error or if there are mixed error causes. + if forReplication && (globalCount != len(errs) || expErrs.moreThanOneCause()) { + return errors.Wrapf(err, "replicating request failed") } // Determine which error occurred most. @@ -1012,13 +1068,13 @@ func determineWriteErrorCause(err error, threshold int) error { } // errWithReplicaFunc is a type to enable sending replica number and err over channel. -type errWithReplicaFunc func() (uint64, error) +type errWithReplicaFunc func() (uint64, string, error) -// sendErrWithReplica is a utility func to send replica number and error over channel. -// Replica number is used to determine write quorum, based on the number of requests, +// sendErrWithReplica is a utility func to send replica number, endpoint and error over channel. +// Replica number and endpoint is used to determine write quorum, based on the number of requests, // see the comments above. -func sendErrWithReplica(f chan errWithReplicaFunc, r uint64, err error) { - f <- func() (uint64, error) { return r, err } +func sendErrWithReplica(f chan errWithReplicaFunc, r uint64, e string, err error) { + f <- func() (uint64, string, error) { return r, e, err } } func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup { From 8dfe57358eeda8e73424476ecce6115757dea488 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 21 Oct 2022 10:37:30 +0200 Subject: [PATCH 06/11] Extended error determination and quorum confirmation unit tests - Add new unit tests for error determination - Simplify and unify quorum unit tests; run for both hashing algorithms; extended unit tests (add tests with 6 nodes and higher repl factor) Signed-off-by: Matej Gera --- pkg/receive/handler_test.go | 638 +++++++++++++----------------------- 1 file changed, 229 insertions(+), 409 deletions(-) diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index b183ea6722..1310408d63 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -34,10 +34,6 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/runutil" @@ -45,14 +41,18 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" "github.com/thanos-io/thanos/pkg/testutil" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func TestDetermineWriteErrorCause(t *testing.T) { for _, tc := range []struct { - name string - err error - threshold int - exp error + name string + err error + threshold int + forReplication bool + exp error }{ { name: "nil", @@ -235,8 +235,53 @@ func TestDetermineWriteErrorCause(t *testing.T) { threshold: 3, exp: errConflict, }, + { + name: "below threshold but only conflict errors", + err: errutil.NonNilMultiError([]error{ + status.Error(codes.AlreadyExists, "conflict"), + status.Error(codes.AlreadyExists, "conflict"), + }), + threshold: 3, + exp: errConflict, + }, + { + name: "for replication with single error cause", + err: errutil.NonNilMultiError([]error{ + errNotReady, + errNotReady, + errNotReady, + errNotReady, + }), + threshold: 3, + forReplication: true, + exp: errNotReady, + }, + { + name: "for replication with mixed error causes", + err: errutil.NonNilMultiError([]error{ + errNotReady, + errConflict, + errNotReady, + errNotReady, + }), + threshold: 3, + forReplication: true, + exp: errors.New("replicating request failed: 4 errors: target not ready; conflict; target not ready; target not ready"), + }, + { + name: "for replication with unknown error", + err: errutil.NonNilMultiError([]error{ + storage.ErrOutOfOrderSample, + storage.ErrOutOfOrderSample, + storage.ErrOutOfOrderSample, + errors.New("foo"), + }), + threshold: 3, + forReplication: true, + exp: errors.New("replicating request failed: 4 errors: out of order sample; out of order sample; out of order sample; foo"), + }, } { - err := determineWriteErrorCause(tc.err, tc.threshold) + err := determineWriteErrorCause(tc.err, tc.threshold, tc.forReplication) if tc.exp != nil { testutil.NotOk(t, err) testutil.Equals(t, tc.exp.Error(), err.Error()) @@ -352,7 +397,7 @@ func (f *fakeAppender) Rollback() error { return f.rollbackErr() } -func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) { +func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64, hashringAlgo HashringAlgorithm) ([]*Handler, Hashring) { var ( cfg = []HashringConfig{{Hashring: "test"}} handlers []*Handler @@ -387,43 +432,26 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin cfg[0].Endpoints = append(cfg[0].Endpoints, h.options.Endpoint) peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h} } - hashring := newMultiHashring(AlgorithmHashmod, replicationFactor, cfg) + // Use hashmod as default. + if hashringAlgo == "" { + hashringAlgo = AlgorithmHashmod + } + + hashring := newMultiHashring(hashringAlgo, replicationFactor, cfg) for _, h := range handlers { h.Hashring(hashring) } return handlers, hashring } -func TestReceiveQuorum(t *testing.T) { +func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsistencyDelay bool) { appenderErrFn := func() error { return errors.New("failed to get appender") } conflictErrFn := func() error { return storage.ErrOutOfBounds } commitErrFn := func() error { return errors.New("failed to commit") } - wreq1 := &prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ - { - Labels: []labelpb.ZLabel{ - { - Name: "foo", - Value: "bar", - }, - }, - Samples: []prompb.Sample{ - { - Value: 1, - Timestamp: 1, - }, - { - Value: 2, - Timestamp: 2, - }, - { - Value: 3, - Timestamp: 3, - }, - }, - }, - }, + wreq := &prompb.WriteRequest{ + Timeseries: makeSeriesWithValues(50), } + for _, tc := range []struct { name string status int @@ -435,7 +463,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 1 success", status: http.StatusOK, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -446,7 +474,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 1 commit error", status: http.StatusInternalServerError, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -457,7 +485,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 1 conflict", status: http.StatusConflict, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(conflictErrFn, nil, nil), @@ -468,7 +496,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 2 success", status: http.StatusOK, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -482,7 +510,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 success", status: http.StatusOK, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -499,7 +527,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 success with replication", status: http.StatusOK, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -516,7 +544,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 commit error", status: http.StatusInternalServerError, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -533,7 +561,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 commit error with replication", status: http.StatusInternalServerError, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -550,7 +578,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 appender error with replication", status: http.StatusInternalServerError, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -570,7 +598,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 conflict with replication", status: http.StatusConflict, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(conflictErrFn, nil, nil), @@ -587,7 +615,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 conflict and commit error with replication", status: http.StatusConflict, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(conflictErrFn, commitErrFn, nil), @@ -604,7 +632,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 with replication and one faulty", status: http.StatusOK, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), @@ -621,7 +649,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 with replication and one commit error", status: http.StatusOK, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -638,7 +666,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 with replication and two conflicts", status: http.StatusConflict, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), @@ -655,11 +683,28 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 with replication one conflict and one commit error", status: http.StatusInternalServerError, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, { appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), }, + }, + }, + { + name: "size 3 with replication two commit errors", + status: http.StatusInternalServerError, + replicationFactor: 3, + wreq: wreq, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, commitErrFn, nil), + }, { appender: newFakeAppender(nil, commitErrFn, nil), }, @@ -669,10 +714,62 @@ func TestReceiveQuorum(t *testing.T) { }, }, { - name: "size 3 with replication two commit errors", + name: "size 6 with replication 3", + status: http.StatusOK, + replicationFactor: 3, + wreq: wreq, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + }, + }, + { + name: "size 6 with replication 3 one commit and two conflict error", status: http.StatusInternalServerError, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(nil, conflictErrFn, nil), + }, + { + appender: newFakeAppender(nil, conflictErrFn, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + }, + }, + { + name: "size 6 with replication 5 two commit errors", + status: http.StatusOK, + replicationFactor: 5, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -683,11 +780,20 @@ func TestReceiveQuorum(t *testing.T) { { appender: newFakeAppender(nil, nil, nil), }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, }, }, } { t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor) + handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor, hashringAlgo) tenant := "test" // Test from the point of view of every node // so that we know status code does not depend @@ -702,6 +808,11 @@ func TestReceiveQuorum(t *testing.T) { t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i, tc.status, rec.Code, rec.Body.String()) } } + + if withConsistencyDelay { + time.Sleep(50 * time.Millisecond) + } + // Test that each time series is stored // the correct amount of times in each fake DB. for _, ts := range tc.wreq.Timeseries { @@ -713,16 +824,30 @@ func TestReceiveQuorum(t *testing.T) { } } for j, a := range tc.appendables { - var expectedMin int - n := a.appender.(*fakeAppender).Get(lset) - got := uint64(len(n)) - if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { - // We have len(handlers) copies of each sample because the test case - // is run once for each handler and they all use the same appender. - expectedMin = int((tc.replicationFactor/2)+1) * len(ts.Samples) - } - if uint64(expectedMin) > got { - t.Errorf("handler: %d, labels %q: expected minimum of %d samples, got %d", j, lset.String(), expectedMin, got) + if withConsistencyDelay { + var expected int + n := a.appender.(*fakeAppender).Get(lset) + got := uint64(len(n)) + if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { + // We have len(handlers) copies of each sample because the test case + // is run once for each handler and they all use the same appender. + expected = len(handlers) * len(ts.Samples) + } + if uint64(expected) != got { + t.Errorf("handler: %d, labels %q: expected %d samples, got %d", j, lset.String(), expected, got) + } + } else { + var expectedMin int + n := a.appender.(*fakeAppender).Get(lset) + got := uint64(len(n)) + if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { + // We have len(handlers) copies of each sample because the test case + // is run once for each handler and they all use the same appender. + expectedMin = int((tc.replicationFactor/2)+1) * len(ts.Samples) + } + if uint64(expectedMin) > got { + t.Errorf("handler: %d, labels %q: expected minimum of %d samples, got %d", j, lset.String(), expectedMin, got) + } } } } @@ -730,6 +855,22 @@ func TestReceiveQuorum(t *testing.T) { } } +func TestReceiveQuorumHashmod(t *testing.T) { + testReceiveQuorum(t, AlgorithmHashmod, false) +} + +func TestReceiveQuorumKetama(t *testing.T) { + testReceiveQuorum(t, AlgorithmKetama, false) +} + +func TestReceiveWithConsistencyDelayHashmod(t *testing.T) { + testReceiveQuorum(t, AlgorithmHashmod, true) +} + +func TestReceiveWithConsistencyDelayKetama(t *testing.T) { + testReceiveQuorum(t, AlgorithmKetama, true) +} + func TestReceiveWriteRequestLimits(t *testing.T) { for _, tc := range []struct { name string @@ -782,7 +923,7 @@ func TestReceiveWriteRequestLimits(t *testing.T) { appender: newFakeAppender(nil, nil, nil), }, } - handlers, _ := newTestHandlerHashring(appendables, 3) + handlers, _ := newTestHandlerHashring(appendables, 3, AlgorithmHashmod) handler := handlers[0] tenant := "test" handler.limiter = NewLimiter( @@ -831,348 +972,6 @@ func TestReceiveWriteRequestLimits(t *testing.T) { } } -func TestReceiveWithConsistencyDelay(t *testing.T) { - appenderErrFn := func() error { return errors.New("failed to get appender") } - conflictErrFn := func() error { return storage.ErrOutOfBounds } - commitErrFn := func() error { return errors.New("failed to commit") } - wreq1 := &prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ - { - Labels: []labelpb.ZLabel{ - { - Name: "foo", - Value: "bar", - }, - }, - Samples: []prompb.Sample{ - { - Value: 1, - Timestamp: 1, - }, - { - Value: 2, - Timestamp: 2, - }, - { - Value: 3, - Timestamp: 3, - }, - }, - }, - }, - } - for _, tc := range []struct { - name string - status int - replicationFactor uint64 - wreq *prompb.WriteRequest - appendables []*fakeAppendable - }{ - { - name: "size 1 success", - status: http.StatusOK, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 1 commit error", - status: http.StatusInternalServerError, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - }, - }, - { - name: "size 1 conflict", - status: http.StatusConflict, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - }, - }, - { - name: "size 2 success", - status: http.StatusOK, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 success", - status: http.StatusOK, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 success with replication", - status: http.StatusOK, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 commit error", - status: http.StatusInternalServerError, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - }, - }, - { - name: "size 3 commit error with replication", - status: http.StatusInternalServerError, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - }, - }, - { - name: "size 3 appender error with replication", - status: http.StatusInternalServerError, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - appenderErr: appenderErrFn, - }, - { - appender: newFakeAppender(nil, nil, nil), - appenderErr: appenderErrFn, - }, - { - appender: newFakeAppender(nil, nil, nil), - appenderErr: appenderErrFn, - }, - }, - }, - { - name: "size 3 conflict with replication", - status: http.StatusConflict, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - }, - }, - { - name: "size 3 conflict and commit error with replication", - status: http.StatusConflict, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(conflictErrFn, commitErrFn, nil), - }, - { - appender: newFakeAppender(conflictErrFn, commitErrFn, nil), - }, - { - appender: newFakeAppender(conflictErrFn, commitErrFn, nil), - }, - }, - }, - { - name: "size 3 with replication and one faulty", - status: http.StatusOK, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 with replication and one commit error", - status: http.StatusOK, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 with replication and two conflicts", - status: http.StatusConflict, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), - }, - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 with replication one conflict and one commit error", - status: http.StatusInternalServerError, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 with replication two commit errors", - status: http.StatusInternalServerError, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - } { - // Run the quorum tests with consistency delay, which should allow us - // to see all requests completing all the time, since we're using local - // network we are not expecting anything to go wrong with these. - t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor) - tenant := "test" - // Test from the point of view of every node - // so that we know status code does not depend - // on which node is erroring and which node is receiving. - for i, handler := range handlers { - // Test that the correct status is returned. - rec, err := makeRequest(handler, tenant, tc.wreq) - if err != nil { - t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", tc.status, err) - } - if rec.Code != tc.status { - t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i, tc.status, rec.Code, rec.Body.String()) - } - } - - time.Sleep(50 * time.Millisecond) - - // Test that each time series is stored - // the correct amount of times in each fake DB. - for _, ts := range tc.wreq.Timeseries { - lset := make(labels.Labels, len(ts.Labels)) - for j := range ts.Labels { - lset[j] = labels.Label{ - Name: ts.Labels[j].Name, - Value: ts.Labels[j].Value, - } - } - for j, a := range tc.appendables { - var expected int - n := a.appender.(*fakeAppender).Get(lset) - got := uint64(len(n)) - if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { - // We have len(handlers) copies of each sample because the test case - // is run once for each handler and they all use the same appender. - expected = len(handlers) * len(ts.Samples) - } - if uint64(expected) != got { - t.Errorf("handler: %d, labels %q: expected %d samples, got %d", j, lset.String(), expected, got) - } - } - } - }) - } -} - // endpointHit is a helper to determine if a given endpoint in a hashring would be selected // for a given time series, tenant, and replication factor. func endpointHit(t *testing.T, h Hashring, rf uint64, endpoint, tenant string, timeSeries *prompb.TimeSeries) bool { @@ -1301,10 +1100,31 @@ func serializeSeriesWithOneSample(t testing.TB, series [][]labelpb.ZLabel) []byt return snappy.Encode(nil, body) } +func makeSeriesWithValues(numSeries int) []prompb.TimeSeries { + series := make([]prompb.TimeSeries, numSeries) + for i := 0; i < numSeries; i++ { + series[i] = prompb.TimeSeries{ + Labels: []labelpb.ZLabel{ + { + Name: fmt.Sprintf("pod-%d", i), + Value: fmt.Sprintf("nginx-%d", i), + }, + }, + Samples: []prompb.Sample{ + { + Value: float64(i), + Timestamp: 10, + }, + }, + } + } + return series +} + func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { dir := b.TempDir() - handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1) + handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1, AlgorithmHashmod) handler := handlers[0] reg := prometheus.NewRegistry() From e6ca767c7d6f2b99fae2b658211f0fa1a6ba357f Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 21 Oct 2022 13:47:21 +0200 Subject: [PATCH 07/11] Address feedback (typos, naming) Signed-off-by: Matej Gera --- pkg/receive/handler.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 84f4bf9427..2d42e2fdee 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -592,13 +592,13 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // Create channel to send errors together with endpoint and replica on which it occurred. ec = make(chan errWithReplicaFunc) - // replicaGroupReqs counts the number of requests that will be made on behalf - // on each replica. This is used to determine write success. - replicaGroupReqs = make([]int, h.options.ReplicationFactor) + // replicaReqs counts the number of requests that will be made on behalf + // of each replica. This is used to determine write success. + replicaReqs = make([]int, h.options.ReplicationFactor) ) for er := range wreqs { - replicaGroupReqs[er.replica.n]++ + replicaReqs[er.replica.n]++ } fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout) @@ -768,11 +768,12 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e }() var ( - // success counts how many requests per replica have succedeed. - success = make([]int, h.options.ReplicationFactor) + // perReplicaSuccess is used to determine if enough requests succeeded for given replica number. + preReplicaSuccess = make([]int, h.options.ReplicationFactor) // replicaMultiError contains all requests errors per replica. replicaMultiError = make([]errutil.MultiError, h.options.ReplicationFactor) + // endpointFailures keeps list of endpoints to which at least one request failed. endpointFailures []string maxEndpointFailures = calculateMaxEndpointFailures(int(h.options.ReplicationFactor), successThreshold) ) @@ -786,11 +787,11 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e for i, rme := range replicaMultiError { // Only if we can determine same error for all requests within replica group, // we return the cause, otherwise inform that replication failed. - errs.Add(determineWriteErrorCause(rme.Err(), replicaGroupReqs[i], replicaGroupReqs[i] > 1)) + errs.Add(determineWriteErrorCause(rme.Err(), replicaReqs[i], replicaReqs[i] > 1)) } finalErr := errs.Err() - // Second success mode - it is permissible to fail all requests to a number of endpoints, - // in which case we still know quorum is reached. + // Second success mode - it is permissible to fail all requests to specific endpoints, + // up to maxEndpointFailures, in which case we still know quorum is reached. // This is useful if e.g. one or few nodes are unresponsive. if len(endpointFailures) <= maxEndpointFailures { level.Debug(tLogger).Log("msg", "some requests failed, but not needed to achieve quorum", "err", finalErr) @@ -801,11 +802,11 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e replica, endpoint, err := errReplicaFn() if err == nil { var replicaGroupSuccess int - success[replica]++ - for i := range success { + preReplicaSuccess[replica]++ + for i := range preReplicaSuccess { // First success mode - if enough replica groups succeed, we can finish early (quorum // is guaranteed). - if success[i] == replicaGroupReqs[i] { + if preReplicaSuccess[i] == replicaReqs[i] { replicaGroupSuccess++ } if replicaGroupSuccess == successThreshold { From ac81f1c097118e7fea6022b3b11aa9eaa812ef5f Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 21 Oct 2022 14:00:05 +0200 Subject: [PATCH 08/11] Add CHANGELOG Signed-off-by: Matej Gera --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa50179c71..866022ae6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5702](https://github.com/thanos-io/thanos/pull/5702) Store: Upgrade minio-go/v7 to fix panic caused by leaked goroutines. - [#5736](https://github.com/thanos-io/thanos/pull/5736) Compact: Fix crash in GatherNoCompactionMarkFilter.NoCompactMarkedBlocks. - [#5763](https://github.com/thanos-io/thanos/pull/5763) Compact: Enable metadata cache. +- [#5791](https://github.com/thanos-io/thanos/pull/5791) Receive: Fix quorum validation for `ketama` hashing. ### Added * [#5654](https://github.com/thanos-io/thanos/pull/5654) Query: add `--grpc-compression` flag that controls the compression used in gRPC client. With the flag it is now possible to compress the traffic between Query and StoreAPI nodes - you get lower network usage in exchange for a bit higher CPU/RAM usage. From b0ed83ad9bf3a618e209496598b564bf1fa455f7 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Mon, 24 Oct 2022 17:17:46 +0200 Subject: [PATCH 09/11] Make node addresses in quorum tests non-random Signed-off-by: Matej Gera --- pkg/receive/handler_test.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 1310408d63..6838a6882e 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -9,7 +9,6 @@ import ( "fmt" "io" "math" - "math/rand" "net/http" "net/http/httptest" "os" @@ -416,6 +415,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin }, } + ag := addrGen{} for i := range appendables { h := NewHandler(nil, &Options{ TenantHeader: DefaultTenantHeader, @@ -427,7 +427,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin }) handlers = append(handlers, h) h.peers = peers - addr := randomAddr() + addr := ag.newAddr() h.options.Endpoint = addr cfg[0].Endpoints = append(cfg[0].Endpoints, h.options.Endpoint) peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h} @@ -1022,8 +1022,11 @@ func makeRequest(h *Handler, tenant string, wreq *prompb.WriteRequest) (*httptes return rec, nil } -func randomAddr() string { - return fmt.Sprintf("http://%d.%d.%d.%d:%d", rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(35000)+30000) +type addrGen struct{ n int } + +func (a *addrGen) newAddr() string { + a.n++ + return fmt.Sprintf("http://node-%d:%d", a.n, 12345+a.n) } type fakeRemoteWriteGRPCServer struct { From f0ab5f5dea7d4e36f4b9361c58522be5f57e08fd Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Wed, 26 Oct 2022 10:38:55 +0200 Subject: [PATCH 10/11] More naming improvements Signed-off-by: Matej Gera --- pkg/receive/handler.go | 79 ++++++++++++++++++++++-------------------- 1 file changed, 42 insertions(+), 37 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 2d42e2fdee..9b3f367706 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -341,11 +341,12 @@ func (h *Handler) Run() error { return httpSrv.Serve(h.listener) } -// replica encapsulates the replica number of a request and if the request is -// already replicated. +// replica encapsulates the replication index of a request and if the request is +// already replicated. Replication index represents the number of logical replica to which +// the request belongs. type replica struct { - n uint64 - replicated bool + replicationIndex uint64 + replicated bool } // endpointReplica is a pair of a receive endpoint and a write request replica. @@ -373,11 +374,11 @@ func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, return errBadReplica } - r := replica{n: rep, replicated: rep != 0} + r := replica{replicationIndex: rep, replicated: rep != 0} // On the wire, format is 1-indexed and in-code is 0-indexed, so we decrement the value if it was already replicated. if r.replicated { - r.n-- + r.replicationIndex-- } // Forward any time series as necessary. All time series @@ -561,7 +562,7 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p // one request per time series. wreqs := make(map[endpointReplica]*prompb.WriteRequest) for i := range wreq.Timeseries { - endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[i], r.n) + endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[i], r.replicationIndex) if err != nil { h.mtx.RUnlock() return err @@ -592,13 +593,13 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // Create channel to send errors together with endpoint and replica on which it occurred. ec = make(chan errWithReplicaFunc) - // replicaReqs counts the number of requests that will be made on behalf - // of each replica. This is used to determine write success. - replicaReqs = make([]int, h.options.ReplicationFactor) + // requestsPerReplicationIndex counts the number of requests that will be made on behalf + // of each logical replica. This is used to determine write success. + requestsPerReplicationIndex = make([]int, h.options.ReplicationFactor) ) for er := range wreqs { - replicaReqs[er.replica.n]++ + requestsPerReplicationIndex[er.replica.replicationIndex]++ } fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout) @@ -640,12 +641,12 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e }) if err != nil { h.replications.WithLabelValues(labelError).Inc() - sendErrWithReplica(ec, r.n, endpoint, errors.Wrapf(err, "replicate write request for endpoint %v", endpoint)) + sendErrWithReplica(ec, r.replicationIndex, endpoint, errors.Wrapf(err, "replicate write request for endpoint %v", endpoint)) return } h.replications.WithLabelValues(labelSuccess).Inc() - sendErrWithReplica(ec, r.n, endpoint, nil) + sendErrWithReplica(ec, r.replicationIndex, endpoint, nil) }(endpoint) continue @@ -669,10 +670,10 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // When a MultiError is added to another MultiError, the error slices are concatenated, not nested. // To avoid breaking the counting logic, we need to flatten the error. level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error()) - sendErrWithReplica(ec, r.n, endpoint, errors.Wrapf(determineWriteErrorCause(err, 1, false), "store locally for endpoint %v", endpoint)) + sendErrWithReplica(ec, r.replicationIndex, endpoint, errors.Wrapf(determineWriteErrorCause(err, 1, false), "store locally for endpoint %v", endpoint)) return } - sendErrWithReplica(ec, r.n, endpoint, nil) + sendErrWithReplica(ec, r.replicationIndex, endpoint, nil) }(endpoint) continue @@ -697,7 +698,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e cl, err = h.peers.get(fctx, endpoint) if err != nil { - sendErrWithReplica(ec, r.n, endpoint, errors.Wrapf(err, "get peer connection for endpoint %v", endpoint)) + sendErrWithReplica(ec, r.replicationIndex, endpoint, errors.Wrapf(err, "get peer connection for endpoint %v", endpoint)) return } @@ -706,7 +707,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e if ok { if time.Now().Before(b.nextAllowed) { h.mtx.RUnlock() - sendErrWithReplica(ec, r.n, endpoint, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint)) + sendErrWithReplica(ec, r.replicationIndex, endpoint, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint)) return } } @@ -719,7 +720,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e Timeseries: wreqs[er].Timeseries, Tenant: tenant, // Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated. - Replica: int64(r.n + 1), + Replica: int64(r.replicationIndex + 1), }) }) if err != nil { @@ -738,14 +739,14 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e h.mtx.Unlock() } } - sendErrWithReplica(ec, r.n, endpoint, errors.Wrapf(err, "forwarding request to endpoint %v", endpoint)) + sendErrWithReplica(ec, r.replicationIndex, endpoint, errors.Wrapf(err, "forwarding request to endpoint %v", endpoint)) return } h.mtx.Lock() delete(h.peerStates, endpoint) h.mtx.Unlock() - sendErrWithReplica(ec, r.n, endpoint, nil) + sendErrWithReplica(ec, r.replicationIndex, endpoint, nil) }(endpoint) } @@ -768,10 +769,14 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e }() var ( - // perReplicaSuccess is used to determine if enough requests succeeded for given replica number. - preReplicaSuccess = make([]int, h.options.ReplicationFactor) - // replicaMultiError contains all requests errors per replica. - replicaMultiError = make([]errutil.MultiError, h.options.ReplicationFactor) + // replicaRequestsSuccess tracks how many successful requests have been sent for each logical replica. + // Each index in the slice represents given replication index. + replicaRequestsSuccess = make([]int, h.options.ReplicationFactor) + + // replicaRequestsError records errors that resulted from replication requests. Each error is recorded + // into a multi error that corresponds to given logical replica. + // Multi error at each index in the slice represents given replication index. + replicaRequestsErrs = make([]errutil.MultiError, h.options.ReplicationFactor) // endpointFailures keeps list of endpoints to which at least one request failed. endpointFailures []string @@ -784,10 +789,10 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e return fctx.Err() case errReplicaFn, more := <-ec: if !more { - for i, rme := range replicaMultiError { + for i, rme := range replicaRequestsErrs { // Only if we can determine same error for all requests within replica group, // we return the cause, otherwise inform that replication failed. - errs.Add(determineWriteErrorCause(rme.Err(), replicaReqs[i], replicaReqs[i] > 1)) + errs.Add(determineWriteErrorCause(rme.Err(), requestsPerReplicationIndex[i], requestsPerReplicationIndex[i] > 1)) } finalErr := errs.Err() // Second success mode - it is permissible to fail all requests to specific endpoints, @@ -801,21 +806,21 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e } replica, endpoint, err := errReplicaFn() if err == nil { - var replicaGroupSuccess int - preReplicaSuccess[replica]++ - for i := range preReplicaSuccess { - // First success mode - if enough replica groups succeed, we can finish early (quorum - // is guaranteed). - if preReplicaSuccess[i] == replicaReqs[i] { - replicaGroupSuccess++ + var replicationsSuccessful int + replicaRequestsSuccess[replica]++ + for i := range replicaRequestsSuccess { + // First success mode - if enough logical replica requests succeed, + // we can finish early (quorum is guaranteed). + if replicaRequestsSuccess[i] == requestsPerReplicationIndex[i] { + replicationsSuccessful++ } - if replicaGroupSuccess == successThreshold { + if replicationsSuccessful == successThreshold { return nil } } continue } - replicaMultiError[replica].Add(err) + replicaRequestsErrs[replica].Add(err) endpointFailures = countEndpointFailures(endpointFailures, endpoint) } } @@ -845,7 +850,7 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri er := endpointReplica{ endpoint: endpoint, - replica: replica{n: i, replicated: true}, + replica: replica{replicationIndex: i, replicated: true}, } replicatedRequest, ok := replicatedRequests[er] if !ok { @@ -1071,7 +1076,7 @@ func determineWriteErrorCause(err error, threshold int, forReplication bool) err // errWithReplicaFunc is a type to enable sending replica number and err over channel. type errWithReplicaFunc func() (uint64, string, error) -// sendErrWithReplica is a utility func to send replica number, endpoint and error over channel. +// sendErrWithReplica is a utility func to send replication index, endpoint and error over channel. // Replica number and endpoint is used to determine write quorum, based on the number of requests, // see the comments above. func sendErrWithReplica(f chan errWithReplicaFunc, r uint64, e string, err error) { From 9a8e22f47510786f124b0d4c1db43264450b7153 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Thu, 27 Oct 2022 09:38:54 +0200 Subject: [PATCH 11/11] Bump to fix CI Signed-off-by: Matej Gera