-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallelize vertical compactions inside a single group #5936
Conversation
The current compaction implementation allocates one goroutine per compaction stream. This means that compaction can run as fast as the slowest stream. As a result, as soon as one stream starts to fall behind, the all other streams can become affected. In addition, despite setting a high compact-concurrency, CPU utilization can still be low because of the one-goroutine-per-stream limit. The compaction algorithm also prioritizes vertical compactions over horizontal ones. As soon as it detects any overlapping blocks, it will compact those blocks and reevaluate the plan in a subsequent iteration. This commit enables parallel execution of vertical compactions within a single compaction stream. It does that by first changing the Planner interface to allow it to return multiple compaction tasks per group instead of a single one. It also adapts the algorithm for detecting overlapping blocks to be able to detect multiple independent groups. These groups are then returned as distinct compaction tasks and the compactor can execute them in separate goroutines. By modifying the planner interface, this commit also enables parallelizing horizontal compactions in the future. Signed-off-by: Filip Petkovski <[email protected]>
5f88f86
to
912172b
Compare
pkg/compact/planner.go
Outdated
return p.plan(p.noCompBlocksFunc(), metasByMinTime) | ||
} | ||
|
||
func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { | ||
type compactionTask []*metadata.Meta |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to expose this so that downstream project like Cortex can access it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Should be public now.
pkg/compact/compact.go
Outdated
@@ -727,7 +729,7 @@ func (rs *RetentionProgressCalculator) ProgressCalculate(ctx context.Context, gr | |||
type Planner interface { | |||
// Plan returns a list of blocks that should be compacted into single one. | |||
// The blocks can be overlapping. The provided metadata has to be ordered by minTime. | |||
Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) | |||
Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]compactionTask, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am trying to understand the interface. The return value of compactionTasks
means those are tasks that we can run in parallel safely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that would be the idea. I've added this a documentation on the type.
@@ -768,19 +771,19 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp | |||
}() | |||
|
|||
if err := os.MkdirAll(subDir, 0750); err != nil { | |||
return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ULID return value was not used anywhere which is why I've removed it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to fit into Prometheus interface. Especially in Prometheus, empty ULID meant special behaviour when compacting blocks - we rely on it to check if compaction really compacted anything.
I think it's fine to change interface if you want - it's not super important to allow Prometheus to use Thanos compaction on anything like that. Just be careful with Cortex @yeya24 and Mimir @pracucci who might use this interface/our structs here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked on the Cortex side and we are not using it, too. I guess same for Mimir as I heard they are moving away from importing Thanos main repo directly.
I don't have a strong preference here. OK to clean it up or keep it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess same for Mimir as I heard they are moving away from importing Thanos main repo directly.
Thanks for asking! I confirm we're not running Thanos compactor or store-gateway anymore in Mimir.
pkg/compact/compact.go
Outdated
groupErr multierror.MultiError | ||
rerunGroup bool | ||
) | ||
for _, task := range tasks { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure how best to handle this, since we will have unbound concurrency. We can have a per-group concurrency in the short term, but that can still lead to one group slowing down everything else. Long term maybe we want a single queue for tasks so that we can have global concurrency for all tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might need that sooner than later - It's common for users to crash their compactor for even TWO compactions at the same time. Why not workers approach as usually? (e.g at max 5 compactions at one time)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also we have compactions on bigger laver (caller of this method I believe)- can we have one concurrency loop to not get too complex in terms of unpredictive concurrency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean max 5 parallel compactions inside a single group? Or 5 parallel vertical compactions across all groups? The former is easier to implement, the latter is better but will make this PR bigger :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have concurrency on a task level now, so users can also run one task at a time if they want to.
49906ef
to
d6fbe29
Compare
Signed-off-by: Filip Petkovski <[email protected]>
d6fbe29
to
d71c4d1
Compare
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, just some suggestions. Thanks!
pkg/compact/compact.go
Outdated
groupErr multierror.MultiError | ||
rerunGroup bool | ||
) | ||
for _, task := range tasks { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might need that sooner than later - It's common for users to crash their compactor for even TWO compactions at the same time. Why not workers approach as usually? (e.g at max 5 compactions at one time)
pkg/compact/compact.go
Outdated
groupErr multierror.MultiError | ||
rerunGroup bool | ||
) | ||
for _, task := range tasks { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also we have compactions on bigger laver (caller of this method I believe)- can we have one concurrency loop to not get too complex in terms of unpredictive concurrency?
@@ -768,19 +771,19 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp | |||
}() | |||
|
|||
if err := os.MkdirAll(subDir, 0750); err != nil { | |||
return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to fit into Prometheus interface. Especially in Prometheus, empty ULID meant special behaviour when compacting blocks - we rely on it to check if compaction really compacted anything.
I think it's fine to change interface if you want - it's not super important to allow Prometheus to use Thanos compaction on anything like that. Just be careful with Cortex @yeya24 and Mimir @pracucci who might use this interface/our structs here.
// overlaps when the next block is contained in the current block. | ||
// See test case https://github.com/thanos-io/thanos/blob/04106d7a7add7f47025c00422c80f746650c1b97/pkg/compact/planner_test.go#L310-L321. | ||
loopMetas: | ||
for i := range metasByMinTime { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical code: Reviewers please be careful here @thanos-io/thanos-maintainers - the bug here can cause irreversible data malformations
pkg/compact/planner_test.go
Outdated
{ | ||
name: "Overlapping blocks 5", | ||
name: "Multiple independent groups of overlapping blocks", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there are independent groups, are they?
Signed-off-by: Filip Petkovski <[email protected]>
812fc1d
to
1248a96
Compare
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
845dfc5
to
a808840
Compare
Signed-off-by: Filip Petkovski <[email protected]>
cmd/thanos/compact.go
Outdated
@@ -746,6 +747,10 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { | |||
"NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set."). | |||
Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction) | |||
|
|||
cmd.Flag("compact.group-concurrency", "The number of concurrent compactions from a single compaction group inside one compaction iteration. "+ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Can we add this flag next to
compact.concurrency
for readability? - Are we sure we need this granuality of config? Is there a way to automatically detect this from
compact.concurrency
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love the idea of detecting based on compact.concurrency
.
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
69f8cf5
to
ba817fb
Compare
Signed-off-by: Filip Petkovski <[email protected]>
ba817fb
to
e3e94c6
Compare
Signed-off-by: Filip Petkovski <[email protected]>
Any interest in moving forward with this? |
Let me try to review this next week |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM from the first glance, need to double-check after the weekend 😄
Hello 👋 Looks like there was no activity on this amazing PR for the last 30 days. |
Closing for now as promised, let us know if you need this to be reopened! 🤗 |
The current compaction implementation allocates one goroutine per compaction stream. This means that compaction can run as fast as the slowest stream. As a result, as soon as one stream starts to fall behind, the all other streams can become affected. In addition, despite setting a high compact-concurrency, CPU utilization can still be low because of the one-goroutine-per-stream limit.
The compaction algorithm also prioritizes vertical compactions over horizontal ones. As soon as it detects any overlapping blocks, it will compact those blocks and reevaluate the plan in a subsequent iteration.
This commit enables parallel execution of vertical compactions within a single compaction stream. It does that by first changing the Planner interface to allow it to return multiple compaction tasks per group instead of a single one. It also adapts the algorithm for detecting overlapping blocks to be able to detect multiple independent groups. These groups are then returned as distinct compaction tasks and the compactor can execute them in separate goroutines.
By modifying the planner interface, this commit also enables parallelizing horizontal compactions in the future.
Changes
Verification
This is only tested through unit tests. I plan to run this in a staging environment soon.