Skip to content

Commit

Permalink
fix: 修复 DAG 环路检测 (#427)
Browse files Browse the repository at this point in the history
  • Loading branch information
1005281342 authored Apr 1, 2023
1 parent b0a4fd8 commit f851bd0
Showing 1 changed file with 42 additions and 19 deletions.
61 changes: 42 additions & 19 deletions internal/scheduler/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,37 +132,60 @@ func (g *ExecutionGraph) setupRetry() error {
func (g *ExecutionGraph) setup() error {
for _, node := range g.nodes {
for _, dep := range node.Depends {
dep_step, err := g.findStep(dep)
if err != nil {
return err
}
err = g.addEdge(dep_step, node)
depStep, err := g.findStep(dep)
if err != nil {
return err
}
g.addEdge(depStep, node)
}
}

if g.hasCycle() {
return fmt.Errorf("cycle detected")
}

return nil
}

func (g *ExecutionGraph) addEdge(from, to *Node) error {
g.from[from.id] = append(g.from[from.id], to.id)
g.to[to.id] = append(g.to[to.id], from.id)
return g.cycleDfs(to.id, make(map[int]bool))
}
func (g *ExecutionGraph) hasCycle() bool {
var inDegrees = make(map[int]int)
for node, depends := range g.to {
inDegrees[node] = len(depends)
}

func (g *ExecutionGraph) cycleDfs(t int, visited map[int]bool) error {
if visited[t] {
return fmt.Errorf("cycle detected")
var q []int
for _, node := range g.nodes {
if inDegrees[node.id] != 0 {
continue
}
q = append(q, node.id)
}
visited[t] = true
for _, next := range g.from[t] {
err := g.cycleDfs(next, visited)
if err != nil {
return err

for len(q) > 0 {
var f = q[0]
q = q[1:]

var tos = g.from[f]
for _, to := range tos {
inDegrees[to]--
if inDegrees[to] == 0 {
q = append(q, to)
}
}
}
return nil

for _, degree := range inDegrees {
if degree > 0 {
return true
}
}

return false
}

func (g *ExecutionGraph) addEdge(from, to *Node) {
g.from[from.id] = append(g.from[from.id], to.id)
g.to[to.id] = append(g.to[to.id], from.id)
}

func (g *ExecutionGraph) findStep(name string) (*Node, error) {
Expand Down

0 comments on commit f851bd0

Please sign in to comment.