From 18538dece496c3f403c3048305c4c9ba8698c86c Mon Sep 17 00:00:00 2001 From: Bianco95 Date: Fri, 6 Sep 2024 10:52:02 +0200 Subject: [PATCH 1/3] updated initProvider function of the Virtual Kubelet to handle mTLS authentication; updated the name of the OTLP Service --- cmd/virtual-kubelet/main.go | 80 +++++++++++++++++++++++++++++++++++-- 1 file changed, 76 insertions(+), 4 deletions(-) diff --git a/cmd/virtual-kubelet/main.go b/cmd/virtual-kubelet/main.go index ab63153d..4fa3d16b 100644 --- a/cmd/virtual-kubelet/main.go +++ b/cmd/virtual-kubelet/main.go @@ -18,8 +18,10 @@ package main import ( "context" "crypto/tls" + "crypto/x509" "flag" "fmt" + "io/ioutil" "net" "os" "path" @@ -64,6 +66,8 @@ import ( "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.21.0" + + "github.com/google/uuid" ) func PodInformerFilter(node string) informers.SharedInformerOption { @@ -94,10 +98,29 @@ type Opts struct { } func initProvider(ctx context.Context) (func(context.Context) error, error) { + + log.G(ctx).Info("Tracing is enabled, setting up the TracerProvider") + + // Get the TELEMETRY_UNIQUE_ID from the environment, if it is not set, use the hostname + uniqueID := os.Getenv("TELEMETRY_UNIQUE_ID") + if uniqueID == "" { + log.G(ctx).Info("No TELEMETRY_UNIQUE_ID set, generating a new one") + newUUID := uuid.New() + uniqueID = newUUID.String() + log.G(ctx).Info("Generated unique ID: ", uniqueID, " use VK-InterLink-"+uniqueID+" as service name from Grafana") + } + + // Create a new resource with the service name set to the TELEMETRY_UNIQUE_ID + // The nomenclature VK-InterLink- is used to identify the service in Grafana. + // VK-InterLink- means that the traces are coming from Virtual Kubelet + // and are related to the call that are made for the InterLink API service + + serviceName := "VK-InterLink-" + uniqueID + res, err := resource.New(ctx, resource.WithAttributes( // the service name used to display traces in backends - semconv.ServiceName("InterLink-VK"), + semconv.ServiceName(serviceName), ), ) if err != nil { @@ -113,11 +136,60 @@ func initProvider(ctx context.Context) (func(context.Context) error, error) { otlpEndpoint = "localhost:4317" } - fmt.Println("TELEMETRY_ENDPOINT: ", otlpEndpoint) + log.G(ctx).Info("TELEMETRY_ENDPOINT: ", otlpEndpoint) + + caCrtFilePath := os.Getenv("TELEMETRY_CA_CRT_FILEPATH") conn := &grpc.ClientConn{} - creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) - conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock()) + if caCrtFilePath != "" { + + // if the CA certificate is provided, set up mutual TLS + + log.G(ctx).Info("CA certificate provided, setting up mutual TLS") + + caCert, err := ioutil.ReadFile(caCrtFilePath) + if err != nil { + return nil, fmt.Errorf("failed to load CA certificate: %w", err) + } + + clientKeyFilePath := os.Getenv("TELEMETRY_CLIENT_KEY_FILEPATH") + if clientKeyFilePath == "" { + return nil, fmt.Errorf("client key file path not provided. Since a CA certificate is provided, a client key is required for mutual TLS") + } + + clientCrtFilePath := os.Getenv("TELEMETRY_CLIENT_CRT_FILEPATH") + if clientCrtFilePath == "" { + return nil, fmt.Errorf("client certificate file path not provided. Since a CA certificate is provided, a client certificate is required for mutual TLS") + } + + certPool := x509.NewCertPool() + if !certPool.AppendCertsFromPEM(caCert) { + return nil, fmt.Errorf("failed to append CA certificate") + } + + cert, err := tls.LoadX509KeyPair(clientCrtFilePath, clientKeyFilePath) + if err != nil { + return nil, fmt.Errorf("failed to load client certificate: %w", err) + } + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: certPool, + MinVersion: tls.VersionTLS12, + InsecureSkipVerify: true, + } + creds := credentials.NewTLS(tlsConfig) + + conn, err = grpc.DialContext(ctx, + otlpEndpoint, + grpc.WithTransportCredentials(creds), + grpc.WithBlock()) + + } else { + // if the CA certificate is not provided, use an insecure connection + // this means that the telemetry collector is not using a certificate, i.e. is inside the k8s cluster + conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithInsecure(), grpc.WithBlock()) + } if err != nil { return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err) From dd0d093df6f084eeb052caaa7e8a174a414327be Mon Sep 17 00:00:00 2001 From: Bianco95 Date: Fri, 6 Sep 2024 14:09:30 +0200 Subject: [PATCH 2/3] updated initProvider function of InterLink. --- cmd/interlink/main.go | 68 ++++++++++++++++++++++++++++++++++--- cmd/virtual-kubelet/main.go | 12 +++---- 2 files changed, 70 insertions(+), 10 deletions(-) diff --git a/cmd/interlink/main.go b/cmd/interlink/main.go index 3d6aed56..f580d0b5 100644 --- a/cmd/interlink/main.go +++ b/cmd/interlink/main.go @@ -3,13 +3,16 @@ package main import ( "context" "crypto/tls" + "crypto/x509" "flag" "fmt" + "io/ioutil" "net/http" "os" "strings" "time" + "github.com/google/uuid" "github.com/sirupsen/logrus" "github.com/virtual-kubelet/virtual-kubelet/log" logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus" @@ -17,6 +20,7 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/trace/opentelemetry" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" types "github.com/intertwin-eu/interlink/pkg/interlink" "github.com/intertwin-eu/interlink/pkg/interlink/api" @@ -31,10 +35,23 @@ import ( ) func initProvider(ctx context.Context) (func(context.Context) error, error) { + log.G(ctx).Info("Tracing is enabled, setting up the TracerProvider") + + // Get the TELEMETRY_UNIQUE_ID from the environment, if it is not set, use the hostname + uniqueID := os.Getenv("TELEMETRY_UNIQUE_ID") + if uniqueID == "" { + log.G(ctx).Info("No TELEMETRY_UNIQUE_ID set, generating a new one") + newUUID := uuid.New() + uniqueID = newUUID.String() + log.G(ctx).Info("Generated unique ID: ", uniqueID, " use InterLink-Plugin-"+uniqueID+" as service name from Grafana") + } + + serviceName := "InterLink-Plugin-" + uniqueID + res, err := resource.New(ctx, resource.WithAttributes( // the service name used to display traces in backends - semconv.ServiceName("InterLink-API"), + semconv.ServiceName(serviceName), ), ) if err != nil { @@ -50,11 +67,54 @@ func initProvider(ctx context.Context) (func(context.Context) error, error) { otlpEndpoint = "localhost:4317" } - fmt.Println("TELEMETRY_ENDPOINT: ", otlpEndpoint) + log.G(ctx).Info("TELEMETRY_ENDPOINT: ", otlpEndpoint) + + caCrtFilePath := os.Getenv("TELEMETRY_CA_CRT_FILEPATH") conn := &grpc.ClientConn{} - creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) - conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock()) + if caCrtFilePath != "" { + + // if the CA certificate is provided, set up mutual TLS + + log.G(ctx).Info("CA certificate provided, setting up mutual TLS") + + caCert, err := ioutil.ReadFile(caCrtFilePath) + if err != nil { + return nil, fmt.Errorf("failed to load CA certificate: %w", err) + } + + clientKeyFilePath := os.Getenv("TELEMETRY_CLIENT_KEY_FILEPATH") + if clientKeyFilePath == "" { + return nil, fmt.Errorf("client key file path not provided. Since a CA certificate is provided, a client key is required for mutual TLS") + } + + clientCrtFilePath := os.Getenv("TELEMETRY_CLIENT_CRT_FILEPATH") + if clientCrtFilePath == "" { + return nil, fmt.Errorf("client certificate file path not provided. Since a CA certificate is provided, a client certificate is required for mutual TLS") + } + + certPool := x509.NewCertPool() + if !certPool.AppendCertsFromPEM(caCert) { + return nil, fmt.Errorf("failed to append CA certificate") + } + + cert, err := tls.LoadX509KeyPair(clientCrtFilePath, clientKeyFilePath) + if err != nil { + return nil, fmt.Errorf("failed to load client certificate: %w", err) + } + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: certPool, + MinVersion: tls.VersionTLS12, + InsecureSkipVerify: true, + } + creds := credentials.NewTLS(tlsConfig) + conn, err = grpc.NewClient(otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock()) + + } else { + conn, err = grpc.NewClient(otlpEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) + } if err != nil { return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err) diff --git a/cmd/virtual-kubelet/main.go b/cmd/virtual-kubelet/main.go index 4fa3d16b..853a29b9 100644 --- a/cmd/virtual-kubelet/main.go +++ b/cmd/virtual-kubelet/main.go @@ -30,7 +30,9 @@ import ( // "k8s.io/client-go/rest" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/kubernetes/scheme" @@ -179,18 +181,16 @@ func initProvider(ctx context.Context) (func(context.Context) error, error) { InsecureSkipVerify: true, } creds := credentials.NewTLS(tlsConfig) - - conn, err = grpc.DialContext(ctx, - otlpEndpoint, - grpc.WithTransportCredentials(creds), - grpc.WithBlock()) + conn, err = grpc.NewClient(otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock()) } else { // if the CA certificate is not provided, use an insecure connection // this means that the telemetry collector is not using a certificate, i.e. is inside the k8s cluster - conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithInsecure(), grpc.WithBlock()) + conn, err = grpc.NewClient(otlpEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) } + conn.WaitForStateChange(ctx, connectivity.Ready) + if err != nil { return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err) } From efbe7d9892e5441e10f10f2a1de60c908cbf5cce Mon Sep 17 00:00:00 2001 From: Bianco95 Date: Fri, 6 Sep 2024 16:20:06 +0200 Subject: [PATCH 3/3] updated initProvider of InterLink to wait the grpc connection to be ready to continue --- cmd/interlink/main.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/interlink/main.go b/cmd/interlink/main.go index f580d0b5..b6e3fcb3 100644 --- a/cmd/interlink/main.go +++ b/cmd/interlink/main.go @@ -19,6 +19,7 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/trace" "github.com/virtual-kubelet/virtual-kubelet/trace/opentelemetry" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -116,6 +117,8 @@ func initProvider(ctx context.Context) (func(context.Context) error, error) { conn, err = grpc.NewClient(otlpEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) } + conn.WaitForStateChange(ctx, connectivity.Ready) + if err != nil { return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err) }