Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: terminate reduce vertex pods when pausing pipeline #1481

Merged
merged 4 commits into from
Jan 27, 2024

Conversation

dpadhiar
Copy link
Contributor

@dpadhiar dpadhiar commented Jan 24, 2024

Explain what this PR does.

Fixes #990.

When pausing a pipeline currently, all vertex pods will be determined as replicas is set to 0 for each. However, this isn't the case for any reduce vertices as they use partitions to determine replicas for the vertex.

To remedy this, we change the logic for checking replicas of a reduce vertex to respect replicas only if the value is 0. The only case it will be 0 is when the pipeline is paused, otherwise it will respect the partitions value which defaults to 1.

if v.IsReduceUDF() {
	// Replicas will be 0 only when pausing a pipeline
	if v.Spec.Replicas != nil && int(*v.Spec.Replicas) == 0 {
		return 0
	}
	// Replica of a reduce vertex is determined by the partitions.
	return v.GetPartitionCount()
}

@dpadhiar dpadhiar linked an issue Jan 25, 2024 that may be closed by this pull request
@dpadhiar dpadhiar marked this pull request as ready for review January 25, 2024 17:20
@@ -844,8 +844,13 @@ func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline,
scaleTo := replicas
// if vtx does not support autoscaling and min is set, scale up to min
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// if vtx does not support autoscaling and min is set, scale up to min
// if replicas equals to 1, it means we are resuming a paused pipeline.
// in this case, if a vertex doesn't support auto-scaling, we scale up based on the vertex's configuration:
// for a reducer, we scale up to the partition count.
// for a non-reducer, if min is set, we scale up to min.

if !vertex.Scalable() && vertex.Spec.Scale.Min != nil && *vertex.Spec.Scale.Min > 1 {
scaleTo = *vertex.Spec.Scale.Min
if !vertex.Scalable() {
// reduce UDF uses partition count to determine replica count
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// reduce UDF uses partition count to determine replica count

Signed-off-by: Dillen Padhiar <[email protected]>
@whynowy whynowy merged commit 9954f84 into numaproj:main Jan 27, 2024
20 checks passed
whynowy pushed a commit that referenced this pull request Feb 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Pause a pipeline does not terminate reduce vertex pods
3 participants