diff --git a/internal/scheduler/graph.go b/internal/scheduler/graph.go index d38969e2..46173310 100644 --- a/internal/scheduler/graph.go +++ b/internal/scheduler/graph.go @@ -132,37 +132,60 @@ func (g *ExecutionGraph) setupRetry() error { func (g *ExecutionGraph) setup() error { for _, node := range g.nodes { for _, dep := range node.Depends { - dep_step, err := g.findStep(dep) - if err != nil { - return err - } - err = g.addEdge(dep_step, node) + depStep, err := g.findStep(dep) if err != nil { return err } + g.addEdge(depStep, node) } } + + if g.hasCycle() { + return fmt.Errorf("cycle detected") + } + return nil } -func (g *ExecutionGraph) addEdge(from, to *Node) error { - g.from[from.id] = append(g.from[from.id], to.id) - g.to[to.id] = append(g.to[to.id], from.id) - return g.cycleDfs(to.id, make(map[int]bool)) -} +func (g *ExecutionGraph) hasCycle() bool { + var inDegrees = make(map[int]int) + for node, depends := range g.to { + inDegrees[node] = len(depends) + } -func (g *ExecutionGraph) cycleDfs(t int, visited map[int]bool) error { - if visited[t] { - return fmt.Errorf("cycle detected") + var q []int + for _, node := range g.nodes { + if inDegrees[node.id] != 0 { + continue + } + q = append(q, node.id) } - visited[t] = true - for _, next := range g.from[t] { - err := g.cycleDfs(next, visited) - if err != nil { - return err + + for len(q) > 0 { + var f = q[0] + q = q[1:] + + var tos = g.from[f] + for _, to := range tos { + inDegrees[to]-- + if inDegrees[to] == 0 { + q = append(q, to) + } } } - return nil + + for _, degree := range inDegrees { + if degree > 0 { + return true + } + } + + return false +} + +func (g *ExecutionGraph) addEdge(from, to *Node) { + g.from[from.id] = append(g.from[from.id], to.id) + g.to[to.id] = append(g.to[to.id], from.id) } func (g *ExecutionGraph) findStep(name string) (*Node, error) {