Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools bucket replicate: add support for concurrent bucket replication #5280

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#5220](https://github.com/thanos-io/thanos/pull/5220) Query Frontend: Add `--query-frontend.forward-header` flag, forward headers to downstream querier.
- [#5250](https://github.com/thanos-io/thanos/pull/5250/files) Querier: Expose Query and QueryRange APIs through GRPC.
- [#5280](https://github.com/thanos-io/thanos/pull/5280) Tools: Add support for concurrent bucket replication.

### Changed

Expand Down
11 changes: 7 additions & 4 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,11 @@ type bucketWebConfig struct {
}

type bucketReplicateConfig struct {
resolutions []time.Duration
compactions []int
matcherStrs []string
singleRun bool
resolutions []time.Duration
compactions []int
matcherStrs []string
singleRun bool
concurrencyLvl int
}

type bucketDownsampleConfig struct {
Expand Down Expand Up @@ -210,6 +211,7 @@ func (tbc *bucketReplicateConfig) registerBucketReplicateFlag(cmd extkingpin.Fla
cmd.Flag("matcher", "Only blocks whose external labels exactly match this matcher will be replicated.").PlaceHolder("key=\"value\"").StringsVar(&tbc.matcherStrs)

cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").BoolVar(&tbc.singleRun)
cmd.Flag("concurrency-level", "Max number of go-routines to use for replication.").Default("4").IntVar(&tbc.concurrencyLvl)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this is not as simple, so if you want to keep this implementation we have to add extra context and set it to 1.

Suggested change
cmd.Flag("concurrency-level", "Max number of go-routines to use for replication.").Default("4").IntVar(&tbc.concurrencyLvl)
cmd.Flag("concurrency-level", "Number of go-routines to use for replication. WARNING: Value bigger than one enables, concurrent, non-sequential block replication. This means that the block from 4w ago can be uploaded after the block from 2h ago. In the default Thanos compactor, with a 30m consistency delay (https://thanos.io/tip/components/compact.md/#consistency-delay) if within 30 minutes after uploading those blocks you will have still some blocks to upload between those, the compactor might do compaction assuming a gap! This is solvable with vertical compaction, but it wastes extra computation and is not enabled by default. Use with care.").Default("1").IntVar(&tbc.concurrencyLvl)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is solvable with vertical compaction, but it wastes extra computation and is not enabled by default.

I see what you mean here, even if the problem can be solved by vertical compaction, the blocks should not be uploaded out of sequence for no valid reason.


return tbc
}
Expand Down Expand Up @@ -735,6 +737,7 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P
objStoreConfig,
toObjStoreConfig,
tbc.singleRun,
tbc.concurrencyLvl,
minTime,
maxTime,
blockIDs,
Expand Down
2 changes: 2 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,8 @@ with Thanos blocks (meta.json has to have Thanos metadata).
Flags:
--compaction=1... ... Only blocks with these compaction levels will
be replicated. Repeated flag.
--concurrency-level=4 Max number of go-routines to use for
replication.
-h, --help Show context-sensitive help (also try
--help-long and --help-man).
--http-address="0.0.0.0:10902"
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware/providers/kit/v2 v2.0.0-20201002093600-73cf2ae9d891
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.2.0.20201207153454-9f6bf00c00a7
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/golang-lru v0.5.4
github.com/jpillora/backoff v1.0.0
github.com/klauspost/compress v1.13.6
Expand Down Expand Up @@ -160,6 +161,7 @@ require (
github.com/google/uuid v1.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.1.1 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/replicate/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func RunReplicate(
fromObjStoreConfig *extflag.PathOrContent,
toObjStoreConfig *extflag.PathOrContent,
singleRun bool,
concurrencyLvl int,
minTime, maxTime *thanosmodel.TimeOrDurationValue,
blockIDs []ulid.ULID,
ignoreMarkedForDeletion bool,
Expand Down Expand Up @@ -195,10 +196,9 @@ func RunReplicate(
logger := log.With(logger, "replication-run-id", runID.String())
level.Info(logger).Log("msg", "running replication attempt")

if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg).execute(ctx); err != nil {
if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg).execute(ctx, concurrencyLvl); err != nil {
return errors.Wrap(err, "replication execute")
}

return nil
}

Expand Down
45 changes: 39 additions & 6 deletions pkg/replicate/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"io/ioutil"
"path"
"sort"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/hashicorp/go-multierror"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -179,7 +181,7 @@ func newReplicationScheme(
}
}

func (rs *replicationScheme) execute(ctx context.Context) error {
func (rs *replicationScheme) execute(ctx context.Context, concurrencyLvl int) error {
availableBlocks := []*metadata.Meta{}

metas, partials, err := rs.fetcher.Fetch(ctx)
Expand All @@ -204,13 +206,44 @@ func (rs *replicationScheme) execute(ctx context.Context) error {
return availableBlocks[i].BlockMeta.MinTime < availableBlocks[j].BlockMeta.MinTime
})

for _, b := range availableBlocks {
if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil {
return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String())
}
// iterate over blocks and send them to a channel sequentially
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use a full sentences in comments (nit).

Suggested change
// iterate over blocks and send them to a channel sequentially
// Iterate over blocks and send them to a channel sequentially.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, my bad, I'll fix it

I wonder why this was not picked up by the linter, golangci-lint config includes godot:

- godot
. Perhaps it's running using the defaults (where it's not checking all comments, just the declarations)? Would it make sense to add .godot.yaml with config appropriate to make it check all comments? (and probably fix lots and lots of comments across the code base)

blocksChan := func() <-chan *metadata.Meta {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it have to be in closure (anon function)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason, good point, I'll simplify this.

out := make(chan *metadata.Meta)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
out := make(chan *metadata.Meta)
blocksChan := make(chan *metadata.Meta)

go func() {
for _, b := range availableBlocks {
out <- b
}
close(out)
}()
return out
}()

// fan-out for concurrent replication
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong comment format (full sentence please).

wg := sync.WaitGroup{}
wg.Add(concurrencyLvl)
errs := make(chan error)
for i := 0; i < concurrencyLvl; i++ {
go func(bc <-chan *metadata.Meta, errs chan<- error) {
for b := range bc {
if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil {
errs <- err
}
}
wg.Done()
}(blocksChan, errs)
}
go func() {
wg.Wait()
close(errs)
}()

// fan-in for errors
var me error
for e := range errs {
me = multierror.Append(me, e)
}

return nil
return me
}

// ensureBlockIsReplicated ensures that a block present in the origin bucket is
Expand Down
3 changes: 2 additions & 1 deletion pkg/replicate/scheme_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,8 @@ func TestReplicationSchemeAll(t *testing.T) {

r := newReplicationScheme(logger, newReplicationMetrics(nil), filter, fetcher, objstore.WithNoopInstr(originBucket), targetBucket, nil)

err = r.execute(ctx)
concurrencyLvl := 4
err = r.execute(ctx, concurrencyLvl)
testutil.Ok(t, err)

c.assert(ctx, t, originBucket, targetBucket)
Expand Down