From 4294849605a6c0dc8dec7261811a80cd23b4c10a Mon Sep 17 00:00:00 2001 From: Michal Wasilewski Date: Wed, 13 Apr 2022 18:55:27 +0200 Subject: [PATCH 1/8] add support for concurrent bucket replication Signed-off-by: Michal Wasilewski --- cmd/thanos/tools_bucket.go | 11 +++++++---- pkg/replicate/replicator.go | 4 ++-- pkg/replicate/scheme.go | 21 ++++++++++++++++----- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 1559ec5050..b6f9f061f2 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -122,10 +122,11 @@ type bucketWebConfig struct { } type bucketReplicateConfig struct { - resolutions []time.Duration - compactions []int - matcherStrs []string - singleRun bool + resolutions []time.Duration + compactions []int + matcherStrs []string + singleRun bool + concurrencyLvl int } type bucketDownsampleConfig struct { @@ -210,6 +211,7 @@ func (tbc *bucketReplicateConfig) registerBucketReplicateFlag(cmd extkingpin.Fla cmd.Flag("matcher", "Only blocks whose external labels exactly match this matcher will be replicated.").PlaceHolder("key=\"value\"").StringsVar(&tbc.matcherStrs) cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").BoolVar(&tbc.singleRun) + cmd.Flag("concurrency-level", "Max number of go-routines to use for replication.").Default("4").IntVar(&tbc.concurrencyLvl) return tbc } @@ -735,6 +737,7 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P objStoreConfig, toObjStoreConfig, tbc.singleRun, + tbc.concurrencyLvl, minTime, maxTime, blockIDs, diff --git a/pkg/replicate/replicator.go b/pkg/replicate/replicator.go index db7992c5ea..724e3c2252 100644 --- a/pkg/replicate/replicator.go +++ b/pkg/replicate/replicator.go @@ -84,6 +84,7 @@ func RunReplicate( fromObjStoreConfig *extflag.PathOrContent, toObjStoreConfig *extflag.PathOrContent, singleRun bool, + concurrencyLvl int, minTime, maxTime *thanosmodel.TimeOrDurationValue, blockIDs []ulid.ULID, ignoreMarkedForDeletion bool, @@ -195,10 +196,9 @@ func RunReplicate( logger := log.With(logger, "replication-run-id", runID.String()) level.Info(logger).Log("msg", "running replication attempt") - if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg).execute(ctx); err != nil { + if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg).execute(ctx, concurrencyLvl); err != nil { return errors.Wrap(err, "replication execute") } - return nil } diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index f9b1067b8e..24f38dac4b 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/hashicorp/go-multierror" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -179,7 +180,7 @@ func newReplicationScheme( } } -func (rs *replicationScheme) execute(ctx context.Context) error { +func (rs *replicationScheme) execute(ctx context.Context, concurrencyLvl int) error { availableBlocks := []*metadata.Meta{} metas, partials, err := rs.fetcher.Fetch(ctx) @@ -204,13 +205,23 @@ func (rs *replicationScheme) execute(ctx context.Context) error { return availableBlocks[i].BlockMeta.MinTime < availableBlocks[j].BlockMeta.MinTime }) + meg := multierror.Group{} + sem := make(chan *metadata.Meta, concurrencyLvl) + defer close(sem) for _, b := range availableBlocks { - if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil { - return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String()) - } + sem <- b + // instead of using a pipeline with a set number of long-lived workers + // a goroutine is started per upload in order to simplify error handling + meg.Go(func() error { + bl := <-sem + if err := rs.ensureBlockIsReplicated(ctx, bl.BlockMeta.ULID); err != nil { + return errors.Wrapf(err, "ensure block %v is replicated", bl.BlockMeta.ULID.String()) + } + return nil + }) } - return nil + return meg.Wait() } // ensureBlockIsReplicated ensures that a block present in the origin bucket is From c872894ece31c1de14a62b5e38f41e40b9ea9bf4 Mon Sep 17 00:00:00 2001 From: Michal Wasilewski Date: Wed, 13 Apr 2022 19:07:25 +0200 Subject: [PATCH 2/8] CHANGELOG.md update Signed-off-by: Michal Wasilewski --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index eec84630ad..c677889474 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5220](https://github.com/thanos-io/thanos/pull/5220) Query Frontend: Add `--query-frontend.forward-header` flag, forward headers to downstream querier. - [#5250](https://github.com/thanos-io/thanos/pull/5250/files) Querier: Expose Query and QueryRange APIs through GRPC. +- [#5280](https://github.com/thanos-io/thanos/pull/5280) Tools: Add support for concurrent bucket replication. ### Changed From c6e032dcb404b55792210881a0ef3102503bb22f Mon Sep 17 00:00:00 2001 From: Michal Wasilewski Date: Thu, 14 Apr 2022 18:25:17 +0200 Subject: [PATCH 3/8] use a pipeline approach for concurrency Signed-off-by: Michal Wasilewski --- pkg/replicate/scheme.go | 51 ++++++++++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 14 deletions(-) diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index 24f38dac4b..01358c4eef 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -11,6 +11,7 @@ import ( "io/ioutil" "path" "sort" + "sync" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -205,23 +206,45 @@ func (rs *replicationScheme) execute(ctx context.Context, concurrencyLvl int) er return availableBlocks[i].BlockMeta.MinTime < availableBlocks[j].BlockMeta.MinTime }) - meg := multierror.Group{} - sem := make(chan *metadata.Meta, concurrencyLvl) - defer close(sem) - for _, b := range availableBlocks { - sem <- b - // instead of using a pipeline with a set number of long-lived workers - // a goroutine is started per upload in order to simplify error handling - meg.Go(func() error { - bl := <-sem - if err := rs.ensureBlockIsReplicated(ctx, bl.BlockMeta.ULID); err != nil { - return errors.Wrapf(err, "ensure block %v is replicated", bl.BlockMeta.ULID.String()) + // iterate over blocks and send them to a channel sequentially + iterBlocks := func() <-chan *metadata.Meta { + out := make(chan *metadata.Meta) + go func() { + for _, b := range availableBlocks { + out <- b } - return nil - }) + close(out) + }() + return out + } + bc := iterBlocks() + + // fan-out for concurrent replication + wg := sync.WaitGroup{} + wg.Add(concurrencyLvl) + errs := make(chan error) + for i := 0; i < concurrencyLvl; i++ { + go func(bc <-chan *metadata.Meta, errs chan<- error) { + for b := range bc { + if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil { //TODO tbd if this is thread safe + errs <- err + } + } + wg.Done() + }(bc, errs) + } + go func() { + wg.Wait() + close(errs) + }() + + // fan-in for errors + var me error + for e := range errs { + me = multierror.Append(me, e) } - return meg.Wait() + return me } // ensureBlockIsReplicated ensures that a block present in the origin bucket is From fc2e67def6fe6e025eb6141552294450f74fe2b6 Mon Sep 17 00:00:00 2001 From: Michal Wasilewski Date: Wed, 20 Apr 2022 17:10:58 +0200 Subject: [PATCH 4/8] simplify creation of the channel with objects to replicate Signed-off-by: Michal Wasilewski --- pkg/replicate/scheme.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index 01358c4eef..f11493526a 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -207,7 +207,7 @@ func (rs *replicationScheme) execute(ctx context.Context, concurrencyLvl int) er }) // iterate over blocks and send them to a channel sequentially - iterBlocks := func() <-chan *metadata.Meta { + blocksChan := func() <-chan *metadata.Meta { out := make(chan *metadata.Meta) go func() { for _, b := range availableBlocks { @@ -216,8 +216,7 @@ func (rs *replicationScheme) execute(ctx context.Context, concurrencyLvl int) er close(out) }() return out - } - bc := iterBlocks() + }() // fan-out for concurrent replication wg := sync.WaitGroup{} @@ -231,7 +230,7 @@ func (rs *replicationScheme) execute(ctx context.Context, concurrencyLvl int) er } } wg.Done() - }(bc, errs) + }(blocksChan, errs) } go func() { wg.Wait() From 8a11e08297022529f189c4e768f16ef5ebef023f Mon Sep 17 00:00:00 2001 From: Michal Wasilewski Date: Wed, 20 Apr 2022 17:13:22 +0200 Subject: [PATCH 5/8] made a simple sanity check if some buckets are thread safe Signed-off-by: Michal Wasilewski --- pkg/replicate/scheme.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index f11493526a..7e0d93c531 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -225,7 +225,7 @@ func (rs *replicationScheme) execute(ctx context.Context, concurrencyLvl int) er for i := 0; i < concurrencyLvl; i++ { go func(bc <-chan *metadata.Meta, errs chan<- error) { for b := range bc { - if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil { //TODO tbd if this is thread safe + if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil { errs <- err } } From aa143f34d4aaf62aacfadee42017fa86fce05873 Mon Sep 17 00:00:00 2001 From: Michal Wasilewski Date: Wed, 20 Apr 2022 17:23:41 +0200 Subject: [PATCH 6/8] make docs Signed-off-by: Michal Wasilewski --- docs/components/tools.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/components/tools.md b/docs/components/tools.md index 80ceed9585..4ad42a7884 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -508,6 +508,8 @@ with Thanos blocks (meta.json has to have Thanos metadata). Flags: --compaction=1... ... Only blocks with these compaction levels will be replicated. Repeated flag. + --concurrency-level=4 Max number of go-routines to use for + replication. -h, --help Show context-sensitive help (also try --help-long and --help-man). --http-address="0.0.0.0:10902" From bf09904a5e229796b6616ddd8b620f1b044cf01a Mon Sep 17 00:00:00 2001 From: Michal Wasilewski Date: Wed, 20 Apr 2022 17:34:00 +0200 Subject: [PATCH 7/8] add explicitly hashicorp/go-multierror as a dependency Signed-off-by: Michal Wasilewski --- go.mod | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go.mod b/go.mod index 63401edc0b..0a10e81c5c 100644 --- a/go.mod +++ b/go.mod @@ -45,6 +45,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware/providers/kit/v2 v2.0.0-20201002093600-73cf2ae9d891 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.2.0.20201207153454-9f6bf00c00a7 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 + github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/golang-lru v0.5.4 github.com/jpillora/backoff v1.0.0 github.com/klauspost/compress v1.13.6 @@ -160,6 +161,7 @@ require ( github.com/google/uuid v1.2.0 // indirect github.com/googleapis/gax-go/v2 v2.1.1 // indirect github.com/gorilla/mux v1.8.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect github.com/josharian/intern v1.0.0 // indirect From 743bca2624889890bd8c65781686ac8a23759348 Mon Sep 17 00:00:00 2001 From: Michal Wasilewski Date: Wed, 20 Apr 2022 18:11:43 +0200 Subject: [PATCH 8/8] fix scheme tests Signed-off-by: Michal Wasilewski --- pkg/replicate/scheme_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/replicate/scheme_test.go b/pkg/replicate/scheme_test.go index 4d6c38c5a1..cdf0f6b86d 100644 --- a/pkg/replicate/scheme_test.go +++ b/pkg/replicate/scheme_test.go @@ -394,7 +394,8 @@ func TestReplicationSchemeAll(t *testing.T) { r := newReplicationScheme(logger, newReplicationMetrics(nil), filter, fetcher, objstore.WithNoopInstr(originBucket), targetBucket, nil) - err = r.execute(ctx) + concurrencyLvl := 4 + err = r.execute(ctx, concurrencyLvl) testutil.Ok(t, err) c.assert(ctx, t, originBucket, targetBucket)