Skip to content

Commit

Permalink
Merge branch 'main' into exp-flag
Browse files Browse the repository at this point in the history
  • Loading branch information
metonymic-smokey authored Sep 23, 2021
2 parents 4689efd + c15594a commit 4399552
Show file tree
Hide file tree
Showing 26 changed files with 31,218 additions and 24,218 deletions.
6 changes: 3 additions & 3 deletions .bingo/Variables.mk
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ $(JSONNETFMT): $(BINGO_DIR)/jsonnetfmt.mod
@echo "(re)installing $(GOBIN)/jsonnetfmt-v0.17.0"
@cd $(BINGO_DIR) && $(GO) build -mod=mod -modfile=jsonnetfmt.mod -o=$(GOBIN)/jsonnetfmt-v0.17.0 "github.com/google/go-jsonnet/cmd/jsonnetfmt"

MDOX := $(GOBIN)/mdox-v0.2.2-0.20210818122826-f16709a2bc2b
MDOX := $(GOBIN)/mdox-v0.9.0
$(MDOX): $(BINGO_DIR)/mdox.mod
@# Install binary/ries using Go 1.14+ build command. This is using bwplotka/bingo-controlled, separate go module with pinned dependencies.
@echo "(re)installing $(GOBIN)/mdox-v0.2.2-0.20210818122826-f16709a2bc2b"
@cd $(BINGO_DIR) && $(GO) build -mod=mod -modfile=mdox.mod -o=$(GOBIN)/mdox-v0.2.2-0.20210818122826-f16709a2bc2b "github.com/bwplotka/mdox"
@echo "(re)installing $(GOBIN)/mdox-v0.9.0"
@cd $(BINGO_DIR) && $(GO) build -mod=mod -modfile=mdox.mod -o=$(GOBIN)/mdox-v0.9.0 "github.com/bwplotka/mdox"

MINIO := $(GOBIN)/minio-v0.0.0-20200527010300-cccf2de129da
$(MINIO): $(BINGO_DIR)/minio.mod
Expand Down
2 changes: 1 addition & 1 deletion .bingo/mdox.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ module _ // Auto generated by https://github.com/bwplotka/bingo. DO NOT EDIT

go 1.16

require github.com/bwplotka/mdox v0.2.2-0.20210818122826-f16709a2bc2b
require github.com/bwplotka/mdox v0.9.0
2 changes: 1 addition & 1 deletion .bingo/variables.env
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ JSONNET="${GOBIN}/jsonnet-v0.17.0"

JSONNETFMT="${GOBIN}/jsonnetfmt-v0.17.0"

MDOX="${GOBIN}/mdox-v0.2.2-0.20210818122826-f16709a2bc2b"
MDOX="${GOBIN}/mdox-v0.9.0"

MINIO="${GOBIN}/minio-v0.0.0-20200527010300-cccf2de129da"

Expand Down
5 changes: 4 additions & 1 deletion .mdox.validate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ validators:
type: 'ignore'
# 301 errors even when curl-ed.
- regex: 'envoyproxy\.io'
type: 'ignore'
type: 'ignore'
# couldn't reach even when curl-ed.
- regex: 'cloud\.baidu\.com'
type: 'ignore'
14 changes: 7 additions & 7 deletions .mdox.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: 1

inputDir: "docs"
outputDir: "website/docs-pre-processed/tip"
inputDir: "$(INPUT_DIR)"
outputDir: "$(OUTPUT_DIR)"
extraInputGlobs:
- "CHANGELOG.md"
- "SECURITY.md"
Expand All @@ -16,7 +16,7 @@ localLinksStyle:

transformations:

- glob: "../CHANGELOG.md"
- glob: "$(EXTERNAL_GLOB_REL)CHANGELOG.md"
path: /thanos/CHANGELOG.md
frontMatter:
template: |
Expand All @@ -29,7 +29,7 @@ transformations:
Found a typo, inconsistency or missing information in our docs?
Help us to improve [Thanos](https://thanos.io) documentation by proposing a fix [on GitHub here](https://github.com/thanos-io/thanos/edit/main/{{ .Origin.Path }}) :heart:
- glob: "../MAINTAINERS.md"
- glob: "$(EXTERNAL_GLOB_REL)MAINTAINERS.md"
path: /thanos/MAINTAINERS.md
frontMatter:
template: |
Expand All @@ -39,7 +39,7 @@ transformations:
lastmod: "{{ .Origin.LastMod }}"
backMatter: *docBackMatter

- glob: "../SECURITY.md"
- glob: "$(EXTERNAL_GLOB_REL)SECURITY.md"
path: /thanos/SECURITY.md
frontMatter:
template: |
Expand All @@ -49,7 +49,7 @@ transformations:
lastmod: "{{ .Origin.LastMod }}"
backMatter: *docBackMatter

- glob: "../CODE_OF_CONDUCT.md"
- glob: "$(EXTERNAL_GLOB_REL)CODE_OF_CONDUCT.md"
path: /contributing/CODE_OF_CONDUCT.md
frontMatter:
template: |
Expand All @@ -59,7 +59,7 @@ transformations:
lastmod: "{{ .Origin.LastMod }}"
backMatter: *docBackMatter

- glob: "../CONTRIBUTING.md"
- glob: "$(EXTERNAL_GLOB_REL)CONTRIBUTING.md"
path: /contributing/CONTRIBUTING.md
frontMatter:
template: |
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased

### Fixed

- [#4663](https://github.com/thanos-io/thanos/pull/4663) Fetcher: Fix discovered data races

### Added

- [#4680](https://github.com/thanos-io/thanos/pull/4680) Query: add `exemplar.partial-response` flag to control partial response.
- [#4679](https://github.com/thanos-io/thanos/pull/4679) Added `enable-feature` flag to enable negative offsets and @ modifier, similar to Prometheus.
- [#4696](https://github.com/thanos-io/thanos/pull/4696) Query: add cache name to tracing spans.

## v0.23.0 - In Progress

Expand Down
55 changes: 44 additions & 11 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,11 @@ type BaseFetcher struct {

// Optional local directory to cache meta.json files.
cacheDir string
cached map[ulid.ULID]*metadata.Meta
syncs prometheus.Counter
g singleflight.Group

mtx sync.Mutex
cached map[ulid.ULID]*metadata.Meta
}

// NewBaseFetcher constructs BaseFetcher.
Expand Down Expand Up @@ -386,7 +388,10 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
for id, m := range resp.metas {
cached[id] = m
}

f.mtx.Lock()
f.cached = cached
f.mtx.Unlock()

// Best effort cleanup of disk-cached metas.
if f.cacheDir != "" {
Expand Down Expand Up @@ -473,10 +478,17 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter
return metas, resp.partial, errors.Wrap(resp.metaErrs.Err(), "incomplete view")
}

level.Info(f.logger).Log("msg", "successfully synchronized block metadata", "duration", time.Since(start).String(), "duration_ms", time.Since(start).Milliseconds(), "cached", len(f.cached), "returned", len(metas), "partial", len(resp.partial))
level.Info(f.logger).Log("msg", "successfully synchronized block metadata", "duration", time.Since(start).String(), "duration_ms", time.Since(start).Milliseconds(), "cached", f.countCached(), "returned", len(metas), "partial", len(resp.partial))
return metas, resp.partial, nil
}

func (f *BaseFetcher) countCached() int {
f.mtx.Lock()
defer f.mtx.Unlock()

return len(f.cached)
}

type MetaFetcher struct {
wrapped *BaseFetcher
metrics *FetcherMetrics
Expand Down Expand Up @@ -711,7 +723,11 @@ func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*met
}

for u, meta := range metas {
l := meta.Thanos.Labels
l := make(map[string]string)
for n, v := range meta.Thanos.Labels {
l[n] = v
}

for _, replicaLabel := range r.replicaLabels {
if _, exists := l[replicaLabel]; exists {
level.Debug(r.logger).Log("msg", "replica label removed", "label", replicaLabel)
Expand All @@ -723,7 +739,10 @@ func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*met
level.Warn(r.logger).Log("msg", "block has no labels left, creating one", r.replicaLabels[0], "deduped")
l[r.replicaLabels[0]] = "deduped"
}
metas[u].Thanos.Labels = l

nm := *meta
nm.Thanos.Labels = l
metas[u] = &nm
}
return nil
}
Expand Down Expand Up @@ -778,10 +797,12 @@ func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.UL
// Delay is not considered when computing DeletionMarkBlocks map.
// Not go-routine safe.
type IgnoreDeletionMarkFilter struct {
logger log.Logger
delay time.Duration
concurrency int
bkt objstore.InstrumentedBucketReader
logger log.Logger
delay time.Duration
concurrency int
bkt objstore.InstrumentedBucketReader

mtx sync.Mutex
deletionMarkMap map[ulid.ULID]*metadata.DeletionMark
}

Expand All @@ -797,13 +818,21 @@ func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBuc

// DeletionMarkBlocks returns block ids that were marked for deletion.
func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata.DeletionMark {
return f.deletionMarkMap
f.mtx.Lock()
defer f.mtx.Unlock()

deletionMarkMap := make(map[ulid.ULID]*metadata.DeletionMark, len(f.deletionMarkMap))
for id, meta := range f.deletionMarkMap {
deletionMarkMap[id] = meta
}

return deletionMarkMap
}

// Filter filters out blocks that are marked for deletion after a given delay.
// It also returns the blocks that can be deleted since they were uploaded delay duration before current time.
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error {
f.deletionMarkMap = make(map[ulid.ULID]*metadata.DeletionMark)
deletionMarkMap := make(map[ulid.ULID]*metadata.DeletionMark)

// Make a copy of block IDs to check, in order to avoid concurrency issues
// between the scheduler and workers.
Expand Down Expand Up @@ -839,7 +868,7 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL
// Keep track of the blocks marked for deletion and filter them out if their
// deletion time is greater than the configured delay.
mtx.Lock()
f.deletionMarkMap[id] = m
deletionMarkMap[id] = m
if time.Since(time.Unix(m.DeletionTime, 0)).Seconds() > f.delay.Seconds() {
synced.WithLabelValues(MarkedForDeletionMeta).Inc()
delete(metas, id)
Expand Down Expand Up @@ -871,6 +900,10 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL
return errors.Wrap(err, "filter blocks marked for deletion")
}

f.mtx.Lock()
f.deletionMarkMap = deletionMarkMap
f.mtx.Unlock()

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ type Cache interface {
// Fetch multiple keys from cache. Returns map of input keys to data.
// If key isn't in the map, data for given key was not found.
Fetch(ctx context.Context, keys []string) map[string][]byte

Name() string
}
6 changes: 6 additions & 0 deletions pkg/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type InMemoryCache struct {
logger log.Logger
maxSizeBytes uint64
maxItemSizeBytes uint64
name string

mtx sync.Mutex
curSize uint64
Expand Down Expand Up @@ -100,6 +101,7 @@ func NewInMemoryCacheWithConfig(name string, logger log.Logger, reg prometheus.R
logger: logger,
maxSizeBytes: uint64(config.MaxSize),
maxItemSizeBytes: uint64(config.MaxItemSize),
name: name,
}

c.evicted = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -303,3 +305,7 @@ func (c *InMemoryCache) Fetch(ctx context.Context, keys []string) map[string][]b
}
return results
}

func (c *InMemoryCache) Name() string {
return c.name
}
6 changes: 6 additions & 0 deletions pkg/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
type MemcachedCache struct {
logger log.Logger
memcached cacheutil.MemcachedClient
name string

// Metrics.
requests prometheus.Counter
Expand All @@ -30,6 +31,7 @@ func NewMemcachedCache(name string, logger log.Logger, memcached cacheutil.Memca
c := &MemcachedCache{
logger: logger,
memcached: memcached,
name: name,
}

c.requests = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -81,3 +83,7 @@ func (c *MemcachedCache) Fetch(ctx context.Context, keys []string) map[string][]
c.hits.Add(float64(len(results)))
return results
}

func (c *MemcachedCache) Name() string {
return c.name
}
5 changes: 5 additions & 0 deletions pkg/cache/tracing_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func (t TracingCache) Store(ctx context.Context, data map[string][]byte, ttl tim

func (t TracingCache) Fetch(ctx context.Context, keys []string) (result map[string][]byte) {
tracing.DoWithSpan(ctx, "cache_fetch", func(spanCtx context.Context, span opentracing.Span) {
span.SetTag("name", t.Name())
span.LogKV("requested keys", len(keys))

result = t.c.Fetch(spanCtx, keys)
Expand All @@ -39,3 +40,7 @@ func (t TracingCache) Fetch(ctx context.Context, keys []string) (result map[stri
})
return
}

func (t TracingCache) Name() string {
return t.c.Name()
}
10 changes: 5 additions & 5 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,15 +704,15 @@ func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*la
return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_series HTTP[client]", &u, &m)
}

// LabelNames returns all known label names constrained by the given matchers. It uses gRPC errors.
// LabelNamesInGRPC returns all known label names constrained by the given matchers. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []storepb.LabelMatcher, startTime, endTime int64) ([]string, error) {
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/labels")
q := u.Query()

if len(matchers) > 0 {
q.Add("match[]", storepb.MatchersToString(matchers...))
q.Add("match[]", storepb.PromMatchersToString(matchers...))
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
Expand All @@ -726,13 +726,13 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [

// LabelValuesInGRPC returns all known label values for a given label name. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []storepb.LabelMatcher, startTime, endTime int64) ([]string, error) {
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/label/", label, "/values")
q := u.Query()

if len(matchers) > 0 {
q.Add("match[]", storepb.MatchersToString(matchers...))
q.Add("match[]", storepb.PromMatchersToString(matchers...))
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
Expand Down
Loading

0 comments on commit 4399552

Please sign in to comment.