Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci committed Nov 12, 2020
1 parent f3f0b42 commit bc167bd
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 8 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#3277](https://github.com/thanos-io/thanos/pull/3277) Thanos Query: Introduce dynamic lookback interval. This allows queries with large step to make use of downsampled data.
- [#3409](https://github.com/thanos-io/thanos/pull/3409) Compactor: Added support for no-compact-mark.json which excludes the block from compaction.
- [#3245](https://github.com/thanos-io/thanos/pull/3245) Query Frontend: Add `query-frontend.org-id-header` flag to specify HTTP header(s) to populate slow query log (e.g. X-Grafana-User).
- [#3431](https://github.com/thanos-io/thanos/pull/3431) Store: Added experimental support to lazy load index-headers at query time. When enabled via `--store.enable-index-header-lazy-reader=true` flag, the store-gateway will load into memory an index-header only once required at query time and will be automatically released after `--store.index-header-lazy-reader-idle-timeout` of inactivity.
- [#3431](https://github.com/thanos-io/thanos/pull/3431) Store: Added experimental support to lazy load index-headers at query time. When enabled via `--store.enable-index-header-lazy-reader` flag, the store-gateway will load into memory an index-header only once it's required at query time. Index-header will be automatically released after `--store.index-header-lazy-reader-idle-timeout` of inactivity.
* This, generally, reduces baseline memory usage of store when inactive, as well as a total number of mapped files (which is limited to 64k in some systems.

### Fixed

Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ func registerStore(app *extkingpin.App) {
"Default is 24h, half of the default value for --delete-delay on compactor.").
Default("24h"))

lazyIndexReaderEnabled := cmd.Flag("store.enable-index-header-lazy-reader", "If true, Store Gateway will lazy memory map index-header only once required by a query.").
Hidden().Default("false").Bool()
lazyIndexReaderEnabled := cmd.Flag("store.enable-index-header-lazy-reader", "If true, Store Gateway will lazy memory map index-header only once the block is required by a query.").
Default("false").Bool()

lazyIndexReaderIdleTimeout := cmd.Flag("store.index-header-lazy-reader-idle-timeout", "If index-header lazy reader is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity.").
Hidden().Default("5m").Duration()
Expand Down
4 changes: 4 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ Flags:
before being deleted from bucket. Default is
24h, half of the default value for
--delete-delay on compactor.
--store.enable-index-header-lazy-reader
If true, Store Gateway will lazy memory map
index-header only once the block is required by
a query.
--web.external-prefix="" Static prefix for all HTML links and redirect
URLs in the bucket web UI interface. Actual
endpoints are still served on / or the
Expand Down
26 changes: 21 additions & 5 deletions pkg/block/indexheader/lazy_binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/thanos-io/thanos/pkg/objstore"
)

// LazyBinaryReaderMetrics holds metrics tracked by LazyBinaryReader.
type LazyBinaryReaderMetrics struct {
loadCount prometheus.Counter
loadFailedCount prometheus.Counter
Expand All @@ -30,6 +31,7 @@ type LazyBinaryReaderMetrics struct {
loadDuration prometheus.Histogram
}

// NewLazyBinaryReaderMetrics makes new LazyBinaryReaderMetrics.
func NewLazyBinaryReaderMetrics(reg prometheus.Registerer) *LazyBinaryReaderMetrics {
return &LazyBinaryReaderMetrics{
loadCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Expand All @@ -56,6 +58,8 @@ func NewLazyBinaryReaderMetrics(reg prometheus.Registerer) *LazyBinaryReaderMetr
}
}

// LazyBinaryReader wraps BinaryReader and loads (mmap) the index-header only upon
// the first Reader function is called.
type LazyBinaryReader struct {
ctx context.Context
logger log.Logger
Expand All @@ -71,6 +75,10 @@ type LazyBinaryReader struct {
readerErr error
}

// NewLazyBinaryReader makes a new LazyBinaryReader. If the index-header does not exist
// on the local disk at dir location, this function will build it downloading required
// sections from the full index stored in the bucket. However, this function doesn't load
// (mmap) the index-header; it will be loaded at first Reader function call.
func NewLazyBinaryReader(
ctx context.Context,
logger log.Logger,
Expand Down Expand Up @@ -110,6 +118,8 @@ func NewLazyBinaryReader(
}, nil
}

// Close implements Reader. It unloads the index-header from memory (releasing the mmap
// area), but a subsequent call to any other Reader function will automatically reload it.
func (r *LazyBinaryReader) Close() error {
r.readerMx.Lock()
defer r.readerMx.Unlock()
Expand All @@ -129,6 +139,7 @@ func (r *LazyBinaryReader) Close() error {
return nil
}

// IndexVersion implements Reader.
func (r *LazyBinaryReader) IndexVersion() (int, error) {
r.readerMx.RLock()
defer r.readerMx.RUnlock()
Expand All @@ -140,6 +151,7 @@ func (r *LazyBinaryReader) IndexVersion() (int, error) {
return r.reader.IndexVersion()
}

// PostingsOffset implements Reader.
func (r *LazyBinaryReader) PostingsOffset(name string, value string) (index.Range, error) {
r.readerMx.RLock()
defer r.readerMx.RUnlock()
Expand All @@ -151,6 +163,7 @@ func (r *LazyBinaryReader) PostingsOffset(name string, value string) (index.Rang
return r.reader.PostingsOffset(name, value)
}

// LookupSymbol implements Reader.
func (r *LazyBinaryReader) LookupSymbol(o uint32) (string, error) {
r.readerMx.RLock()
defer r.readerMx.RUnlock()
Expand All @@ -162,6 +175,7 @@ func (r *LazyBinaryReader) LookupSymbol(o uint32) (string, error) {
return r.reader.LookupSymbol(o)
}

// LabelValues implements Reader.
func (r *LazyBinaryReader) LabelValues(name string) ([]string, error) {
r.readerMx.RLock()
defer r.readerMx.RUnlock()
Expand All @@ -173,6 +187,7 @@ func (r *LazyBinaryReader) LabelValues(name string) ([]string, error) {
return r.reader.LabelValues(name)
}

// LabelNames implements Reader.
func (r *LazyBinaryReader) LabelNames() ([]string, error) {
r.readerMx.RLock()
defer r.readerMx.RUnlock()
Expand All @@ -190,7 +205,8 @@ func (r *LazyBinaryReader) open() error {
// Nothing to do if we already tried opening it.
if r.reader != nil {
return nil
} else if r.readerErr != nil {
}
if r.readerErr != nil {
return r.readerErr
}

Expand All @@ -204,7 +220,8 @@ func (r *LazyBinaryReader) open() error {
// Ensure none else tried to open it in the meanwhile.
if r.reader != nil {
return nil
} else if r.readerErr != nil {
}
if r.readerErr != nil {
return r.readerErr
}

Expand All @@ -214,14 +231,13 @@ func (r *LazyBinaryReader) open() error {

reader, err := NewBinaryReader(r.ctx, r.logger, r.bkt, r.dir, r.id, r.postingOffsetsInMemSampling)
if err != nil {
level.Error(r.logger).Log("msg", "failed to lazy load index-header file", "path", r.filepath, "err", err)
r.metrics.loadFailedCount.Inc()
r.readerErr = err
return err
return errors.Wrapf(err, "lazy load index-header file at %s", r.filepath)
}

r.reader = reader
level.Info(r.logger).Log("msg", "lazy loaded index-header file", "path", r.filepath, "elapsed", time.Since(startTime))
level.Debug(r.logger).Log("msg", "lazy loaded index-header file", "path", r.filepath, "elapsed", time.Since(startTime))
r.metrics.loadDuration.Observe(time.Since(startTime).Seconds())

return nil
Expand Down
14 changes: 14 additions & 0 deletions pkg/block/indexheader/reader_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"github.com/thanos-io/thanos/pkg/objstore"
)

// ReaderPool is used to istantiate new index-header readers and keep track of them.
// When the lazy reader is enabled, the pool keeps track of all instantiated readers
// and automatically close them once the idle timeout is reached. A closed lazy reader
// will be automatically re-opened upon next usage.
type ReaderPool struct {
lazyReaderEnabled bool
lazyReaderIdleTimeout time.Duration
Expand All @@ -32,6 +36,7 @@ type ReaderPool struct {
readers map[*readerTracker]struct{}
}

// NewReaderPool makes a new ReaderPool.
func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, reg prometheus.Registerer) *ReaderPool {
p := &ReaderPool{
logger: logger,
Expand Down Expand Up @@ -61,6 +66,9 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime
return p
}

// NewBinaryReader creates and returns a new binary reader. If the pool has been configured
// with lazy reader enabled, this function will return a lazy reader. The returned lazy reader
// is tracked by the pool and automatically closed once the idle timeout expires.
func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (Reader, error) {
var reader Reader
var err error
Expand Down Expand Up @@ -156,31 +164,37 @@ type readerTracker struct {
usedAt *atomic.Int64
}

// Close implements Reader.
func (r *readerTracker) Close() error {
r.pool.onReaderClosed(r)
return r.reader.Close()
}

// IndexVersion implements Reader.
func (r *readerTracker) IndexVersion() (int, error) {
r.usedAt.Store(time.Now().UnixNano())
return r.reader.IndexVersion()
}

// PostingsOffset implements Reader.
func (r *readerTracker) PostingsOffset(name string, value string) (index.Range, error) {
r.usedAt.Store(time.Now().UnixNano())
return r.reader.PostingsOffset(name, value)
}

// LookupSymbol implements Reader.
func (r *readerTracker) LookupSymbol(o uint32) (string, error) {
r.usedAt.Store(time.Now().UnixNano())
return r.reader.LookupSymbol(o)
}

// LabelValues implements Reader.
func (r *readerTracker) LabelValues(name string) ([]string, error) {
r.usedAt.Store(time.Now().UnixNano())
return r.reader.LabelValues(name)
}

// LabelNames implements Reader.
func (r *readerTracker) LabelNames() ([]string, error) {
r.usedAt.Store(time.Now().UnixNano())
return r.reader.LabelNames()
Expand Down

0 comments on commit bc167bd

Please sign in to comment.