diff --git a/cmd/main.go b/cmd/main.go index cad5467..c21d421 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,16 +2,83 @@ package main import ( "context" + "crypto/tls" + "fmt" "net/http" + "os" "strconv" + "time" "github.com/sirupsen/logrus" "github.com/virtual-kubelet/virtual-kubelet/log" logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" slurm "github.com/intertwin-eu/interlink-slurm-plugin/pkg/slurm" + + "github.com/virtual-kubelet/virtual-kubelet/trace" + "github.com/virtual-kubelet/virtual-kubelet/trace/opentelemetry" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" ) +func initProvider(ctx context.Context) (func(context.Context) error, error) { + res, err := resource.New(ctx, + resource.WithAttributes( + // the service name used to display traces in backends + semconv.ServiceName("InterLink-SLURM-plugin"), + ), + ) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + otlpEndpoint := os.Getenv("TELEMETRY_ENDPOINT") + + if otlpEndpoint == "" { + otlpEndpoint = "localhost:4317" + } + + fmt.Println("TELEMETRY_ENDPOINT: ", otlpEndpoint) + + conn := &grpc.ClientConn{} + creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) + conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock()) + + if err != nil { + return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err) + } + + // Set up a trace exporter + traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn)) + if err != nil { + return nil, fmt.Errorf("failed to create trace exporter: %w", err) + } + + // Register the trace exporter with a TracerProvider, using a batch + // span processor to aggregate spans before export. + bsp := sdktrace.NewBatchSpanProcessor(traceExporter) + tracerProvider := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithResource(res), + sdktrace.WithSpanProcessor(bsp), + ) + otel.SetTracerProvider(tracerProvider) + + // set global propagator to tracecontext (the default is no-op). + otel.SetTextMapPropagator(propagation.TraceContext{}) + + return tracerProvider.Shutdown, nil +} + func main() { logger := logrus.StandardLogger() @@ -31,14 +98,32 @@ func main() { log.L = logruslogger.FromLogrus(logrus.NewEntry(logger)) JobIDs := make(map[string]*slurm.JidStruct) - Ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - log.G(Ctx).Debug("Debug level: " + strconv.FormatBool(slurmConfig.VerboseLogging)) + + if os.Getenv("ENABLE_TRACING") == "1" { + shutdown, err := initProvider(ctx) + if err != nil { + log.G(ctx).Fatal(err) + } + defer func() { + if err = shutdown(ctx); err != nil { + log.G(ctx).Fatal("failed to shutdown TracerProvider: %w", err) + } + }() + + log.G(ctx).Info("Tracer setup succeeded") + + // TODO: disable this through options + trace.T = opentelemetry.Adapter{} + } + + log.G(ctx).Debug("Debug level: " + strconv.FormatBool(slurmConfig.VerboseLogging)) SidecarAPIs := slurm.SidecarHandler{ Config: slurmConfig, JIDs: &JobIDs, - Ctx: Ctx, + Ctx: ctx, } mutex := http.NewServeMux() @@ -47,11 +132,11 @@ func main() { mutex.HandleFunc("/delete", SidecarAPIs.StopHandler) mutex.HandleFunc("/getLogs", SidecarAPIs.GetLogsHandler) - slurm.CreateDirectories(slurmConfig) - slurm.LoadJIDs(Ctx, slurmConfig, &JobIDs) + SidecarAPIs.CreateDirectories() + SidecarAPIs.LoadJIDs() err = http.ListenAndServe(":"+slurmConfig.Sidecarport, mutex) if err != nil { - log.G(Ctx).Fatal(err) + log.G(ctx).Fatal(err) } } diff --git a/go.mod b/go.mod index 9d98be6..4268423 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,8 @@ module github.com/intertwin-eu/interlink-slurm-plugin -go 1.21 +go 1.22 -toolchain go1.21.3 +toolchain go1.22.4 require ( github.com/alexellis/go-execute v0.6.0 @@ -10,6 +10,11 @@ require ( github.com/intertwin-eu/interlink v0.0.0-20240523154644-820ca4bd6fac github.com/sirupsen/logrus v1.9.3 github.com/virtual-kubelet/virtual-kubelet v1.11.0 + go.opentelemetry.io/otel v1.22.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 + go.opentelemetry.io/otel/sdk v1.22.0 + go.opentelemetry.io/otel/trace v1.22.0 + google.golang.org/grpc v1.59.0 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.29.1 k8s.io/apimachinery v0.29.1 @@ -17,9 +22,11 @@ require ( ) require ( + github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.20.0 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.22.4 // indirect @@ -28,12 +35,16 @@ require ( github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.3.1 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect + go.opentelemetry.io/otel/metric v1.22.0 // indirect + go.opentelemetry.io/proto/otlp v1.0.0 // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/oauth2 v0.16.0 // indirect golang.org/x/sys v0.17.0 // indirect @@ -41,6 +52,8 @@ require ( golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.3.0 // indirect google.golang.org/appengine v1.6.8 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index cf83c56..e788e19 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/alexellis/go-execute v0.6.0 h1:FVGoudJnWSObwf9qmehbvVuvhK6g1UpKOCBjS+OUXEA= github.com/alexellis/go-execute v0.6.0/go.mod h1:nlg2F6XdYydUm1xXQMMiuibQCV1mveybBkNWfdNznjk= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/containerd/containerd v1.7.6 h1:oNAVsnhPoy4BTPQivLgTzI9Oleml9l/+eYIDYXRCYo8= github.com/containerd/containerd v1.7.6/go.mod h1:SY6lrkkuJT40BVNO37tlYTSnKJnP5AXBc0fhx0q+TJ4= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -8,9 +10,12 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= github.com/go-openapi/jsonpointer v0.20.0 h1:ESKJdU9ASRfaPNOPRx12IUyA1vn3R9GiE3KYD14BXdQ= github.com/go-openapi/jsonpointer v0.20.0/go.mod h1:6PGzBjjIIumbLYysB73Klnms1mwnU4G3YHOECG3CedA= @@ -23,6 +28,8 @@ github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEe github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/glog v1.1.2 h1:DVjP2PbBOzHyzA+dn3WhHIq4NdVu3Q+pvivFICf/7fo= +github.com/golang/glog v1.1.2/go.mod h1:zR+okUeTbrL6EL3xHUDxZuEtGv04p5shwip1+mL/rLQ= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= @@ -40,6 +47,8 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJY github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= github.com/intertwin-eu/interlink v0.0.0-20240523154644-820ca4bd6fac h1:LdpDorMFbMkcIpO9F9LmvBaUteyWrqEKzDjuhcF4fAc= github.com/intertwin-eu/interlink v0.0.0-20240523154644-820ca4bd6fac/go.mod h1:BxkHXL7pr4PwdpGR685KgidM+r94N1W+S+cpc/o2MX8= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= @@ -68,6 +77,8 @@ github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4 github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o= github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg= github.com/onsi/gomega v1.29.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= @@ -91,6 +102,22 @@ github.com/virtual-kubelet/virtual-kubelet v1.11.0/go.mod h1:WQfPHbIlzfhMNYkh6hF github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.opentelemetry.io/otel v1.22.0 h1:xS7Ku+7yTFvDfDraDIJVpw7XPyuHlB9MCiqqX5mcJ6Y= +go.opentelemetry.io/otel v1.22.0/go.mod h1:eoV4iAi3Ea8LkAEI9+GFT44O6T/D0GWAVFyZVCC6pMI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 h1:Mne5On7VWdx7omSrSSZvM4Kw7cS7NQkOOmLcgscI51U= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0/go.mod h1:IPtUMKL4O3tH5y+iXVyAXqpAwMuzC1IrxVS81rummfE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 h1:3d+S281UTjM+AbF31XSOYn1qXn3BgIdWl8HNEpx08Jk= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0/go.mod h1:0+KuTDyKL4gjKCF75pHOX4wuzYDUZYfAQdSu43o+Z2I= +go.opentelemetry.io/otel/metric v1.22.0 h1:lypMQnGyJYeuYPhOM/bgjbFM6WE44W1/T45er4d8Hhg= +go.opentelemetry.io/otel/metric v1.22.0/go.mod h1:evJGjVpZv0mQ5QBRJoBF64yMuOf4xCWdXjK8pzFvliY= +go.opentelemetry.io/otel/sdk v1.22.0 h1:6coWHw9xw7EfClIC/+O31R8IY3/+EiRFHevmHafB2Gw= +go.opentelemetry.io/otel/sdk v1.22.0/go.mod h1:iu7luyVGYovrRpe2fmj3CVKouQNdTOkxtLzPvPz1DOc= +go.opentelemetry.io/otel/trace v1.22.0 h1:Hg6pPujv0XG9QaVbGOBVHunyuLcCC3jN7WEhPx83XD0= +go.opentelemetry.io/otel/trace v1.22.0/go.mod h1:RbbHXVqKES9QhzZq/fE5UnOSILqRt40a21sPw2He1xo= +go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= +go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -147,6 +174,14 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= +google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d h1:VBu5YqKPv6XiJ199exd8Br+Aetz+o08F+PLMnwJQHAY= +google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d/go.mod h1:yZTlhN0tQnXo3h00fuXNCxJdLdIdnVFVBaRJ5LWBbw4= +google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d h1:DoPTO70H+bcDXcd39vOqb2viZxgqeBeSGtZ55yZU4/Q= +google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d/go.mod h1:KjSP20unUpOx5kyQUFa7k4OJg0qeJ7DEZflGDu2p6Bk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= +google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= +google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= @@ -162,6 +197,8 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= +gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= k8s.io/api v0.29.1 h1:DAjwWX/9YT7NQD4INu49ROJuZAAAP/Ijki48GUPzxqw= k8s.io/api v0.29.1/go.mod h1:7Kl10vBRUXhnQQI8YR/R327zXC8eJ7887/+Ybta+RoQ= k8s.io/apimachinery v0.29.1 h1:KY4/E6km/wLBguvCZv8cKTeOwwOBqFNjwJIdMkMbbRc= diff --git a/pkg/slurm/Create.go b/pkg/slurm/Create.go index 50279d7..a55881b 100644 --- a/pkg/slurm/Create.go +++ b/pkg/slurm/Create.go @@ -6,22 +6,36 @@ import ( "io" "net/http" "os" + "strconv" "strings" + "time" "github.com/containerd/containerd/log" commonIL "github.com/intertwin-eu/interlink/pkg/interlink" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" ) // SubmitHandler generates and submits a SLURM batch script according to provided data. // 1 Pod = 1 Job. If a Pod has multiple containers, every container is a line with it's parameters in the SLURM script. func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) { + start := time.Now().UnixMicro() + tracer := otel.Tracer("interlink-API") + spanCtx, span := tracer.Start(h.Ctx, "CreateSLURM", trace.WithAttributes( + attribute.Int64("start.timestamp", start), + )) + defer span.End() + defer commonIL.SetDurationSpan(start, span) + log.G(h.Ctx).Info("Slurm Sidecar: received Submit call") statusCode := http.StatusOK bodyBytes, err := io.ReadAll(r.Body) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } @@ -34,9 +48,7 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) { err = json.Unmarshal(bodyBytes, &dataList) if err != nil { statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte("Some errors occurred while creating container. Check Slurm Sidecar's logs")) - log.G(h.Ctx).Error(err) + h.handleError(spanCtx, w, http.StatusGatewayTimeout, err) return } @@ -69,7 +81,7 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) { commstr1 := []string{"singularity", "exec", "--containall", "--nv", singularityMounts, singularityOptions} - envs := prepareEnvs(h.Ctx, container) + envs := prepareEnvs(spanCtx, container) image := "" CPULimit, _ := container.Resources.Limits.Cpu().AsInt64() @@ -87,13 +99,11 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) { resourceLimits.Memory += MemoryLimit } - mounts, err := prepareMounts(h.Ctx, h.Config, data, container, filesPath) + mounts, err := prepareMounts(spanCtx, h.Config, data, container, filesPath) log.G(h.Ctx).Debug(mounts) if err != nil { statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte("Error prepairing mounts. Check Slurm Sidecar's logs")) - log.G(h.Ctx).Error(err) + h.handleError(spanCtx, w, http.StatusGatewayTimeout, err) os.RemoveAll(filesPath) return } @@ -120,10 +130,24 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) { isInit = true } + span.SetAttributes( + attribute.String("job.container"+strconv.Itoa(i)+".name", container.Name), + attribute.Bool("job.container"+strconv.Itoa(i)+".isinit", isInit), + attribute.StringSlice("job.container"+strconv.Itoa(i)+".envs", envs), + attribute.String("job.container"+strconv.Itoa(i)+".image", image), + attribute.StringSlice("job.container"+strconv.Itoa(i)+".command", container.Command), + attribute.StringSlice("job.container"+strconv.Itoa(i)+".args", container.Args), + ) + singularity_command_pod = append(singularity_command_pod, SingularityCommand{singularityCommand: singularity_command, containerName: container.Name, containerArgs: container.Args, containerCommand: container.Command, isInitContainer: isInit}) } - path, err := produceSLURMScript(h.Ctx, h.Config, string(data.Pod.UID), filesPath, metadata, singularity_command_pod, resourceLimits) + span.SetAttributes( + attribute.Int64("job.limits.cpu", resourceLimits.CPU), + attribute.Int64("job.limits.memory", resourceLimits.Memory), + ) + + path, err := produceSLURMScript(spanCtx, h.Config, string(data.Pod.UID), filesPath, metadata, singularity_command_pod, resourceLimits) if err != nil { log.G(h.Ctx).Error(err) os.RemoveAll(filesPath) @@ -131,10 +155,9 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) { } out, err := SLURMBatchSubmit(h.Ctx, h.Config, path) if err != nil { + span.AddEvent("Failed to submit the SLURM Job") statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte("Error submitting Slurm script. Check Slurm Sidecar's logs")) - log.G(h.Ctx).Error(err) + h.handleError(spanCtx, w, http.StatusGatewayTimeout, err) os.RemoveAll(filesPath) return } @@ -142,28 +165,29 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) { jid, err := handleJidAndPodUid(h.Ctx, data.Pod, h.JIDs, out, filesPath) if err != nil { statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte("Error handling JID. Check Slurm Sidecar's logs")) - log.G(h.Ctx).Error(err) + h.handleError(spanCtx, w, http.StatusGatewayTimeout, err) os.RemoveAll(filesPath) - err = deleteContainer(h.Ctx, h.Config, string(data.Pod.UID), h.JIDs, filesPath) + err = deleteContainer(spanCtx, h.Config, string(data.Pod.UID), h.JIDs, filesPath) if err != nil { log.G(h.Ctx).Error(err) } return } + span.AddEvent("SLURM Job successfully submitted with ID " + jid) returnedJID = CreateStruct{PodUID: string(data.Pod.UID), PodJID: jid} returnedJIDBytes, err = json.Marshal(returnedJID) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } w.WriteHeader(statusCode) + commonIL.SetDurationSpan(start, span, commonIL.WithHTTPReturnCode(statusCode)) + if statusCode != http.StatusOK { w.Write([]byte("Some errors occurred while creating containers. Check Slurm Sidecar's logs")) } else { diff --git a/pkg/slurm/Delete.go b/pkg/slurm/Delete.go index f0a2b3c..4be046c 100644 --- a/pkg/slurm/Delete.go +++ b/pkg/slurm/Delete.go @@ -5,20 +5,34 @@ import ( "io" "net/http" "os" + "time" "github.com/containerd/containerd/log" + commonIL "github.com/intertwin-eu/interlink/pkg/interlink" v1 "k8s.io/api/core/v1" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" ) // StopHandler runs a scancel command, updating JIDs and cached statuses func (h *SidecarHandler) StopHandler(w http.ResponseWriter, r *http.Request) { + start := time.Now().UnixMicro() + tracer := otel.Tracer("interlink-API") + spanCtx, span := tracer.Start(h.Ctx, "DeleteSLURM", trace.WithAttributes( + attribute.Int64("start.timestamp", start), + )) + defer span.End() + defer commonIL.SetDurationSpan(start, span) + log.G(h.Ctx).Info("Slurm Sidecar: received Stop call") statusCode := http.StatusOK bodyBytes, err := io.ReadAll(r.Body) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } @@ -26,32 +40,34 @@ func (h *SidecarHandler) StopHandler(w http.ResponseWriter, r *http.Request) { err = json.Unmarshal(bodyBytes, &pod) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } filesPath := h.Config.DataRootFolder + pod.Namespace + "-" + string(pod.UID) - err = deleteContainer(h.Ctx, h.Config, string(pod.UID), h.JIDs, filesPath) + err = deleteContainer(spanCtx, h.Config, string(pod.UID), h.JIDs, filesPath) + if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } if os.Getenv("SHARED_FS") != "true" { err = os.RemoveAll(filesPath) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } } + commonIL.SetDurationSpan(start, span, commonIL.WithHTTPReturnCode(statusCode)) + w.WriteHeader(statusCode) if statusCode != http.StatusOK { w.Write([]byte("Some errors occurred deleting containers. Check Slurm Sidecar's logs")) } else { - w.Write([]byte("All containers for submitted Pods have been deleted")) } } diff --git a/pkg/slurm/GetLogs.go b/pkg/slurm/GetLogs.go index 7a65520..dc92a50 100644 --- a/pkg/slurm/GetLogs.go +++ b/pkg/slurm/GetLogs.go @@ -11,11 +11,23 @@ import ( "github.com/containerd/containerd/log" commonIL "github.com/intertwin-eu/interlink/pkg/interlink" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" ) // GetLogsHandler reads Jobs' output file to return what's logged inside. // What's returned is based on the provided parameters (Tail/LimitBytes/Timestamps/etc) func (h *SidecarHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request) { + start := time.Now().UnixMicro() + tracer := otel.Tracer("interlink-API") + spanCtx, span := tracer.Start(h.Ctx, "GetLogsSLURM", trace.WithAttributes( + attribute.Int64("start.timestamp", start), + )) + defer span.End() + defer commonIL.SetDurationSpan(start, span) + log.G(h.Ctx).Info("Docker Sidecar: received GetLogs call") var req commonIL.LogStruct statusCode := http.StatusOK @@ -24,21 +36,33 @@ func (h *SidecarHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request) bodyBytes, err := io.ReadAll(r.Body) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } err = json.Unmarshal(bodyBytes, &req) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } + span.SetAttributes( + attribute.String("pod.name", req.PodName), + attribute.String("pod.namespace", req.Namespace), + attribute.Int("opts.limitbytes", req.Opts.LimitBytes), + attribute.Int("opts.since", req.Opts.SinceSeconds), + attribute.Int64("opts.sincetime", req.Opts.SinceTime.UnixMicro()), + attribute.Int("opts.tail", req.Opts.Tail), + attribute.Bool("opts.follow", req.Opts.Follow), + attribute.Bool("opts.previous", req.Opts.Previous), + attribute.Bool("opts.timestamps", req.Opts.Timestamps), + ) + path := h.Config.DataRootFolder + req.Namespace + "-" + req.PodUID var output []byte if req.Opts.Timestamps { - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } else { log.G(h.Ctx).Info("Reading " + path + "/" + req.ContainerName + ".out") @@ -52,7 +76,8 @@ func (h *SidecarHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request) } if err1 != nil && err2 != nil { - h.handleError(w, statusCode, err) + span.AddEvent("Error retrieving logs") + h.handleError(spanCtx, w, statusCode, err) return } @@ -115,6 +140,8 @@ func (h *SidecarHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request) } } + commonIL.SetDurationSpan(start, span, commonIL.WithHTTPReturnCode(statusCode)) + if statusCode != http.StatusOK { w.Write([]byte("Some errors occurred while checking container status. Check Docker Sidecar's logs")) } else { diff --git a/pkg/slurm/Status.go b/pkg/slurm/Status.go index f29dea1..f8be0d9 100644 --- a/pkg/slurm/Status.go +++ b/pkg/slurm/Status.go @@ -18,10 +18,22 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" commonIL "github.com/intertwin-eu/interlink/pkg/interlink" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" ) // StatusHandler performs a squeue --me and uses regular expressions to get the running Jobs' status func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { + start := time.Now().UnixMicro() + tracer := otel.Tracer("interlink-API") + spanCtx, span := tracer.Start(h.Ctx, "GetLogsSLURM", trace.WithAttributes( + attribute.Int64("start.timestamp", start), + )) + defer span.End() + defer commonIL.SetDurationSpan(start, span) + var req []*v1.Pod var resp []commonIL.PodStatus statusCode := http.StatusOK @@ -31,14 +43,14 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { bodyBytes, err := io.ReadAll(r.Body) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } err = json.Unmarshal(bodyBytes, &req) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } @@ -54,7 +66,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { if execReturn.Stderr != "" { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, errors.New("unable to retrieve job status: "+execReturn.Stderr)) + h.handleError(spanCtx, w, statusCode, errors.New("unable to retrieve job status: "+execReturn.Stderr)) return } @@ -63,7 +75,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { uid := string(pod.UID) path := h.Config.DataRootFolder + pod.Namespace + "-" + string(pod.UID) - if checkIfJidExists((h.JIDs), uid) { + if checkIfJidExists(spanCtx, (h.JIDs), uid) { cmd := []string{"--noheader", "-a", "-j " + (*h.JIDs)[uid].JID} shell := exec.ExecTask{ Command: h.Config.Squeuepath, @@ -76,13 +88,14 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { //log.G(h.Ctx).Info("Pod: " + jid.PodUID + " | JID: " + jid.JID) if execReturn.Stderr != "" { + span.AddEvent("squeue returned error " + execReturn.Stderr + " for Job " + (*h.JIDs)[uid].JID + ".\nGetting status from files") log.G(h.Ctx).Error("ERR: ", execReturn.Stderr) for _, ct := range pod.Spec.Containers { log.G(h.Ctx).Info("Getting exit status from " + path + "/" + ct.Name + ".status") file, err := os.Open(path + "/" + ct.Name + ".status") if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, fmt.Errorf("unable to retrieve container status: %s", err)) + h.handleError(spanCtx, w, statusCode, fmt.Errorf("unable to retrieve container status: %s", err)) log.G(h.Ctx).Error() return } @@ -90,7 +103,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { statusb, err := io.ReadAll(file) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, fmt.Errorf("unable to read container status: %s", err)) + h.handleError(spanCtx, w, statusCode, fmt.Errorf("unable to read container status: %s", err)) log.G(h.Ctx).Error() return } @@ -98,7 +111,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { status, err := strconv.Atoi(strings.Replace(string(statusb), "\n", "", -1)) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, fmt.Errorf("unable to convert container status: %s", err)) + h.handleError(spanCtx, w, statusCode, fmt.Errorf("unable to convert container status: %s", err)) log.G(h.Ctx).Error() status = 500 } @@ -133,7 +146,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f, err := os.Create(path + "/FinishedAt.time") if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } f.WriteString((*h.JIDs)[uid].EndTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) @@ -154,7 +167,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f, err := os.Create(path + "/StartedAt.time") if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } f.WriteString((*h.JIDs)[uid].StartTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) @@ -170,7 +183,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f, err := os.Create(path + "/FinishedAt.time") if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } f.WriteString((*h.JIDs)[uid].EndTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) @@ -197,7 +210,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f, err := os.Create(path + "/FinishedAt.time") if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } f.WriteString((*h.JIDs)[uid].EndTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) @@ -218,7 +231,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f, err := os.Create(path + "/StartedAt.time") if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } f.WriteString((*h.JIDs)[uid].StartTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) @@ -240,7 +253,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f, err := os.Create(path + "/FinishedAt.time") if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } f.WriteString((*h.JIDs)[uid].EndTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) @@ -261,7 +274,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f, err := os.Create(path + "/FinishedAt.time") if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } f.WriteString((*h.JIDs)[uid].EndTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) @@ -302,7 +315,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { } else { bodyBytes, err := json.Marshal(resp) if err != nil { - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } w.Write(bodyBytes) diff --git a/pkg/slurm/func.go b/pkg/slurm/func.go index e4af7f0..34097c6 100644 --- a/pkg/slurm/func.go +++ b/pkg/slurm/func.go @@ -7,6 +7,7 @@ import ( "net/http" "os" + "go.opentelemetry.io/otel/trace" "k8s.io/client-go/kubernetes" "github.com/containerd/containerd/log" @@ -44,7 +45,7 @@ func NewSlurmConfig() (SlurmConfig, error) { } if _, err := os.Stat(path); err != nil { - log.G(context.Background()).Error("File " + path + " doesn't exist. You can set a custom path by exporting INTERLINKCONFIGPATH. Exiting...") + log.G(context.Background()).Error("File " + path + " doesn't exist. You can set a custom path by exporting SLURMCONFIGPATH. Exiting...") return SlurmConfig{}, err } @@ -95,7 +96,9 @@ func NewSlurmConfig() (SlurmConfig, error) { return SlurmConfigInst, nil } -func (h *SidecarHandler) handleError(w http.ResponseWriter, statusCode int, err error) { +func (h *SidecarHandler) handleError(ctx context.Context, w http.ResponseWriter, statusCode int, err error) { + span := trace.SpanFromContext(ctx) + span.AddEvent("An error occurred:" + err.Error()) w.WriteHeader(statusCode) w.Write([]byte("Some errors occurred while creating container. Check Slurm Sidecar's logs")) log.G(h.Ctx).Error(err) diff --git a/pkg/slurm/prepare.go b/pkg/slurm/prepare.go index 05c6989..aa165f0 100644 --- a/pkg/slurm/prepare.go +++ b/pkg/slurm/prepare.go @@ -21,6 +21,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" commonIL "github.com/intertwin-eu/interlink/pkg/interlink" + + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" ) type SidecarHandler struct { @@ -97,8 +100,8 @@ func parsingTimeFromString(Ctx context.Context, stringTime string, timestampForm } // CreateDirectories is just a function to be sure directories exists at runtime -func CreateDirectories(config SlurmConfig) error { - path := config.DataRootFolder +func (h *SidecarHandler) CreateDirectories() error { + path := h.Config.DataRootFolder if _, err := os.Stat(path); err != nil { if os.IsNotExist(err) { err = os.MkdirAll(path, os.ModePerm) @@ -113,19 +116,19 @@ func CreateDirectories(config SlurmConfig) error { // LoadJIDs loads Job IDs into the main JIDs struct from files in the root folder. // It's useful went down and needed to be restarded, but there were jobs running, for example. // Return only error in case of failure -func LoadJIDs(Ctx context.Context, config SlurmConfig, JIDs *map[string]*JidStruct) error { - path := config.DataRootFolder +func (h *SidecarHandler) LoadJIDs() error { + path := h.Config.DataRootFolder dir, err := os.Open(path) if err != nil { - log.G(Ctx).Error(err) + log.G(h.Ctx).Error(err) return err } defer dir.Close() entries, err := dir.ReadDir(0) if err != nil { - log.G(Ctx).Error(err) + log.G(h.Ctx).Error(err) return err } @@ -138,43 +141,43 @@ func LoadJIDs(Ctx context.Context, config SlurmConfig, JIDs *map[string]*JidStru JID, err := os.ReadFile(path + entry.Name() + "/" + "JobID.jid") if err != nil { - log.G(Ctx).Debug(err) + log.G(h.Ctx).Debug(err) continue } else { podUID, err = os.ReadFile(path + entry.Name() + "/" + "PodUID.uid") if err != nil { - log.G(Ctx).Debug(err) + log.G(h.Ctx).Debug(err) continue } else { podNamespace, err = os.ReadFile(path + entry.Name() + "/" + "PodNamespace.ns") if err != nil { - log.G(Ctx).Debug(err) + log.G(h.Ctx).Debug(err) continue } } StartedAtString, err := os.ReadFile(path + entry.Name() + "/" + "StartedAt.time") if err != nil { - log.G(Ctx).Debug(err) + log.G(h.Ctx).Debug(err) } else { - StartedAt, err = parsingTimeFromString(Ctx, string(StartedAtString), "2006-01-02 15:04:05.999999999 -0700 MST") + StartedAt, err = parsingTimeFromString(h.Ctx, string(StartedAtString), "2006-01-02 15:04:05.999999999 -0700 MST") if err != nil { - log.G(Ctx).Debug(err) + log.G(h.Ctx).Debug(err) } } } FinishedAtString, err := os.ReadFile(path + entry.Name() + "/" + "FinishedAt.time") if err != nil { - log.G(Ctx).Debug(err) + log.G(h.Ctx).Debug(err) } else { - FinishedAt, err = parsingTimeFromString(Ctx, string(FinishedAtString), "2006-01-02 15:04:05.999999999 -0700 MST") + FinishedAt, err = parsingTimeFromString(h.Ctx, string(FinishedAtString), "2006-01-02 15:04:05.999999999 -0700 MST") if err != nil { - log.G(Ctx).Debug(err) + log.G(h.Ctx).Debug(err) } } JIDEntry := JidStruct{PodUID: string(podUID), PodNamespace: string(podNamespace), JID: string(JID), StartTime: StartedAt, EndTime: FinishedAt} - (*JIDs)[string(podUID)] = &JIDEntry + (*h.JIDs)[string(podUID)] = &JIDEntry } } @@ -184,10 +187,14 @@ func LoadJIDs(Ctx context.Context, config SlurmConfig, JIDs *map[string]*JidStru // prepareEnvs reads all Environment variables from a container and append them to a slice of strings. // It returns the slice containing all envs in the form of key=value. func prepareEnvs(Ctx context.Context, container v1.Container) []string { + start := time.Now().UnixMicro() + span := trace.SpanFromContext(Ctx) + span.AddEvent("Preparing ENVs for container " + container.Name) + var envs []string + if len(container.Env) > 0 { log.G(Ctx).Info("-- Appending envs") - env := make([]string, 1) - env = append(env, "--env") + envs = append(envs, "--env") env_data := "" for _, envVar := range container.Env { tmp := (envVar.Name + "=" + envVar.Value + ",") @@ -197,14 +204,17 @@ func prepareEnvs(Ctx context.Context, container v1.Container) []string { env_data = env_data[:last] } if env_data == "" { - env = []string{} + envs = []string{} } - env = append(env, env_data) - - return env - } else { - return []string{} + envs = append(envs, env_data) } + + duration := time.Now().UnixMicro() - start + span.AddEvent("Prepared ENVs for container "+container.Name, trace.WithAttributes( + attribute.String("prepareenvs.container.name", container.Name), + attribute.Int64("prepareenvs.duration", duration), + attribute.StringSlice("prepareenvs.container.envs", envs))) + return envs } // prepareMounts iterates along the struct provided in the data parameter and checks for ConfigMaps, Secrets and EmptyDirs to be mounted. @@ -220,6 +230,11 @@ func prepareMounts( container v1.Container, workingPath string, ) (string, error) { + span := trace.SpanFromContext(Ctx) + start := time.Now().UnixMicro() + log.G(Ctx).Info(span) + span.AddEvent("Preparing Mounts for container " + container.Name) + log.G(Ctx).Info("-- Preparing mountpoints for " + container.Name) mountedData := "" @@ -307,6 +322,12 @@ func prepareMounts( } log.G(Ctx).Debug(mountedData) + duration := time.Now().UnixMicro() - start + span.AddEvent("Prepared mounts for container "+container.Name, trace.WithAttributes( + attribute.String("peparemounts.container.name", container.Name), + attribute.Int64("preparemounts.duration", duration), + attribute.String("preparemounts.container.mounts", mountedData))) + return mountedData, nil } @@ -323,6 +344,10 @@ func produceSLURMScript( commands []SingularityCommand, resourceLimits ResourceLimits, ) (string, error) { + start := time.Now().UnixMicro() + span := trace.SpanFromContext(Ctx) + span.AddEvent("Producing SLURM script") + log.G(Ctx).Info("-- Creating file for the Slurm script") prefix = "" err := os.MkdirAll(path, os.ModePerm) @@ -489,6 +514,12 @@ func produceSLURMScript( log.G(Ctx).Debug("---- Written file") } + duration := time.Now().UnixMicro() - start + span.AddEvent("Produced SLURM script", trace.WithAttributes( + attribute.String("produceslurmscript.path", f.Name()), + attribute.Int64("preparemounts.duration", duration), + )) + return f.Name(), nil } @@ -584,7 +615,8 @@ func removeJID(podUID string, JIDs *map[string]*JidStruct) { // Returns the first encountered error. func deleteContainer(Ctx context.Context, config SlurmConfig, podUID string, JIDs *map[string]*JidStruct, path string) error { log.G(Ctx).Info("- Deleting Job for pod " + podUID) - if checkIfJidExists(JIDs, podUID) { + span := trace.SpanFromContext(Ctx) + if checkIfJidExists(Ctx, JIDs, podUID) { _, err := exec.Command(config.Scancelpath, (*JIDs)[podUID].JID).Output() if err != nil { log.G(Ctx).Error(err) @@ -594,11 +626,21 @@ func deleteContainer(Ctx context.Context, config SlurmConfig, podUID string, JID } } err := os.RemoveAll(path) + jid := (*JIDs)[podUID].JID removeJID(podUID, JIDs) + + span.SetAttributes( + attribute.String("delete.pod.uid", podUID), + attribute.String("delete.jid", jid), + ) + if err != nil { log.G(Ctx).Error(err) - return err + span.AddEvent("Failed to delete SLURM Job " + (*JIDs)[podUID].JID + " for Pod " + podUID) + } else { + span.AddEvent("SLURM Job " + jid + " for Pod " + podUID + " successfully deleted") } + return err } @@ -608,10 +650,13 @@ func deleteContainer(Ctx context.Context, config SlurmConfig, podUID string, JID // to create the files inside the container. // It also returns the first encountered error. func mountData(Ctx context.Context, config SlurmConfig, pod v1.Pod, container v1.Container, data interface{}, path string) ([]string, string, error) { + span := trace.SpanFromContext(Ctx) + start := time.Now().UnixMicro() if config.ExportPodData { for _, mountSpec := range container.VolumeMounts { switch mount := data.(type) { case v1.ConfigMap: + span.AddEvent("Preparing ConfigMap mount") for _, vol := range pod.Spec.Volumes { if vol.ConfigMap != nil && vol.Name == mountSpec.Name && mount.Name == vol.ConfigMap.Name { configMaps := make(map[string]string) @@ -694,12 +739,18 @@ func mountData(Ctx context.Context, config SlurmConfig, pod v1.Pod, container v1 } } } + duration := time.Now().UnixMicro() - start + span.AddEvent("Prepared ConfigMap mounts", trace.WithAttributes( + attribute.String("mountdata.container.name", container.Name), + attribute.Int64("mountdata.duration", duration), + attribute.StringSlice("mountdata.container.configmaps", configMapNamePath))) return configMapNamePath, env, nil } } //} case v1.Secret: + span.AddEvent("Preparing ConfigMap mount") for _, vol := range pod.Spec.Volumes { if vol.Secret != nil && vol.Name == mountSpec.Name && mount.Name == vol.Secret.SecretName { secrets := make(map[string][]byte) @@ -784,12 +835,18 @@ func mountData(Ctx context.Context, config SlurmConfig, pod v1.Pod, container v1 } } } + duration := time.Now().UnixMicro() - start + span.AddEvent("Prepared Secrets mounts", trace.WithAttributes( + attribute.String("mountdata.container.name", container.Name), + attribute.Int64("mountdata.duration", duration), + attribute.StringSlice("mountdata.container.secrets", secretNamePath))) return secretNamePath, env, nil } } //} case string: + span.AddEvent("Preparing EmptyDirs mount") var edPaths []string for _, vol := range pod.Spec.Volumes { for _, mountSpec := range container.VolumeMounts { @@ -823,6 +880,11 @@ func mountData(Ctx context.Context, config SlurmConfig, pod v1.Pod, container v1 } } } + duration := time.Now().UnixMicro() - start + span.AddEvent("Prepared Secrets mounts", trace.WithAttributes( + attribute.String("mountdata.container.name", container.Name), + attribute.Int64("mountdata.duration", duration), + attribute.StringSlice("mountdata.container.emptydirs", edPaths))) return edPaths, "", nil } } @@ -831,12 +893,14 @@ func mountData(Ctx context.Context, config SlurmConfig, pod v1.Pod, container v1 } // checkIfJidExists checks if a JID is in the main JIDs struct -func checkIfJidExists(JIDs *map[string]*JidStruct, uid string) bool { +func checkIfJidExists(ctx context.Context, JIDs *map[string]*JidStruct, uid string) bool { + span := trace.SpanFromContext(ctx) _, ok := (*JIDs)[uid] if ok { return true } else { + span.AddEvent("Span for PodUID " + uid + " doesn't exist") return false } }