Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

querier: Moved query range API to middlewares reusing Cortex cache. #1039

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func main() {
prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}),
)

prometheus.DefaultRegisterer = metrics
prometheus.DefaultRegisterer = prometheus.WrapRegistererWithPrefix("thanos_", metrics)
// Memberlist uses go-metrics
sink, err := gprom.NewPrometheusSink()
if err != nil {
Expand Down
71 changes: 66 additions & 5 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path"
"time"

"github.com/cortexproject/cortex/pkg/querier/frontend/queryrange"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/improbable-eng/thanos/pkg/extprom"
"github.com/improbable-eng/thanos/pkg/query"
v1 "github.com/improbable-eng/thanos/pkg/query/api"
"github.com/improbable-eng/thanos/pkg/query/cacheclient"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/store"
"github.com/improbable-eng/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -67,6 +69,8 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node.").
Default("20").Int()

queryCacheResultConfig := regQuerierChunkFlags(cmd)

replicaLabel := cmd.Flag("query.replica-label", "Label to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter.").
String()

Expand Down Expand Up @@ -162,10 +166,25 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
time.Duration(*dnsSDInterval),
*dnsSDResolver,
time.Duration(*unhealthyStoreTimeout),
queryCacheResultConfig,
)
}
}

func regQuerierChunkFlags(cmd *kingpin.CmdClause) *pathOrContent {
fileFlag := cmd.Flag("query.cache-results-config-file", "Path to YAML file that contains cache results configuration. Leave empty to disable cache.").PlaceHolder("<cache.config-yaml-path>").String()
contentFlag := cmd.Flag("query.cache-results-config", "Path to YAML file that contains cache results configuration. Leave empty to disable cache.").PlaceHolder("<cache.config-yaml>").String()

return &pathOrContent{
fileFlagName: "query.cache-results-config-file",
contentFlagName: "query.cache-results-config",
required: false,

path: fileFlag,
content: contentFlag,
}
}

func storeClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert string, serverName string) ([]grpc.DialOption, error) {
grpcMets := grpc_prometheus.NewClientMetrics()
grpcMets.EnableClientHandlingTimeHistogram(
Expand Down Expand Up @@ -280,6 +299,7 @@ func runQuery(
dnsSDInterval time.Duration,
dnsSDResolver string,
unhealthyStoreTimeout time.Duration,
queryCacheResultConfig *pathOrContent,
) error {
// TODO(bplotka in PR #513 review): Move arguments into struct.
duplicatedStores := prometheus.NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -327,9 +347,8 @@ func runQuery(
dialOpts,
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
engine = promql.NewEngine(
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout)
engine = promql.NewEngine(
promql.EngineOpts{
Logger: logger,
Reg: reg,
Expand Down Expand Up @@ -434,9 +453,51 @@ func runQuery(

ui.NewQueryUI(logger, stores, flagsMap).Register(router.WithPrefix(webRoutePrefix))

api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse)
partialResponseStrategy := storepb.PartialResponseStrategy_ABORT
if enablePartialResponse {
partialResponseStrategy = storepb.PartialResponseStrategy_WARN
}

// Setup query_range.
chunkCacheCfg, err := queryCacheResultConfig.Content()
if err != nil {
return errors.Wrap(err, "chunk cache cfg")
}

var extraMiddlewares []queryrange.Middleware
var limits queryrange.Limits // TODO: Implement.
if len(chunkCacheCfg) > 0 {
chunkCacheClient, err := cacheclient.NewChunk(logger, chunkCacheCfg, reg)
if err != nil {
return errors.Wrap(err, "create chunk cache client")
}

extraMiddlewares = append(extraMiddlewares, queryrange.StepAlignMiddleware)
extraMiddlewares = append(extraMiddlewares, queryrange.SplitByDayMiddleware(limits))
// 1 minute is for most recent allowed cacheable result, to prevent caching very recent results that might still be in flux.
m, err := queryrange.NewResultsCacheMiddleware(logger, chunkCacheClient, 1*time.Minute, limits)
if err != nil {
return errors.Wrap(err, "create result cache middleware")
}
extraMiddlewares = append(extraMiddlewares, m)
}

queryAPI := query.NewAPI(engine, replicaLabel, proxy)
api := v1.NewAPI(
logger,
reg,
queryAPI,
queryAPI,
proxy,
query.Options{
Deduplicate: true,
PartialResponseStrategy: partialResponseStrategy,
},
enableAutodownsampling,
extraMiddlewares...,
)

api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger)
api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer)

router.Get("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
Expand Down
22 changes: 18 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
module github.com/improbable-eng/thanos

replace github.com/cortexproject/cortex => github.com/bwplotka/cortex v0.0.0-20190416102825-ae4098d14a9c
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


require (
cloud.google.com/go v0.34.0
github.com/Azure/azure-storage-blob-go v0.0.0-20181022225951-5152f14ace1c
github.com/NYTimes/gziphandler v1.1.1
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da
github.com/blang/semver v3.5.1+incompatible // indirect
github.com/bradfitz/gomemcache v0.0.0-20190329173943-551aad21a668 // indirect
github.com/cortexproject/cortex v0.0.0
github.com/fatih/structtag v1.0.0
github.com/fortytw2/leaktest v1.2.0
github.com/fsnotify/fsnotify v1.4.7
github.com/go-kit/kit v0.8.0
github.com/gobuffalo/envy v1.6.15 // indirect
github.com/gogo/protobuf v1.2.0
github.com/gohugoio/hugo v0.54.0 // indirect
github.com/gogo/status v1.1.0 // indirect
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
github.com/google/martian v2.1.0+incompatible // indirect
github.com/googleapis/gax-go v2.0.2+incompatible // indirect
github.com/gophercloud/gophercloud v0.0.0-20181206160319-9d88c34913a9
github.com/gorilla/mux v1.7.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20181025070259-68e3a13e4117
github.com/hashicorp/go-sockaddr v0.0.0-20180320115054-6d291a969b86
Expand All @@ -30,18 +35,27 @@ require (
github.com/oklog/run v1.0.0
github.com/oklog/ulid v1.3.1
github.com/olekukonko/tablewriter v0.0.1
github.com/opentracing-contrib/go-grpc v0.0.0-20180928155321-4b5a12d3ff02 // indirect
github.com/opentracing-contrib/go-stdlib v0.0.0-20170113013457-1de4cc2120e7
github.com/opentracing/basictracer-go v1.0.0
github.com/opentracing/opentracing-go v1.0.2
github.com/opentracing/opentracing-go v1.1.0
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.9.2
github.com/prometheus/common v0.0.0-20181218105931-67670fe90761
github.com/prometheus/prometheus v0.0.0-20190118110214-3bd41cc92c78
github.com/prometheus/tsdb v0.4.0
github.com/sercand/kuberesolver v2.1.0+incompatible // indirect
github.com/sirupsen/logrus v1.4.1 // indirect
github.com/uber-go/atomic v1.3.2 // indirect
github.com/uber/jaeger-client-go v2.16.0+incompatible // indirect
github.com/uber/jaeger-lib v2.0.0+incompatible // indirect
github.com/weaveworks/common v0.0.0-20190403142338-51eb0fb475a6 // indirect
github.com/weaveworks/promrus v1.2.0 // indirect
go.opencensus.io v0.19.0 // indirect
go.uber.org/atomic v1.3.2 // indirect
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd
golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 // indirect
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2
google.golang.org/api v0.1.0
Expand Down
Loading