diff --git a/examples/10-cycle-to-prev.yaml b/examples/10-cycle-to-prev.yaml new file mode 100644 index 0000000000..8178e412c1 --- /dev/null +++ b/examples/10-cycle-to-prev.yaml @@ -0,0 +1,45 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: cycle-to-prev +spec: + vertices: + - name: in + source: + http: {} + - name: cat + udf: + builtin: + name: cat + - name: retry + scale: + min: 1 + disabled: true # don't scale this beyond one Pod since it doesn't make sense for this particular container, which uses in-memory storage + udf: + container: + # This will try each message up to 3 times before continuing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/retry + # (a more realistic example might retry only on failure) + image: quay.io/numaio/numaflow-go/map-retry:latest + - name: out + scale: + min: 1 + sink: + log: {} + edges: + - from: in + to: cat + - from: cat + to: retry + - from: retry + to: cat + conditions: + tags: + values: + - retry + - from: retry + to: out + conditions: + tags: + operator: not + values: + - retry \ No newline at end of file diff --git a/examples/10-cycle-to-self.yaml b/examples/10-cycle-to-self.yaml new file mode 100644 index 0000000000..7b75635c7e --- /dev/null +++ b/examples/10-cycle-to-self.yaml @@ -0,0 +1,39 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: cycle-to-self +spec: + vertices: + - name: in + source: + http: {} + - name: retry + scale: + min: 1 + disabled: true # don't scale this beyond one Pod since it doesn't make sense for this particular container, which uses in-memory storage + udf: + container: + # This will try each message up to 3 times before continuing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/retry + # (a more realistic example might retry only on failure) + image: quay.io/numaio/numaflow-go/map-retry:latest + - name: out + scale: + min: 1 + sink: + log: {} + edges: + - from: in + to: retry + - from: retry + to: retry + conditions: + tags: + values: + - retry + - from: retry + to: out + conditions: + tags: + operator: not + values: + - retry \ No newline at end of file diff --git a/pkg/apis/numaflow/v1alpha1/pipeline_types.go b/pkg/apis/numaflow/v1alpha1/pipeline_types.go index f59c4ff72a..3160441c92 100644 --- a/pkg/apis/numaflow/v1alpha1/pipeline_types.go +++ b/pkg/apis/numaflow/v1alpha1/pipeline_types.go @@ -448,6 +448,35 @@ type PipelineSpec struct { SideInputs []SideInput `json:"sideInputs,omitempty" protobuf:"bytes,8,rep,name=sideInputs"` } +func (pipeline PipelineSpec) GetMatchingVertices(f func(AbstractVertex) bool) map[string]*AbstractVertex { + mappedVertices := make(map[string]*AbstractVertex) + for i := range pipeline.Vertices { + v := &pipeline.Vertices[i] + if f(*v) { + mappedVertices[v.Name] = v + } + } + return mappedVertices +} + +func (pipeline PipelineSpec) GetVerticesByName() map[string]*AbstractVertex { + return pipeline.GetMatchingVertices(func(v AbstractVertex) bool { + return true + }) +} + +func (pipeline PipelineSpec) GetSourcesByName() map[string]*AbstractVertex { + return pipeline.GetMatchingVertices(func(v AbstractVertex) bool { + return v.IsASource() + }) +} + +func (pipeline PipelineSpec) GetSinksByName() map[string]*AbstractVertex { + return pipeline.GetMatchingVertices(func(v AbstractVertex) bool { + return v.IsASink() + }) +} + type Watermark struct { // Disabled toggles the watermark propagation, defaults to false. // +kubebuilder:default=false diff --git a/pkg/reconciler/pipeline/validate.go b/pkg/reconciler/pipeline/validate.go index 8b3592ef2d..a70108ee55 100644 --- a/pkg/reconciler/pipeline/validate.go +++ b/pkg/reconciler/pipeline/validate.go @@ -133,9 +133,6 @@ func ValidatePipeline(pl *dfv1.Pipeline) error { if e.From == "" || e.To == "" { return fmt.Errorf("invalid edge: both from and to need to be specified") } - if e.From == e.To { - return fmt.Errorf("invalid edge: same from and to") - } if !names[e.From] { return fmt.Errorf("invalid edge: no vertex named %q", e.From) } @@ -155,7 +152,12 @@ func ValidatePipeline(pl *dfv1.Pipeline) error { return fmt.Errorf("not all the vertex names are defined in edges") } - // TODO(Join): prevent pipelines with Cycles in the case that there is a Reduce Vertex at the point of the cycle or to the right of it + // Prevent pipelines with Cycles in the case that there is a Reduce Vertex at the point of the cycle or to the right of it. + // Whenever there's a cycle, there will inherently be "late data", and we don't want late data for a Reduce Vertex, which may + // have already "closed the book" on the data's time window. + if err := validateCycles(&pl.Spec); err != nil { + return err + } for _, v := range pl.Spec.Vertices { if err := validateVertex(v); err != nil { @@ -308,3 +310,145 @@ func isReservedContainerName(name string) bool { name == dfv1.CtrInitSideInputs || name == dfv1.CtrSideInputsWatcher } + +// validateCycles verifies that there are no invalid cycles in the pipeline. +// An invalid cycle has a Reduce Vertex at or to the right of the cycle. Whenever there's a cycle, +// there will inherently be "late data", and we don't want late data for a Reduce Vertex, which may +// have already "closed the book" on the data's time window. +func validateCycles(pipelineSpec *dfv1.PipelineSpec) error { + verticesByName := pipelineSpec.GetVerticesByName() + edges, err := toVerticesMappedByFrom(pipelineSpec.Edges, verticesByName) + if err != nil { + return err + } + + // first find the cycles, if any + cycles, err := getCycles(pipelineSpec) + if err != nil { + return err + } + // need to make sure none of the cycles have a Reduce Vertex at or to the right of the cycle + for cycleVertexName := range cycles { + cycleVertex, found := verticesByName[cycleVertexName] + if !found { + return fmt.Errorf("something went wrong: no Vertex found with name %q", cycleVertexName) + } + invalidReduce := edges.findVertex(cycleVertex, map[string]struct{}{}, func(v *dfv1.AbstractVertex) bool { + return v.IsReduceUDF() + }) + if invalidReduce { + return fmt.Errorf("there's a Reduce Vertex at or to the right of a Cycle occurring at Vertex %q", cycleVertexName) + } + } + + return nil +} + +// getCycles locates the vertices where there's a Cycle, if any +// eg. if A->B->A, then return A +// Since there are multiple Sources, and since each Source produces a Tree, then we can return multiple Cycles +func getCycles(pipelineSpec *dfv1.PipelineSpec) (map[string]struct{}, error) { + edges, err := toVerticesMappedByFrom(pipelineSpec.Edges, pipelineSpec.GetVerticesByName()) + if err != nil { + return nil, err + } + + sources := pipelineSpec.GetSourcesByName() + cycles := map[string]struct{}{} // essentially a Set of cycle Vertex names + + // consolidate the Cycles from all Sources + for _, sourceVertex := range sources { + cyclesFromSource := edges.getCyclesFromVertex(sourceVertex, map[string]struct{}{}) + for cycleVertex := range cyclesFromSource { + cycles[cycleVertex] = struct{}{} + } + } + + return cycles, nil +} + +// getCyclesFromVertex returns the cycles detected if any, starting from startVertex +// This is a recursive function. Each iteration we keep track of the visited Vertices in order to detect a cycle. +func (edges verticesByFrom) getCyclesFromVertex(startVertex *dfv1.AbstractVertex, visited map[string]struct{}) map[string]struct{} { + + toVertices, found := edges[startVertex.Name] + // base case: no Edges stem from this Vertex + if !found { + return map[string]struct{}{} + } + + // check for cycle + _, alreadyVisited := visited[startVertex.Name] + if alreadyVisited { + return map[string]struct{}{startVertex.Name: {}} + } + // add this Vertex to our Set + visited[startVertex.Name] = struct{}{} + + // recurse the Edges of this Vertex, looking for cycles + cyclesFound := make(map[string]struct{}) + for _, toVertex := range toVertices { + newCycles := edges.getCyclesFromVertex(toVertex, visited) + for cycleVertex := range newCycles { + cyclesFound[cycleVertex] = struct{}{} + } + } + + delete(visited, startVertex.Name) // pop + + return cyclesFound +} + +// findVertex determines if any Vertex starting from this one meets some condition +// This is a recursive function. Each iteration we keep track of the visited Vertices in order not to get in an infinite loop +func (edges verticesByFrom) findVertex(startVertex *dfv1.AbstractVertex, visited map[string]struct{}, f func(*dfv1.AbstractVertex) bool) bool { + + // first try the condition on this vertex + if f(startVertex) { + return true + } + + toVertices, found := edges[startVertex.Name] + // base case: no Edges stem from this Vertex + if !found { + return false + } + + // if we've arrived at a cycle, then stop + _, alreadyVisited := visited[startVertex.Name] + if alreadyVisited { + return false + } + // keep track of visited vertices so we don't get into an infinite loop + visited[startVertex.Name] = struct{}{} + + // recurse + for _, toVertex := range toVertices { + if edges.findVertex(toVertex, visited, f) { + return true + } + } + + delete(visited, startVertex.Name) // pop + + return false +} + +type verticesByFrom map[string][]*dfv1.AbstractVertex + +// toVerticesMappedByFrom is a helper function to create a map of "To Vertices" from their "From Vertex" +func toVerticesMappedByFrom(edges []dfv1.Edge, verticesByName map[string]*dfv1.AbstractVertex) (verticesByFrom, error) { + mappedEdges := make(verticesByFrom) + for _, edge := range edges { + _, found := mappedEdges[edge.From] + if !found { + mappedEdges[edge.From] = make([]*dfv1.AbstractVertex, 0) + } + toVertex, found := verticesByName[edge.To] + if !found { + return nil, fmt.Errorf("no vertex found of name %q", edge.To) + } + mappedEdges[edge.From] = append(mappedEdges[edge.From], toVertex) + } + return mappedEdges, nil +} diff --git a/pkg/reconciler/pipeline/validate_test.go b/pkg/reconciler/pipeline/validate_test.go index 198525d1ae..00f5e20b2d 100644 --- a/pkg/reconciler/pipeline/validate_test.go +++ b/pkg/reconciler/pipeline/validate_test.go @@ -306,16 +306,6 @@ func TestValidatePipeline(t *testing.T) { assert.Contains(t, err.Error(), "pipeline has no sink") }) - t.Run("same from and to", func(t *testing.T) { - testObj := testPipeline.DeepCopy() - testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "p1", To: "p1"}) - err := ValidatePipeline(testObj) - assert.Error(t, err) - assert.Contains(t, err.Error(), "same from and to") - }) - - // TODO(Join): we can test for certain types of invalid cycles here instead - t.Run("or conditional forwarding", func(t *testing.T) { testObj := testPipeline.DeepCopy() operatorOr := dfv1.LogicOperatorOr @@ -607,3 +597,225 @@ func Test_validateSideInputs(t *testing.T) { err = validateSideInputs(*testObj) assert.NoError(t, err) } + +func Test_getCyclesFromVertex(t *testing.T) { + tests := []struct { + name string + edges []dfv1.Edge + startVertex string + expectedCycleVertices map[string]struct{} + }{ + { + name: "NoCycle", + edges: []dfv1.Edge{ + {From: "A", To: "B"}, + {From: "B", To: "C"}, + {From: "B", To: "D"}, + }, + startVertex: "A", + expectedCycleVertices: map[string]struct{}{}, + }, + { + name: "CycleToSelf", + edges: []dfv1.Edge{ + {From: "A", To: "B"}, + {From: "B", To: "B"}, + {From: "B", To: "C"}, + }, + startVertex: "A", + expectedCycleVertices: map[string]struct{}{"B": {}}, + }, + { + name: "CycleBackward", + edges: []dfv1.Edge{ + {From: "A", To: "B"}, + {From: "B", To: "A"}, + {From: "B", To: "C"}, + }, + startVertex: "A", + expectedCycleVertices: map[string]struct{}{"A": {}}, + }, + { + name: "Complicated", + edges: []dfv1.Edge{ + {From: "A", To: "B"}, + {From: "B", To: "C"}, + {From: "B", To: "E"}, + {From: "A", To: "D"}, + {From: "D", To: "E"}, + {From: "E", To: "A"}, // this cycles + {From: "E", To: "F"}, + }, + startVertex: "A", + expectedCycleVertices: map[string]struct{}{"A": {}}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Logf("running test: %q\n", tt.name) + mappedEdges, err := toVerticesMappedByFrom(tt.edges, constructVerticesByName(tt.edges)) + assert.NoError(t, err) + + cyclesFound := mappedEdges.getCyclesFromVertex(&dfv1.AbstractVertex{Name: tt.startVertex}, make(map[string]struct{})) + + assert.Equal(t, len(tt.expectedCycleVertices), len(cyclesFound)) + for cycleFound := range cyclesFound { + assert.Contains(t, tt.expectedCycleVertices, cycleFound) + } + }) + } + +} + +func constructVerticesByName(edges []dfv1.Edge) map[string]*dfv1.AbstractVertex { + mappedVertices := make(map[string]*dfv1.AbstractVertex) + for _, edge := range edges { + mappedVertices[edge.From] = &dfv1.AbstractVertex{Name: edge.From} // fine if we see the same one twice and overwrite + mappedVertices[edge.To] = &dfv1.AbstractVertex{Name: edge.To} + } + return mappedVertices +} + +func Test_validateCycles(t *testing.T) { + tests := []struct { + name string + pipelineSpec *dfv1.PipelineSpec + success bool + }{ + { + name: "NoCycle", + pipelineSpec: &dfv1.PipelineSpec{ + Vertices: []dfv1.AbstractVertex{ + {Name: "A", Source: &dfv1.Source{}}, + {Name: "B", UDF: &dfv1.UDF{}}, + {Name: "C", UDF: &dfv1.UDF{}}, + {Name: "D", UDF: &dfv1.UDF{}}, + {Name: "E", UDF: &dfv1.UDF{}}, + {Name: "F", Source: &dfv1.Source{}}, + {Name: "G", UDF: &dfv1.UDF{}}, + }, + Edges: []dfv1.Edge{ + {From: "A", To: "B"}, + {From: "B", To: "C"}, + {From: "A", To: "D"}, + {From: "D", To: "E"}, + {From: "E", To: "B"}, + {From: "F", To: "G"}, + {From: "G", To: "D"}, + }, + }, + success: true, + }, + { + name: "CycleToSelf-NoReduce", + pipelineSpec: &dfv1.PipelineSpec{ + Vertices: []dfv1.AbstractVertex{ + {Name: "A", Source: &dfv1.Source{}}, + {Name: "B", UDF: &dfv1.UDF{GroupBy: &dfv1.GroupBy{}}}, //Reduce vertex + {Name: "C", UDF: &dfv1.UDF{}}, + {Name: "D", UDF: &dfv1.UDF{}}, + {Name: "E", UDF: &dfv1.UDF{}}, + {Name: "F", Source: &dfv1.Source{}}, + {Name: "G", UDF: &dfv1.UDF{}}, + }, + Edges: []dfv1.Edge{ + {From: "A", To: "B"}, + {From: "B", To: "C"}, + {From: "C", To: "C"}, // cycle to self + {From: "C", To: "E"}, + {From: "A", To: "D"}, + {From: "D", To: "E"}, + {From: "F", To: "G"}, + {From: "G", To: "D"}, + }, + }, + success: true, + }, + { + name: "CycleToSelf-CycleIsReduce", + pipelineSpec: &dfv1.PipelineSpec{ + Vertices: []dfv1.AbstractVertex{ + {Name: "A", Source: &dfv1.Source{}}, + {Name: "B", UDF: &dfv1.UDF{GroupBy: &dfv1.GroupBy{}}}, //Reduce vertex + {Name: "C", UDF: &dfv1.UDF{}}, + {Name: "D", UDF: &dfv1.UDF{}}, + {Name: "E", UDF: &dfv1.UDF{}}, + {Name: "F", Source: &dfv1.Source{}}, + {Name: "G", UDF: &dfv1.UDF{}}, + }, + Edges: []dfv1.Edge{ + {From: "A", To: "B"}, + {From: "B", To: "B"}, // cycle to self + {From: "B", To: "C"}, + {From: "A", To: "D"}, + {From: "D", To: "E"}, + {From: "F", To: "G"}, + {From: "G", To: "D"}, + }, + }, + success: false, + }, + { + name: "CycleToSelf-ReduceAhead", + pipelineSpec: &dfv1.PipelineSpec{ + Vertices: []dfv1.AbstractVertex{ + {Name: "A", Source: &dfv1.Source{}}, + {Name: "B", UDF: &dfv1.UDF{}}, + {Name: "C", UDF: &dfv1.UDF{GroupBy: &dfv1.GroupBy{}}}, //Reduce vertex + {Name: "D", UDF: &dfv1.UDF{}}, + {Name: "E", UDF: &dfv1.UDF{}}, + {Name: "F", Source: &dfv1.Source{}}, + {Name: "G", UDF: &dfv1.UDF{}}, + }, + Edges: []dfv1.Edge{ + {From: "A", To: "B"}, + {From: "B", To: "B"}, // cycle to self + {From: "B", To: "C"}, + {From: "A", To: "D"}, + {From: "D", To: "E"}, + {From: "F", To: "G"}, + {From: "G", To: "D"}, + }, + }, + success: false, + }, + { + name: "CycleBackward-ReduceAhead", + pipelineSpec: &dfv1.PipelineSpec{ + Vertices: []dfv1.AbstractVertex{ + {Name: "A", Source: &dfv1.Source{}}, + {Name: "B", UDF: &dfv1.UDF{}}, //Reduce vertex + {Name: "C", UDF: &dfv1.UDF{}}, + {Name: "D", UDF: &dfv1.UDF{}}, + {Name: "E", UDF: &dfv1.UDF{GroupBy: &dfv1.GroupBy{}}}, //Reduce vertex + {Name: "F", Source: &dfv1.Source{}}, + {Name: "G", UDF: &dfv1.UDF{}}, + {Name: "H", UDF: &dfv1.UDF{}}, + }, + Edges: []dfv1.Edge{ + {From: "A", To: "B"}, + {From: "B", To: "C"}, + {From: "A", To: "D"}, + {From: "D", To: "E"}, + {From: "F", To: "G"}, + {From: "G", To: "D"}, + {From: "D", To: "G"}, // cycle backward + {From: "E", To: "H"}, + }, + }, + success: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Logf("running test: %q\n", tt.name) + err := validateCycles(tt.pipelineSpec) + if tt.success { + assert.NoError(t, err) + } else { + assert.Error(t, err) + } + }) + } +} diff --git a/test/e2e-suite-2/functional_test.go b/test/e2e-suite-2/functional_test.go index 81652a36ab..ecc349eb0e 100644 --- a/test/e2e-suite-2/functional_test.go +++ b/test/e2e-suite-2/functional_test.go @@ -19,6 +19,7 @@ limitations under the License. package e2e import ( + "fmt" "strings" "testing" "time" @@ -92,6 +93,58 @@ func (s *FunctionalSuite) TestJoinSinkVertex() { w.Expect().SinkContains("out", "888889") } +func (s *FunctionalSuite) TestCycleToSelf() { + w := s.Given().Pipeline("@testdata/cycle-to-self.yaml"). + When(). + CreatePipelineAndWait() + defer w.DeletePipelineAndWait() + pipelineName := "cycle-to-self" + + // wait for all the pods to come up + w.Expect().VertexPodsRunning() + + msgs := [10]string{} + for i := 0; i < 10; i++ { + msgs[i] = fmt.Sprintf("msg-%d", i) + w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte(msgs[i]))) + } + for iteration := 1; iteration <= 3; iteration++ { + for i := 0; i < 10; i++ { + expectedString := fmt.Sprintf("count for \"msg-%d\"=%d", i, iteration) + w.Expect().VertexPodLogContains("retry", expectedString, PodLogCheckOptionWithContainer("udf")) + } + } + for i := 0; i < 10; i++ { + w.Expect().SinkContains("out", msgs[i]) + } + +} +func (s *FunctionalSuite) TestCycleBackward() { + w := s.Given().Pipeline("@testdata/cycle-backward.yaml"). + When(). + CreatePipelineAndWait() + defer w.DeletePipelineAndWait() + pipelineName := "cycle-backward" + + // wait for all the pods to come up + w.Expect().VertexPodsRunning() + + msgs := [10]string{} + for i := 0; i < 10; i++ { + msgs[i] = fmt.Sprintf("msg-%d", i) + w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte(msgs[i]))) + } + for iteration := 1; iteration <= 3; iteration++ { + for i := 0; i < 10; i++ { + expectedString := fmt.Sprintf("count for \"msg-%d\"=%d", i, iteration) + w.Expect().VertexPodLogContains("retry", expectedString, PodLogCheckOptionWithContainer("udf")) + } + } + for i := 0; i < 10; i++ { + w.Expect().SinkContains("out", msgs[i]) + } +} + func TestFunctionalSuite(t *testing.T) { suite.Run(t, new(FunctionalSuite)) } diff --git a/test/e2e-suite-2/testdata/cycle-backward.yaml b/test/e2e-suite-2/testdata/cycle-backward.yaml new file mode 100644 index 0000000000..79fa4c5e1a --- /dev/null +++ b/test/e2e-suite-2/testdata/cycle-backward.yaml @@ -0,0 +1,44 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: cycle-backward +spec: + vertices: + - name: in + source: + http: {} + - name: cat + udf: + builtin: + name: cat + - name: retry + scale: + disabled: true # don't scale this beyond one Pod since it doesn't make sense for this container + udf: + container: + # This will try each message up to 3 times before continuing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/retry + image: quay.io/numaio/numaflow-go/map-retry:latest + - name: out + sink: + udsink: + container: + # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-sinks/tree/main/redis-e2e-test-sink + image: quay.io/numaio/numaflow-sink/redis-e2e-test-sink:latest + edges: + - from: in + to: cat + - from: cat + to: retry + - from: retry + to: cat + conditions: + tags: + values: + - retry + - from: retry + to: out + conditions: + tags: + operator: not + values: + - retry \ No newline at end of file diff --git a/test/e2e-suite-2/testdata/cycle-to-self.yaml b/test/e2e-suite-2/testdata/cycle-to-self.yaml new file mode 100644 index 0000000000..8248027291 --- /dev/null +++ b/test/e2e-suite-2/testdata/cycle-to-self.yaml @@ -0,0 +1,38 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: cycle-to-self +spec: + vertices: + - name: in + source: + http: {} + - name: retry + scale: + disabled: true # don't scale this beyond one Pod since it doesn't make sense for this container + udf: + container: + # This will try each message up to 3 times before continuing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/retry + image: quay.io/numaio/numaflow-go/map-retry:latest + - name: out + sink: + udsink: + container: + # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-sinks/tree/main/redis-e2e-test-sink + image: quay.io/numaio/numaflow-sink/redis-e2e-test-sink:latest + edges: + - from: in + to: retry + - from: retry + to: retry + conditions: + tags: + values: + - retry + - from: retry + to: out + conditions: + tags: + operator: not + values: + - retry \ No newline at end of file