diff --git a/CHANGELOG.md b/CHANGELOG.md index 200fefe250..742204f963 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6171](https://github.com/thanos-io/thanos/pull/6171) Store: fix error handling on limits. ### Changed +- [#6168](https://github.com/thanos-io/thanos/pull/6168) Receiver: Make ketama hashring fail early when configured with number of nodes lower than the replication factor. ### Removed diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index dd54809ec4..f06aa2eca7 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -160,7 +160,7 @@ func (f *fakeAppender) Rollback() error { return f.rollbackErr() } -func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64, hashringAlgo HashringAlgorithm) ([]*Handler, Hashring) { +func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64, hashringAlgo HashringAlgorithm) ([]*Handler, Hashring, error) { var ( cfg = []HashringConfig{{Hashring: "test"}} handlers []*Handler @@ -202,11 +202,14 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin hashringAlgo = AlgorithmHashmod } - hashring := newMultiHashring(hashringAlgo, replicationFactor, cfg) + hashring, err := newMultiHashring(hashringAlgo, replicationFactor, cfg) + if err != nil { + return nil, nil, err + } for _, h := range handlers { h.Hashring(hashring) } - return handlers, hashring + return handlers, hashring, nil } func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsistencyDelay bool) { @@ -576,7 +579,10 @@ func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsist }, } { t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor, hashringAlgo) + handlers, hashring, err := newTestHandlerHashring(tc.appendables, tc.replicationFactor, hashringAlgo) + if err != nil { + t.Fatalf("unable to create test handler: %v", err) + } tenant := "test" // Test from the point of view of every node // so that we know status code does not depend @@ -706,7 +712,10 @@ func TestReceiveWriteRequestLimits(t *testing.T) { appender: newFakeAppender(nil, nil, nil), }, } - handlers, _ := newTestHandlerHashring(appendables, 3, AlgorithmHashmod) + handlers, _, err := newTestHandlerHashring(appendables, 3, AlgorithmHashmod) + if err != nil { + t.Fatalf("unable to create test handler: %v", err) + } handler := handlers[0] tenant := "test" @@ -915,7 +924,10 @@ func makeSeriesWithValues(numSeries int) []prompb.TimeSeries { func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { dir := b.TempDir() - handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1, AlgorithmHashmod) + handlers, _, err := newTestHandlerHashring([]*fakeAppendable{nil}, 1, AlgorithmHashmod) + if err != nil { + b.Fatalf("unable to create test handler: %v", err) + } handler := handlers[0] reg := prometheus.NewRegistry() diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index 730a81f799..df41cdf72f 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -110,9 +110,14 @@ type ketamaHashring struct { numEndpoints uint64 } -func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFactor uint64) *ketamaHashring { +func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) { numSections := len(endpoints) * sectionsPerNode + if len(endpoints) < int(replicationFactor) { + return nil, errors.New("ketama: amount of endpoints needs to be larger than replication factor") + + } + hash := xxhash.New() ringSections := make(sections, 0, numSections) for endpointIndex, endpoint := range endpoints { @@ -135,7 +140,7 @@ func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFacto endpoints: endpoints, sections: ringSections, numEndpoints: uint64(len(endpoints)), - } + }, nil } // calculateSectionReplicas pre-calculates replicas for each section, @@ -234,17 +239,21 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st // groups. // Which hashring to use for a tenant is determined // by the tenants field of the hashring configuration. -func newMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg []HashringConfig) Hashring { +func newMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg []HashringConfig) (Hashring, error) { m := &multiHashring{ cache: make(map[string]Hashring), } for _, h := range cfg { var hashring Hashring + var err error if h.Algorithm != "" { - hashring = newHashring(h.Algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants) + hashring, err = newHashring(h.Algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants) } else { - hashring = newHashring(algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants) + hashring, err = newHashring(algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants) + } + if err != nil { + return nil, err } m.hashrings = append(m.hashrings, hashring) var t map[string]struct{} @@ -256,7 +265,7 @@ func newMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg } m.tenantSets = append(m.tenantSets, t) } - return m + return m, nil } // HashringFromConfigWatcher creates multi-tenant hashrings from a @@ -276,7 +285,11 @@ func HashringFromConfigWatcher(ctx context.Context, algorithm HashringAlgorithm, if !ok { return errors.New("hashring config watcher stopped unexpectedly") } - updates <- newMultiHashring(algorithm, replicationFactor, cfg) + h, err := newMultiHashring(algorithm, replicationFactor, cfg) + if err != nil { + return errors.Wrap(err, "unable to create new hashring from config") + } + updates <- h case <-ctx.Done(): return ctx.Err() } @@ -295,13 +308,13 @@ func HashringFromConfig(algorithm HashringAlgorithm, replicationFactor uint64, c return nil, errors.Wrapf(err, "failed to load configuration") } - return newMultiHashring(algorithm, replicationFactor, config), err + return newMultiHashring(algorithm, replicationFactor, config) } -func newHashring(algorithm HashringAlgorithm, endpoints []string, replicationFactor uint64, hashring string, tenants []string) Hashring { +func newHashring(algorithm HashringAlgorithm, endpoints []string, replicationFactor uint64, hashring string, tenants []string) (Hashring, error) { switch algorithm { case AlgorithmHashmod: - return simpleHashring(endpoints) + return simpleHashring(endpoints), nil case AlgorithmKetama: return newKetamaHashring(endpoints, SectionsPerNode, replicationFactor) default: @@ -309,6 +322,6 @@ func newHashring(algorithm HashringAlgorithm, endpoints []string, replicationFac level.Warn(l).Log("msg", "Unrecognizable hashring algorithm. Fall back to hashmod algorithm.", "hashring", hashring, "tenants", tenants) - return simpleHashring(endpoints) + return simpleHashring(endpoints), nil } } diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index 8c4de9d19e..550ce03bec 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -136,7 +136,9 @@ func TestHashringGet(t *testing.T) { }, }, } { - hs := newMultiHashring(AlgorithmHashmod, 3, tc.cfg) + hs, err := newMultiHashring(AlgorithmHashmod, 3, tc.cfg) + require.NoError(t, err) + h, err := hs.Get(tc.tenant, ts) if tc.nodes != nil { if err != nil { @@ -226,7 +228,8 @@ func TestKetamaHashringGet(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - hashRing := newKetamaHashring(test.nodes, 10, test.n+1) + hashRing, err := newKetamaHashring(test.nodes, 10, test.n+1) + require.NoError(t, err) result, err := hashRing.GetN("tenant", test.ts, test.n) require.NoError(t, err) @@ -235,6 +238,11 @@ func TestKetamaHashringGet(t *testing.T) { } } +func TestKetamaHashringBadConfigIsRejected(t *testing.T) { + _, err := newKetamaHashring([]string{"node-1"}, 1, 2) + require.Error(t, err) +} + func TestKetamaHashringConsistency(t *testing.T) { series := makeSeries() @@ -348,7 +356,10 @@ func assignSeries(series []prompb.TimeSeries, nodes []string) (map[string][]prom } func assignReplicatedSeries(series []prompb.TimeSeries, nodes []string, replicas uint64) (map[string][]prompb.TimeSeries, error) { - hashRing := newKetamaHashring(nodes, SectionsPerNode, replicas) + hashRing, err := newKetamaHashring(nodes, SectionsPerNode, replicas) + if err != nil { + return nil, err + } assignments := make(map[string][]prompb.TimeSeries) for i := uint64(0); i < replicas; i++ { for _, ts := range series {