From cc460a17c97c9bd6cba6f2c51bb340e2e0ce789a Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Wed, 24 Jan 2024 15:28:07 -0800 Subject: [PATCH 1/3] feat: terminate reduce UDF pods when pausing pipeline Signed-off-by: Dillen Padhiar --- pkg/apis/numaflow/v1alpha1/vertex_types.go | 4 + .../numaflow/v1alpha1/vertex_types_test.go | 1 + pkg/reconciler/pipeline/controller.go | 9 +- pkg/reconciler/pipeline/controller_test.go | 106 ++++++++++++------ 4 files changed, 84 insertions(+), 36 deletions(-) diff --git a/pkg/apis/numaflow/v1alpha1/vertex_types.go b/pkg/apis/numaflow/v1alpha1/vertex_types.go index 719b4f309b..a950456a28 100644 --- a/pkg/apis/numaflow/v1alpha1/vertex_types.go +++ b/pkg/apis/numaflow/v1alpha1/vertex_types.go @@ -395,6 +395,10 @@ func (v Vertex) GetToBuffers() []string { func (v Vertex) GetReplicas() int { if v.IsReduceUDF() { + // Replicas will be 0 only when pausing a pipeline + if v.Spec.Replicas != nil && int(*v.Spec.Replicas) == 0 { + return 0 + } // Replica of a reduce vertex is determined by the partitions. return v.GetPartitionCount() } diff --git a/pkg/apis/numaflow/v1alpha1/vertex_types_test.go b/pkg/apis/numaflow/v1alpha1/vertex_types_test.go index 96232dd307..34f3c267be 100644 --- a/pkg/apis/numaflow/v1alpha1/vertex_types_test.go +++ b/pkg/apis/numaflow/v1alpha1/vertex_types_test.go @@ -143,6 +143,7 @@ func TestGetVertexReplicas(t *testing.T) { v.Spec.FromEdges = []CombinedEdge{ {Edge: Edge{From: "a", To: "b"}}, } + v.Spec.Replicas = pointer.Int32(5) assert.Equal(t, 1, v.GetReplicas()) v.Spec.Replicas = pointer.Int32(1000) assert.Equal(t, 1, v.GetReplicas()) diff --git a/pkg/reconciler/pipeline/controller.go b/pkg/reconciler/pipeline/controller.go index 7444198d47..47a317502b 100644 --- a/pkg/reconciler/pipeline/controller.go +++ b/pkg/reconciler/pipeline/controller.go @@ -844,8 +844,13 @@ func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline, scaleTo := replicas // if vtx does not support autoscaling and min is set, scale up to min if replicas == 1 { - if !vertex.Scalable() && vertex.Spec.Scale.Min != nil && *vertex.Spec.Scale.Min > 1 { - scaleTo = *vertex.Spec.Scale.Min + if !vertex.Scalable() { + // reduce UDF uses partition count to determine replica count + if vertex.IsReduceUDF() { + scaleTo = int32(vertex.GetPartitionCount()) + } else if vertex.Spec.Scale.Min != nil && *vertex.Spec.Scale.Min > 1 { + scaleTo = *vertex.Spec.Scale.Min + } } } vertex.Spec.Replicas = pointer.Int32(scaleTo) diff --git a/pkg/reconciler/pipeline/controller_test.go b/pkg/reconciler/pipeline/controller_test.go index 46d8752eab..e9f8f1a91c 100644 --- a/pkg/reconciler/pipeline/controller_test.go +++ b/pkg/reconciler/pipeline/controller_test.go @@ -223,40 +223,78 @@ func Test_buildReducesVertices(t *testing.T) { } func Test_pauseAndResumePipeline(t *testing.T) { - cl := fake.NewClientBuilder().Build() - ctx := context.TODO() - testIsbSvc := testNativeRedisIsbSvc.DeepCopy() - testIsbSvc.Status.MarkConfigured() - testIsbSvc.Status.MarkDeployed() - err := cl.Create(ctx, testIsbSvc) - assert.Nil(t, err) - r := &pipelineReconciler{ - client: cl, - scheme: scheme.Scheme, - config: fakeConfig, - image: testFlowImage, - logger: zaptest.NewLogger(t).Sugar(), - recorder: record.NewFakeRecorder(64), - } - testObj := testPipeline.DeepCopy() - testObj.Spec.Vertices[0].Scale.Min = pointer.Int32(3) - _, err = r.reconcile(ctx, testObj) - assert.NoError(t, err) - _, err = r.pausePipeline(ctx, testObj) - assert.NoError(t, err) - v, err := r.findExistingVertices(ctx, testObj) - assert.NoError(t, err) - assert.Equal(t, int32(0), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas) - assert.NotNil(t, testObj.Annotations[dfv1.KeyPauseTimestamp]) - testObj.Annotations[dfv1.KeyPauseTimestamp] = "" - _, err = r.resumePipeline(ctx, testObj) - assert.NoError(t, err) - v, err = r.findExistingVertices(ctx, testObj) - assert.NoError(t, err) - // when auto-scaling is enabled, while resuming the pipeline, instead of setting the replicas to Scale.Min, - // we set it to one and let auto-scaling to scale up - assert.Equal(t, int32(1), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas) - assert.NoError(t, err) + + t.Run("test normal pipeline", func(t *testing.T) { + cl := fake.NewClientBuilder().Build() + ctx := context.TODO() + testIsbSvc := testNativeRedisIsbSvc.DeepCopy() + testIsbSvc.Status.MarkConfigured() + testIsbSvc.Status.MarkDeployed() + err := cl.Create(ctx, testIsbSvc) + assert.Nil(t, err) + r := &pipelineReconciler{ + client: cl, + scheme: scheme.Scheme, + config: fakeConfig, + image: testFlowImage, + logger: zaptest.NewLogger(t).Sugar(), + recorder: record.NewFakeRecorder(64), + } + testObj := testPipeline.DeepCopy() + testObj.Spec.Vertices[0].Scale.Min = pointer.Int32(3) + _, err = r.reconcile(ctx, testObj) + assert.NoError(t, err) + _, err = r.pausePipeline(ctx, testObj) + assert.NoError(t, err) + v, err := r.findExistingVertices(ctx, testObj) + assert.NoError(t, err) + assert.Equal(t, int32(0), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas) + assert.NotNil(t, testObj.Annotations[dfv1.KeyPauseTimestamp]) + testObj.Annotations[dfv1.KeyPauseTimestamp] = "" + _, err = r.resumePipeline(ctx, testObj) + assert.NoError(t, err) + v, err = r.findExistingVertices(ctx, testObj) + assert.NoError(t, err) + // when auto-scaling is enabled, while resuming the pipeline, instead of setting the replicas to Scale.Min, + // we set it to one and let auto-scaling to scale up + assert.Equal(t, int32(1), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas) + assert.NoError(t, err) + }) + + t.Run("test reduce pipeline", func(t *testing.T) { + cl := fake.NewClientBuilder().Build() + ctx := context.TODO() + testIsbSvc := testNativeRedisIsbSvc.DeepCopy() + testIsbSvc.Status.MarkConfigured() + testIsbSvc.Status.MarkDeployed() + err := cl.Create(ctx, testIsbSvc) + assert.Nil(t, err) + r := &pipelineReconciler{ + client: cl, + scheme: scheme.Scheme, + config: fakeConfig, + image: testFlowImage, + logger: zaptest.NewLogger(t).Sugar(), + recorder: record.NewFakeRecorder(64), + } + testObj := testReducePipeline.DeepCopy() + _, err = r.reconcile(ctx, testObj) + assert.NoError(t, err) + _, err = r.pausePipeline(ctx, testObj) + assert.NoError(t, err) + v, err := r.findExistingVertices(ctx, testObj) + assert.NoError(t, err) + assert.Equal(t, int32(0), *v[testObj.Name+"-"+testObj.Spec.Vertices[2].Name].Spec.Replicas) + assert.NotNil(t, testObj.Annotations[dfv1.KeyPauseTimestamp]) + testObj.Annotations[dfv1.KeyPauseTimestamp] = "" + _, err = r.resumePipeline(ctx, testObj) + assert.NoError(t, err) + v, err = r.findExistingVertices(ctx, testObj) + assert.NoError(t, err) + // reduce UDFs are not autoscalable thus they are scaled manually back to their partition count + assert.Equal(t, int32(2), *v[testObj.Name+"-"+testObj.Spec.Vertices[2].Name].Spec.Replicas) + assert.NoError(t, err) + }) } func Test_copyVertexLimits(t *testing.T) { From 69b965183586393e3b0c92d24e6ab849fb1091dc Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Thu, 25 Jan 2024 08:51:27 -0800 Subject: [PATCH 2/3] test: reduce test too fast to see changes Signed-off-by: Dillen Padhiar --- pkg/reconciler/pipeline/controller_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/reconciler/pipeline/controller_test.go b/pkg/reconciler/pipeline/controller_test.go index e9f8f1a91c..d0745a0d3e 100644 --- a/pkg/reconciler/pipeline/controller_test.go +++ b/pkg/reconciler/pipeline/controller_test.go @@ -282,14 +282,13 @@ func Test_pauseAndResumePipeline(t *testing.T) { assert.NoError(t, err) _, err = r.pausePipeline(ctx, testObj) assert.NoError(t, err) - v, err := r.findExistingVertices(ctx, testObj) + _, err = r.findExistingVertices(ctx, testObj) assert.NoError(t, err) - assert.Equal(t, int32(0), *v[testObj.Name+"-"+testObj.Spec.Vertices[2].Name].Spec.Replicas) assert.NotNil(t, testObj.Annotations[dfv1.KeyPauseTimestamp]) testObj.Annotations[dfv1.KeyPauseTimestamp] = "" _, err = r.resumePipeline(ctx, testObj) assert.NoError(t, err) - v, err = r.findExistingVertices(ctx, testObj) + v, err := r.findExistingVertices(ctx, testObj) assert.NoError(t, err) // reduce UDFs are not autoscalable thus they are scaled manually back to their partition count assert.Equal(t, int32(2), *v[testObj.Name+"-"+testObj.Spec.Vertices[2].Name].Spec.Replicas) From 0ebfaf5f52799590750744019010d1d9c774ae80 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Thu, 25 Jan 2024 13:01:04 -0800 Subject: [PATCH 3/3] style: rewrite comments Signed-off-by: Dillen Padhiar --- pkg/reconciler/pipeline/controller.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/reconciler/pipeline/controller.go b/pkg/reconciler/pipeline/controller.go index 47a317502b..e6c64ce83d 100644 --- a/pkg/reconciler/pipeline/controller.go +++ b/pkg/reconciler/pipeline/controller.go @@ -842,10 +842,12 @@ func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline, for _, vertex := range existingVertices { if origin := *vertex.Spec.Replicas; origin != replicas && filter(vertex) { scaleTo := replicas - // if vtx does not support autoscaling and min is set, scale up to min + // if replicas equals to 1, it means we are resuming a paused pipeline + // in this case, if a vertex doesn't support auto-scaling, we scale up based on the vertex's configuration: + // for a reducer, we scale up to the partition count + // for a non-reducer, if min is set, we scale up to min if replicas == 1 { if !vertex.Scalable() { - // reduce UDF uses partition count to determine replica count if vertex.IsReduceUDF() { scaleTo = int32(vertex.GetPartitionCount()) } else if vertex.Spec.Scale.Min != nil && *vertex.Spec.Scale.Min > 1 {