From 627c41c051258783ffff96b6f5d8dabea3c81d19 Mon Sep 17 00:00:00 2001 From: Surax98 Date: Tue, 23 Jul 2024 09:04:24 +0000 Subject: [PATCH] first otel implementation on VK only Signed-off-by: Surax98 --- cmd/virtual-kubelet/main.go | 51 +++++--- pkg/virtualkubelet/execute.go | 181 ++++++++++++++++++++------- pkg/virtualkubelet/virtualkubelet.go | 140 +++++++++++++-------- 3 files changed, 258 insertions(+), 114 deletions(-) diff --git a/cmd/virtual-kubelet/main.go b/cmd/virtual-kubelet/main.go index fa1c2b14..79d4496e 100644 --- a/cmd/virtual-kubelet/main.go +++ b/cmd/virtual-kubelet/main.go @@ -18,8 +18,10 @@ package main import ( "context" "crypto/tls" + "crypto/x509" "flag" "fmt" + "io/ioutil" "net" "os" "path" @@ -28,7 +30,7 @@ import ( // "k8s.io/client-go/rest" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/credentials" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/kubernetes/scheme" @@ -106,13 +108,6 @@ func initProvider() (func(context.Context) error, error) { return nil, fmt.Errorf("failed to create resource: %w", err) } - // TODO: disable is telemetry is disabled - - // If the OpenTelemetry Collector is running on a local cluster (minikube or - // microk8s), it should be accessible through the NodePort service at the - // `localhost:30080` endpoint. Otherwise, replace `localhost` with the - // endpoint of your cluster. If you run the app inside k8s, then you can - // probably connect directly to the service through dns. ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() @@ -122,11 +117,35 @@ func initProvider() (func(context.Context) error, error) { otlpEndpoint = "localhost:4317" } - conn, err := grpc.DialContext(ctx, otlpEndpoint, - // Note the use of insecure transport here. TLS is recommended in production. - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), - ) + fmt.Println("TELEMETRY_ENDPOINT: ", otlpEndpoint) + + crtFilePath := os.Getenv("TELEMETRY_CRTFILEPATH") + + conn := &grpc.ClientConn{} + + if crtFilePath != "" { + cert, err := ioutil.ReadFile(crtFilePath) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + roots := x509.NewCertPool() + if !roots.AppendCertsFromPEM(cert) { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + creds := credentials.NewTLS(&tls.Config{ + RootCAs: roots, + }) + + conn, err = grpc.DialContext(ctx, otlpEndpoint, + grpc.WithTransportCredentials(creds), + grpc.WithBlock(), + ) + } else { + conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithInsecure(), grpc.WithBlock()) + } + if err != nil { return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err) } @@ -150,15 +169,9 @@ func initProvider() (func(context.Context) error, error) { // set global propagator to tracecontext (the default is no-op). otel.SetTextMapPropagator(propagation.TraceContext{}) - // Shutdown will flush any remaining spans and shut down the exporter. return tracerProvider.Shutdown, nil } -func tlsConfig(tls *tls.Config) error { - tls.InsecureSkipVerify = true - return nil -} - func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pkg/virtualkubelet/execute.go b/pkg/virtualkubelet/execute.go index b07c90cb..75f2b5ed 100644 --- a/pkg/virtualkubelet/execute.go +++ b/pkg/virtualkubelet/execute.go @@ -14,6 +14,9 @@ import ( "time" "github.com/containerd/containerd/log" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -44,6 +47,7 @@ func getSidecarEndpoint(ctx context.Context, interLinkURL string, interLinkPort // PingInterLink pings the InterLink API and returns true if there's an answer. The second return value is given by the answer provided by the API. func PingInterLink(ctx context.Context, config VirtualKubeletConfig) (bool, int, error) { + tracer := otel.Tracer("interlink-service") interLinkEndpoint := getSidecarEndpoint(ctx, config.InterlinkURL, config.Interlinkport) log.G(ctx).Info("Pinging: " + interLinkEndpoint + "/pinglink") retVal := -1 @@ -59,12 +63,22 @@ func PingInterLink(ctx context.Context, config VirtualKubeletConfig) (bool, int, return false, retVal, err } req.Header.Add("Authorization", "Bearer "+string(token)) + + startHttpCall := time.Now().UnixMicro() + _, spanHttp := tracer.Start(ctx, "PingHttpCall", trace.WithAttributes( + attribute.Int64("start.timestamp", startHttpCall), + )) + defer spanHttp.End() resp, err := http.DefaultClient.Do(req) + setDurationSpan(startHttpCall, spanHttp) + if err != nil { + spanHttp.SetAttributes(attribute.Int("exit.code", http.StatusInternalServerError)) return false, retVal, err } - if resp.StatusCode == http.StatusOK { + if resp != nil { + spanHttp.SetAttributes(attribute.Int("exit.code", resp.StatusCode)) retBytes, err := io.ReadAll(resp.Body) if err != nil { log.G(ctx).Error(err) @@ -75,15 +89,18 @@ func PingInterLink(ctx context.Context, config VirtualKubeletConfig) (bool, int, log.G(ctx).Error(err) return false, retVal, err } - return true, retVal, nil - } else { - log.G(ctx).Error("server error: " + fmt.Sprint(resp.StatusCode)) - return false, retVal, nil + + if resp.StatusCode != http.StatusOK { + log.G(ctx).Error("server error: " + fmt.Sprint(resp.StatusCode)) + return false, retVal, nil + } } + return true, retVal, nil } // updateCacheRequest is called when the VK receives the status of a pod already deleted. It performs a REST call InterLink API to update the cache deleting that pod from the cached structure func updateCacheRequest(ctx context.Context, config VirtualKubeletConfig, pod v1.Pod, token string) error { + tracer := otel.Tracer("interlink-service") bodyBytes, err := json.Marshal(pod) if err != nil { log.L.Error(err) @@ -100,15 +117,28 @@ func updateCacheRequest(ctx context.Context, config VirtualKubeletConfig, pod v1 req.Header.Add("Authorization", "Bearer "+token) req.Header.Set("Content-Type", "application/json") + + startHttpCall := time.Now().UnixMicro() + _, spanHttp := tracer.Start(ctx, "UpdateCacheHttpCall", trace.WithAttributes( + attribute.String("pod.name", pod.Name), + attribute.String("pod.namespace", pod.Namespace), + attribute.String("pod.uid", string(pod.UID)), + attribute.Int64("start.timestamp", startHttpCall), + )) + defer spanHttp.End() resp, err := http.DefaultClient.Do(req) + setDurationSpan(startHttpCall, spanHttp) if err != nil { log.L.Error(err) return err } - statusCode := resp.StatusCode + if resp != nil { + statusCode := resp.StatusCode + spanHttp.SetAttributes(attribute.Int("exit.code", resp.StatusCode)) - if statusCode != http.StatusOK { - return errors.New("Unexpected error occured while updating InterLink cache. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check InterLink's logs for further informations") + if statusCode != http.StatusOK { + return errors.New("Unexpected error occured while updating InterLink cache. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check InterLink's logs for further informations") + } } return err @@ -117,8 +147,9 @@ func updateCacheRequest(ctx context.Context, config VirtualKubeletConfig, pod v1 // createRequest performs a REST call to the InterLink API when a Pod is registered to the VK. It Marshals the pod with already retrieved ConfigMaps and Secrets and sends it to InterLink. // Returns the call response expressed in bytes and/or the first encountered error func createRequest(ctx context.Context, config VirtualKubeletConfig, pod types.PodCreateRequests, token string) ([]byte, error) { + tracer := otel.Tracer("interlink-service") interLinkEndpoint := getSidecarEndpoint(ctx, config.InterlinkURL, config.Interlinkport) - var returnValue, _ = json.Marshal(types.PodStatus{}) + var returnValue, _ = json.Marshal(types.CreateStruct{}) bodyBytes, err := json.Marshal(pod) if err != nil { @@ -132,20 +163,33 @@ func createRequest(ctx context.Context, config VirtualKubeletConfig, pod types.P return nil, err } + startHttpCall := time.Now().UnixMicro() + _, spanHttp := tracer.Start(ctx, "CreateHttpCall", trace.WithAttributes( + attribute.String("pod.name", pod.Pod.Name), + attribute.String("pod.namespace", pod.Pod.Namespace), + attribute.String("pod.uid", string(pod.Pod.UID)), + attribute.Int64("start.timestamp", startHttpCall), + )) + defer spanHttp.End() resp, err := doRequest(req, token) + setDurationSpan(startHttpCall, spanHttp) if err != nil { log.L.Error(err) return nil, err } - statusCode := resp.StatusCode - if statusCode != http.StatusOK { - return nil, errors.New("Unexpected error occured while creating Pods. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check InterLink's logs for further informations") - } else { - returnValue, err = io.ReadAll(resp.Body) - if err != nil { - log.L.Error(err) - return nil, err + if resp != nil { + statusCode := resp.StatusCode + spanHttp.SetAttributes(attribute.Int("exit.code", resp.StatusCode)) + + if statusCode != http.StatusOK { + return nil, errors.New("Unexpected error occured while creating Pods. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check InterLink's logs for further informations") + } else { + returnValue, err = io.ReadAll(resp.Body) + if err != nil { + log.L.Error(err) + return nil, err + } } } @@ -155,7 +199,9 @@ func createRequest(ctx context.Context, config VirtualKubeletConfig, pod types.P // deleteRequest performs a REST call to the InterLink API when a Pod is deleted from the VK. It Marshals the standard v1.Pod struct and sends it to InterLink. // Returns the call response expressed in bytes and/or the first encountered error func deleteRequest(ctx context.Context, config VirtualKubeletConfig, pod *v1.Pod, token string) ([]byte, error) { + tracer := otel.Tracer("interlink-service") interLinkEndpoint := getSidecarEndpoint(ctx, config.InterlinkURL, config.Interlinkport) + var returnValue []byte bodyBytes, err := json.Marshal(pod) if err != nil { log.G(context.Background()).Error(err) @@ -168,38 +214,51 @@ func deleteRequest(ctx context.Context, config VirtualKubeletConfig, pod *v1.Pod return nil, err } + startHttpCall := time.Now().UnixMicro() + _, spanHttp := tracer.Start(ctx, "DeleteHttpCall", trace.WithAttributes( + attribute.String("pod.name", pod.Name), + attribute.String("pod.namespace", pod.Namespace), + attribute.String("pod.uid", string(pod.UID)), + attribute.Int64("start.timestamp", startHttpCall), + )) + defer spanHttp.End() resp, err := doRequest(req, token) + setDurationSpan(startHttpCall, spanHttp) if err != nil { log.G(context.Background()).Error(err) return nil, err } - statusCode := resp.StatusCode + if resp != nil { + statusCode := resp.StatusCode + spanHttp.SetAttributes(attribute.Int("exit.code", resp.StatusCode)) - if statusCode != http.StatusOK { - return nil, errors.New("Unexpected error occured while deleting Pods. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check InterLink's logs for further informations") - } else { - returnValue, err := io.ReadAll(resp.Body) - if err != nil { - log.G(context.Background()).Error(err) - return nil, err - } - log.G(context.Background()).Info(string(returnValue)) - var response []types.PodStatus - err = json.Unmarshal(returnValue, &response) - if err != nil { - log.G(context.Background()).Error(err) - return nil, err + if statusCode != http.StatusOK { + return nil, errors.New("Unexpected error occured while deleting Pods. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check InterLink's logs for further informations") + } else { + returnValue, err := io.ReadAll(resp.Body) + if err != nil { + log.G(context.Background()).Error(err) + return nil, err + } + log.G(context.Background()).Info(string(returnValue)) + var response []types.PodStatus + err = json.Unmarshal(returnValue, &response) + if err != nil { + log.G(context.Background()).Error(err) + return nil, err + } } - return returnValue, nil } + return returnValue, nil } // statusRequest performs a REST call to the InterLink API when the VK needs an update on its Pods' status. A Marshalled slice of v1.Pod is sent to the InterLink API, // to query the below plugin for their status. // Returns the call response expressed in bytes and/or the first encountered error func statusRequest(ctx context.Context, config VirtualKubeletConfig, podsList []*v1.Pod, token string) ([]byte, error) { - var returnValue []byte + tracer := otel.Tracer("interlink-service") + returnValue, _ := json.Marshal(types.PodStatus{}) interLinkEndpoint := getSidecarEndpoint(ctx, config.InterlinkURL, config.Interlinkport) bodyBytes, err := json.Marshal(podsList) @@ -216,18 +275,27 @@ func statusRequest(ctx context.Context, config VirtualKubeletConfig, podsList [] //log.L.Println(string(bodyBytes)) + startHttpCall := time.Now().UnixMicro() + _, spanHttp := tracer.Start(ctx, "StatusHttpCall", trace.WithAttributes( + attribute.Int64("start.timestamp", startHttpCall), + )) + defer spanHttp.End() resp, err := doRequest(req, token) + setDurationSpan(startHttpCall, spanHttp) if err != nil { return nil, err } - if resp.StatusCode != http.StatusOK { - return nil, errors.New("Unexpected error occured while getting status. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check InterLink's logs for further informations") - } else { - returnValue, err = io.ReadAll(resp.Body) - if err != nil { - log.L.Error(err) - return nil, err + if resp != nil { + spanHttp.SetAttributes(attribute.Int("exit.code", resp.StatusCode)) + if resp.StatusCode != http.StatusOK { + return nil, errors.New("Unexpected error occured while getting status. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check InterLink's logs for further informations") + } else { + returnValue, err = io.ReadAll(resp.Body) + if err != nil { + log.L.Error(err) + return nil, err + } } } @@ -238,6 +306,8 @@ func statusRequest(ctx context.Context, config VirtualKubeletConfig, podsList [] // This struct only includes a minimum data set needed to identify the job/container to get the logs from. // Returns the call response and/or the first encountered error func LogRetrieval(ctx context.Context, config VirtualKubeletConfig, logsRequest types.LogStruct) (io.ReadCloser, error) { + tracer := otel.Tracer("interlink-service") + var returnValue io.ReadCloser interLinkEndpoint := getSidecarEndpoint(ctx, config.InterlinkURL, config.Interlinkport) b, err := os.ReadFile(config.VKTokenFile) // just pass the file name if err != nil { @@ -259,18 +329,30 @@ func LogRetrieval(ctx context.Context, config VirtualKubeletConfig, logsRequest //log.G(ctx).Println(string(bodyBytes)) + startHttpCall := time.Now().UnixMicro() + _, spanHttp := tracer.Start(ctx, "LogHttpCall", trace.WithAttributes( + attribute.String("pod.name", logsRequest.PodName), + attribute.String("pod.namespace", logsRequest.Namespace), + attribute.String("pod.uid", logsRequest.PodUID), + attribute.Int64("start.timestamp", startHttpCall), + )) + defer spanHttp.End() resp, err := doRequest(req, token) + setDurationSpan(startHttpCall, spanHttp) if err != nil { log.G(ctx).Error(err) return nil, err } - if resp.StatusCode != http.StatusOK { - log.G(ctx).Info(resp.Body) - return nil, errors.New("Unexpected error occured while getting logs. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check InterLink's logs for further informations") - } else { - return resp.Body, nil + if resp != nil { + spanHttp.SetAttributes(attribute.Int("exit.code", resp.StatusCode)) + if resp.StatusCode != http.StatusOK { + err = errors.New("Unexpected error occured while getting logs. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check InterLink's logs for further informations") + } else { + returnValue = resp.Body + } } + return returnValue, err } // RemoteExecution is called by the VK everytime a Pod is being registered or deleted to/from the VK. @@ -525,3 +607,10 @@ func checkPodsStatus(ctx context.Context, p *VirtualKubeletProvider, podsList [] return nil, err } + +func setDurationSpan(startTime int64, span trace.Span) { + endTime := time.Now().UnixMicro() + duration := endTime - startTime + span.SetAttributes(attribute.Int64("end.timestamp", endTime)) + span.SetAttributes(attribute.Int64("duration", duration)) +} diff --git a/pkg/virtualkubelet/virtualkubelet.go b/pkg/virtualkubelet/virtualkubelet.go index 631a6ad9..1285f70a 100644 --- a/pkg/virtualkubelet/virtualkubelet.go +++ b/pkg/virtualkubelet/virtualkubelet.go @@ -15,7 +15,9 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/node/api" stats "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" - "github.com/virtual-kubelet/virtual-kubelet/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -346,15 +348,22 @@ func (p *VirtualKubeletProvider) Ping(ctx context.Context) error { // CreatePod accepts a Pod definition and stores it in memory in p.pods func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) error { - ctx, span := trace.StartSpan(ctx, "CreatePod") + start := time.Now().Unix() + tracer := otel.Tracer("interlink-service") + ctx, span := tracer.Start(ctx, "CreatePodVK", trace.WithAttributes( + attribute.String("pod.name", pod.Name), + attribute.String("pod.namespace", pod.Namespace), + attribute.Int64("start.timestamp", start), + )) + defer span.End() + var hasInitContainers = false var state v1.ContainerState defer span.End() - // Add the pod's coordinates to the current span. - ctx = addAttributes(ctx, span, NamespaceKey, pod.Namespace, NameKey, pod.Name) key, err := buildKey(pod) if err != nil { + setDurationSpan(start, span) return err } now := metav1.NewTime(time.Now()) @@ -427,6 +436,7 @@ func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) err err := RemoteExecution(ctx, p.config, p, pod, CREATE) if err != nil { if err.Error() == "Deleted pod before actual creation" { + setDurationSpan(start, span) log.G(ctx).Warn(err) } else { // TODO if node in NotReady put it to Unknown/pending? @@ -455,6 +465,7 @@ func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) err p.UpdatePod(ctx, pod) } + setDurationSpan(start, span) return } }() @@ -474,40 +485,52 @@ func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) err p.pods[key] = pod + setDurationSpan(start, span) + return nil } // UpdatePod accepts a Pod definition and updates its reference. func (p *VirtualKubeletProvider) UpdatePod(ctx context.Context, pod *v1.Pod) error { - ctx, span := trace.StartSpan(ctx, "UpdatePod") + start := time.Now().Unix() + tracer := otel.Tracer("interlink-service") + ctx, span := tracer.Start(ctx, "UpdatePodVK", trace.WithAttributes( + attribute.String("pod.name", pod.Name), + attribute.String("pod.namespace", pod.Namespace), + attribute.Int64("start.timestamp", start), + )) defer span.End() - // Add the pod's coordinates to the current span. - ctx = addAttributes(ctx, span, NamespaceKey, pod.Namespace, NameKey, pod.Name) - log.G(ctx).Infof("receive UpdatePod %q", pod.Name) p.notifier(pod) + setDurationSpan(start, span) + return nil } // DeletePod deletes the specified pod and drops it out of p.pods func (p *VirtualKubeletProvider) DeletePod(ctx context.Context, pod *v1.Pod) (err error) { - ctx, span := trace.StartSpan(ctx, "DeletePod") + start := time.Now().Unix() + tracer := otel.Tracer("interlink-service") + ctx, span := tracer.Start(ctx, "DeletePodVK", trace.WithAttributes( + attribute.String("pod.name", pod.Name), + attribute.String("pod.namespace", pod.Namespace), + attribute.Int64("start.timestamp", start), + )) defer span.End() - // Add the pod's coordinates to the current span. - ctx = addAttributes(ctx, span, NamespaceKey, pod.Namespace, NameKey, pod.Name) - log.G(ctx).Infof("receive DeletePod %q", pod.Name) key, err := buildKey(pod) if err != nil { + setDurationSpan(start, span) return err } if _, exists := p.pods[key]; !exists { + setDurationSpan(start, span) return errdefs.NotFound("pod not found") } @@ -518,6 +541,7 @@ func (p *VirtualKubeletProvider) DeletePod(ctx context.Context, pod *v1.Pod) (er err = RemoteExecution(ctx, p.config, p, pod, DELETE) if err != nil { log.G(ctx).Error(err) + setDurationSpan(start, span) return } }() @@ -549,56 +573,72 @@ func (p *VirtualKubeletProvider) DeletePod(ctx context.Context, pod *v1.Pod) (er // delete from p.pods delete(p.pods, key) + setDurationSpan(start, span) + return nil } // GetPod returns a pod by name that is stored in memory. func (p *VirtualKubeletProvider) GetPod(ctx context.Context, namespace, name string) (pod *v1.Pod, err error) { - - ctx, span := trace.StartSpan(ctx, "GetPod") - defer func() { - span.SetStatus(err) - span.End() - }() - - // Add the pod's coordinates to the current span. - ctx = addAttributes(ctx, span, NamespaceKey, namespace, NameKey, name) + start := time.Now().Unix() + tracer := otel.Tracer("interlink-service") + ctx, span := tracer.Start(ctx, "GetPodVK", trace.WithAttributes( + attribute.String("pod.name", name), + attribute.String("pod.namespace", namespace), + attribute.Int64("start.timestamp", start), + )) + defer span.End() log.G(ctx).Infof("receive GetPod %q", name) key, err := buildKeyFromNames(namespace, name) if err != nil { + setDurationSpan(start, span) return nil, err } if pod, ok := p.pods[key]; ok { + setDurationSpan(start, span) return pod, nil } + + setDurationSpan(start, span) + return nil, errdefs.NotFoundf("pod \"%s/%s\" is not known to the provider", namespace, name) } // GetPodStatus returns the status of a pod by name that is "running". // returns nil if a pod by that name is not found. func (p *VirtualKubeletProvider) GetPodStatus(ctx context.Context, namespace, name string) (*v1.PodStatus, error) { - ctx, span := trace.StartSpan(ctx, "GetPodStatus") + start := time.Now().Unix() + tracer := otel.Tracer("interlink-service") + ctx, span := tracer.Start(ctx, "GetPodStatusVK", trace.WithAttributes( + attribute.String("pod.name", name), + attribute.String("pod.namespace", namespace), + attribute.Int64("start.timestamp", start), + )) defer span.End() - // Add namespace and name as attributes to the current span. - ctx = addAttributes(ctx, span, NamespaceKey, namespace, NameKey, name) - log.G(ctx).Infof("receive GetPodStatus %q", name) pod, err := p.GetPod(ctx, namespace, name) if err != nil { + setDurationSpan(start, span) return nil, err } + setDurationSpan(start, span) + return &pod.Status, nil } // GetPods returns a list of all pods known to be "running". func (p *VirtualKubeletProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) { - ctx, span := trace.StartSpan(ctx, "GetPods") + start := time.Now().Unix() + tracer := otel.Tracer("interlink-service") + ctx, span := tracer.Start(ctx, "GetPodsVK", trace.WithAttributes( + attribute.Int64("start.timestamp", start), + )) defer span.End() log.G(ctx).Info("receive GetPods") @@ -612,6 +652,7 @@ func (p *VirtualKubeletProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) pods = append(pods, pod) } + setDurationSpan(start, span) return pods, nil } @@ -727,29 +768,15 @@ func (p *VirtualKubeletProvider) statusLoop(ctx context.Context) { } } -// addAttributes adds the specified attributes to the provided span. -// attrs must be an even-sized list of string arguments. -// Otherwise, the span won't be modified. -// TODO: Refactor and move to a "tracing utilities" package. -func addAttributes(ctx context.Context, span trace.Span, attrs ...string) context.Context { - if len(attrs)%2 == 1 { - return ctx - } - for i := 0; i < len(attrs); i += 2 { - ctx = span.WithField(ctx, attrs[i], attrs[i+1]) - } - return ctx -} - // GetLogs implements the logic for interLink pod logs retrieval. func (p *VirtualKubeletProvider) GetLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) { - var span trace.Span - ctx, span = trace.StartSpan(ctx, "GetLogs") //nolint: ineffassign,staticcheck + start := time.Now().Unix() + tracer := otel.Tracer("interlink-service") + ctx, span := tracer.Start(ctx, "GetLogsVK", trace.WithAttributes( + attribute.Int64("start.timestamp", start), + )) defer span.End() - // Add namespace and name as attributes to the current span. - ctx = addAttributes(ctx, span, NamespaceKey, namespace, NameKey, podName) - log.G(ctx).Infof("receive GetPodLogs %q", podName) key, err := buildKeyFromNames(namespace, podName) @@ -765,13 +792,17 @@ func (p *VirtualKubeletProvider) GetLogs(ctx context.Context, namespace, podName Opts: types.ContainerLogOpts(opts), } + setDurationSpan(start, span) return LogRetrieval(ctx, p.config, logsRequest) } // GetStatsSummary returns dummy stats for all pods known by this provider. func (p *VirtualKubeletProvider) GetStatsSummary(ctx context.Context) (*stats.Summary, error) { - var span trace.Span - _, span = trace.StartSpan(ctx, "GetStatsSummary") //nolint: ineffassign,staticcheck + start := time.Now().Unix() + tracer := otel.Tracer("interlink-service") + _, span := tracer.Start(ctx, "GetStatsSummaryVK", trace.WithAttributes( + attribute.Int64("start.timestamp", start), + )) defer span.End() // Grab the current timestamp so we can report it as the time the stats were generated. @@ -843,13 +874,18 @@ func (p *VirtualKubeletProvider) GetStatsSummary(ctx context.Context) (*stats.Su } // Return the dummy stats. + setDurationSpan(start, span) return res, nil } // RetrievePodsFromCluster scans all pods registered to the K8S cluster and re-assigns the ones with a valid JobID to the Virtual Kubelet. // This will run at the initiation time only func (p *VirtualKubeletProvider) RetrievePodsFromCluster(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "RetrievePodsFromCluster") + start := time.Now().Unix() + tracer := otel.Tracer("interlink-service") + ctx, span := tracer.Start(ctx, "RetrievePodsFromCluster", trace.WithAttributes( + attribute.Int64("start.timestamp", start), + )) defer span.End() log.G(ctx).Info("Retrieving ALL Pods registered to the cluster and owned by VK") @@ -879,6 +915,7 @@ func (p *VirtualKubeletProvider) RetrievePodsFromCluster(ctx context.Context) er } + setDurationSpan(start, span) return err } @@ -894,7 +931,11 @@ func CheckIfAnnotationExists(pod *v1.Pod, key string) bool { } func (p *VirtualKubeletProvider) initClientSet(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "InitClientSet") + start := time.Now().Unix() + tracer := otel.Tracer("interlink-service") + ctx, span := tracer.Start(ctx, "InitClientSet", trace.WithAttributes( + attribute.Int64("start.timestamp", start), + )) defer span.End() if p.clientSet == nil { @@ -913,5 +954,6 @@ func (p *VirtualKubeletProvider) initClientSet(ctx context.Context) error { } } + setDurationSpan(start, span) return nil }