Skip to content

Commit

Permalink
receive: add support for globbing tenant specifiers
Browse files Browse the repository at this point in the history
We want to be able to route all tenants which begin with certain letters
to some receivers so we need to have some kind of globbing/regex support
in the hashring. This PR adds that functionality. We've been using this
in prod successfully.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Feb 22, 2024
1 parent 1723d1d commit 402a264
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 11 deletions.
33 changes: 33 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,39 @@ The example content of `hashring.json`:

With such configuration any receive listens for remote write on `<ip>10908/api/v1/receive` and will forward to correct one in hashring if needed for tenancy and replication.

It is possible to only match certain `tenant`s inside of a hashring file. For example:

```json
[
{
"tenants": ["foobar"],
"endpoints": [
"127.0.0.1:1234",
"127.0.0.1:12345",
"127.0.0.1:1235"
]
}
]
```

The specified endpoints will be used if the tenant is set to `foobar`. It is possible to use glob matching through the parameter `tenant_matcher_type`. It can have the value `glob`. In this case, the strings inside the array are taken as glob patterns and matched against the `tenant` inside of a remote-write request. For instance:

```json
[
{
"tenants": ["foo*"],
"tenant_matcher_type": "glob",
"endpoints": [
"127.0.0.1:1234",
"127.0.0.1:12345",
"127.0.0.1:1235"
]
}
]
```

This will still match the tenant `foobar` and any other tenant which begins with the letters `foo`.

### AZ-aware Ketama hashring (experimental)

In order to ensure even spread for replication over nodes in different availability-zones, you can choose to include az definition in your hashring config. If we for example have a 6 node cluster, spread over 3 different availability zones; A, B and C, we could use the following example `hashring.json`:
Expand Down
4 changes: 4 additions & 0 deletions docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,10 @@ Flags:
--query.default-step=1s Default range query step to use. This is
only used in stateless Ruler and alert state
restoration.
--query.enable-x-functions
Whether to enable extended rate functions
(xrate, xincrease and xdelta). Only has effect
when used with Thanos engine.
--query.http-method=POST HTTP method to use when sending queries.
Possible options: [GET, POST]
--query.sd-dns-interval=30s
Expand Down
24 changes: 19 additions & 5 deletions pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,25 @@ func (e *Endpoint) UnmarshalJSON(data []byte) error {
// HashringConfig represents the configuration for a hashring
// a receive node knows about.
type HashringConfig struct {
Hashring string `json:"hashring,omitempty"`
Tenants []string `json:"tenants,omitempty"`
Endpoints []Endpoint `json:"endpoints"`
Algorithm HashringAlgorithm `json:"algorithm,omitempty"`
ExternalLabels labels.Labels `json:"external_labels,omitempty"`
Hashring string `json:"hashring,omitempty"`
Tenants []string `json:"tenants,omitempty"`
TenantMatcherType tenantMatcher `json:"tenant_matcher_type,omitempty"`
Endpoints []Endpoint `json:"endpoints"`
Algorithm HashringAlgorithm `json:"algorithm,omitempty"`
ExternalLabels labels.Labels `json:"external_labels,omitempty"`
}

type tenantMatcher string

const (
// TenantMatcherTypeExact matches tenants exactly. This is also the default one.
TenantMatcherTypeExact tenantMatcher = "exact"
// TenantMatcherGlob matches tenants using glob patterns.
TenantMatcherGlob tenantMatcher = "glob"
)

func isExactMatcher(m tenantMatcher) bool {
return m == TenantMatcherTypeExact || m == ""
}

// ConfigWatcher is able to watch a file containing a hashring configuration
Expand Down
35 changes: 29 additions & 6 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package receive
import (
"fmt"
"math"
"path/filepath"
"sort"
"strconv"
"sync"
Expand Down Expand Up @@ -249,7 +250,7 @@ func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
type multiHashring struct {
cache map[string]Hashring
hashrings []Hashring
tenantSets []map[string]struct{}
tenantSets []map[string]tenantMatcher

// We need a mutex to guard concurrent access
// to the cache map, as this is both written to
Expand All @@ -273,15 +274,37 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
return h.GetN(tenant, ts, n)
}
var found bool

// If the tenant is not in the cache, then we need to check
// every tenant in the configuration.
for i, t := range m.tenantSets {
// If the hashring has no tenants, then it is
// considered a default hashring and matches everything.
if t == nil {
found = true
} else if _, ok := t[tenant]; ok {
found = true
} else {
// Fast path for the common case of direct match.
if mt, ok := t[tenant]; ok && isExactMatcher(mt) {
found = true
} else {
for tenantPattern, matcherType := range t {
switch matcherType {
case TenantMatcherGlob:
matches, err := filepath.Match(tenantPattern, tenant)
if err != nil {
return "", fmt.Errorf("error matching tenant pattern %s (tenant %s): %w", tenantPattern, tenant, err)
}
found = matches
case TenantMatcherTypeExact:
// Already checked above, skipping.
fallthrough
default:
continue
}

}
}

}
if found {
m.mu.Lock()
Expand Down Expand Up @@ -320,12 +343,12 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg
}
m.nodes = append(m.nodes, hashring.Nodes()...)
m.hashrings = append(m.hashrings, hashring)
var t map[string]struct{}
var t map[string]tenantMatcher
if len(h.Tenants) != 0 {
t = make(map[string]struct{})
t = make(map[string]tenantMatcher)
}
for _, tenant := range h.Tenants {
t[tenant] = struct{}{}
t[tenant] = h.TenantMatcherType
}
m.tenantSets = append(m.tenantSets, t)
}
Expand Down
50 changes: 50 additions & 0 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,27 @@ import (
"log"
"net/http"
"net/http/httputil"
"os"
"testing"
"time"

"github.com/efficientgo/core/backoff"
"github.com/efficientgo/e2e"
e2edb "github.com/efficientgo/e2e/db"
e2emon "github.com/efficientgo/e2e/monitoring"
"github.com/efficientgo/e2e/monitoring/matchers"
logkit "github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/prompb"

"github.com/stretchr/testify/require"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/receive"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
)

Expand Down Expand Up @@ -968,3 +975,46 @@ test_metric{a="2", b="2"} 1`)
})
})
}

func TestReceiveGlob(t *testing.T) {
e, err := e2e.NewDockerEnvironment("receive-glob")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().Init()
testutil.Ok(t, e2e.StartAndWaitReady(i))

h := receive.HashringConfig{
TenantMatcherType: "glob",
Tenants: []string{
"default*",
},
Endpoints: []receive.Endpoint{
{Address: i.InternalEndpoint("grpc")},
},
}

r := e2ethanos.NewReceiveBuilder(e, "router").WithRouting(1, h).Init()
testutil.Ok(t, e2e.StartAndWaitReady(r))

q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q))

require.NoError(t, runutil.RetryWithLog(logkit.NewLogfmtLogger(os.Stdout), 1*time.Second, make(<-chan struct{}), func() error {
return storeWriteRequest(context.Background(), "http://"+r.Endpoint("remote-write")+"/api/v1/receive", &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "aa", Value: "bb"},
},
Samples: []prompb.Sample{
{Value: 1, Timestamp: time.Now().UnixMilli()},
},
},
},
})
}))

testutil.Ok(t, i.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"prometheus_tsdb_blocks_loaded"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tenant", "default-tenant")), e2emon.WaitMissingMetrics()))

}

0 comments on commit 402a264

Please sign in to comment.