Skip to content

Commit

Permalink
Merge pull request stolostron#1 from fpetkovski/change-proxy-response…
Browse files Browse the repository at this point in the history
…-heap-less-func

Reuse buffers for label comparison
  • Loading branch information
wallee94 authored Apr 28, 2023
2 parents 5a7c764 + b8977d0 commit 380b1e4
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 15 deletions.
73 changes: 58 additions & 15 deletions pkg/store/proxy_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,24 +163,37 @@ func (d *dedupResponseHeap) At() *storepb.SeriesResponse {
// This is O(n*logk) but can be Theta(n*logk). However,
// tournament trees need n-1 auxiliary nodes so there
// might not be much of a difference.
type ProxyResponseHeap []ProxyResponseHeapNode
type ProxyResponseHeap struct {
nodes []ProxyResponseHeapNode
iLblsScratch labels.Labels
jLblsScratch labels.Labels
}

func (h *ProxyResponseHeap) Less(i, j int) bool {
iResp := (*h)[i].rs.At()
jResp := (*h)[j].rs.At()
iResp := h.nodes[i].rs.At()
jResp := h.nodes[j].rs.At()

if iResp.GetSeries() != nil && jResp.GetSeries() != nil {
// Response sets are sorted before adding external labels.
// This comparison excludes those labels to keep the same order.
iStoreLbls := (*h)[i].rs.StoreLabels()
jStoreLbls := (*h)[j].rs.StoreLabels()
iStoreLbls := h.nodes[i].rs.StoreLabels()
jStoreLbls := h.nodes[j].rs.StoreLabels()

iLbls := labelpb.ZLabelsToPromLabels(iResp.GetSeries().Labels)
jLbls := labelpb.ZLabelsToPromLabels(jResp.GetSeries().Labels)
c := labels.Compare(rmLabels(iLbls.Copy(), iStoreLbls), rmLabels(jLbls.Copy(), jStoreLbls))
if c == 0 {
c = labels.Compare(iLbls, jLbls)

copyLabels(&h.iLblsScratch, iLbls)
copyLabels(&h.jLblsScratch, jLbls)

var iExtLbls, jExtLbls labels.Labels
h.iLblsScratch, iExtLbls = dropLabels(h.iLblsScratch, iStoreLbls)
h.jLblsScratch, jExtLbls = dropLabels(h.jLblsScratch, jStoreLbls)

c := labels.Compare(h.iLblsScratch, h.jLblsScratch)
if c != 0 {
return c < 0
}
return c < 0
return labels.Compare(iExtLbls, jExtLbls) < 0
} else if iResp.GetSeries() == nil && jResp.GetSeries() != nil {
return true
} else if iResp.GetSeries() != nil && jResp.GetSeries() == nil {
Expand All @@ -193,19 +206,19 @@ func (h *ProxyResponseHeap) Less(i, j int) bool {
}

func (h *ProxyResponseHeap) Len() int {
return len(*h)
return len(h.nodes)
}

func (h *ProxyResponseHeap) Swap(i, j int) {
(*h)[i], (*h)[j] = (*h)[j], (*h)[i]
h.nodes[i], h.nodes[j] = h.nodes[j], h.nodes[i]
}

func (h *ProxyResponseHeap) Push(x interface{}) {
*h = append(*h, x.(ProxyResponseHeapNode))
h.nodes = append(h.nodes, x.(ProxyResponseHeapNode))
}

func (h *ProxyResponseHeap) Pop() (v interface{}) {
*h, v = (*h)[:h.Len()-1], (*h)[h.Len()-1]
h.nodes, v = h.nodes[:h.Len()-1], h.nodes[h.Len()-1]
return
}

Expand All @@ -214,7 +227,7 @@ func (h *ProxyResponseHeap) Empty() bool {
}

func (h *ProxyResponseHeap) Min() *ProxyResponseHeapNode {
return &(*h)[0]
return &h.nodes[0]
}

type ProxyResponseHeapNode struct {
Expand All @@ -224,7 +237,9 @@ type ProxyResponseHeapNode struct {
// NewProxyResponseHeap returns heap that k-way merge series together.
// It's agnostic to duplicates and overlaps, it forwards all duplicated series in random order.
func NewProxyResponseHeap(seriesSets ...respSet) *ProxyResponseHeap {
ret := make(ProxyResponseHeap, 0, len(seriesSets))
ret := ProxyResponseHeap{
nodes: make([]ProxyResponseHeapNode, 0, len(seriesSets)),
}

for _, ss := range seriesSets {
if ss.Empty() {
Expand Down Expand Up @@ -774,6 +789,34 @@ func rmLabels(l labels.Labels, labelsToRemove map[string]struct{}) labels.Labels
return l
}

// dropLabels removes labels from the given label set and returns the removed labels.
func dropLabels(l labels.Labels, labelsToDrop map[string]struct{}) (labels.Labels, labels.Labels) {
cutoff := len(l)
for i := 0; i < len(l); i++ {
if i == cutoff {
break
}
if _, ok := labelsToDrop[l[i].Name]; !ok {
continue
}

lbl := l[i]
l = append(append(l[:i], l[i+1:]...), lbl)
cutoff--
i--
}

return l[:cutoff], l[cutoff:]
}

func copyLabels(dest *labels.Labels, src labels.Labels) {
if len(*dest) < cap(src) {
*dest = make([]labels.Label, len(src))
}
*dest = (*dest)[:len(src)]
copy(*dest, src)
}

// sortWithoutLabels removes given labels from series and re-sorts the series responses that the same
// series with different labels are coming right after each other. Other types of responses are moved to front.
func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string]struct{}) {
Expand Down
55 changes: 55 additions & 0 deletions pkg/store/proxy_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/efficientgo/core/testutil"
"github.com/prometheus/prometheus/model/labels"

"github.com/thanos-io/thanos/pkg/dedup"
"github.com/thanos-io/thanos/pkg/errors"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -135,6 +136,60 @@ func TestProxyResponseHeapSort(t *testing.T) {
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
},
},
{
title: "merge series with external labels at beginning of series",
input: []respSet{
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "2")),
},
storeLabels: map[string]struct{}{"a": {}},
},
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
},
storeLabels: map[string]struct{}{"a": {}},
},
},
exp: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "2")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
},
},
{
title: "merge series in stores with external labels not present in series (e.g. stripped during dedup)",
input: []respSet{
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
},
storeLabels: map[string]struct{}{"ext2": {}, "replica": {}},
},
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")),
},
storeLabels: map[string]struct{}{"ext1": {}, "ext2": {}, "replica": {}},
},
},
exp: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
},
},
} {
t.Run(tcase.title, func(t *testing.T) {
h := NewProxyResponseHeap(tcase.input...)
Expand Down

0 comments on commit 380b1e4

Please sign in to comment.