Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

monitoring setup configure mtls and assign unique identifiers to otlp service names for trace display #289

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 67 additions & 4 deletions cmd/interlink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package main
import (
"context"
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
Expand All @@ -13,13 +15,16 @@ import (
"syscall"
"time"

"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/virtual-kubelet/virtual-kubelet/log"
logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"github.com/virtual-kubelet/virtual-kubelet/trace/opentelemetry"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

types "github.com/intertwin-eu/interlink/pkg/interlink"
"github.com/intertwin-eu/interlink/pkg/interlink/api"
Expand All @@ -34,10 +39,23 @@ import (
)

func initProvider(ctx context.Context) (func(context.Context) error, error) {
log.G(ctx).Info("Tracing is enabled, setting up the TracerProvider")

// Get the TELEMETRY_UNIQUE_ID from the environment, if it is not set, use the hostname
uniqueID := os.Getenv("TELEMETRY_UNIQUE_ID")
if uniqueID == "" {
log.G(ctx).Info("No TELEMETRY_UNIQUE_ID set, generating a new one")
newUUID := uuid.New()
uniqueID = newUUID.String()
log.G(ctx).Info("Generated unique ID: ", uniqueID, " use InterLink-Plugin-"+uniqueID+" as service name from Grafana")
}

serviceName := "InterLink-Plugin-" + uniqueID

res, err := resource.New(ctx,
resource.WithAttributes(
// the service name used to display traces in backends
semconv.ServiceName("InterLink-API"),
semconv.ServiceName(serviceName),
),
)
if err != nil {
Expand All @@ -53,11 +71,56 @@ func initProvider(ctx context.Context) (func(context.Context) error, error) {
otlpEndpoint = "localhost:4317"
}

fmt.Println("TELEMETRY_ENDPOINT: ", otlpEndpoint)
log.G(ctx).Info("TELEMETRY_ENDPOINT: ", otlpEndpoint)

caCrtFilePath := os.Getenv("TELEMETRY_CA_CRT_FILEPATH")

conn := &grpc.ClientConn{}
creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock())
if caCrtFilePath != "" {

// if the CA certificate is provided, set up mutual TLS

log.G(ctx).Info("CA certificate provided, setting up mutual TLS")

caCert, err := ioutil.ReadFile(caCrtFilePath)
if err != nil {
return nil, fmt.Errorf("failed to load CA certificate: %w", err)
}

clientKeyFilePath := os.Getenv("TELEMETRY_CLIENT_KEY_FILEPATH")
if clientKeyFilePath == "" {
return nil, fmt.Errorf("client key file path not provided. Since a CA certificate is provided, a client key is required for mutual TLS")
}

clientCrtFilePath := os.Getenv("TELEMETRY_CLIENT_CRT_FILEPATH")
if clientCrtFilePath == "" {
return nil, fmt.Errorf("client certificate file path not provided. Since a CA certificate is provided, a client certificate is required for mutual TLS")
}

certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to append CA certificate")
}

cert, err := tls.LoadX509KeyPair(clientCrtFilePath, clientKeyFilePath)
if err != nil {
return nil, fmt.Errorf("failed to load client certificate: %w", err)
}

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: certPool,
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: true,
}
creds := credentials.NewTLS(tlsConfig)
conn, err = grpc.NewClient(otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock())

} else {
conn, err = grpc.NewClient(otlpEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

conn.WaitForStateChange(ctx, connectivity.Ready)

if err != nil {
return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err)
Expand Down
80 changes: 76 additions & 4 deletions cmd/virtual-kubelet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package main
import (
"context"
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"io/ioutil"
"net"
"os"
"path"
Expand All @@ -29,7 +31,9 @@ import (

// "k8s.io/client-go/rest"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -65,6 +69,8 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"

"github.com/google/uuid"
)

func PodInformerFilter(node string) informers.SharedInformerOption {
Expand Down Expand Up @@ -95,10 +101,29 @@ type Opts struct {
}

func initProvider(ctx context.Context) (func(context.Context) error, error) {

log.G(ctx).Info("Tracing is enabled, setting up the TracerProvider")

// Get the TELEMETRY_UNIQUE_ID from the environment, if it is not set, use the hostname
uniqueID := os.Getenv("TELEMETRY_UNIQUE_ID")
if uniqueID == "" {
log.G(ctx).Info("No TELEMETRY_UNIQUE_ID set, generating a new one")
newUUID := uuid.New()
uniqueID = newUUID.String()
log.G(ctx).Info("Generated unique ID: ", uniqueID, " use VK-InterLink-"+uniqueID+" as service name from Grafana")
}

// Create a new resource with the service name set to the TELEMETRY_UNIQUE_ID
// The nomenclature VK-InterLink-<TELEMETRY_UNIQUE_ID> is used to identify the service in Grafana.
// VK-InterLink-<TELEMETRY_UNIQUE_ID> means that the traces are coming from Virtual Kubelet
// and are related to the call that are made for the InterLink API service

serviceName := "VK-InterLink-" + uniqueID

res, err := resource.New(ctx,
resource.WithAttributes(
// the service name used to display traces in backends
semconv.ServiceName("InterLink-VK"),
semconv.ServiceName(serviceName),
),
)
if err != nil {
Expand All @@ -114,11 +139,58 @@ func initProvider(ctx context.Context) (func(context.Context) error, error) {
otlpEndpoint = "localhost:4317"
}

fmt.Println("TELEMETRY_ENDPOINT: ", otlpEndpoint)
log.G(ctx).Info("TELEMETRY_ENDPOINT: ", otlpEndpoint)

caCrtFilePath := os.Getenv("TELEMETRY_CA_CRT_FILEPATH")

conn := &grpc.ClientConn{}
creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock())
if caCrtFilePath != "" {

// if the CA certificate is provided, set up mutual TLS

log.G(ctx).Info("CA certificate provided, setting up mutual TLS")

caCert, err := ioutil.ReadFile(caCrtFilePath)
if err != nil {
return nil, fmt.Errorf("failed to load CA certificate: %w", err)
}

clientKeyFilePath := os.Getenv("TELEMETRY_CLIENT_KEY_FILEPATH")
if clientKeyFilePath == "" {
return nil, fmt.Errorf("client key file path not provided. Since a CA certificate is provided, a client key is required for mutual TLS")
}

clientCrtFilePath := os.Getenv("TELEMETRY_CLIENT_CRT_FILEPATH")
if clientCrtFilePath == "" {
return nil, fmt.Errorf("client certificate file path not provided. Since a CA certificate is provided, a client certificate is required for mutual TLS")
}

certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to append CA certificate")
}

cert, err := tls.LoadX509KeyPair(clientCrtFilePath, clientKeyFilePath)
if err != nil {
return nil, fmt.Errorf("failed to load client certificate: %w", err)
}

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: certPool,
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: true,
}
creds := credentials.NewTLS(tlsConfig)
conn, err = grpc.NewClient(otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock())

} else {
// if the CA certificate is not provided, use an insecure connection
// this means that the telemetry collector is not using a certificate, i.e. is inside the k8s cluster
conn, err = grpc.NewClient(otlpEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

conn.WaitForStateChange(ctx, connectivity.Ready)

if err != nil {
return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err)
Expand Down
Loading