diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index c2838e40bc892..af9f361cc20e3 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -383,6 +383,8 @@ func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Tas case <-ctx.Done(): // do nothing default: + // chunks may not be sorted + sort.Slice(res.Removals, func(i, j int) bool { return res.Removals[i].Less(res.Removals[j]) }) task.responses = append(task.responses, res) } } @@ -413,7 +415,7 @@ func orderedResponsesByFP(responses [][]v1.Output) v1.Iterator[v1.Output] { itrs = append(itrs, v1.NewPeekingIter(v1.NewSliceIter(r))) } return v1.NewHeapIterator[v1.Output]( - func(o1, o2 v1.Output) bool { return o1.Fp <= o2.Fp }, + func(o1, o2 v1.Output) bool { return o1.Fp < o2.Fp }, itrs..., ) } diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 4e54ed81e5521..15c9ca2be2d85 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -640,12 +640,12 @@ func TestFilterChunkRefs(t *testing.T) { { {fp: 0, checksums: []uint32{0, 1}}, {fp: 0, checksums: []uint32{0, 1, 2}}, - {fp: 1, checksums: []uint32{1}}, + {fp: 1, checksums: []uint32{0, 2}}, {fp: 2, checksums: []uint32{1}}, }, }, expected: mkResult([]instruction{ - {fp: 1, checksums: []uint32{0, 2}}, + {fp: 1, checksums: []uint32{1}}, {fp: 2, checksums: []uint32{0, 2}}, {fp: 3, checksums: []uint32{0, 1, 2}}, }), @@ -670,6 +670,27 @@ func TestFilterChunkRefs(t *testing.T) { {fp: 3, checksums: []uint32{0, 1, 2}}, }), }, + { + desc: "unordered fingerprints", + input: mkInput(4, 3), + removals: [][]instruction{ + { + {fp: 3, checksums: []uint32{2}}, + {fp: 0, checksums: []uint32{1, 2}}, + {fp: 2, checksums: []uint32{1, 2}}, + }, + { + {fp: 1, checksums: []uint32{1}}, + {fp: 2, checksums: []uint32{0, 1}}, + {fp: 3, checksums: []uint32{0}}, + }, + }, + expected: mkResult([]instruction{ + {fp: 0, checksums: []uint32{0}}, + {fp: 1, checksums: []uint32{0, 2}}, + {fp: 3, checksums: []uint32{1}}, + }), + }, } { t.Run(tc.desc, func(t *testing.T) { res := filterChunkRefs(tc.input, mkRemovals(tc.removals)) diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index fe37038a988f1..45e0d3736bcc9 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -5,7 +5,6 @@ import ( "flag" "io" "math" - "sort" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -254,11 +253,7 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo count := 0 err := concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error { rs := servers[i] - - sort.Slice(rs.groups, func(i, j int) bool { - return rs.groups[i].Fingerprint < rs.groups[j].Fingerprint - }) - + slices.SortFunc(rs.groups, func(a, b *logproto.GroupedChunkRefs) int { return a.Cmp(b) }) return c.doForAddrs([]string{rs.addr}, func(client logproto.BloomGatewayClient) error { req := &logproto.FilterChunkRefRequest{ From: interval.Start, @@ -294,13 +289,14 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh iters := make([]v1.PeekingIterator[*logproto.GroupedChunkRefs], 0, len(input)) for _, inp := range input { + // this should not be necessary + // but who knows ... + slices.SortFunc(inp, func(a, b *logproto.GroupedChunkRefs) int { return a.Cmp(b) }) iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp))) } heapIter := v1.NewHeapIterator[*logproto.GroupedChunkRefs]( - func(a, b *logproto.GroupedChunkRefs) bool { - return a.Fingerprint < b.Fingerprint - }, + func(a, b *logproto.GroupedChunkRefs) bool { return a.Less(b) }, iters..., ) @@ -311,10 +307,12 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh v1.Identity[*logproto.GroupedChunkRefs], // merge func(a, b *logproto.GroupedChunkRefs) *logproto.GroupedChunkRefs { + slices.SortFunc(a.Refs, func(a, b *logproto.ShortRef) int { return a.Cmp(b) }) + slices.SortFunc(b.Refs, func(a, b *logproto.ShortRef) int { return a.Cmp(b) }) return &logproto.GroupedChunkRefs{ Fingerprint: a.Fingerprint, Tenant: a.Tenant, - Refs: mergeChunks(a.Refs, b.Refs), + Refs: mergeChunkSets(a.Refs, b.Refs), } }, // iterator @@ -329,18 +327,15 @@ func mergeChunks(inputs ...[]*logproto.ShortRef) []*logproto.ShortRef { return nil } - // sort all inputs - for _, inp := range inputs { - slices.SortFunc(inp, func(a, b *logproto.ShortRef) int { return a.Cmp(b) }) - } - // if only one input slice, then sort and return if len(inputs) == 1 { + slices.SortFunc(inputs[0], func(a, b *logproto.ShortRef) int { return a.Cmp(b) }) return inputs[0] } iters := make([]v1.PeekingIterator[*logproto.ShortRef], 0, len(inputs)) for _, inp := range inputs { + slices.SortFunc(inp, func(a, b *logproto.ShortRef) int { return a.Cmp(b) }) iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp))) } @@ -354,9 +349,7 @@ func mergeChunks(inputs ...[]*logproto.ShortRef) []*logproto.ShortRef { // iterator v1.NewPeekingIter[*logproto.ShortRef]( v1.NewHeapIterator[*logproto.ShortRef]( - func(a, b *logproto.ShortRef) bool { - return a.Less(b) - }, + func(a, b *logproto.ShortRef) bool { return a.Less(b) }, iters..., ), ), @@ -365,6 +358,39 @@ func mergeChunks(inputs ...[]*logproto.ShortRef) []*logproto.ShortRef { return merged } +// mergeChunkSets merges/deduplicates two sorted slices of shortRefs +func mergeChunkSets(s1, s2 []*logproto.ShortRef) (result []*logproto.ShortRef) { + var i, j int + for i < len(s1) && j < len(s2) { + + a, b := s1[i], s2[j] + + if a.Equal(b) { + result = append(result, a) + i++ + j++ + continue + } + + if a.Less(b) { + result = append(result, a) + i++ + } else { + result = append(result, b) + j++ + } + } + + if i < len(s1) { + result = append(result, s1[i:]...) + } + if j < len(s2) { + result = append(result, s2[j:]...) + } + + return result +} + // doForAddrs sequetially calls the provided callback function fn for each // address in given slice addrs until the callback function does not return an // error. diff --git a/pkg/logproto/compat.go b/pkg/logproto/compat.go index d89adf1eb22eb..a11467584b58f 100644 --- a/pkg/logproto/compat.go +++ b/pkg/logproto/compat.go @@ -408,51 +408,62 @@ func (m *FilterChunkRefRequest) WithStartEndForCache(start, end time.Time) resul return &clone } -func (a *ShortRef) Cmp(b *ShortRef) int { - if a == nil && b == nil { - return 0 - } - if a == nil { - return -1 - } +func (a *GroupedChunkRefs) Cmp(b *GroupedChunkRefs) int { if b == nil { + if a == nil { + return 0 + } return 1 } - if a.From.Before(b.From) { + if a.Fingerprint < b.Fingerprint { return -1 } - if a.From.After(b.From) { + if a.Fingerprint > b.Fingerprint { return 1 } - if a.From.Equal(b.From) { - if a.Through.Before(b.Through) { - return -1 - } - if a.Through.After(b.Through) { - return 1 + return 0 +} + +func (a *GroupedChunkRefs) Less(b *GroupedChunkRefs) bool { + if b == nil { + return a == nil + } + return a.Fingerprint < b.Fingerprint +} + +// Cmp returns a positive number when a > b, a negative number when a < b, and 0 when a == b +func (a *ShortRef) Cmp(b *ShortRef) int { + if b == nil { + if a == nil { + return 0 } + return 1 + } + + if a.From != b.From { + return int(a.From) - int(b.From) + } + + if a.Through != b.Through { + return int(a.Through) - int(b.Through) } - return int(b.Checksum) - int(a.Checksum) + + return int(a.Checksum) - int(b.Checksum) } func (a *ShortRef) Less(b *ShortRef) bool { if b == nil { return a == nil } - if a.From.Before(b.From) { - return true - } - if a.From.After(b.From) { - return false + + if a.From != b.From { + return a.From < b.From } - if a.From.Equal(b.From) { - if a.Through.Before(b.Through) { - return true - } - if a.Through.After(b.Through) { - return false - } + + if a.Through != b.Through { + return a.Through < b.Through } + return a.Checksum < b.Checksum }