diff --git a/.circleci/config.yml b/.circleci/config.yml index f476096349..fc0bc0bd63 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -23,7 +23,7 @@ jobs: echo "Awesome! GCS integration tests are enabled." fi - run: make check-go-mod - - run: make errcheck + - run: make lint - run: make check-docs - run: make format - run: @@ -131,4 +131,4 @@ workflows: tags: only: /^v[0-9]+(\.[0-9]+){2}(-.+|[^-.]*)$/ branches: - ignore: /.*/ \ No newline at end of file + ignore: /.*/ diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000000..95a92fb61a --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,21 @@ +linters-settings: + errcheck: + exclude: ./.errcheck_excludes.txt + +issues: + exclude-rules: + # TODO: move away from deprecated prometheus.InstrumentHandler{,Func} + - linters: + - staticcheck + text: "SA1019:" + # These are not being checked since these methods exist + # so that no one else could implement them. + - linters: + - unused + text: "SourceStoreAPI.implementsStoreAPI" + - linters: + - unused + text: "SourceStoreAPI.producesBlocks" + - linters: + - unused + text: "Source.producesBlocks" diff --git a/Makefile b/Makefile index 8c92d49b4d..f797f11d4f 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,4 @@ PREFIX ?= $(shell pwd) -FILES_TO_FMT ?= $(shell find . -path ./vendor -prune -o -name '*.go' -print) DOCKER_IMAGE_NAME ?= thanos DOCKER_IMAGE_TAG ?= $(subst /,-,$(shell git rev-parse --abbrev-ref HEAD))-$(shell date +%Y-%m-%d)-$(shell git rev-parse --short HEAD) @@ -15,13 +14,8 @@ export GOPROXY EMBEDMD ?= $(GOBIN)/embedmd-$(EMBEDMD_VERSION) # v2.0.0 EMBEDMD_VERSION ?= 97c13d6e41602fc6e397eb51c45f38069371a969 -ERRCHECK ?= $(GOBIN)/errcheck-$(ERRCHECK_VERSION) -# v1.2.0 -ERRCHECK_VERSION ?= e14f8d59a22d460d56c5ee92507cd94c78fbf274 LICHE ?= $(GOBIN)/liche-$(LICHE_VERSION) LICHE_VERSION ?= 2a2e6e56f6c615c17b2e116669c4cdb31b5453f3 -GOIMPORTS ?= $(GOBIN)/goimports-$(GOIMPORTS_VERSION) -GOIMPORTS_VERSION ?= 9d4d845e86f14303813298ede731a971dd65b593 PROMU ?= $(GOBIN)/promu-$(PROMU_VERSION) PROMU_VERSION ?= 9583e5a6448f97c6294dca72dd1d173e28f8d4a4 PROTOC ?= $(GOBIN)/protoc-$(PROTOC_VERSION) @@ -33,6 +27,10 @@ HUGO ?= $(GOBIN)/hugo-$(HUGO_VERSION) GOBINDATA_VERSION ?= a9c83481b38ebb1c4eb8f0168fd4b10ca1d3c523 GOBINDATA ?= $(GOBIN)/go-bindata-$(GOBINDATA_VERSION) GIT ?= $(shell which git) +# golangci-lint which includes errcheck, goimports +# and more. v1.16.0 +GOLANGCILINT_VERSION ?= 97ea1cbb21bbf5e4d0e8bcc0f9243385e9262dcc +GOLANGCILINT ?= $(GOBIN)/golangci-lint-$(GOLANGCILINT_VERSION) WEB_DIR ?= website WEBSITE_BASE_URL ?= https://thanos.io @@ -151,19 +149,14 @@ check-docs: $(EMBEDMD) $(LICHE) build @$(LICHE) --recursive docs --exclude "cloud.tencent.com" --document-root . @$(LICHE) --exclude "cloud.tencent.com" --document-root . *.md -# errcheck performs static analysis and returns error if any of the errors is not checked. -.PHONY: errcheck -errcheck: $(ERRCHECK) - @echo ">> errchecking the code" - $(ERRCHECK) -verbose -exclude .errcheck_excludes.txt ./cmd/... ./pkg/... ./test/... - # format formats the code (including imports format). -# NOTE: format requires deps to not remove imports that are used, just not resolved. -# This is not encoded, because it is often used in IDE onSave logic. +# # NOTE: format requires deps to not remove imports that are used, just not resolved. +# # This is not encoded, because it is often used in IDE onSave logic. .PHONY: format -format: $(GOIMPORTS) +format: check-git $(GOLANGCILINT) @echo ">> formatting code" - @$(GOIMPORTS) -w $(FILES_TO_FMT) + @$(GOLANGCILINT) run --disable-all -E goimports ./... + # proto generates golang files from Thanos proto files. .PHONY: proto @@ -195,12 +188,6 @@ test-deps: $(call fetch_go_bin_version,github.com/prometheus/alertmanager/cmd/alertmanager,$(ALERTMANAGER_VERSION)) $(call fetch_go_bin_version,github.com/minio/minio,$(MINIO_SERVER_VERSION)) -# vet vets the code. -.PHONY: vet -vet: check-git - @echo ">> vetting code" - @go vet ./... - # go mod related .PHONY: go-mod-tidy go-mod-tidy: check-git @@ -231,6 +218,11 @@ web: web-pre-process $(HUGO) # TODO(bwplotka): Make it --gc @cd $(WEB_DIR) && HUGO_ENV=production $(HUGO) --config hugo.yaml --minify -v -b $(WEBSITE_BASE_URL) +.PHONY: lint +lint: check-git $(GOLANGCILINT) + @echo ">> linting all of the Go files" + @$(GOLANGCILINT) run ./... + .PHONY: web-serve web-serve: web-pre-process $(HUGO) @echo ">> serving documentation website" @@ -240,9 +232,6 @@ web-serve: web-pre-process $(HUGO) $(EMBEDMD): $(call fetch_go_bin_version,github.com/campoy/embedmd,$(EMBEDMD_VERSION)) -$(ERRCHECK): - $(call fetch_go_bin_version,github.com/kisielk/errcheck,$(ERRCHECK_VERSION)) - $(GOIMPORTS): $(call fetch_go_bin_version,golang.org/x/tools/cmd/goimports,$(GOIMPORTS_VERSION)) @@ -260,6 +249,9 @@ $(HUGO): $(GOBINDATA): $(call fetch_go_bin_version,github.com/go-bindata/go-bindata/go-bindata,$(GOBINDATA_VERSION)) +$(GOLANGCILINT): + $(call fetch_go_bin_version,github.com/golangci/golangci-lint/cmd/golangci-lint,$(GOLANGCILINT_VERSION)) + $(PROTOC): @mkdir -p $(TMP_GOPATH) @echo ">> fetching protoc@${PROTOC_VERSION}" diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index 65f2e2f411..d8a3f53b72 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -60,7 +60,6 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin registerBucketLs(m, cmd, name, objStoreConfig) registerBucketInspect(m, cmd, name, objStoreConfig) registerBucketWeb(m, cmd, name, objStoreConfig) - return } func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *pathOrContent) { diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index d799dc50b9..57c234e6c9 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -206,6 +206,7 @@ func runCompact( // are in milliseconds. comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool()) if err != nil { + cancel() return errors.Wrap(err, "create compactor") } @@ -216,11 +217,13 @@ func runCompact( ) if err := os.RemoveAll(downsamplingDir); err != nil { + cancel() return errors.Wrap(err, "clean working downsample directory") } compactor, err := compact.NewBucketCompactor(logger, sy, comp, compactDir, bkt, concurrency) if err != nil { + cancel() return errors.Wrap(err, "create bucket compactor") } diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 0d4b1a60db..ba15f1c8df 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -298,8 +298,6 @@ func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bu level.Info(logger).Log("msg", "uploaded block", "id", id, "duration", time.Since(begin)) - begin = time.Now() - // It is not harmful if these fails. if err := os.RemoveAll(bdir); err != nil { level.Warn(logger).Log("msg", "failed to clean directory", "dir", bdir, "err", err) diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index 2a72a79d03..9257a3cc8c 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -145,6 +145,10 @@ func main() { var closer io.Closer var confContentYaml []byte confContentYaml, err = tracingConfig.Content() + if err != nil { + level.Error(logger).Log("msg", "getting tracing config failed", "err", err) + os.Exit(1) + } if len(confContentYaml) == 0 { level.Info(logger).Log("msg", "Tracing will be disabled") diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 281168599f..152fe1732a 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -257,10 +257,7 @@ func runReceive( err error ) g.Add(func() error { - select { - case <-dbOpen: - break - } + <-dbOpen l, err = net.Listen("tcp", grpcBindAddr) if err != nil { diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index d5b739d616..548ae900f9 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -762,14 +762,14 @@ func queryFunc( if err != nil { level.Error(logger).Log("err", err, "query", q) + } else { + if len(warns) > 0 { + ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc() + // TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ): + level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q) + } + return v, nil } - - if err == nil && len(warns) > 0 { - ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc() - // TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ): - level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q) - } - return v, err } return nil, errors.Errorf("no query peer reachable") } diff --git a/pkg/alert/alert.go b/pkg/alert/alert.go index 87ceab7052..ca49e4f9b0 100644 --- a/pkg/alert/alert.go +++ b/pkg/alert/alert.go @@ -360,7 +360,6 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) { s.dropped.Add(float64(len(alerts))) level.Warn(s.logger).Log("msg", "failed to send alerts to all alertmanagers", "alertmanagers", amrs, "alerts", string(b)) - return } func (s *Sender) sendOne(ctx context.Context, url string, b []byte) error { diff --git a/pkg/block/index.go b/pkg/block/index.go index cb2c7b1ed6..39ca48d4d9 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -382,7 +382,7 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime stats.OutOfOrderLabels++ level.Warn(logger).Log("msg", "out-of-order label set: known bug in Prometheus 2.8.0 and below", - "labelset", fmt.Sprintf("%s", lset), + "labelset", lset.String(), "series", fmt.Sprintf("%d", id), ) } @@ -727,11 +727,6 @@ func (ss stringset) set(s string) { ss[s] = struct{}{} } -func (ss stringset) has(s string) bool { - _, ok := ss[s] - return ok -} - func (ss stringset) String() string { return strings.Join(ss.slice(), ",") } diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index 3f48f36bd7..ed5ef8d185 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -44,8 +44,6 @@ const ( // Meta describes the a block's meta. It wraps the known TSDB meta structure and // extends it by Thanos-specific fields. type Meta struct { - Version int `json:"version"` - tsdb.BlockMeta Thanos Thanos `json:"thanos"` diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 61a9f0be1d..cb54302826 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -62,9 +62,6 @@ type syncerMetrics struct { garbageCollectionDuration prometheus.Histogram compactions *prometheus.CounterVec compactionFailures *prometheus.CounterVec - indexCacheBlocks prometheus.Counter - indexCacheTraverse prometheus.Counter - indexCacheFailures prometheus.Counter } func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { @@ -985,6 +982,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { finishedAllGroups = true mtx sync.Mutex ) + defer workCtxCancel() // Set up workers who will compact the groups when the groups are ready. // They will compact available groups until they encounter an error, after which they will stop. diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index e7b09437ec..9a9fceced1 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -241,7 +241,6 @@ func (a *aggregator) add(v float64) { // aggrChunkBuilder builds chunks for multiple different aggregates. type aggrChunkBuilder struct { mint, maxt int64 - isCounter bool added int chunks [5]chunkenc.Chunk diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index 1a26b1e5db..b07bc97aad 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -254,11 +254,11 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) { func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxTime time.Time, resolutionLevel int64) { t.Helper() meta1 := metadata.Meta{ - Version: 1, BlockMeta: tsdb.BlockMeta{ ULID: ulid.MustParse(id), MinTime: minTime.Unix() * 1000, MaxTime: maxTime.Unix() * 1000, + Version: 1, }, Thanos: metadata.Thanos{ Downsample: metadata.ThanosDownsample{ diff --git a/pkg/discovery/dns/provider.go b/pkg/discovery/dns/provider.go index 68fb0636eb..76d358fe32 100644 --- a/pkg/discovery/dns/provider.go +++ b/pkg/discovery/dns/provider.go @@ -97,7 +97,6 @@ func (p *Provider) Resolve(ctx context.Context, addrs []string) { p.resolverLookupsCount.Inc() if err != nil { // The DNS resolution failed. Continue without modifying the old records. - p.resolved[addr] = p.resolved[addr] // Ensure metrics capture the result even if empty. p.resolverFailuresCount.Inc() level.Error(p.logger).Log("msg", "dns resolution failed", "addr", addr, "err", err) continue diff --git a/pkg/objstore/azure/helpers.go b/pkg/objstore/azure/helpers.go index 0c54cc62b5..d1dda2c076 100644 --- a/pkg/objstore/azure/helpers.go +++ b/pkg/objstore/azure/helpers.go @@ -25,7 +25,7 @@ func getContainerURL(ctx context.Context, conf Config) (blob.ContainerURL, error MaxTries: int32(conf.MaxRetries), } if deadline, ok := ctx.Deadline(); ok { - retryOptions.TryTimeout = deadline.Sub(time.Now()) + retryOptions.TryTimeout = time.Until(deadline) } p := blob.NewPipeline(c, blob.PipelineOptions{ @@ -73,7 +73,7 @@ func getBlobURL(ctx context.Context, conf Config, blobName string) (blob.BlockBl func parseError(errorCode string) string { match := errorCodeRegex.FindStringSubmatch(errorCode) - if match != nil && len(match) == 2 { + if len(match) == 2 { return match[1] } return errorCode diff --git a/pkg/objstore/gcs/gcs.go b/pkg/objstore/gcs/gcs.go index e4f6a7295a..9445e08f33 100644 --- a/pkg/objstore/gcs/gcs.go +++ b/pkg/objstore/gcs/gcs.go @@ -168,6 +168,7 @@ func (b *Bucket) Close() error { // In a close function it empties and deletes the bucket. func NewTestBucket(t testing.TB, project string) (objstore.Bucket, func(), error) { ctx, cancel := context.WithCancel(context.Background()) + defer cancel() src := rand.NewSource(time.Now().UnixNano()) gTestConfig := Config{ Bucket: fmt.Sprintf("test_%s_%x", strings.ToLower(t.Name()), src.Int63()), @@ -180,12 +181,10 @@ func NewTestBucket(t testing.TB, project string) (objstore.Bucket, func(), error b, err := NewBucket(ctx, log.NewNopLogger(), bc, "thanos-e2e-test") if err != nil { - cancel() return nil, nil, err } if err = b.bkt.Create(ctx, project, nil); err != nil { - cancel() _ = b.Close() return nil, nil, err } @@ -196,7 +195,6 @@ func NewTestBucket(t testing.TB, project string) (objstore.Bucket, func(), error if err := b.bkt.Delete(ctx); err != nil { t.Logf("deleting bucket failed: %s", err) } - cancel() if err := b.Close(); err != nil { t.Logf("closing bucket failed: %s", err) } diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go index b8ef90a0c8..d7aa43a0d8 100644 --- a/pkg/pool/pool.go +++ b/pkg/pool/pool.go @@ -16,7 +16,7 @@ type BytesPool struct { maxTotal uint64 usedTotal uint64 - new func(s int) []byte + new func(s int) *[]byte } // NewBytesPool returns a new BytesPool with size buckets for minSize to maxSize @@ -42,8 +42,9 @@ func NewBytesPool(minSize, maxSize int, factor float64, maxTotal uint64) (*Bytes buckets: make([]sync.Pool, len(sizes)), sizes: sizes, maxTotal: maxTotal, - new: func(sz int) []byte { - return make([]byte, 0, sz) + new: func(sz int) *[]byte { + s := make([]byte, 0, sz) + return &s }, } return p, nil @@ -53,7 +54,7 @@ func NewBytesPool(minSize, maxSize int, factor float64, maxTotal uint64) (*Bytes var ErrPoolExhausted = errors.New("pool exhausted") // Get returns a new byte slices that fits the given size. -func (p *BytesPool) Get(sz int) ([]byte, error) { +func (p *BytesPool) Get(sz int) (*[]byte, error) { used := atomic.LoadUint64(&p.usedTotal) if p.maxTotal > 0 && used+uint64(sz) > p.maxTotal { @@ -63,11 +64,11 @@ func (p *BytesPool) Get(sz int) ([]byte, error) { if sz > bktSize { continue } - b, ok := p.buckets[i].Get().([]byte) + b, ok := p.buckets[i].Get().(*[]byte) if !ok { b = p.new(bktSize) } - atomic.AddUint64(&p.usedTotal, uint64(cap(b))) + atomic.AddUint64(&p.usedTotal, uint64(cap(*b))) return b, nil } @@ -77,12 +78,16 @@ func (p *BytesPool) Get(sz int) ([]byte, error) { } // Put returns a byte slice to the right bucket in the pool. -func (p *BytesPool) Put(b []byte) { +func (p *BytesPool) Put(b *[]byte) { + if b == nil { + return + } for i, bktSize := range p.sizes { - if cap(b) > bktSize { + if cap(*b) > bktSize { continue } - p.buckets[i].Put(b[:0]) + *b = (*b)[:0] + p.buckets[i].Put(b) break } atomic.AddUint64(&p.usedTotal, ^uint64(p.usedTotal-1)) diff --git a/pkg/pool/pool_test.go b/pkg/pool/pool_test.go index ae26e6536a..e439ed8272 100644 --- a/pkg/pool/pool_test.go +++ b/pkg/pool/pool_test.go @@ -10,13 +10,6 @@ func TestBytesPool(t *testing.T) { chunkPool, err := NewBytesPool(10, 100, 2, 1000) testutil.Ok(t, err) - // Inject alloc counter. - allocs := uint64(0) - wrapped := chunkPool.new - chunkPool.new = func(sz int) []byte { - allocs++ - return wrapped(sz) - } testutil.Equals(t, []int{10, 20, 40, 80}, chunkPool.sizes) for i := 0; i < 10; i++ { @@ -27,7 +20,7 @@ func TestBytesPool(t *testing.T) { if i%2 == 0 { for j := 0; j < 6; j++ { - b = append(b, []byte{'1', '2', '3', '4', '5'}...) + *b = append(*b, []byte{'1', '2', '3', '4', '5'}...) } } chunkPool.Put(b) @@ -56,5 +49,4 @@ func TestBytesPool(t *testing.T) { chunkPool.Put(b2) testutil.Equals(t, uint64(0), chunkPool.usedTotal) - testutil.Equals(t, uint64(4), allocs) } diff --git a/pkg/promclient/promclient_e2e_test.go b/pkg/promclient/promclient_e2e_test.go index 9f30996dde..0f4929653e 100644 --- a/pkg/promclient/promclient_e2e_test.go +++ b/pkg/promclient/promclient_e2e_test.go @@ -140,10 +140,10 @@ func TestRule_UnmarshalScalarResponse(t *testing.T) { testutil.Equals(t, vectorResult.String(), expectedVector.String()) // Test invalid length of scalar data structure. - vectorResult, err = convertScalarJSONToVector(invalidLengthScalarJSONResult) + _, err = convertScalarJSONToVector(invalidLengthScalarJSONResult) testutil.NotOk(t, err) // Test invalid format of scalar data. - vectorResult, err = convertScalarJSONToVector(invalidDataScalarJSONResult) + _, err = convertScalarJSONToVector(invalidDataScalarJSONResult) testutil.NotOk(t, err) } diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index f9dcfb20b7..0d3645eab1 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -47,18 +47,18 @@ type status string const ( statusSuccess status = "success" - statusError = "error" + statusError status = "error" ) type ErrorType string const ( errorNone ErrorType = "" - errorTimeout = "timeout" - errorCanceled = "canceled" - errorExec = "execution" - errorBadData = "bad_data" - ErrorInternal = "internal" + errorTimeout ErrorType = "timeout" + errorCanceled ErrorType = "canceled" + errorExec ErrorType = "execution" + errorBadData ErrorType = "bad_data" + ErrorInternal ErrorType = "internal" ) var corsHeaders = map[string]string{ @@ -289,7 +289,7 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) { } // We are starting promQL tracing span here, because we have no control over promQL code. - span, ctx := tracing.StartSpan(r.Context(), "promql_instant_query") + span, ctx := tracing.StartSpan(ctx, "promql_instant_query") defer span.Finish() begin := api.now() @@ -387,7 +387,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) { } // We are starting promQL tracing span here, because we have no control over promQL code. - span, ctx := tracing.StartSpan(r.Context(), "promql_range_query") + span, ctx := tracing.StartSpan(ctx, "promql_range_query") defer span.Finish() begin := api.now() diff --git a/pkg/query/api/v1_test.go b/pkg/query/api/v1_test.go index 696778ce85..c5fbd88742 100644 --- a/pkg/query/api/v1_test.go +++ b/pkg/query/api/v1_test.go @@ -576,7 +576,7 @@ func TestEndpoints(t *testing.T) { } return } - if apiErr == nil && test.errType != errorNone { + if test.errType != errorNone { t.Fatalf("Expected error of type %q but got none", test.errType) } diff --git a/pkg/query/iter.go b/pkg/query/iter.go index 8e54d61e68..bc0b767db0 100644 --- a/pkg/query/iter.go +++ b/pkg/query/iter.go @@ -395,7 +395,6 @@ func (s *dedupSeries) Iterator() (it storage.SeriesIterator) { type dedupSeriesIterator struct { a, b storage.SeriesIterator - i int aok, bok bool lastT int64 diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 18218eeabe..0ae92c0dba 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -46,10 +46,10 @@ func TestQuerier_DownsampledData(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() testProxy := &storeServer{ resps: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "a", "aaa", "bbb"), []sample{{99, 1}, {199, 5}}), // Downsampled chunk from Store - storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "b", "bbbb", "eee"), []sample{{99, 3}, {199, 8}}), // Downsampled chunk from Store - storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "c", "qwe", "wqeqw"), []sample{{99, 5}, {199, 15}}), // Downsampled chunk from Store - storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "c", "htgtreytr", "vbnbv"), []sample{{99, 123}, {199, 15}}), // Downsampled chunk from Store + storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "a", "aaa", "bbb"), []sample{{99, 1}, {199, 5}}), // Downsampled chunk from Store + storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "b", "bbbb", "eee"), []sample{{99, 3}, {199, 8}}), // Downsampled chunk from Store + storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "c", "qwe", "wqeqw"), []sample{{99, 5}, {199, 15}}), // Downsampled chunk from Store + storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "c", "htgtreytr", "vbnbv"), []sample{{99, 123}, {199, 15}}), // Downsampled chunk from Store storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "d", "asdsad", "qweqwewq"), []sample{{22, 5}, {44, 8}, {199, 15}}), // Raw chunk from Sidecar storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "d", "asdsad", "qweqwebb"), []sample{{22, 5}, {44, 8}, {199, 15}}), // Raw chunk from Sidecar }, @@ -315,23 +315,23 @@ func TestDedupSeriesSet(t *testing.T) { vals []sample }{ { - lset: labels.Labels{{"a", "1"}, {"c", "3"}}, + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, vals: []sample{{10000, 1}, {20000, 2}, {60000, 3}, {70000, 4}, {200000, 5}, {210000, 6}}, }, { - lset: labels.Labels{{"a", "1"}, {"c", "3"}, {"d", "4"}}, + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}}, vals: []sample{{10000, 1}, {20000, 2}}, }, { - lset: labels.Labels{{"a", "1"}, {"c", "3"}}, + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}}, vals: []sample{{10000, 1}, {20000, 2}}, }, { - lset: labels.Labels{{"a", "1"}, {"c", "4"}}, + lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "c", Value: "4"}}, vals: []sample{{10000, 1}, {20000, 2}}, }, { - lset: labels.Labels{{"a", "2"}, {"c", "3"}}, + lset: labels.Labels{{Name: "a", Value: "2"}, {Name: "c", Value: "3"}}, vals: []sample{{10000, 1}, {20000, 2}, {60000, 3}, {70000, 4}}, }, } @@ -540,7 +540,7 @@ func storeSeriesResponse(t testing.TB, lset labels.Labels, smplChunks ...[]sampl ch := storepb.AggrChunk{ MinTime: smpls[0].t, MaxTime: smpls[len(smpls)-1].t, - Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}, + Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}, } s.Chunks = append(s.Chunks, ch) diff --git a/pkg/query/storeset.go b/pkg/query/storeset.go index af638403bc..49a36d8981 100644 --- a/pkg/query/storeset.go +++ b/pkg/query/storeset.go @@ -64,7 +64,7 @@ func (s *grpcStoreSpec) Addr() string { // Metadata method for gRPC store API tries to reach host Info method until context timeout. If we are unable to get metadata after // that time, we assume that the host is unhealthy and return error. func (s *grpcStoreSpec) Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, err error) { - resp, err := client.Info(ctx, &storepb.InfoRequest{}, grpc.FailFast(false)) + resp, err := client.Info(ctx, &storepb.InfoRequest{}, grpc.WaitForReady(true)) if err != nil { return nil, 0, 0, errors.Wrapf(err, "fetching store info from %s", s.addr) } @@ -146,9 +146,9 @@ func NewStoreSet( storeNodeConnections: storeNodeConnections, gRPCInfoCallTimeout: 10 * time.Second, externalLabelOccurrencesInStores: map[string]int{}, - stores: make(map[string]*storeRef), - storeStatuses: make(map[string]*StoreStatus), - unhealthyStoreTimeout: unhealthyStoreTimeout, + stores: make(map[string]*storeRef), + storeStatuses: make(map[string]*StoreStatus), + unhealthyStoreTimeout: unhealthyStoreTimeout, } storeNodeCollector := &storeSetNodeCollector{externalLabelOccurrences: ss.externalLabelOccurrences} @@ -315,7 +315,7 @@ func (s *StoreSet) getHealthyStores(ctx context.Context) map[string]*storeRef { store = &storeRef{StoreClient: storepb.NewStoreClient(conn), cc: conn, addr: addr, logger: s.logger} // Initial info call for all types of stores to check gRPC StoreAPI. - resp, err := store.StoreClient.Info(ctx, &storepb.InfoRequest{}, grpc.FailFast(false)) + resp, err := store.StoreClient.Info(ctx, &storepb.InfoRequest{}, grpc.WaitForReady(true)) if err != nil { store.close() s.updateStoreStatus(store, err) diff --git a/pkg/query/test_print.go b/pkg/query/test_print.go deleted file mode 100644 index 70bc292439..0000000000 --- a/pkg/query/test_print.go +++ /dev/null @@ -1,34 +0,0 @@ -package query - -import ( - "fmt" - - "github.com/prometheus/prometheus/storage" -) - -type printSeriesSet struct { - set storage.SeriesSet -} - -func newPrintSeriesSet(set storage.SeriesSet) storage.SeriesSet { - return &printSeriesSet{set: set} -} - -func (s *printSeriesSet) Next() bool { - return s.set.Next() -} - -func (s *printSeriesSet) At() storage.Series { - at := s.set.At() - fmt.Println("Series", at.Labels()) - - i := at.Iterator() - for i.Next() { - fmt.Println(i.At()) - } - return at -} - -func (s *printSeriesSet) Err() error { - return s.set.Err() -} diff --git a/pkg/reloader/example_test.go b/pkg/reloader/example_test.go index 7b046734d9..7a1f8575ce 100644 --- a/pkg/reloader/example_test.go +++ b/pkg/reloader/example_test.go @@ -35,7 +35,6 @@ func ExampleReloader() { if _, err := io.WriteString(w, "Reloaded\n"); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) } - return } http.HandleFunc("/-/reload", reloadHandler) diff --git a/pkg/reloader/reloader_test.go b/pkg/reloader/reloader_test.go index dcf636e7d1..257e32aed2 100644 --- a/pkg/reloader/reloader_test.go +++ b/pkg/reloader/reloader_test.go @@ -139,6 +139,7 @@ config: } }() err = reloader.Watch(ctx) + testutil.Ok(t, err) cancel() g.Wait() testutil.Equals(t, 2, reloads.Load().(int)) diff --git a/pkg/rule/api/v1.go b/pkg/rule/api/v1.go index 62a0c5ea46..7a654a711f 100644 --- a/pkg/rule/api/v1.go +++ b/pkg/rule/api/v1.go @@ -104,7 +104,7 @@ func (api *API) rules(r *http.Request) (interface{}, []error, *qapi.ApiError) { } default: err := fmt.Errorf("failed to assert type of rule '%v'", rule.Name()) - return nil, nil, &qapi.ApiError{qapi.ErrorInternal, err} + return nil, nil, &qapi.ApiError{Typ: qapi.ErrorInternal, Err: err} } apiRuleGroup.Rules = append(apiRuleGroup.Rules, enrichedRule) diff --git a/pkg/rule/api/v1_test.go b/pkg/rule/api/v1_test.go index f3e2c8d585..cd30377867 100644 --- a/pkg/rule/api/v1_test.go +++ b/pkg/rule/api/v1_test.go @@ -138,7 +138,6 @@ func testEndpoints(t *testing.T, api *API) { params map[string]string query url.Values response interface{} - errType qapi.ErrorType } var tests = []test{ { diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index e0790d179b..db05786b62 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -77,7 +77,6 @@ func newMetrics(r prometheus.Registerer, uploadCompacted bool) *metrics { type Shipper struct { logger log.Logger dir string - workDir string metrics *metrics bucket objstore.Bucket labels func() labels.Labels diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index 62c6b65d02..cd2acb1fd2 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -58,7 +58,6 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) { testutil.Ok(t, os.Mkdir(tmp, 0777)) meta := metadata.Meta{ - Version: 1, BlockMeta: tsdb.BlockMeta{ Version: 1, ULID: id, @@ -190,8 +189,6 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) { defer upcancel() testutil.Ok(t, p.WaitPrometheusUp(upctx)) - shipper := NewWithCompacted(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource) - p.DisableCompaction() testutil.Ok(t, p.Restart()) @@ -199,7 +196,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) { defer upcancel2() testutil.Ok(t, p.WaitPrometheusUp(upctx2)) - shipper = NewWithCompacted(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource) + shipper := NewWithCompacted(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource) // Create 10 new blocks. 9 of them (non compacted) should be actually uploaded. var ( @@ -218,7 +215,6 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) { testutil.Ok(t, os.Mkdir(tmp, 0777)) meta := metadata.Meta{ - Version: 1, BlockMeta: tsdb.BlockMeta{ Version: 1, ULID: id, diff --git a/pkg/shipper/shipper_test.go b/pkg/shipper/shipper_test.go index 150b5264db..81053640fd 100644 --- a/pkg/shipper/shipper_test.go +++ b/pkg/shipper/shipper_test.go @@ -40,11 +40,11 @@ func TestShipperTimestamps(t *testing.T) { id1 := ulid.MustNew(1, nil) testutil.Ok(t, os.Mkdir(path.Join(dir, id1.String()), os.ModePerm)) testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id1.String()), &metadata.Meta{ - Version: 1, BlockMeta: tsdb.BlockMeta{ ULID: id1, MaxTime: 2000, MinTime: 1000, + Version: 1, }, })) mint, maxt, err = s.Timestamps() @@ -55,11 +55,11 @@ func TestShipperTimestamps(t *testing.T) { id2 := ulid.MustNew(2, nil) testutil.Ok(t, os.Mkdir(path.Join(dir, id2.String()), os.ModePerm)) testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id2.String()), &metadata.Meta{ - Version: 1, BlockMeta: tsdb.BlockMeta{ ULID: id2, MaxTime: 4000, MinTime: 2000, + Version: 1, }, })) mint, maxt, err = s.Timestamps() diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index a4e1ef734e..c6db3ec81c 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -963,10 +963,7 @@ func (s *bucketBlockSet) add(b *bucketBlock) error { s.blocks[i] = bs sort.Slice(bs, func(j, k int) bool { - if bs[j].meta.MinTime < bs[k].meta.MinTime { - return true - } - return false + return bs[j].meta.MinTime < bs[k].meta.MinTime }) return nil } @@ -1064,7 +1061,6 @@ type bucketBlock struct { indexVersion int symbols map[uint32]string - symbolsV2 map[string]struct{} lvals map[string][]string postings map[labels.Label]index.Range @@ -1199,12 +1195,12 @@ func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([] return c, nil } -func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length int64) ([]byte, error) { +func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length int64) (*[]byte, error) { c, err := b.chunkPool.Get(int(length)) if err != nil { return nil, errors.Wrap(err, "allocate chunk bytes") } - buf := bytes.NewBuffer(c) + buf := bytes.NewBuffer(*c) r, err := b.bucket.GetRange(ctx, b.chunkObjs[seq], off, length) if err != nil { @@ -1215,7 +1211,8 @@ func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length i if _, err = io.Copy(buf, r); err != nil { return nil, errors.Wrap(err, "read range") } - return buf.Bytes(), nil + internalBuf := buf.Bytes() + return &internalBuf, nil } func (b *bucketBlock) indexReader(ctx context.Context) *bucketIndexReader { @@ -1641,16 +1638,13 @@ func (r *bucketIndexReader) LoadedSeries(ref uint64, lset *labels.Labels, chks * // LabelValues returns label values for single name. func (r *bucketIndexReader) LabelValues(name string) []string { res := make([]string, 0, len(r.block.lvals[name])) - for _, v := range r.block.lvals[name] { - res = append(res, v) - } - return res + return append(res, r.block.lvals[name]...) } // LabelNames returns a list of label names. func (r *bucketIndexReader) LabelNames() []string { res := make([]string, 0, len(r.block.lvals)) - for ln, _ := range r.block.lvals { + for ln := range r.block.lvals { res = append(res, ln) } return res @@ -1672,7 +1666,7 @@ type bucketChunkReader struct { chunks map[uint64]chunkenc.Chunk // Byte slice to return to the chunk pool on close. - chunkBytes [][]byte + chunkBytes []*[]byte } func newBucketChunkReader(ctx context.Context, block *bucketBlock) *bucketChunkReader { @@ -1760,7 +1754,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq i r.stats.chunksFetchedSizeSum += int(end - start) for _, o := range offs { - cb := b[o-start:] + cb := (*b)[o-start:] l, n := binary.Uvarint(cb) if n < 1 { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index d5f436a65b..8a08bbd33f 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -77,7 +77,6 @@ func TestBucketBlock_Property(t *testing.T) { return false } else if len(res) > 1 { mint := int64(21001) - maxt := int64(0) for i := 0; i < len(res)-1; i++ { if res[i].meta.Thanos.Downsample.Resolution > maxResolution { return false @@ -88,16 +87,10 @@ func TestBucketBlock_Property(t *testing.T) { if res[i].meta.MinTime < mint { mint = res[i].meta.MinTime } - if res[i].meta.MaxTime > maxt { - maxt = res[i].meta.MaxTime - } } if res[len(res)-1].meta.MinTime < mint { mint = res[len(res)-1].meta.MinTime } - if res[len(res)-1].meta.MaxTime > maxt { - maxt = res[len(res)-1].meta.MaxTime - } if low < mint { return false } diff --git a/pkg/store/gate.go b/pkg/store/gate.go index 0b6d00de1a..cdb9ea3712 100644 --- a/pkg/store/gate.go +++ b/pkg/store/gate.go @@ -43,7 +43,7 @@ func NewGate(maxConcurrent int, reg prometheus.Registerer) *Gate { func (g *Gate) IsMyTurn(ctx context.Context) error { start := time.Now() defer func() { - g.gateTiming.Observe(float64(time.Now().Sub(start))) + g.gateTiming.Observe(float64(time.Since(start))) }() if err := g.g.Start(ctx); err != nil { diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 12cff5d085..d23b9a5b31 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -110,16 +110,17 @@ func (p *PrometheusStore) Info(ctx context.Context, r *storepb.InfoRequest) (*st return res, nil } -func (p *PrometheusStore) getBuffer() []byte { +func (p *PrometheusStore) getBuffer() *[]byte { b := p.buffers.Get() if b == nil { - return make([]byte, 0, 32*1024) // 32KB seems like a good minimum starting size. + buf := make([]byte, 0, 32*1024) // 32KB seems like a good minimum starting size. + return &buf } - return b.([]byte) + return b.(*[]byte) } -func (p *PrometheusStore) putBuffer(b []byte) { - p.buffers.Put(b[:0]) +func (p *PrometheusStore) putBuffer(b *[]byte) { + p.buffers.Put(b) } // Series returns all series for a requested time range and label matcher. @@ -257,23 +258,24 @@ func (p *PrometheusStore) promSeries(ctx context.Context, q prompb.Query) (*prom return nil, errors.Errorf("request failed with code %s", presp.Status) } - buf := bytes.NewBuffer(p.getBuffer()) - defer func() { - p.putBuffer(buf.Bytes()) - }() + c := p.getBuffer() + buf := bytes.NewBuffer(*c) + defer p.putBuffer(c) if _, err := io.Copy(buf, presp.Body); err != nil { return nil, errors.Wrap(err, "copy response") } + spanSnappyDecode, ctx := tracing.StartSpan(ctx, "decompress_response") - decomp, err := snappy.Decode(p.getBuffer(), buf.Bytes()) + sc := p.getBuffer() + decomp, err := snappy.Decode(*sc, buf.Bytes()) spanSnappyDecode.Finish() - defer p.putBuffer(decomp) + defer p.putBuffer(sc) if err != nil { return nil, errors.Wrap(err, "decompress response") } var data prompb.ReadResponse - spanUnmarshal, ctx := tracing.StartSpan(ctx, "unmarshal_response") + spanUnmarshal, _ := tracing.StartSpan(ctx, "unmarshal_response") if err := proto.Unmarshal(decomp, &data); err != nil { return nil, errors.Wrap(err, "unmarshal response") } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 9f528c27a9..c99b21d867 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -539,7 +539,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ store := st g.Go(func() error { resp, err := store.LabelValues(gctx, &storepb.LabelValuesRequest{ - Label: r.Label, + Label: r.Label, PartialResponseDisabled: r.PartialResponseDisabled, }) if err != nil { diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 40322bf187..74915d7bb4 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -677,7 +677,7 @@ func TestProxyStore_LabelValues(t *testing.T) { ctx := context.Background() req := &storepb.LabelValuesRequest{ - Label: "a", + Label: "a", PartialResponseDisabled: true, } resp, err := q.LabelValues(ctx, req) diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index 1366377d2c..ce6ccb8659 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -162,7 +162,7 @@ func (s *mergedSeriesSet) Next() bool { } func LabelsToPromLabels(lset []Label) labels.Labels { - ret := make(labels.Labels, len(lset), len(lset)) + ret := make(labels.Labels, len(lset)) for i, l := range lset { ret[i] = labels.Label{Name: l.Name, Value: l.Value} } diff --git a/pkg/tracing/http.go b/pkg/tracing/http.go index 3169246387..9c17496d6d 100644 --- a/pkg/tracing/http.go +++ b/pkg/tracing/http.go @@ -42,7 +42,6 @@ func HTTPMiddleware(tracer opentracing.Tracer, name string, logger log.Logger, n next.ServeHTTP(w, r.WithContext(opentracing.ContextWithSpan(ContextWithTracer(r.Context(), tracer), span))) span.Finish() - return } } diff --git a/pkg/tracing/stackdriver/tracer.go b/pkg/tracing/stackdriver/tracer.go index 90d8589c59..6664af62a0 100644 --- a/pkg/tracing/stackdriver/tracer.go +++ b/pkg/tracing/stackdriver/tracer.go @@ -6,11 +6,10 @@ import ( "io" "os" - "github.com/improbable-eng/thanos/pkg/tracing" - trace "cloud.google.com/go/trace/apiv1" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/improbable-eng/thanos/pkg/tracing" gcloudtracer "github.com/lovoo/gcloud-opentracing" "github.com/opentracing/basictracer-go" "github.com/opentracing/opentracing-go" diff --git a/pkg/ui/query.go b/pkg/ui/query.go index 70ada91e10..3bc5d243ea 100644 --- a/pkg/ui/query.go +++ b/pkg/ui/query.go @@ -18,8 +18,6 @@ import ( "github.com/prometheus/common/version" ) -var localhostRepresentations = []string{"127.0.0.1", "localhost"} - type Query struct { *BaseUI storeSet *query.StoreSet diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 463bdda12c..be3708de4c 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -2,11 +2,9 @@ package e2e_test import ( "context" - "os" "testing" "time" - "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/promclient" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/testutil" @@ -130,10 +128,6 @@ func testReceive(t *testing.T, conf receiveTestConfig) { var res model.Vector - w := log.NewSyncWriter(os.Stderr) - l := log.NewLogfmtLogger(w) - l = log.With(l, "conf-name", conf.name) - // Query without deduplication so we can check what replica the // time series ended up on. testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {