Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Apr 29, 2024
1 parent 48f986a commit d0b04e0
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 51 deletions.
51 changes: 9 additions & 42 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"io"
"math"
"sort"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -253,7 +254,11 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
count := 0
err := concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error {
rs := servers[i]
slices.SortFunc(rs.groups, func(a, b *logproto.GroupedChunkRefs) int { return a.Cmp(b) })

sort.Slice(rs.groups, func(i, j int) bool {
return rs.groups[i].Fingerprint < rs.groups[j].Fingerprint
})

return c.doForAddrs([]string{rs.addr}, func(client logproto.BloomGatewayClient) error {
req := &logproto.FilterChunkRefRequest{
From: interval.Start,
Expand Down Expand Up @@ -289,14 +294,12 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh

iters := make([]v1.PeekingIterator[*logproto.GroupedChunkRefs], 0, len(input))
for _, inp := range input {
// this should not be necessary
// but who knows ...
slices.SortFunc(inp, func(a, b *logproto.GroupedChunkRefs) int { return a.Cmp(b) })
sort.Slice(inp, func(i, j int) bool { return inp[i].Fingerprint < inp[j].Fingerprint })
iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp)))
}

heapIter := v1.NewHeapIterator[*logproto.GroupedChunkRefs](
func(a, b *logproto.GroupedChunkRefs) bool { return a.Less(b) },
func(a, b *logproto.GroupedChunkRefs) bool { return a.Fingerprint < b.Fingerprint },
iters...,
)

Expand All @@ -322,43 +325,7 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh
return v1.CollectInto(dedupeIter, buf)
}

func mergeChunks(inputs ...[]*logproto.ShortRef) []*logproto.ShortRef {
if len(inputs) == 0 {
return nil
}

// if only one input slice, then sort and return
if len(inputs) == 1 {
slices.SortFunc(inputs[0], func(a, b *logproto.ShortRef) int { return a.Cmp(b) })
return inputs[0]
}

iters := make([]v1.PeekingIterator[*logproto.ShortRef], 0, len(inputs))
for _, inp := range inputs {
slices.SortFunc(inp, func(a, b *logproto.ShortRef) int { return a.Cmp(b) })
iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp)))
}

chunkDedupe := v1.NewDedupingIter[*logproto.ShortRef, *logproto.ShortRef](
// eq
func(a, b *logproto.ShortRef) bool { return a.Equal(b) },
// from
v1.Identity[*logproto.ShortRef],
// merge
func(a, b *logproto.ShortRef) *logproto.ShortRef { return a },
// iterator
v1.NewPeekingIter[*logproto.ShortRef](
v1.NewHeapIterator[*logproto.ShortRef](
func(a, b *logproto.ShortRef) bool { return a.Less(b) },
iters...,
),
),
)
merged, _ := v1.Collect(chunkDedupe)
return merged
}

// mergeChunkSets merges/deduplicates two sorted slices of shortRefs
// mergeChunkSets merges and deduplicates two sorted slices of shortRefs
func mergeChunkSets(s1, s2 []*logproto.ShortRef) (result []*logproto.ShortRef) {
var i, j int
for i < len(s1) && j < len(s2) {
Expand Down
24 changes: 15 additions & 9 deletions pkg/bloomgateway/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,26 @@ func TestGatewayClient_MergeSeries(t *testing.T) {
require.Equal(t, expected, result)
}

func TestGatewayClient_MergeChunks(t *testing.T) {
inputs := [][]*logproto.ShortRef{
{shortRef(2, 3, 2), shortRef(1, 3, 3)},
{shortRef(2, 3, 2), shortRef(1, 3, 3), shortRef(1, 2, 1)},
{shortRef(1, 3, 3), shortRef(1, 2, 1)},
{shortRef(1, 2, 1)},
func TestGatewayClient_MergeChunkSets(t *testing.T) {
inp1 := []*logproto.ShortRef{
shortRef(1, 3, 1),
shortRef(2, 3, 2),
shortRef(4, 5, 3),
}
inp2 := []*logproto.ShortRef{
shortRef(2, 3, 2),
shortRef(3, 4, 4),
shortRef(5, 6, 5),
}

expected := []*logproto.ShortRef{
shortRef(1, 2, 1),
shortRef(1, 3, 3),
shortRef(1, 3, 1),
shortRef(2, 3, 2),
shortRef(3, 4, 4),
shortRef(4, 5, 3),
shortRef(5, 6, 5),
}

result := mergeChunks(inputs...)
result := mergeChunkSets(inp1, inp2)
require.Equal(t, expected, result)
}

0 comments on commit d0b04e0

Please sign in to comment.