Skip to content

Commit

Permalink
feat: terminate reduce vertex pods when pausing pipeline (#1481)
Browse files Browse the repository at this point in the history
Signed-off-by: Dillen Padhiar <[email protected]>
  • Loading branch information
dpadhiar authored Jan 27, 2024
1 parent 36f7002 commit 9954f84
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 37 deletions.
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,10 @@ func (v Vertex) GetToBuffers() []string {

func (v Vertex) GetReplicas() int {
if v.IsReduceUDF() {
// Replicas will be 0 only when pausing a pipeline
if v.Spec.Replicas != nil && int(*v.Spec.Replicas) == 0 {
return 0
}
// Replica of a reduce vertex is determined by the partitions.
return v.GetPartitionCount()
}
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/numaflow/v1alpha1/vertex_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func TestGetVertexReplicas(t *testing.T) {
v.Spec.FromEdges = []CombinedEdge{
{Edge: Edge{From: "a", To: "b"}},
}
v.Spec.Replicas = pointer.Int32(5)
assert.Equal(t, 1, v.GetReplicas())
v.Spec.Replicas = pointer.Int32(1000)
assert.Equal(t, 1, v.GetReplicas())
Expand Down
13 changes: 10 additions & 3 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,10 +842,17 @@ func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline,
for _, vertex := range existingVertices {
if origin := *vertex.Spec.Replicas; origin != replicas && filter(vertex) {
scaleTo := replicas
// if vtx does not support autoscaling and min is set, scale up to min
// if replicas equals to 1, it means we are resuming a paused pipeline
// in this case, if a vertex doesn't support auto-scaling, we scale up based on the vertex's configuration:
// for a reducer, we scale up to the partition count
// for a non-reducer, if min is set, we scale up to min
if replicas == 1 {
if !vertex.Scalable() && vertex.Spec.Scale.Min != nil && *vertex.Spec.Scale.Min > 1 {
scaleTo = *vertex.Spec.Scale.Min
if !vertex.Scalable() {
if vertex.IsReduceUDF() {
scaleTo = int32(vertex.GetPartitionCount())
} else if vertex.Spec.Scale.Min != nil && *vertex.Spec.Scale.Min > 1 {
scaleTo = *vertex.Spec.Scale.Min
}
}
}
vertex.Spec.Replicas = pointer.Int32(scaleTo)
Expand Down
105 changes: 71 additions & 34 deletions pkg/reconciler/pipeline/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,40 +223,77 @@ func Test_buildReducesVertices(t *testing.T) {
}

func Test_pauseAndResumePipeline(t *testing.T) {
cl := fake.NewClientBuilder().Build()
ctx := context.TODO()
testIsbSvc := testNativeRedisIsbSvc.DeepCopy()
testIsbSvc.Status.MarkConfigured()
testIsbSvc.Status.MarkDeployed()
err := cl.Create(ctx, testIsbSvc)
assert.Nil(t, err)
r := &pipelineReconciler{
client: cl,
scheme: scheme.Scheme,
config: fakeConfig,
image: testFlowImage,
logger: zaptest.NewLogger(t).Sugar(),
recorder: record.NewFakeRecorder(64),
}
testObj := testPipeline.DeepCopy()
testObj.Spec.Vertices[0].Scale.Min = pointer.Int32(3)
_, err = r.reconcile(ctx, testObj)
assert.NoError(t, err)
_, err = r.pausePipeline(ctx, testObj)
assert.NoError(t, err)
v, err := r.findExistingVertices(ctx, testObj)
assert.NoError(t, err)
assert.Equal(t, int32(0), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas)
assert.NotNil(t, testObj.Annotations[dfv1.KeyPauseTimestamp])
testObj.Annotations[dfv1.KeyPauseTimestamp] = ""
_, err = r.resumePipeline(ctx, testObj)
assert.NoError(t, err)
v, err = r.findExistingVertices(ctx, testObj)
assert.NoError(t, err)
// when auto-scaling is enabled, while resuming the pipeline, instead of setting the replicas to Scale.Min,
// we set it to one and let auto-scaling to scale up
assert.Equal(t, int32(1), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas)
assert.NoError(t, err)

t.Run("test normal pipeline", func(t *testing.T) {
cl := fake.NewClientBuilder().Build()
ctx := context.TODO()
testIsbSvc := testNativeRedisIsbSvc.DeepCopy()
testIsbSvc.Status.MarkConfigured()
testIsbSvc.Status.MarkDeployed()
err := cl.Create(ctx, testIsbSvc)
assert.Nil(t, err)
r := &pipelineReconciler{
client: cl,
scheme: scheme.Scheme,
config: fakeConfig,
image: testFlowImage,
logger: zaptest.NewLogger(t).Sugar(),
recorder: record.NewFakeRecorder(64),
}
testObj := testPipeline.DeepCopy()
testObj.Spec.Vertices[0].Scale.Min = pointer.Int32(3)
_, err = r.reconcile(ctx, testObj)
assert.NoError(t, err)
_, err = r.pausePipeline(ctx, testObj)
assert.NoError(t, err)
v, err := r.findExistingVertices(ctx, testObj)
assert.NoError(t, err)
assert.Equal(t, int32(0), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas)
assert.NotNil(t, testObj.Annotations[dfv1.KeyPauseTimestamp])
testObj.Annotations[dfv1.KeyPauseTimestamp] = ""
_, err = r.resumePipeline(ctx, testObj)
assert.NoError(t, err)
v, err = r.findExistingVertices(ctx, testObj)
assert.NoError(t, err)
// when auto-scaling is enabled, while resuming the pipeline, instead of setting the replicas to Scale.Min,
// we set it to one and let auto-scaling to scale up
assert.Equal(t, int32(1), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas)
assert.NoError(t, err)
})

t.Run("test reduce pipeline", func(t *testing.T) {
cl := fake.NewClientBuilder().Build()
ctx := context.TODO()
testIsbSvc := testNativeRedisIsbSvc.DeepCopy()
testIsbSvc.Status.MarkConfigured()
testIsbSvc.Status.MarkDeployed()
err := cl.Create(ctx, testIsbSvc)
assert.Nil(t, err)
r := &pipelineReconciler{
client: cl,
scheme: scheme.Scheme,
config: fakeConfig,
image: testFlowImage,
logger: zaptest.NewLogger(t).Sugar(),
recorder: record.NewFakeRecorder(64),
}
testObj := testReducePipeline.DeepCopy()
_, err = r.reconcile(ctx, testObj)
assert.NoError(t, err)
_, err = r.pausePipeline(ctx, testObj)
assert.NoError(t, err)
_, err = r.findExistingVertices(ctx, testObj)
assert.NoError(t, err)
assert.NotNil(t, testObj.Annotations[dfv1.KeyPauseTimestamp])
testObj.Annotations[dfv1.KeyPauseTimestamp] = ""
_, err = r.resumePipeline(ctx, testObj)
assert.NoError(t, err)
v, err := r.findExistingVertices(ctx, testObj)
assert.NoError(t, err)
// reduce UDFs are not autoscalable thus they are scaled manually back to their partition count
assert.Equal(t, int32(2), *v[testObj.Name+"-"+testObj.Spec.Vertices[2].Name].Spec.Replicas)
assert.NoError(t, err)
})
}

func Test_copyVertexLimits(t *testing.T) {
Expand Down

0 comments on commit 9954f84

Please sign in to comment.