Skip to content

Commit

Permalink
solver: move scheduler debug statements to their own functions
Browse files Browse the repository at this point in the history
Move scheduler debug statements to their own functions and in their own
file so that scheduler debug statements don't make the scheduler logic
more complex to follow. Some of the scheduler logs can be quite long and
can make it difficult to follow the code logic. This changes these log
statements to `debugSchedulerXXX` where `XXX` is the message that would
be printed.

Signed-off-by: Jonathan A. Sternberg <[email protected]>
  • Loading branch information
jsternberg committed Sep 18, 2024
1 parent 1e0f685 commit 2f18b83
Show file tree
Hide file tree
Showing 3 changed files with 331 additions and 245 deletions.
312 changes: 312 additions & 0 deletions solver/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
package solver

import (
"context"
"os"
"strings"
"sync"

"github.com/moby/buildkit/solver/internal/pipe"
"github.com/moby/buildkit/util/bklog"
"github.com/tonistiigi/go-csvvalue"
)

var (
debugScheduler = false // TODO: replace with logs in build trace
debugSchedulerSteps = sync.OnceValue(parseSchedulerDebugSteps)
)

func init() {
if os.Getenv("BUILDKIT_SCHEDULER_DEBUG") == "1" {
debugScheduler = true
}
}

func parseSchedulerDebugSteps() []string {
if s := os.Getenv("BUILDKIT_SCHEDULER_DEBUG_STEPS"); s != "" {
fields, err := csvvalue.Fields(s, nil)
if err != nil {
return nil
}
return fields
}
return nil
}

// debugSchedulerCheckEdge determines if this edge should be debugged
// depending on the set environment variables.
func debugSchedulerCheckEdge(e *edge) bool {
if debugScheduler {
return true
}

if steps := debugSchedulerSteps(); len(steps) > 0 {
withParents := strings.HasSuffix(steps[0], "^")
name := strings.TrimSuffix(steps[0], "^")
for _, v := range steps {
if strings.Contains(name, v) {
return true
}
}

if withParents {
for _, vtx := range e.edge.Vertex.Inputs() {
name := strings.TrimSuffix(vtx.Vertex.Name(), "^")
for _, v := range steps {
if strings.Contains(name, v) {
return true
}
}
}
}
}
return false
}

func debugSchedulerSkipMergeDueToDependency(e, origEdge *edge) {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("edge_index", e.edge.Index).
WithField("origEdge_vertex_name", origEdge.edge.Vertex.Name()).
WithField("origEdge_vertex_digest", origEdge.edge.Vertex.Digest()).
WithField("origEdge_index", origEdge.edge.Index).
Debug("skip merge due to dependency")
}

func debugSchedulerSwapMergeDueToOwner(e, origEdge *edge) {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("edge_index", e.edge.Index).
WithField("origEdge_vertex_name", origEdge.edge.Vertex.Name()).
WithField("origEdge_vertex_digest", origEdge.edge.Vertex.Digest()).
WithField("origEdge_index", origEdge.edge.Index).
Debug("swap merge due to owner")
}

func debugSchedulerMergingEdges(src, dest *edge) {
bklog.G(context.TODO()).
WithField("source_edge_vertex_name", src.edge.Vertex.Name()).
WithField("source_edge_vertex_digest", src.edge.Vertex.Digest()).
WithField("source_edge_index", src.edge.Index).
WithField("dest_vertex_name", dest.edge.Vertex.Name()).
WithField("dest_vertex_digest", dest.edge.Vertex.Digest()).
WithField("dest_index", dest.edge.Index).
Debug("merging edges")
}

func debugSchedulerMergingEdgesSkipped(src, dest *edge) {
bklog.G(context.TODO()).
WithField("source_edge_vertex_name", src.edge.Vertex.Name()).
WithField("source_edge_vertex_digest", src.edge.Vertex.Digest()).
WithField("source_edge_index", src.edge.Index).
WithField("dest_vertex_name", dest.edge.Vertex.Name()).
WithField("dest_vertex_digest", dest.edge.Vertex.Digest()).
WithField("dest_index", dest.edge.Index).
Debug("merging edges skipped")
}

func debugSchedulerPreUnpark(e *edge, inc []pipeSender, updates, allPipes []pipeReceiver) {
if e.debug {
debugSchedulerPreUnparkSlow(e, inc, updates, allPipes)
}
}

func debugSchedulerPreUnparkSlow(e *edge, inc []pipeSender, updates, allPipes []pipeReceiver) {
log := bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("edge_index", e.edge.Index)

log.
WithField("edge_state", e.state).
WithField("req", len(inc)).
WithField("upt", len(updates)).
WithField("out", len(allPipes)).
Debug(">> unpark")

for i, dep := range e.deps {
des := edgeStatusInitial
if dep.req != nil {
des = dep.req.Request().desiredState
}
log.
WithField("dep_index", i).
WithField("dep_vertex_name", e.edge.Vertex.Inputs()[i].Vertex.Name()).
WithField("dep_vertex_digest", e.edge.Vertex.Inputs()[i].Vertex.Digest()).
WithField("dep_state", dep.state).
WithField("dep_desired_state", des).
WithField("dep_keys", len(dep.keys)).
WithField("dep_has_slow_cache", e.slowCacheFunc(dep) != nil).
WithField("dep_preprocess_func", e.preprocessFunc(dep) != nil).
Debug(":: dep")
}

for i, in := range inc {
req := in.Request()
log.
WithField("incoming_index", i).
WithField("incoming_pointer", in).
WithField("incoming_desired_state", req.Payload.desiredState).
WithField("incoming_canceled", req.Canceled).
Debug("> incoming")
}

for i, up := range updates {
if up == e.cacheMapReq {
log.
WithField("update_index", i).
WithField("update_pointer", up).
WithField("update_complete", up.Status().Completed).
Debug("> update cacheMapReq")
} else if up == e.execReq {
log.
WithField("update_index", i).
WithField("update_pointer", up).
WithField("update_complete", up.Status().Completed).
Debug("> update execReq")
} else {
st, ok := up.Status().Value.(*edgeState)
if ok {
index := -1
if dep, ok := e.depRequests[up]; ok {
index = int(dep.index)
}
log.
WithField("update_index", i).
WithField("update_pointer", up).
WithField("update_complete", up.Status().Completed).
WithField("update_input_index", index).
WithField("update_keys", len(st.keys)).
WithField("update_state", st.state).
Debugf("> update edgeState")
} else {
log.
WithField("update_index", i).
Debug("> update unknown")
}
}
}
}

func debugSchedulerPostUnpark(e *edge, inc []pipeSender) {
if e.debug {
debugSchedulerPostUnparkSlow(e, inc)
}
}

func debugSchedulerPostUnparkSlow(e *edge, inc []pipeSender) {
log := bklog.G(context.TODO())
for i, in := range inc {
log.
WithField("incoming_index", i).
WithField("incoming_pointer", in).
WithField("incoming_complete", in.Status().Completed).
Debug("< incoming")
}
log.
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("edge_index", e.edge.Index).
WithField("edge_state", e.state).
Debug("<< unpark")
}

func debugSchedulerNewPipe(e *edge, p *pipe.Pipe[*edgeRequest, any], req *edgeRequest) {
if e.debug {
bklog.G(context.TODO()).Debugf("> newPipe %s %p desiredState=%s", e.edge.Vertex.Name(), p, req.desiredState)
}
}

func debugSchedulerNewFunc(e *edge, p pipeReceiver) {
if e.debug {
bklog.G(context.TODO()).Debugf("> newFunc %p", p)
}
}

func debugSchedulerInconsistentGraphState(ee Edge) {
bklog.G(context.TODO()).
WithField("edge_vertex_name", ee.Vertex.Name()).
WithField("edge_vertex_digest", ee.Vertex.Digest()).
WithField("edge_index", ee.Index).
Error("failed to get edge: inconsistent graph state")
}

func debugSchedulerFinishIncoming(e *edge, err error, req pipeSender) {
if e.debug {
bklog.G(context.TODO()).Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.desiredState)
}
}

func debugSchedulerUpdateIncoming(e *edge, req pipeSender) {
if e.debug {
bklog.G(context.TODO()).Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.desiredState)
}
}

func debugSchedulerUpgradeCacheSlow(e *edge) {
if e.debug {
bklog.G(context.TODO()).Debugf("upgrade to cache-slow because no open keys")
}
}

func debugSchedulerRespondToIncomingStatus(e *edge, allIncomingCanComplete bool) {
if e.debug {
bklog.G(context.TODO()).Debugf("status state=%s cancomplete=%v hasouts=%v noPossibleCache=%v depsCacheFast=%v keys=%d cacheRecords=%d", e.state, allIncomingCanComplete, e.hasActiveOutgoing, e.noCacheMatchPossible, e.allDepsCompletedCacheFast, len(e.keys), len(e.cacheRecords))
}
}

func debugSchedulerCancelInputRequest(e *edge, dep *dep, desiredStateDep edgeStatusType) {
if e.debug {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("dep_index", dep.index).
WithField("dep_req_desired_state", dep.req.Request().desiredState).
WithField("dep_desired_state", desiredStateDep).
WithField("dep_state", dep.state).
Debug("cancel input request")
}
}

func debugSchedulerSkipInputRequestBasedOnExistingRequest(e *edge, dep *dep, desiredStateDep edgeStatusType) {
if e.debug {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("dep_index", dep.index).
WithField("dep_req_desired_state", dep.req.Request().desiredState).
WithField("dep_desired_state", desiredStateDep).
WithField("dep_state", dep.state).
Debug("skip input request based on existing request")
}
}

func debugSchedulerAddInputRequest(e *edge, dep *dep, desiredStateDep edgeStatusType) {
if e.debug {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("dep_index", dep.index).
WithField("dep_desired_state", desiredStateDep).
WithField("dep_state", dep.state).
WithField("dep_vertex_name", e.edge.Vertex.Inputs()[dep.index].Vertex.Name()).
WithField("dep_vertex_digest", e.edge.Vertex.Inputs()[dep.index].Vertex.Digest()).
Debug("add input request")
}
}

func debugSchedulerSkipInputRequestBasedOnDepState(e *edge, dep *dep, desiredStateDep edgeStatusType) {
if e.debug {
bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("dep_index", dep.index).
WithField("dep_desired_state", desiredStateDep).
WithField("dep_state", dep.state).
WithField("dep_vertex_name", e.edge.Vertex.Inputs()[dep.index].Vertex.Name()).
WithField("dep_vertex_digest", e.edge.Vertex.Inputs()[dep.index].Vertex.Digest()).
Debug("skip input request based on dep state")
}
}
Loading

0 comments on commit 2f18b83

Please sign in to comment.