Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add time based partitioning to store component #957

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions cmd/thanos/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,31 @@ func modelDuration(flags *kingpin.FlagClause) *model.Duration {
return value
}

type flagTime struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I belive this is quite fixed... We essentially need duration right? Like now-3months - now-2h style.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree I need relative time also, would essentially replace the functionality I got here #930

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would be amazing!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both would be useful. We're intending to use this for horizontal scale out, and we want the ability to specify exact time ranges in order to size the blocks served to the capacity of the thanos store host.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I think both are useful

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

One question: I don't get why someone would need specific time. I think it does not make sense. What's the use case?

We're intending to use this for horizontal scale out, and we want the ability to specify exact time ranges in order to size the blocks served to the capacity of the thanos store host.

@claytono hmm are you sure you want fixed time ranges for that? Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We intend to have a handful of Thanos store nodes, each serving a portion of a shared bucket, with one of them having an open ended time range for all new metrics. For now, we intend to have an outside process periodically do analysis of the bucket and generate time ranges for each thanos store process based on index size for each block. We want to provision these nodes such that they're all fairly full from a memory standpoint, but that we're not over-provisioning. For us, the major expense of running Thanos is the memory on compute instances, and the S3 storage is nearly free in comparison.

With metric ingest rates changing over time (new apps, seasonality, etc) and the activity of the compactor, I think partitioning the bucket time ranges with relative times is going to be error prone and/or lead to inefficient usage of the hardware.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of odd from my perspective, but if you find this useful, sure (: happy to accept that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's what we've come up with for horizontal scaling of the Thanos store nodes. I'd love to hear how other people are managing scaling out.

Copy link
Contributor

@xjewer xjewer Mar 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For us, the major expense of running Thanos is the memory on compute instances

Seems, you are trying to solve separate problem with absolute time ranges.

I'd like to have relative time as well.

time time.Time
}

func (ft *flagTime) Set(s string) error {
var err error
ft.time, err = time.Parse(time.RFC3339, s)
return err
}

func (ft flagTime) String() string {
return ft.time.String()
}

func (ft flagTime) Time() time.Time {
return ft.time
}

func timeFlag(flags *kingpin.FlagClause) *flagTime {
var value = new(flagTime)
flags.SetValue(value)

return value
}

type pathOrContent struct {
fileFlagName string
contentFlagName string
Expand Down
13 changes: 13 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/timestamp"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
)
Expand Down Expand Up @@ -44,6 +45,12 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage.").
Default("20").Int()

minTime := timeFlag(cmd.Flag("min-time", "Start of time range limit to serve").
Default("0000-01-01T00:00:00Z"))

maxTime := timeFlag(cmd.Flag("max-time", "End of time range limit to serve").
Default("9999-12-31T23:59:59Z"))

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
peer, err := newPeerFn(logger, reg, false, "", false)
if err != nil {
Expand All @@ -67,6 +74,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
debugLogging,
*syncInterval,
*blockSyncConcurrency,
timestamp.FromTime(minTime.Time()),
timestamp.FromTime(maxTime.Time()),
)
}
}
Expand All @@ -91,6 +100,8 @@ func runStore(
verbose bool,
syncInterval time.Duration,
blockSyncConcurrency int,
minTime int64,
maxTime int64,
) error {
{
confContentYaml, err := objStoreConfig.Content()
Expand Down Expand Up @@ -119,6 +130,8 @@ func runStore(
chunkPoolSizeBytes,
verbose,
blockSyncConcurrency,
minTime,
maxTime,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
Expand Down
4 changes: 4 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,9 @@ Flags:
--block-sync-concurrency=20
Number of goroutines to use when syncing blocks
from object storage.
--min-time=0000-01-01T00:00:00Z
Start of time range limit to serve
--max-time=9999-12-31T23:59:59Z
End of time range limit to serve

```
33 changes: 33 additions & 0 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ type BucketStore struct {
// Number of goroutines to use when syncing blocks from object storage.
blockSyncConcurrency int

// Start and end time to serve with this store process. The meta.json files
// loaded will be filtered by these ranges.
minTime int64
maxTime int64

partitioner partitioner
}

Expand All @@ -187,6 +192,8 @@ func NewBucketStore(
maxChunkPoolBytes uint64,
debugLogging bool,
blockSyncConcurrency int,
minTime int64,
maxTime int64,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -212,6 +219,8 @@ func NewBucketStore(
blockSets: map[uint64]*bucketBlockSet{},
debugLogging: debugLogging,
blockSyncConcurrency: blockSyncConcurrency,
minTime: minTime,
maxTime: maxTime,
partitioner: gapBasedPartitioner{maxGapSize: maxGapSize},
}
s.metrics = newBucketStoreMetrics(reg)
Expand Down Expand Up @@ -359,11 +368,20 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) {
dir,
s.indexCache,
s.chunkPool,
s.minTime,
s.maxTime,
s.partitioner,
)
if err != nil {
return errors.Wrap(err, "new bucket block")
}

// If newBucketBlock doesn't return a bucket (outside time range?) then skip
// the rest of the work.
if b == nil {
return nil
}

s.mtx.Lock()
defer s.mtx.Unlock()

Expand Down Expand Up @@ -1001,6 +1019,8 @@ func newBucketBlock(
dir string,
indexCache *indexCache,
chunkPool *pool.BytesPool,
minTime int64,
maxTime int64,
p partitioner,
) (b *bucketBlock, err error) {
b = &bucketBlock{
Expand All @@ -1015,6 +1035,19 @@ func newBucketBlock(
if err = b.loadMeta(ctx, id); err != nil {
return nil, errors.Wrap(err, "load meta")
}

// We want to make sure a single stores owns a block. In order to do that,
// we check if the Mintime is contained within the time frame. We're more
// concerned that we have exactly one owning store per block than perfectly
// containing the blocks within the start and end times. If the MinTime is
// outside our time range, then clean up downloaded files and return early.
if b.meta.MinTime < minTime || b.meta.MinTime > maxTime {
if err = os.RemoveAll(dir); err != nil {
return nil, err
}
return nil, nil
}

if err = b.loadIndexCache(ctx); err != nil {
return nil, errors.Wrap(err, "load index cache")
}
Expand Down
38 changes: 32 additions & 6 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package store
import (
"context"
"io/ioutil"
"math"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -35,7 +36,7 @@ func (s *storeSuite) Close() {
s.wg.Wait()
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool) *storeSuite {
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, startTime time.Time, manyParts bool) *storeSuite {
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
Expand All @@ -48,8 +49,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
}
extLset := labels.FromStrings("ext1", "value1")

start := time.Now()
now := start
now := startTime

ctx, cancel := context.WithCancel(context.Background())
s := &storeSuite{cancel: cancel}
Expand Down Expand Up @@ -87,7 +87,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
testutil.Ok(t, os.RemoveAll(dir2))
}

store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, false, 20)
store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, false, 20, math.MinInt64, math.MaxInt64)
testutil.Ok(t, err)

s.store = store
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestBucketStore_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false)
s := prepareStoreWithTestBlocks(t, dir, bkt, time.Now(), false)
defer s.Close()

testBucketStore_e2e(t, ctx, s)
Expand Down Expand Up @@ -363,9 +363,35 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, true)
s := prepareStoreWithTestBlocks(t, dir, bkt, time.Now(), true)
defer s.Close()

testBucketStore_e2e(t, ctx, s)
})
}

func TestBucketStore_timeRanges(t *testing.T) {
objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dir, err := ioutil.TempDir("", "test_bucketstore_e2e")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

startTime := time.Now()
s := prepareStoreWithTestBlocks(t, dir, bkt, startTime, false)
s.Close()

// Store is now populated, so create a new bucket store instance with a
// time limit and validate that it only returns the two blocks in the first time range.
minTimeLimit := timestamp.FromTime(startTime.Add(-time.Minute))
maxTimeLimit := timestamp.FromTime(startTime.Add(time.Minute))
store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, false, 20, minTimeLimit, maxTimeLimit)
testutil.Ok(t, err)

err = store.SyncBlocks(ctx)
testutil.Ok(t, err)
testutil.Equals(t, 2, store.numBlocks())
})
}
2 changes: 1 addition & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestBucketStore_Info(t *testing.T) {
dir, err := ioutil.TempDir("", "prometheus-test")
testutil.Ok(t, err)

bucketStore, err := NewBucketStore(nil, nil, nil, dir, 2e5, 2e5, false, 20)
bucketStore, err := NewBucketStore(nil, nil, nil, dir, 2e5, 2e5, false, 20, math.MaxInt64, math.MaxInt64)
testutil.Ok(t, err)

resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{})
Expand Down