From 0f6df8bd0805d00bc72960ce2b7b3b7fceb8abb7 Mon Sep 17 00:00:00 2001 From: Xiang Dai <764524258@qq.com> Date: Thu, 27 Feb 2020 02:48:32 +0800 Subject: [PATCH] Added `thanos bucket replicate` (#2113) * bucket: Implement replcate Signed-off-by: Xiang Dai <764524258@qq.com> * Document replicate Signed-off-by: Xiang Dai <764524258@qq.com> * feedback Signed-off-by: Xiang Dai <764524258@qq.com> * feedback Signed-off-by: Xiang Dai <764524258@qq.com> * remove version check Signed-off-by: Xiang Dai <764524258@qq.com> * update CHANGLOG Signed-off-by: Xiang Dai <764524258@qq.com> * add chan for SIGHUP Signed-off-by: Xiang Dai <764524258@qq.com> * add existence check Signed-off-by: Xiang Dai <764524258@qq.com> * Add mixin Signed-off-by: Xiang Dai <764524258@qq.com> * rename as replicate Signed-off-by: Xiang Dai <764524258@qq.com> * feedback Signed-off-by: Xiang Dai <764524258@qq.com> * update mixin Signed-off-by: Xiang Dai <764524258@qq.com> * update CHANGLOG Signed-off-by: Xiang Dai <764524258@qq.com> * add bucket prefix Signed-off-by: Xiang Dai <764524258@qq.com> --- CHANGELOG.md | 1 + cmd/thanos/bucket.go | 47 ++ docs/components/bucket.md | 74 +++ examples/alerts/alerts.md | 42 ++ examples/alerts/alerts.yaml | 36 ++ examples/alerts/rules.yaml | 2 + examples/dashboards/bucket_replicate.json | 515 ++++++++++++++++++ examples/dashboards/dashboards.md | 1 + mixin/thanos/README.md | 5 + mixin/thanos/alerts/alerts.libsonnet | 3 +- .../thanos/alerts/bucket_replicate.libsonnet | 63 +++ .../dashboards/bucket_replicate.libsonnet | 57 ++ mixin/thanos/dashboards/dashboards.libsonnet | 1 + mixin/thanos/defaults.libsonnet | 5 + mixin/thanos/rules/bucket_replicate.libsonnet | 15 + mixin/thanos/rules/rules.libsonnet | 3 +- pkg/component/component.go | 1 + pkg/replicate/replicater.go | 214 ++++++++ pkg/replicate/scheme.go | 384 +++++++++++++ pkg/replicate/scheme_test.go | 332 +++++++++++ scripts/genflagdocs.sh | 2 +- 21 files changed, 1800 insertions(+), 3 deletions(-) create mode 100644 examples/dashboards/bucket_replicate.json create mode 100644 mixin/thanos/alerts/bucket_replicate.libsonnet create mode 100644 mixin/thanos/dashboards/bucket_replicate.libsonnet create mode 100644 mixin/thanos/rules/bucket_replicate.libsonnet create mode 100644 pkg/replicate/replicater.go create mode 100644 pkg/replicate/scheme.go create mode 100644 pkg/replicate/scheme_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index eccab526cf..26ccc79e8b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2049](https://github.com/thanos-io/thanos/pull/2049) Tracing: Support sampling on Elastic APM with new sample_rate setting. - [#2008](https://github.com/thanos-io/thanos/pull/2008) Querier, Receiver, Sidecar, Store: Add gRPC [health check](https://github.com/grpc/grpc/blob/master/doc/health-checking.md) endpoints. - [#2145](https://github.com/thanos-io/thanos/pull/2145) Tracing: track query sent to prometheus via remote read api. +- [#2113](https://github.com/thanos-io/thanos/pull/2113) Bucket: Added `thanos bucket replicate`. ### Changed diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index 6967229b05..e922057e96 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -9,6 +9,7 @@ import ( "fmt" "os" "sort" + "strconv" "strings" "text/template" "time" @@ -26,6 +27,7 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" + "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/extprom" @@ -33,6 +35,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" + "github.com/thanos-io/thanos/pkg/replicate" "github.com/thanos-io/thanos/pkg/runutil" httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/ui" @@ -69,6 +72,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin registerBucketLs(m, cmd, name, objStoreConfig) registerBucketInspect(m, cmd, name, objStoreConfig) registerBucketWeb(m, cmd, name, objStoreConfig) + registerBucketReplicate(m, cmd, name, objStoreConfig) } func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *extflag.PathOrContent) { @@ -377,6 +381,49 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str } } +// Provide a list of resolution, can not use Enum directly, since string does not implement int64 function. +func listResLevel() []string { + return []string{ + strconv.FormatInt(downsample.ResLevel0, 10), + strconv.FormatInt(downsample.ResLevel1, 10), + strconv.FormatInt(downsample.ResLevel2, 10)} +} + +func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *extflag.PathOrContent) { + cmd := root.Command("replicate", fmt.Sprintf("Replicate data from one object storage to another. NOTE: Currently it works only with Thanos blocks (%v has to have Thanos metadata).", block.MetaFilename)) + httpBindAddr, httpGracePeriod := regHTTPFlags(cmd) + toObjStoreConfig := regCommonObjStoreFlags(cmd, "-to", false, "The object storage which replicate data to.") + // TODO(bwplotka): Allow to replicate many resolution levels. + resolution := cmd.Flag("resolution", "Only blocks with this resolution will be replicated.").Default(strconv.FormatInt(downsample.ResLevel0, 10)).HintAction(listResLevel).Int64() + // TODO(bwplotka): Allow to replicate many compaction levels. + compaction := cmd.Flag("compaction", "Only blocks with this compaction level will be replicated.").Default("1").Int() + matcherStrs := cmd.Flag("matcher", "Only blocks whose external labels exactly match this matcher will be replicated.").PlaceHolder("key=\"value\"").Strings() + singleRun := cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").Bool() + + m[name+" replicate"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { + matchers, err := replicate.ParseFlagMatchers(*matcherStrs) + if err != nil { + return errors.Wrap(err, "parse block label matchers") + } + + return replicate.RunReplicate( + g, + logger, + reg, + tracer, + *httpBindAddr, + time.Duration(*httpGracePeriod), + matchers, + compact.ResolutionLevel(*resolution), + *compaction, + objStoreConfig, + toObjStoreConfig, + *singleRun, + ) + } + +} + // refresh metadata from remote storage periodically and update UI. func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, name string, reg *prometheus.Registry, objStoreConfig *extflag.PathOrContent) error { confContentYaml, err := objStoreConfig.Content() diff --git a/docs/components/bucket.md b/docs/components/bucket.md index 20efeeea47..3dc3c7aaf9 100644 --- a/docs/components/bucket.md +++ b/docs/components/bucket.md @@ -74,6 +74,10 @@ Subcommands: bucket web [] Web interface for remote storage bucket + bucket replicate [] + Replicate data from one object storage to another. NOTE: Currently it works + only with Thanos blocks (meta.json has to have Thanos metadata). + ``` @@ -315,3 +319,73 @@ Flags: --timeout=5m Timeout to download metadata from remote storage ``` + +### replicate + +`bucket replicate` is used to replicate buckets from one object storage to another. + +NOTE: Currently it works only with Thanos blocks (meta.json has to have Thanos metadata). + +Example: +``` +$ thanos bucket replicate --objstore.config-file="..." --objstore-to.config="..." +``` + +[embedmd]:# (flags/bucket_replicate.txt) +```txt +usage: thanos bucket replicate [] + +Replicate data from one object storage to another. NOTE: Currently it works only +with Thanos blocks (meta.json has to have Thanos metadata). + +Flags: + -h, --help Show context-sensitive help (also try + --help-long and --help-man). + --version Show application version. + --log.level=info Log filtering level. + --log.format=logfmt Log format to use. Possible options: logfmt or + json. + --tracing.config-file= + Path to YAML file with tracing configuration. + See format details: + https://thanos.io/tracing.md/#configuration + --tracing.config= + Alternative to 'tracing.config-file' flag + (lower priority). Content of YAML file with + tracing configuration. See format details: + https://thanos.io/tracing.md/#configuration + --objstore.config-file= + Path to YAML file that contains object store + configuration. See format details: + https://thanos.io/storage.md/#configuration + --objstore.config= + Alternative to 'objstore.config-file' flag + (lower priority). Content of YAML file that + contains object store configuration. See format + details: + https://thanos.io/storage.md/#configuration + --http-address="0.0.0.0:10902" + Listen host:port for HTTP endpoints. + --http-grace-period=2m Time to wait after an interrupt received for + HTTP Server. + --objstore-to.config-file= + Path to YAML file that contains object store-to + configuration. See format details: + https://thanos.io/storage.md/#configuration The + object storage which replicate data to. + --objstore-to.config= + Alternative to 'objstore-to.config-file' flag + (lower priority). Content of YAML file that + contains object store-to configuration. See + format details: + https://thanos.io/storage.md/#configuration The + object storage which replicate data to. + --resolution=0 Only blocks with this resolution will be + replicated. + --compaction=1 Only blocks with this compaction level will be + replicated. + --matcher=key="value" ... Only blocks whose external labels exactly match + this matcher will be replicated. + --single-run Run replication only one time, then exit. + +``` diff --git a/examples/alerts/alerts.md b/examples/alerts/alerts.md index 0465effb32..39b4c57bfa 100644 --- a/examples/alerts/alerts.md +++ b/examples/alerts/alerts.md @@ -428,6 +428,48 @@ rules: severity: warning ``` +## Replicate + +[embedmd]:# (../tmp/thanos-bucket-replicate.rules.yaml yaml) +```yaml +name: thanos-bucket-replicate.rules +rules: +- alert: ThanosBucketReplicateIsDown + annotations: + message: Thanos Replicate has disappeared from Prometheus target discovery. + expr: | + absent(up{job=~"thanos-bucket-replicate.*"}) + for: 5m + labels: + severity: critical +- alert: ThanosBucketReplicateErrorRate + annotations: + message: Thanos Replicate failing to run, {{ $value | humanize }}% of attempts + failed. + expr: | + ( + sum(rate(thanos_replicate_replication_runs_total{result="error", job=~"thanos-bucket-replicate.*"}[5m])) + / on (namespace) group_left + sum(rate(thanos_replicate_replication_runs_total{job=~"thanos-bucket-replicate.*"}[5m])) + ) * 100 >= 10 + for: 5m + labels: + severity: critical +- alert: ThanosBucketReplicateRunLatency + annotations: + message: Thanos Replicate {{$labels.job}} has a 99th percentile latency of {{ + $value }} seconds for the replicate operations. + expr: | + ( + histogram_quantile(0.9, sum by (job, le) (thanos_replicate_replication_run_duration_seconds_bucket{job=~"thanos-bucket-replicate.*"})) > 120 + and + sum by (job) (rate(thanos_replicate_replication_run_duration_seconds_bucket{job=~"thanos-bucket-replicate.*"}[5m])) > 0 + ) + for: 5m + labels: + severity: critical +``` + ## Extras ### Absent Rules diff --git a/examples/alerts/alerts.yaml b/examples/alerts/alerts.yaml index a6a14bd8a1..400414da89 100644 --- a/examples/alerts/alerts.yaml +++ b/examples/alerts/alerts.yaml @@ -439,3 +439,39 @@ groups: for: 5m labels: severity: critical +- name: thanos-bucket-replicate.rules + rules: + - alert: ThanosBucketReplicateIsDown + annotations: + message: Thanos Replicate has disappeared from Prometheus target discovery. + expr: | + absent(up{job=~"thanos-bucket-replicate.*"}) + for: 5m + labels: + severity: critical + - alert: ThanosBucketReplicateErrorRate + annotations: + message: Thanos Replicate failing to run, {{ $value | humanize }}% of attempts + failed. + expr: | + ( + sum(rate(thanos_replicate_replication_runs_total{result="error", job=~"thanos-bucket-replicate.*"}[5m])) + / on (namespace) group_left + sum(rate(thanos_replicate_replication_runs_total{job=~"thanos-bucket-replicate.*"}[5m])) + ) * 100 >= 10 + for: 5m + labels: + severity: critical + - alert: ThanosBucketReplicateRunLatency + annotations: + message: Thanos Replicate {{$labels.job}} has a 99th percentile latency of {{ + $value }} seconds for the replicate operations. + expr: | + ( + histogram_quantile(0.9, sum by (job, le) (thanos_replicate_replication_run_duration_seconds_bucket{job=~"thanos-bucket-replicate.*"})) > 120 + and + sum by (job) (rate(thanos_replicate_replication_run_duration_seconds_bucket{job=~"thanos-bucket-replicate.*"}[5m])) > 0 + ) + for: 5m + labels: + severity: critical diff --git a/examples/alerts/rules.yaml b/examples/alerts/rules.yaml index 9049262121..7e847fcbe4 100644 --- a/examples/alerts/rules.yaml +++ b/examples/alerts/rules.yaml @@ -121,3 +121,5 @@ groups: labels: quantile: "0.99" record: :thanos_objstore_bucket_operation_duration_seconds:histogram_quantile +- name: thanos-bucket-replicate.rules + rules: [] diff --git a/examples/dashboards/bucket_replicate.json b/examples/dashboards/bucket_replicate.json new file mode 100644 index 0000000000..daf3c83aa2 --- /dev/null +++ b/examples/dashboards/bucket_replicate.json @@ -0,0 +1,515 @@ +{ + "annotations": { + "list": [ ] + }, + "editable": true, + "gnetId": null, + "graphTooltip": 0, + "hideControls": false, + "links": [ ], + "refresh": "10s", + "rows": [ + { + "collapse": false, + "height": "250px", + "panels": [ + { + "aliasColors": { + "error": "#E24D42" + }, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$datasource", + "fill": 10, + "id": 1, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 0, + "links": [ ], + "nullPointMode": "null as zero", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [ ], + "spaceLength": 10, + "span": 4, + "stack": true, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(thanos_replicate_replication_runs_total{result=\"error\", namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval])) / sum(rate(thanos_replicate_replication_runs_total{namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "error", + "refId": "A", + "step": 10 + } + ], + "thresholds": [ ], + "timeFrom": null, + "timeShift": null, + "title": "Rate", + "tooltip": { + "shared": false, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [ ] + }, + "yaxes": [ + { + "format": "percentunit", + "label": null, + "logBase": 1, + "max": null, + "min": 0, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ] + }, + { + "aliasColors": { }, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$datasource", + "description": "Shows rate of errors.", + "fill": 10, + "id": 2, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 0, + "links": [ ], + "nullPointMode": "null as zero", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [ ], + "spaceLength": 10, + "span": 4, + "stack": true, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(thanos_replicate_replication_runs_total{result=\"error\", namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval])) by (result)", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{result}}", + "legendLink": null, + "step": 10 + } + ], + "thresholds": [ ], + "timeFrom": null, + "timeShift": null, + "title": "Errors", + "tooltip": { + "shared": false, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [ ] + }, + "yaxes": [ + { + "format": "percentunit", + "label": null, + "logBase": 1, + "max": null, + "min": 0, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ] + }, + { + "aliasColors": { }, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$datasource", + "description": "Shows how long has it taken to run a replication cycle.", + "fill": 1, + "id": 3, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [ ], + "nullPointMode": "null as zero", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [ ], + "spaceLength": 10, + "span": 4, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.99, sum(rate(thanos_replicate_replication_run_duration_seconds_bucket{result=\"success\", namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval])) by (job, le)) * 1", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "P99 {{job}}", + "refId": "A", + "step": 10 + }, + { + "expr": "sum(rate(thanos_replicate_replication_run_duration_seconds_sum{result=\"success\", namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval])) by (job) * 1 / sum(rate(thanos_replicate_replication_run_duration_seconds_count{result=\"success\", namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval])) by (job)", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "mean {{job}}", + "refId": "B", + "step": 10 + }, + { + "expr": "histogram_quantile(0.50, sum(rate(thanos_replicate_replication_run_duration_seconds_bucket{result=\"success\", namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval])) by (job, le)) * 1", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "P50 {{job}}", + "refId": "C", + "step": 10 + } + ], + "thresholds": [ ], + "timeFrom": null, + "timeShift": null, + "title": "Duration", + "tooltip": { + "shared": false, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [ ] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": 0, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ] + } + ], + "repeat": null, + "repeatIteration": null, + "repeatRowId": null, + "showTitle": true, + "title": "Bucket Replicate Runs", + "titleSize": "h6" + }, + { + "collapse": false, + "height": "250px", + "panels": [ + { + "aliasColors": { }, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$datasource", + "fill": 1, + "id": 4, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [ ], + "nullPointMode": "null as zero", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [ ], + "spaceLength": 10, + "span": 12, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(thanos_replicate_origin_iterations_total{namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "iterations", + "legendLink": null, + "step": 10 + }, + { + "expr": "sum(rate(thanos_replicate_origin_meta_loads_total{namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "meta loads", + "legendLink": null, + "step": 10 + }, + { + "expr": "sum(rate(thanos_replicate_origin_partial_meta_reads_total{namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "partial meta reads", + "legendLink": null, + "step": 10 + }, + { + "expr": "sum(rate(thanos_replicate_blocks_already_replicated_total{namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "already replicated blocks", + "legendLink": null, + "step": 10 + }, + { + "expr": "sum(rate(thanos_replicate_blocks_replicated_total{namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "replicated blocks", + "legendLink": null, + "step": 10 + }, + { + "expr": "sum(rate(thanos_replicate_objects_replicated_total{namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "replicated objects", + "legendLink": null, + "step": 10 + } + ], + "thresholds": [ ], + "timeFrom": null, + "timeShift": null, + "title": "Metrics", + "tooltip": { + "shared": false, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [ ] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": 0, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ] + } + ], + "repeat": null, + "repeatIteration": null, + "repeatRowId": null, + "showTitle": true, + "title": "Bucket Replication", + "titleSize": "h6" + } + ], + "schemaVersion": 14, + "style": "dark", + "tags": [ + "thanos-mixin" + ], + "templating": { + "list": [ + { + "current": { + "text": "Prometheus", + "value": "Prometheus" + }, + "hide": 0, + "label": null, + "name": "datasource", + "options": [ ], + "query": "prometheus", + "refresh": 1, + "regex": "", + "type": "datasource" + }, + { + "allValue": null, + "current": { }, + "datasource": "$datasource", + "hide": 0, + "includeAll": false, + "label": "namespace", + "multi": false, + "name": "namespace", + "options": [ ], + "query": "label_values(kube_pod_info{}, namespace)", + "refresh": 1, + "regex": "", + "sort": 2, + "tagValuesQuery": "", + "tags": [ ], + "tagsQuery": "", + "type": "query", + "useTags": false + }, + { + "allValue": "thanos-bucket-replicate.*", + "current": { + "text": "all", + "value": "$__all" + }, + "datasource": "$datasource", + "hide": 0, + "includeAll": true, + "label": "job", + "multi": false, + "name": "job", + "options": [ ], + "query": "label_values(up{namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}, job)", + "refresh": 1, + "regex": "", + "sort": 2, + "tagValuesQuery": "", + "tags": [ ], + "tagsQuery": "", + "type": "query", + "useTags": false + }, + { + "auto": true, + "auto_count": 300, + "auto_min": "10s", + "current": { + "text": "5m", + "value": "5m" + }, + "hide": 0, + "label": "interval", + "name": "interval", + "query": "5m,10m,30m,1h,6h,12h", + "refresh": 2, + "type": "interval" + } + ] + }, + "time": { + "from": "now-1h", + "to": "now" + }, + "timepicker": { + "refresh_intervals": [ + "5s", + "10s", + "30s", + "1m", + "5m", + "15m", + "30m", + "1h", + "2h", + "1d" + ], + "time_options": [ + "5m", + "15m", + "1h", + "6h", + "12h", + "24h", + "2d", + "7d", + "30d" + ] + }, + "timezone": "", + "title": "Thanos / BucketReplicate", + "uid": "49f644ecf8e31dd1a5084ae2a5f10e80", + "version": 0 +} diff --git a/examples/dashboards/dashboards.md b/examples/dashboards/dashboards.md index df12f49bd5..cc6cf091a2 100644 --- a/examples/dashboards/dashboards.md +++ b/examples/dashboards/dashboards.md @@ -9,6 +9,7 @@ There exists Grafana dashboards for each component (not all of them complete) ta - [Thanos Receiver](thanos-receiver.json) - [Thanos Sidecar](thanos-sidecar.json) - [Thanos Ruler](thanos-ruler.json) +- [Thanos Replicate](thanos-bucket-replicate.json) You can import them via `Import -> Paste JSON` in Grafana. These dashboards require Grafana 5 or above, importing them in older versions are known not to work. diff --git a/mixin/thanos/README.md b/mixin/thanos/README.md index 946e7a916d..7f35050e50 100644 --- a/mixin/thanos/README.md +++ b/mixin/thanos/README.md @@ -89,6 +89,11 @@ This project is intended to be used as a library. You can extend and customize d selector: 'job=~"%s.*"' % self.jobPrefix, title: '%(prefix)sSidecar' % $.dashboard.prefix, }, + bucket_replicate+:: { + jobPrefix: 'thanos-bucket-replicate', + selector: 'job=~"%s.*"' % self.jobPrefix, + title: '%(prefix)sBucketReplicate' % $.dashboard.prefix, + }, overview+:: { title: '%(prefix)sOverview' % $.dashboard.prefix, }, diff --git a/mixin/thanos/alerts/alerts.libsonnet b/mixin/thanos/alerts/alerts.libsonnet index fa0a9d3d36..5067ec9332 100644 --- a/mixin/thanos/alerts/alerts.libsonnet +++ b/mixin/thanos/alerts/alerts.libsonnet @@ -4,4 +4,5 @@ (import 'sidecar.libsonnet') + (import 'store.libsonnet') + (import 'rule.libsonnet') + -(import 'absent.libsonnet') +(import 'absent.libsonnet') + +(import 'bucket_replicate.libsonnet') diff --git a/mixin/thanos/alerts/bucket_replicate.libsonnet b/mixin/thanos/alerts/bucket_replicate.libsonnet new file mode 100644 index 0000000000..fa6bdfbe27 --- /dev/null +++ b/mixin/thanos/alerts/bucket_replicate.libsonnet @@ -0,0 +1,63 @@ +{ + local thanos = self, + bucket_replicate+:: { + jobPrefix: error 'must provide job prefix for Thanos Bucket Replicate dashboard', + selector: error 'must provide selector for Thanos Bucket Replicate dashboard', + }, + prometheusAlerts+:: { + groups+: [ + { + name: 'thanos-bucket-replicate.rules', + rules: [ + { + alert: 'ThanosBucketReplicateIsDown', + expr: ||| + absent(up{%(selector)s}) + ||| % thanos.bucket_replicate, + 'for': '5m', + labels: { + severity: 'critical', + }, + annotations: { + message: 'Thanos Replicate has disappeared from Prometheus target discovery.', + }, + }, + { + alert: 'ThanosBucketReplicateErrorRate', + annotations: { + message: 'Thanos Replicate failing to run, {{ $value | humanize }}% of attempts failed.', + }, + expr: ||| + ( + sum(rate(thanos_replicate_replication_runs_total{result="error", %(selector)s}[5m])) + / on (namespace) group_left + sum(rate(thanos_replicate_replication_runs_total{%(selector)s}[5m])) + ) * 100 >= 10 + ||| % thanos.bucket_replicate, + 'for': '5m', + labels: { + severity: 'critical', + }, + }, + { + alert: 'ThanosBucketReplicateRunLatency', + annotations: { + message: 'Thanos Replicate {{$labels.job}} has a 99th percentile latency of {{ $value }} seconds for the replicate operations.', + }, + expr: ||| + ( + histogram_quantile(0.9, sum by (job, le) (thanos_replicate_replication_run_duration_seconds_bucket{%(selector)s})) > 120 + and + sum by (job) (rate(thanos_replicate_replication_run_duration_seconds_bucket{%(selector)s}[5m])) > 0 + ) + ||| % thanos.bucket_replicate, + 'for': '5m', + labels: { + severity: 'critical', + }, + }, + ], + }, + ], + }, +} diff --git a/mixin/thanos/dashboards/bucket_replicate.libsonnet b/mixin/thanos/dashboards/bucket_replicate.libsonnet new file mode 100644 index 0000000000..9e406f3833 --- /dev/null +++ b/mixin/thanos/dashboards/bucket_replicate.libsonnet @@ -0,0 +1,57 @@ +local g = import '../lib/thanos-grafana-builder/builder.libsonnet'; + +{ + local thanos = self, + bucket_replicate+:: { + jobPrefix: error 'must provide job prefix for Thanos Bucket Replicate dashboard', + selector: error 'must provide selector for Thanos Bucket Replicate dashboard', + title: error 'must provide title for Thanos Bucket Replicate dashboard', + }, + grafanaDashboards+:: { + 'bucket_replicate.json': + g.dashboard(thanos.bucket_replicate.title) + .addRow( + g.row('Bucket Replicate Runs') + .addPanel( + g.panel('Rate') + + g.qpsErrTotalPanel( + 'thanos_replicate_replication_runs_total{result="error", namespace="$namespace",%(selector)s}' % thanos.bucket_replicate, + 'thanos_replicate_replication_runs_total{namespace="$namespace",%(selector)s}' % thanos.bucket_replicate, + ) + ) + .addPanel( + g.panel('Errors', 'Shows rate of errors.') + + g.queryPanel( + 'sum(rate(thanos_replicate_replication_runs_total{result="error", namespace="$namespace",%(selector)s}[$interval])) by (result)' % thanos.bucket_replicate, + '{{result}}' + ) + + { yaxes: g.yaxes('percentunit') } + + g.stack + ) + .addPanel( + g.panel('Duration', 'Shows how long has it taken to run a replication cycle.') + + g.latencyPanel('thanos_replicate_replication_run_duration_seconds', 'result="success", namespace="$namespace",%(selector)s' % thanos.bucket_replicate) + ) + ) + .addRow( + g.row('Bucket Replication') + .addPanel( + g.panel('Metrics') + + g.queryPanel( + [ + 'sum(rate(thanos_replicate_origin_iterations_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate, + 'sum(rate(thanos_replicate_origin_meta_loads_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate, + 'sum(rate(thanos_replicate_origin_partial_meta_reads_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate, + 'sum(rate(thanos_replicate_blocks_already_replicated_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate, + 'sum(rate(thanos_replicate_blocks_replicated_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate, + 'sum(rate(thanos_replicate_objects_replicated_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate, + ], + ['iterations', 'meta loads', 'partial meta reads', 'already replicated blocks', 'replicated blocks', 'replicated objects'] + ) + ) + ) + + + g.template('namespace', 'kube_pod_info') + + g.template('job', 'up', 'namespace="$namespace",%(selector)s' % thanos.bucket_replicate, true, '%(jobPrefix)s.*' % thanos.bucket_replicate), + }, +} diff --git a/mixin/thanos/dashboards/dashboards.libsonnet b/mixin/thanos/dashboards/dashboards.libsonnet index 67fccfcae7..5bd99093f5 100644 --- a/mixin/thanos/dashboards/dashboards.libsonnet +++ b/mixin/thanos/dashboards/dashboards.libsonnet @@ -4,5 +4,6 @@ (import 'receive.libsonnet') + (import 'rule.libsonnet') + (import 'compact.libsonnet') + +(import 'bucket_replicate.libsonnet') + (import 'overview.libsonnet') + (import 'defaults.libsonnet') diff --git a/mixin/thanos/defaults.libsonnet b/mixin/thanos/defaults.libsonnet index b2a0c9d76a..b2f99266dc 100644 --- a/mixin/thanos/defaults.libsonnet +++ b/mixin/thanos/defaults.libsonnet @@ -29,6 +29,11 @@ selector: 'job=~"%s.*"' % self.jobPrefix, title: '%(prefix)sSidecar' % $.dashboard.prefix, }, + bucket_replicate+:: { + jobPrefix: 'thanos-bucket-replicate', + selector: 'job=~"%s.*"' % self.jobPrefix, + title: '%(prefix)sBucketReplicate' % $.dashboard.prefix, + }, overview+:: { title: '%(prefix)sOverview' % $.dashboard.prefix, }, diff --git a/mixin/thanos/rules/bucket_replicate.libsonnet b/mixin/thanos/rules/bucket_replicate.libsonnet new file mode 100644 index 0000000000..14eb5c945d --- /dev/null +++ b/mixin/thanos/rules/bucket_replicate.libsonnet @@ -0,0 +1,15 @@ +{ + local thanos = self, + bucket_replicate+:: { + selector: error 'must provide selector for Thanos Bucket Replicate dashboard', + }, + prometheusRules+:: { + groups+: [ + { + name: 'thanos-bucket-replicate.rules', + rules: [ + ], + }, + ], + }, +} diff --git a/mixin/thanos/rules/rules.libsonnet b/mixin/thanos/rules/rules.libsonnet index c74492d4d4..a9b51ea839 100644 --- a/mixin/thanos/rules/rules.libsonnet +++ b/mixin/thanos/rules/rules.libsonnet @@ -1,3 +1,4 @@ (import 'query.libsonnet') + (import 'receive.libsonnet') + -(import 'store.libsonnet') +(import 'store.libsonnet') + +(import 'bucket_replicate.libsonnet') diff --git a/pkg/component/component.go b/pkg/component/component.go index 675caa6034..f86ae85503 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -95,4 +95,5 @@ var ( Sidecar = sourceStoreAPI{component: component{name: "sidecar"}} Store = sourceStoreAPI{component: component{name: "store"}} Receive = sourceStoreAPI{component: component{name: "receive"}} + Replicate = sourceStoreAPI{component: component{name: "replicate"}} ) diff --git a/pkg/replicate/replicater.go b/pkg/replicate/replicater.go new file mode 100644 index 0000000000..2a505d4f94 --- /dev/null +++ b/pkg/replicate/replicater.go @@ -0,0 +1,214 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package replicate + +import ( + "context" + "fmt" + "math/rand" + "strconv" + "strings" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/run" + "github.com/oklog/ulid" + opentracing "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/thanos-io/thanos/pkg/compact" + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/extflag" + "github.com/thanos-io/thanos/pkg/objstore/client" + "github.com/thanos-io/thanos/pkg/prober" + "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/server/http" +) + +// ParseFlagMatchers parse flag into matchers. +func ParseFlagMatchers(s []string) ([]*labels.Matcher, error) { + matchers := make([]*labels.Matcher, 0, len(s)) + + for _, l := range s { + parts := strings.SplitN(l, "=", 2) + if len(parts) != 2 { + return nil, errors.Errorf("unrecognized label %q", l) + } + + labelName := parts[0] + if !model.LabelName.IsValid(model.LabelName(labelName)) { + return nil, errors.Errorf("unsupported format for label %s", l) + } + + labelValue, err := strconv.Unquote(parts[1]) + if err != nil { + return nil, errors.Wrap(err, "unquote label value") + } + newEqualMatcher, err := labels.NewMatcher(labels.MatchEqual, labelName, labelValue) + if err != nil { + return nil, errors.Wrap(err, "new equal matcher") + } + matchers = append(matchers, newEqualMatcher) + } + + return matchers, nil +} + +// RunReplicate replicate data based on config. +func RunReplicate( + g *run.Group, + logger log.Logger, + reg *prometheus.Registry, + _ opentracing.Tracer, + httpBindAddr string, + httpGracePeriod time.Duration, + labelSelector labels.Selector, + resolution compact.ResolutionLevel, + compaction int, + fromObjStoreConfig *extflag.PathOrContent, + toObjStoreConfig *extflag.PathOrContent, + singleRun bool, +) error { + logger = log.With(logger, "component", "replicate") + + level.Debug(logger).Log("msg", "setting up http listen-group") + + httpProbe := prober.NewHTTP() + statusProber := prober.Combine( + httpProbe, + prober.NewInstrumentation(component.Replicate, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)), + ) + + s := http.New(logger, reg, component.Replicate, httpProbe, + http.WithListen(httpBindAddr), + http.WithGracePeriod(httpGracePeriod), + ) + + g.Add(func() error { + level.Info(logger).Log("msg", "Listening for http service", "address", httpBindAddr) + + statusProber.Healthy() + + return s.ListenAndServe() + }, func(err error) { + statusProber.NotReady(err) + defer statusProber.NotHealthy(err) + + s.Shutdown(err) + }) + + fromConfContentYaml, err := fromObjStoreConfig.Content() + if err != nil { + return err + } + + if len(fromConfContentYaml) == 0 { + return errors.New("No supported bucket was configured to replicate from") + } + + fromBkt, err := client.NewBucket( + logger, + fromConfContentYaml, + prometheus.WrapRegistererWith(prometheus.Labels{"replicate": "from"}, reg), + component.Replicate.String(), + ) + if err != nil { + return err + } + + toConfContentYaml, err := toObjStoreConfig.Content() + if err != nil { + return err + } + + if len(toConfContentYaml) == 0 { + return errors.New("No supported bucket was configured to replicate to") + } + + toBkt, err := client.NewBucket( + logger, + toConfContentYaml, + prometheus.WrapRegistererWith(prometheus.Labels{"replicate": "to"}, reg), + component.Replicate.String(), + ) + if err != nil { + return err + } + + replicationRunCounter := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_replicate_replication_runs_total", + Help: "The number of replication runs split by success and error.", + }, []string{"result"}) + + replicationRunDuration := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "thanos_replicate_replication_run_duration_seconds", + Help: "The Duration of replication runs split by success and error.", + }, []string{"result"}) + + reg.MustRegister(replicationRunCounter) + reg.MustRegister(replicationRunDuration) + + blockFilter := NewBlockFilter( + logger, + labelSelector, + resolution, + compaction, + ).Filter + metrics := newReplicationMetrics(reg) + ctx, cancel := context.WithCancel(context.Background()) + + replicateFn := func() error { + timestamp := time.Now() + entropy := ulid.Monotonic(rand.New(rand.NewSource(timestamp.UnixNano())), 0) + + ulid, err := ulid.New(ulid.Timestamp(timestamp), entropy) + if err != nil { + return errors.Wrap(err, "generate replication run-id") + } + + logger := log.With(logger, "replication-run-id", ulid.String()) + level.Info(logger).Log("msg", "running replication attempt") + + if err := newReplicationScheme(logger, metrics, blockFilter, fromBkt, toBkt, reg).execute(ctx); err != nil { + return fmt.Errorf("replication execute: %w", err) + } + + return nil + } + + g.Add(func() error { + defer runutil.CloseWithLogOnErr(logger, fromBkt, "from bucket client") + defer runutil.CloseWithLogOnErr(logger, toBkt, "to bucket client") + + if singleRun { + return replicateFn() + } + + return runutil.Repeat(time.Minute, ctx.Done(), func() error { + start := time.Now() + if err := replicateFn(); err != nil { + level.Error(logger).Log("msg", "running replication failed", "err", err) + replicationRunCounter.WithLabelValues("error").Inc() + replicationRunDuration.WithLabelValues("error").Observe(time.Since(start).Seconds()) + + // No matter the error we want to repeat indefinitely. + return nil + } + replicationRunCounter.WithLabelValues("success").Inc() + replicationRunDuration.WithLabelValues("success").Observe(time.Since(start).Seconds()) + level.Info(logger).Log("msg", "ran replication successfully") + + return nil + }) + }, func(error) { + cancel() + }) + + level.Info(logger).Log("msg", "starting replication") + + return nil +} diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go new file mode 100644 index 0000000000..30be01f0f2 --- /dev/null +++ b/pkg/replicate/scheme.go @@ -0,0 +1,384 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package replicate + +import ( + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "path" + "sort" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/labels" + thanosblock "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/runutil" +) + +// BlockFilter is block filter that filters out compacted and unselected blocks. +type BlockFilter struct { + logger log.Logger + labelSelector labels.Selector + resolutionLevel compact.ResolutionLevel + compactionLevel int +} + +// NewBlockFilter returns block filter. +func NewBlockFilter( + logger log.Logger, + labelSelector labels.Selector, + resolutionLevel compact.ResolutionLevel, + compactionLevel int, +) *BlockFilter { + return &BlockFilter{ + labelSelector: labelSelector, + logger: logger, + resolutionLevel: resolutionLevel, + compactionLevel: compactionLevel, + } +} + +// Filter return true if block is non-compacted and matches selector. +func (bf *BlockFilter) Filter(b *metadata.Meta) bool { + if len(b.Thanos.Labels) == 0 { + level.Error(bf.logger).Log("msg", "filtering block", "reason", "labels should not be empty") + return false + } + + blockLabels := labels.FromMap(b.Thanos.Labels) + + labelMatch := bf.labelSelector.Matches(blockLabels) + if !labelMatch { + selStr := "{" + + for i, m := range bf.labelSelector { + if i != 0 { + selStr += "," + } + + selStr += m.String() + } + + selStr += "}" + + level.Debug(bf.logger).Log("msg", "filtering block", "reason", "labels don't match", "block_labels", blockLabels.String(), "selector", selStr) + + return false + } + + gotResolution := compact.ResolutionLevel(b.Thanos.Downsample.Resolution) + expectedResolution := bf.resolutionLevel + + resolutionMatch := gotResolution == expectedResolution + if !resolutionMatch { + level.Debug(bf.logger).Log("msg", "filtering block", "reason", "resolutions don't match", "got_resolution", gotResolution, "expected_resolution", expectedResolution) + return false + } + + gotCompactionLevel := b.BlockMeta.Compaction.Level + expectedCompactionLevel := bf.compactionLevel + + compactionMatch := gotCompactionLevel == expectedCompactionLevel + if !compactionMatch { + level.Debug(bf.logger).Log("msg", "filtering block", "reason", "compaction levels don't match", "got_compaction_level", gotCompactionLevel, "expected_compaction_level", expectedCompactionLevel) + return false + } + + return true +} + +type blockFilterFunc func(b *metadata.Meta) bool + +// TODO: Add filters field. +type replicationScheme struct { + fromBkt objstore.BucketReader + toBkt objstore.Bucket + + blockFilter blockFilterFunc + + logger log.Logger + metrics *replicationMetrics + + reg prometheus.Registerer +} + +type replicationMetrics struct { + originIterations prometheus.Counter + originMetaLoads prometheus.Counter + originPartialMeta prometheus.Counter + + blocksAlreadyReplicated prometheus.Counter + blocksReplicated prometheus.Counter + objectsReplicated prometheus.Counter +} + +func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics { + m := &replicationMetrics{ + originIterations: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_replicate_origin_iterations_total", + Help: "Total number of objects iterated over in the origin bucket.", + }), + originMetaLoads: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_replicate_origin_meta_loads_total", + Help: "Total number of meta.json reads in the origin bucket.", + }), + originPartialMeta: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_replicate_origin_partial_meta_reads_total", + Help: "Total number of partial meta reads encountered.", + }), + blocksAlreadyReplicated: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_replicate_blocks_already_replicated_total", + Help: "Total number of blocks skipped due to already being replicated.", + }), + blocksReplicated: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_replicate_blocks_replicated_total", + Help: "Total number of blocks replicated.", + }), + objectsReplicated: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_replicate_objects_replicated_total", + Help: "Total number of objects replicated.", + }), + } + + if reg != nil { + reg.MustRegister(m.originIterations) + reg.MustRegister(m.originMetaLoads) + reg.MustRegister(m.originPartialMeta) + reg.MustRegister(m.blocksAlreadyReplicated) + reg.MustRegister(m.blocksReplicated) + reg.MustRegister(m.objectsReplicated) + } + + return m +} + +func newReplicationScheme(logger log.Logger, metrics *replicationMetrics, blockFilter blockFilterFunc, from objstore.BucketReader, to objstore.Bucket, reg prometheus.Registerer) *replicationScheme { + if logger == nil { + logger = log.NewNopLogger() + } + + return &replicationScheme{ + logger: logger, + blockFilter: blockFilter, + fromBkt: from, + toBkt: to, + metrics: metrics, + reg: reg, + } +} + +func (rs *replicationScheme) execute(ctx context.Context) error { + availableBlocks := []*metadata.Meta{} + + level.Debug(rs.logger).Log("msg", "scanning blocks available blocks for replication") + + if err := rs.fromBkt.Iter(ctx, "", func(name string) error { + rs.metrics.originIterations.Inc() + + id, ok := thanosblock.IsBlockDir(name) + if !ok { + return nil + } + + rs.metrics.originMetaLoads.Inc() + + meta, metaNonExistentOrPartial, err := loadMeta(ctx, rs, id) + if metaNonExistentOrPartial { + // meta.json is the last file uploaded by a Thanos shipper, + // therefore a block may be partially present, but no meta.json + // file yet. If this is the case we skip that block for now. + rs.metrics.originPartialMeta.Inc() + level.Info(rs.logger).Log("msg", "block meta not uploaded yet. Skipping.", "block_uuid", id.String()) + return nil + } + if err != nil { + return fmt.Errorf("load meta for block %v from origin bucket: %w", id.String(), err) + } + + if len(meta.Thanos.Labels) == 0 { + // TODO(bwplotka): Allow injecting custom labels as shipper does. + level.Info(rs.logger).Log("msg", "block meta without Thanos external labels set. This is not allowed. Skipping.", "block_uuid", id.String()) + return nil + } + + level.Debug(rs.logger).Log("msg", "adding block to available blocks", "block_uuid", id.String()) + + availableBlocks = append(availableBlocks, meta) + + return nil + }); err != nil { + return fmt.Errorf("iterate over origin bucket: %w", err) + } + + candidateBlocks := []*metadata.Meta{} + + for _, b := range availableBlocks { + if rs.blockFilter(b) { + level.Debug(rs.logger).Log("msg", "adding block to candidate blocks", "block_uuid", b.BlockMeta.ULID.String()) + candidateBlocks = append(candidateBlocks, b) + } + } + + // In order to prevent races in compactions by the target environment, we + // need to replicate oldest start timestamp first. + sort.Slice(candidateBlocks, func(i, j int) bool { + return candidateBlocks[i].BlockMeta.MinTime < candidateBlocks[j].BlockMeta.MinTime + }) + + for _, b := range candidateBlocks { + if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil { + return fmt.Errorf("ensure block %v is replicated: %w", b.BlockMeta.ULID.String(), err) + } + } + + return nil +} + +// ensureBlockIsReplicated ensures that a block present in the origin bucket is +// present in the target bucket. +func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id ulid.ULID) error { + blockID := id.String() + chunksDir := path.Join(blockID, thanosblock.ChunksDirname) + indexFile := path.Join(blockID, thanosblock.IndexFilename) + metaFile := path.Join(blockID, thanosblock.MetaFilename) + + level.Debug(rs.logger).Log("msg", "ensuring block is replicated", "block_uuid", blockID) + + originMetaFile, err := rs.fromBkt.Get(ctx, metaFile) + if err != nil { + return fmt.Errorf("get meta file from origin bucket: %w", err) + } + + defer runutil.CloseWithLogOnErr(rs.logger, originMetaFile, "close original meta file") + + targetMetaFile, err := rs.toBkt.Get(ctx, metaFile) + + if targetMetaFile != nil { + defer runutil.CloseWithLogOnErr(rs.logger, targetMetaFile, "close target meta file") + } + + if err != nil && !rs.toBkt.IsObjNotFoundErr(err) && err != io.EOF { + return fmt.Errorf("get meta file from target bucket: %w", err) + } + + originMetaFileContent, err := ioutil.ReadAll(originMetaFile) + if err != nil { + return fmt.Errorf("read origin meta file: %w", err) + } + + if targetMetaFile != nil && !rs.toBkt.IsObjNotFoundErr(err) { + targetMetaFileContent, err := ioutil.ReadAll(targetMetaFile) + if err != nil { + return fmt.Errorf("read target meta file: %w", err) + } + + if bytes.Equal(originMetaFileContent, targetMetaFileContent) { + // If the origin meta file content and target meta file content is + // equal, we know we have already successfully replicated + // previously. + level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", id.String()) + rs.metrics.blocksAlreadyReplicated.Inc() + + return nil + } + } + + if err := rs.fromBkt.Iter(ctx, chunksDir, func(objectName string) error { + err := rs.ensureObjectReplicated(ctx, objectName) + if err != nil { + return fmt.Errorf("replicate object %v: %w", objectName, err) + } + + return nil + }); err != nil { + return err + } + + if err := rs.ensureObjectReplicated(ctx, indexFile); err != nil { + return fmt.Errorf("replicate index file: %w", err) + } + + level.Debug(rs.logger).Log("msg", "replicating meta file", "object", metaFile) + + if err := rs.toBkt.Upload(ctx, metaFile, bytes.NewReader(originMetaFileContent)); err != nil { + return fmt.Errorf("upload meta file: %w", err) + } + + rs.metrics.blocksReplicated.Inc() + + return nil +} + +// ensureBlockIsReplicated ensures that an object present in the origin bucket +// is present in the target bucket. +func (rs *replicationScheme) ensureObjectReplicated(ctx context.Context, objectName string) error { + level.Debug(rs.logger).Log("msg", "ensuring object is replicated", "object", objectName) + + exists, err := rs.toBkt.Exists(ctx, objectName) + if err != nil { + return fmt.Errorf("check if %v exists in target bucket: %w", objectName, err) + } + + // skip if already exists. + if exists { + level.Debug(rs.logger).Log("msg", "skipping object as already replicated", "object", objectName) + return nil + } + + level.Debug(rs.logger).Log("msg", "object not present in target bucket, replicating", "object", objectName) + + r, err := rs.fromBkt.Get(ctx, objectName) + if err != nil { + return fmt.Errorf("get %v from origin bucket: %w", objectName, err) + } + + defer r.Close() + + if err = rs.toBkt.Upload(ctx, objectName, r); err != nil { + return fmt.Errorf("upload %v to target bucket: %w", objectName, err) + } + + level.Info(rs.logger).Log("msg", "object replicated", "object", objectName) + rs.metrics.objectsReplicated.Inc() + + return nil +} + +// loadMeta loads the meta.json from the origin bucket and returns the meta +// struct as well as if failed, whether the failure was due to the meta.json +// not being present or partial. The distinction is important, as if missing or +// partial, this is just a temporary failure, as the block is still being +// uploaded to the origin bucket. +func loadMeta(ctx context.Context, rs *replicationScheme, id ulid.ULID) (*metadata.Meta, bool, error) { + fetcher, err := thanosblock.NewMetaFetcher(rs.logger, 32, rs.fromBkt, "", rs.reg) + if err != nil { + return nil, false, fmt.Errorf("create meta fetcher with buecket %v: %w", rs.fromBkt, err) + } + + metas, _, err := fetcher.Fetch(ctx) + if err != nil { + switch errors.Cause(err) { + default: + return nil, false, fmt.Errorf("fetch meta: %w", err) + case thanosblock.ErrorSyncMetaNotFound: + return nil, true, fmt.Errorf("fetch meta: %w", err) + } + } + + m, ok := metas[id] + if !ok { + return nil, true, fmt.Errorf("fetch meta: %w", err) + } + + return m, false, nil +} diff --git a/pkg/replicate/scheme_test.go b/pkg/replicate/scheme_test.go new file mode 100644 index 0000000000..c8ca4e0598 --- /dev/null +++ b/pkg/replicate/scheme_test.go @@ -0,0 +1,332 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package replicate + +import ( + "bytes" + "context" + "encoding/json" + "io" + "math/rand" + "os" + "path" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/objstore/inmem" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func testLogger(testName string) log.Logger { + return log.With( + level.NewFilter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), level.AllowDebug()), + "test", testName, + ) +} + +func testULID(inc int64) ulid.ULID { + timestamp := time.Unix(1000000+inc, 0) + entropy := ulid.Monotonic(rand.New(rand.NewSource(timestamp.UnixNano())), 0) + ulid := ulid.MustNew(ulid.Timestamp(timestamp), entropy) + + return ulid +} + +func testMeta(ulid ulid.ULID) *metadata.Meta { + return &metadata.Meta{ + Thanos: metadata.Thanos{ + Labels: map[string]string{ + "test-labelname": "test-labelvalue", + }, + Downsample: metadata.ThanosDownsample{ + Resolution: int64(compact.ResolutionLevelRaw), + }, + }, + BlockMeta: tsdb.BlockMeta{ + ULID: ulid, + Compaction: tsdb.BlockMetaCompaction{ + Level: 1, + }, + Version: metadata.MetaVersion1, + }, + } +} + +func TestReplicationSchemeAll(t *testing.T) { + var cases = []struct { + name string + selector labels.Selector + prepare func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) + assert func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) + }{ + { + name: "EmptyOrigin", + prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) {}, + assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) {}, + }, + { + name: "NoMeta", + prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) { + _ = originBucket.Upload(ctx, path.Join(testULID(0).String(), "chunks", "000001"), bytes.NewReader(nil)) + }, + assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) { + if len(targetBucket.Objects()) != 0 { + t.Fatal("TargetBucket should have been empty but is not.") + } + }, + }, + { + name: "PartialMeta", + prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) { + _ = originBucket.Upload(ctx, path.Join(testULID(0).String(), "meta.json"), bytes.NewReader([]byte("{"))) + }, + assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) { + if len(targetBucket.Objects()) != 0 { + t.Fatal("TargetBucket should have been empty but is not.") + } + }, + }, + { + name: "FullBlock", + prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) { + ulid := testULID(0) + meta := testMeta(ulid) + + b, err := json.Marshal(meta) + testutil.Ok(t, err) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) + }, + assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) { + if len(targetBucket.Objects()) != 3 { + t.Fatal("TargetBucket should have one block made up of three objects replicated.") + } + }, + }, + { + name: "PreviousPartialUpload", + prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) { + ulid := testULID(0) + meta := testMeta(ulid) + + b, err := json.Marshal(meta) + testutil.Ok(t, err) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) + + _ = targetBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), io.LimitReader(bytes.NewReader(b), int64(len(b)-10))) + _ = targetBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) + _ = targetBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) + }, + assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) { + for k := range originBucket.Objects() { + if !bytes.Equal(originBucket.Objects()[k], targetBucket.Objects()[k]) { + t.Fatalf("Object %s not equal in origin and target bucket.", k) + } + } + }, + }, + { + name: "OnlyUploadsRaw", + prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) { + ulid := testULID(0) + meta := testMeta(ulid) + + b, err := json.Marshal(meta) + testutil.Ok(t, err) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) + + ulid = testULID(1) + meta = testMeta(ulid) + meta.Thanos.Downsample.Resolution = int64(compact.ResolutionLevel5m) + + b, err = json.Marshal(meta) + testutil.Ok(t, err) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) + }, + assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) { + expected := 3 + got := len(targetBucket.Objects()) + if got != expected { + t.Fatalf("TargetBucket should have one block made up of three objects replicated. Got %d but expected %d objects.", got, expected) + } + }, + }, + { + name: "UploadMultipleCandidatesWhenPresent", + prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) { + ulid := testULID(0) + meta := testMeta(ulid) + + b, err := json.Marshal(meta) + testutil.Ok(t, err) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) + + ulid = testULID(1) + meta = testMeta(ulid) + + b, err = json.Marshal(meta) + testutil.Ok(t, err) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) + }, + assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) { + expected := 6 + got := len(targetBucket.Objects()) + if got != expected { + t.Fatalf("TargetBucket should have two blocks made up of three objects replicated. Got %d but expected %d objects.", got, expected) + } + }, + }, + { + name: "LabelSelector", + prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) { + ulid := testULID(0) + meta := testMeta(ulid) + + b, err := json.Marshal(meta) + testutil.Ok(t, err) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) + + ulid = testULID(1) + meta = testMeta(ulid) + meta.Thanos.Labels["test-labelname"] = "non-selected-value" + + b, err = json.Marshal(meta) + testutil.Ok(t, err) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) + }, + assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) { + expected := 3 + got := len(targetBucket.Objects()) + if got != expected { + t.Fatalf("TargetBucket should have one block made up of three objects replicated. Got %d but expected %d objects.", got, expected) + } + }, + }, + { + name: "NonZeroCompaction", + prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) { + ulid := testULID(0) + meta := testMeta(ulid) + meta.BlockMeta.Compaction.Level = 2 + + b, err := json.Marshal(meta) + testutil.Ok(t, err) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "meta.json"), bytes.NewReader(b)) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil)) + _ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil)) + }, + assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) { + if len(targetBucket.Objects()) != 0 { + t.Fatal("TargetBucket should have been empty but is not.") + } + }, + }, + { + name: "Regression", + selector: labels.Selector{}, + prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) { + b := []byte(`{ + "ulid": "01DQYXMK8G108CEBQ79Y84DYVY", + "minTime": 1571911200000, + "maxTime": 1571918400000, + "stats": { + "numSamples": 90793, + "numSeries": 3703, + "numChunks": 3746 + }, + "compaction": { + "level": 1, + "sources": [ + "01DQYXMK8G108CEBQ79Y84DYVY" + ] + }, + "version": 1, + "thanos": { + "labels": { + "receive": "true", + "replica": "thanos-receive-default-0" + }, + "downsample": { + "resolution": 0 + }, + "source": "receive" + } +}`) + + _ = originBucket.Upload(ctx, path.Join("01DQYXMK8G108CEBQ79Y84DYVY", "meta.json"), bytes.NewReader(b)) + _ = originBucket.Upload(ctx, path.Join("01DQYXMK8G108CEBQ79Y84DYVY", "chunks", "000001"), bytes.NewReader(nil)) + _ = originBucket.Upload(ctx, path.Join("01DQYXMK8G108CEBQ79Y84DYVY", "index"), bytes.NewReader(nil)) + }, + assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) { + if len(targetBucket.Objects()) != 3 { + t.Fatal("TargetBucket should have one block does not.") + } + + expected := originBucket.Objects()["01DQYXMK8G108CEBQ79Y84DYVY/meta.json"] + got := targetBucket.Objects()["01DQYXMK8G108CEBQ79Y84DYVY/meta.json"] + testutil.Equals(t, expected, got) + }, + }, + } + + for _, c := range cases { + ctx := context.Background() + originBucket := inmem.NewBucket() + targetBucket := inmem.NewBucket() + logger := testLogger(t.Name() + "/" + c.name) + + c.prepare(ctx, t, originBucket, targetBucket) + + matcher, err := labels.NewMatcher(labels.MatchEqual, "test-labelname", "test-labelvalue") + if err != nil { + t.Fatal("Failed to create a matcher.") + } + + selector := labels.Selector{ + matcher, + } + if c.selector != nil { + selector = c.selector + } + + filter := NewBlockFilter(logger, selector, compact.ResolutionLevelRaw, 1).Filter + + r := newReplicationScheme( + logger, + newReplicationMetrics(nil), + filter, + originBucket, + targetBucket, + nil, + ) + + err = r.execute(ctx) + testutil.Ok(t, err) + + c.assert(ctx, t, originBucket, targetBucket) + } +} diff --git a/scripts/genflagdocs.sh b/scripts/genflagdocs.sh index 5393b311dc..9583934f42 100755 --- a/scripts/genflagdocs.sh +++ b/scripts/genflagdocs.sh @@ -40,7 +40,7 @@ for x in "${commands[@]}"; do ./thanos "${x}" --help &> "docs/components/flags/${x}.txt" done -bucketCommands=("verify" "ls" "inspect" "web") +bucketCommands=("verify" "ls" "inspect" "web" "replicate") for x in "${bucketCommands[@]}"; do ./thanos bucket "${x}" --help &> "docs/components/flags/bucket_${x}.txt" done