Skip to content

Commit

Permalink
Sort series (again?)
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Apr 29, 2024
1 parent 689d49c commit 48f986a
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 49 deletions.
4 changes: 3 additions & 1 deletion pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Tas
case <-ctx.Done():
// do nothing
default:
// chunks may not be sorted
sort.Slice(res.Removals, func(i, j int) bool { return res.Removals[i].Less(res.Removals[j]) })
task.responses = append(task.responses, res)
}
}
Expand Down Expand Up @@ -413,7 +415,7 @@ func orderedResponsesByFP(responses [][]v1.Output) v1.Iterator[v1.Output] {
itrs = append(itrs, v1.NewPeekingIter(v1.NewSliceIter(r)))
}
return v1.NewHeapIterator[v1.Output](
func(o1, o2 v1.Output) bool { return o1.Fp <= o2.Fp },
func(o1, o2 v1.Output) bool { return o1.Fp < o2.Fp },
itrs...,
)
}
Expand Down
25 changes: 23 additions & 2 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,12 +640,12 @@ func TestFilterChunkRefs(t *testing.T) {
{
{fp: 0, checksums: []uint32{0, 1}},
{fp: 0, checksums: []uint32{0, 1, 2}},
{fp: 1, checksums: []uint32{1}},
{fp: 1, checksums: []uint32{0, 2}},
{fp: 2, checksums: []uint32{1}},
},
},
expected: mkResult([]instruction{
{fp: 1, checksums: []uint32{0, 2}},
{fp: 1, checksums: []uint32{1}},
{fp: 2, checksums: []uint32{0, 2}},
{fp: 3, checksums: []uint32{0, 1, 2}},
}),
Expand All @@ -670,6 +670,27 @@ func TestFilterChunkRefs(t *testing.T) {
{fp: 3, checksums: []uint32{0, 1, 2}},
}),
},
{
desc: "unordered fingerprints",
input: mkInput(4, 3),
removals: [][]instruction{
{
{fp: 3, checksums: []uint32{2}},
{fp: 0, checksums: []uint32{1, 2}},
{fp: 2, checksums: []uint32{1, 2}},
},
{
{fp: 1, checksums: []uint32{1}},
{fp: 2, checksums: []uint32{0, 1}},
{fp: 3, checksums: []uint32{0}},
},
},
expected: mkResult([]instruction{
{fp: 0, checksums: []uint32{0}},
{fp: 1, checksums: []uint32{0, 2}},
{fp: 3, checksums: []uint32{1}},
}),
},
} {
t.Run(tc.desc, func(t *testing.T) {
res := filterChunkRefs(tc.input, mkRemovals(tc.removals))
Expand Down
62 changes: 44 additions & 18 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"flag"
"io"
"math"
"sort"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -254,11 +253,7 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
count := 0
err := concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error {
rs := servers[i]

sort.Slice(rs.groups, func(i, j int) bool {
return rs.groups[i].Fingerprint < rs.groups[j].Fingerprint
})

slices.SortFunc(rs.groups, func(a, b *logproto.GroupedChunkRefs) int { return a.Cmp(b) })
return c.doForAddrs([]string{rs.addr}, func(client logproto.BloomGatewayClient) error {
req := &logproto.FilterChunkRefRequest{
From: interval.Start,
Expand Down Expand Up @@ -294,13 +289,14 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh

iters := make([]v1.PeekingIterator[*logproto.GroupedChunkRefs], 0, len(input))
for _, inp := range input {
// this should not be necessary
// but who knows ...
slices.SortFunc(inp, func(a, b *logproto.GroupedChunkRefs) int { return a.Cmp(b) })
iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp)))
}

heapIter := v1.NewHeapIterator[*logproto.GroupedChunkRefs](
func(a, b *logproto.GroupedChunkRefs) bool {
return a.Fingerprint < b.Fingerprint
},
func(a, b *logproto.GroupedChunkRefs) bool { return a.Less(b) },
iters...,
)

Expand All @@ -311,10 +307,12 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh
v1.Identity[*logproto.GroupedChunkRefs],
// merge
func(a, b *logproto.GroupedChunkRefs) *logproto.GroupedChunkRefs {
slices.SortFunc(a.Refs, func(a, b *logproto.ShortRef) int { return a.Cmp(b) })
slices.SortFunc(b.Refs, func(a, b *logproto.ShortRef) int { return a.Cmp(b) })
return &logproto.GroupedChunkRefs{
Fingerprint: a.Fingerprint,
Tenant: a.Tenant,
Refs: mergeChunks(a.Refs, b.Refs),
Refs: mergeChunkSets(a.Refs, b.Refs),
}
},
// iterator
Expand All @@ -329,18 +327,15 @@ func mergeChunks(inputs ...[]*logproto.ShortRef) []*logproto.ShortRef {
return nil
}

// sort all inputs
for _, inp := range inputs {
slices.SortFunc(inp, func(a, b *logproto.ShortRef) int { return a.Cmp(b) })
}

// if only one input slice, then sort and return
if len(inputs) == 1 {
slices.SortFunc(inputs[0], func(a, b *logproto.ShortRef) int { return a.Cmp(b) })
return inputs[0]
}

iters := make([]v1.PeekingIterator[*logproto.ShortRef], 0, len(inputs))
for _, inp := range inputs {
slices.SortFunc(inp, func(a, b *logproto.ShortRef) int { return a.Cmp(b) })
iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp)))
}

Expand All @@ -354,9 +349,7 @@ func mergeChunks(inputs ...[]*logproto.ShortRef) []*logproto.ShortRef {
// iterator
v1.NewPeekingIter[*logproto.ShortRef](
v1.NewHeapIterator[*logproto.ShortRef](
func(a, b *logproto.ShortRef) bool {
return a.Less(b)
},
func(a, b *logproto.ShortRef) bool { return a.Less(b) },
iters...,
),
),
Expand All @@ -365,6 +358,39 @@ func mergeChunks(inputs ...[]*logproto.ShortRef) []*logproto.ShortRef {
return merged
}

// mergeChunkSets merges/deduplicates two sorted slices of shortRefs
func mergeChunkSets(s1, s2 []*logproto.ShortRef) (result []*logproto.ShortRef) {
var i, j int
for i < len(s1) && j < len(s2) {

a, b := s1[i], s2[j]

if a.Equal(b) {
result = append(result, a)
i++
j++
continue
}

if a.Less(b) {
result = append(result, a)
i++
} else {
result = append(result, b)
j++
}
}

if i < len(s1) {
result = append(result, s1[i:]...)
}
if j < len(s2) {
result = append(result, s2[j:]...)
}

return result
}

// doForAddrs sequetially calls the provided callback function fn for each
// address in given slice addrs until the callback function does not return an
// error.
Expand Down
67 changes: 39 additions & 28 deletions pkg/logproto/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,51 +408,62 @@ func (m *FilterChunkRefRequest) WithStartEndForCache(start, end time.Time) resul
return &clone
}

func (a *ShortRef) Cmp(b *ShortRef) int {
if a == nil && b == nil {
return 0
}
if a == nil {
return -1
}
func (a *GroupedChunkRefs) Cmp(b *GroupedChunkRefs) int {
if b == nil {
if a == nil {
return 0
}
return 1
}
if a.From.Before(b.From) {
if a.Fingerprint < b.Fingerprint {
return -1
}
if a.From.After(b.From) {
if a.Fingerprint > b.Fingerprint {
return 1
}
if a.From.Equal(b.From) {
if a.Through.Before(b.Through) {
return -1
}
if a.Through.After(b.Through) {
return 1
return 0
}

func (a *GroupedChunkRefs) Less(b *GroupedChunkRefs) bool {
if b == nil {
return a == nil
}
return a.Fingerprint < b.Fingerprint
}

// Cmp returns a positive number when a > b, a negative number when a < b, and 0 when a == b
func (a *ShortRef) Cmp(b *ShortRef) int {
if b == nil {
if a == nil {
return 0
}
return 1
}

if a.From != b.From {
return int(a.From) - int(b.From)
}

if a.Through != b.Through {
return int(a.Through) - int(b.Through)
}
return int(b.Checksum) - int(a.Checksum)

return int(a.Checksum) - int(b.Checksum)
}

func (a *ShortRef) Less(b *ShortRef) bool {
if b == nil {
return a == nil
}
if a.From.Before(b.From) {
return true
}
if a.From.After(b.From) {
return false

if a.From != b.From {
return a.From < b.From
}
if a.From.Equal(b.From) {
if a.Through.Before(b.Through) {
return true
}
if a.Through.After(b.Through) {
return false
}

if a.Through != b.Through {
return a.Through < b.Through
}

return a.Checksum < b.Checksum
}

Expand Down

0 comments on commit 48f986a

Please sign in to comment.