From 8599f1a088fb5b87e64494f9d2bdcd98b93b8689 Mon Sep 17 00:00:00 2001 From: Ron Federman <73110295+RonFed@users.noreply.github.com> Date: Tue, 8 Oct 2024 17:25:09 +0300 Subject: [PATCH] Fix nil check for eBPF instrumentation (#1576) Co-authored-by: Tamir David --- odiglet/pkg/ebpf/director.go | 13 +- odiglet/pkg/ebpf/{test => }/director_test.go | 126 +++++++++++++++---- 2 files changed, 111 insertions(+), 28 deletions(-) rename odiglet/pkg/ebpf/{test => }/director_test.go (70%) diff --git a/odiglet/pkg/ebpf/director.go b/odiglet/pkg/ebpf/director.go index 4855dd582..0d508056c 100644 --- a/odiglet/pkg/ebpf/director.go +++ b/odiglet/pkg/ebpf/director.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "os" + "reflect" "sync" "sync/atomic" "syscall" @@ -175,6 +176,11 @@ var IsProcessExists = func(pid int) bool { return false } +// Since OtelEbpfSdk is a generic type, we can't simply check it is nil with inst == nil +func isNil[T OtelEbpfSdk](inst T) bool { + return reflect.ValueOf(&inst).Elem().IsZero() +} + func (d *EbpfDirector[T]) periodicCleanup(ctx context.Context) { ticker := time.NewTicker(CleanupInterval) defer ticker.Stop() @@ -189,7 +195,10 @@ func (d *EbpfDirector[T]) periodicCleanup(ctx context.Context) { newInstrumentedProcesses := make([]*InstrumentedProcess[T], 0, len(details.InstrumentedProcesses)) for i := range details.InstrumentedProcesses { ip := details.InstrumentedProcesses[i] - if !IsProcessExists(ip.PID) && any(ip.inst) != nil { + // if the process does not exist, we should make sure we clean the instrumentation resources. + // Also making sure the instrumentation itself is not nil to avoid closing it here. + // This can happen if the process exits while the instrumentation is initializing. + if !IsProcessExists(ip.PID) && !isNil(ip.inst) { log.Logger.V(0).Info("Instrumented process does not exist, cleaning up", "pid", ip.PID) d.cleanProcess(ctx, pod, ip) } else { @@ -428,7 +437,7 @@ func (d *EbpfDirector[T]) GetWorkloadInstrumentations(workload *workload.PodWork } for _, ip := range details.InstrumentedProcesses { - if any(ip.inst) != nil { + if !isNil(ip.inst) { insts = append(insts, ip.inst) } } diff --git a/odiglet/pkg/ebpf/test/director_test.go b/odiglet/pkg/ebpf/director_test.go similarity index 70% rename from odiglet/pkg/ebpf/test/director_test.go rename to odiglet/pkg/ebpf/director_test.go index d8437f815..18baed701 100644 --- a/odiglet/pkg/ebpf/test/director_test.go +++ b/odiglet/pkg/ebpf/director_test.go @@ -1,4 +1,4 @@ -package test +package ebpf import ( "context" @@ -11,7 +11,6 @@ import ( "github.com/odigos-io/odigos/common" "github.com/odigos-io/odigos/k8sutils/pkg/instrumentation_instance" "github.com/odigos-io/odigos/k8sutils/pkg/workload" - "github.com/odigos-io/odigos/odiglet/pkg/ebpf" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -32,7 +31,7 @@ type FakeEbpfSdk struct { } // compile-time check that FakeEbpfSdk implements ConfigurableOtelEbpfSdk -var _ ebpf.ConfigurableOtelEbpfSdk = (*FakeEbpfSdk)(nil) +var _ ConfigurableOtelEbpfSdk = (*FakeEbpfSdk)(nil) func (f *FakeEbpfSdk) ApplyConfig(ctx context.Context, config *odigosv1.InstrumentationConfig) error { return nil @@ -63,24 +62,27 @@ func (f *FakeEbpfSdk) Run(ctx context.Context) error { } type FakeInstrumentationFactory struct { + timeToSetup time.Duration kubeclient client.Client } -func NewFakeInstrumentationFactory(kubeclient client.Client) ebpf.InstrumentationFactory[*FakeEbpfSdk] { +func NewFakeInstrumentationFactory(kubeclient client.Client, setupDuration time.Duration) InstrumentationFactory[*FakeEbpfSdk] { return &FakeInstrumentationFactory{ kubeclient: kubeclient, + timeToSetup: setupDuration, } } func (f *FakeInstrumentationFactory) CreateEbpfInstrumentation(ctx context.Context, pid int, serviceName string, podWorkload *workload.PodWorkload, containerName string, podName string, loadedIndicator chan struct{}) (*FakeEbpfSdk, error) { + <-time.After(f.timeToSetup) return &FakeEbpfSdk{ loadedIndicator: loadedIndicator, pid: pid, }, nil } -func newFakeDirector(ctx context.Context, client client.Client) ebpf.Director { - dir := ebpf.NewEbpfDirector(ctx, client, client.Scheme(), common.GoProgrammingLanguage, NewFakeInstrumentationFactory(client)) +func newFakeDirector(ctx context.Context, client client.Client, setupDuration time.Duration) Director { + dir := NewEbpfDirector(ctx, client, client.Scheme(), common.GoProgrammingLanguage, NewFakeInstrumentationFactory(client, setupDuration)) return dir } @@ -113,7 +115,7 @@ func assertHealthyInstrumentationInstance(t *testing.T, client client.Client, po return assert.False(t, *instance.Status.Healthy) } -func assertInstrumentationInstanceDeleted(t *testing.T, client client.Client, pod types.NamespacedName, pid int) bool { +func assertInstrumentationInstanceNotExisting(t *testing.T, client client.Client, pod types.NamespacedName, pid int) bool { // instrumentation instance is deleted return assert.Eventually(t, func() bool { return getInstrumentationInstance(client, pod, pid) == nil @@ -145,15 +147,15 @@ func TestSingleInstrumentation(t *testing.T) { WithRuntimeObjects(&pod). Build() - origIsProcessExists := ebpf.IsProcessExists - ebpf.IsProcessExists = func(pid int) bool { + origIsProcessExists := IsProcessExists + IsProcessExists = func(pid int) bool { return true } - t.Cleanup(func() { ebpf.IsProcessExists = origIsProcessExists }) + t.Cleanup(func() { IsProcessExists = origIsProcessExists }) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dir := newFakeDirector(ctx, client).(*ebpf.EbpfDirector[*FakeEbpfSdk]) + dir := newFakeDirector(ctx, client, time.Millisecond).(*EbpfDirector[*FakeEbpfSdk]) err := dir.Instrument(ctx, 1, pod_id, workload, "test-app", "test-container") assert.NoError(t, err) @@ -171,7 +173,7 @@ func TestSingleInstrumentation(t *testing.T) { // cleanup dir.Cleanup(pod_id) // the instrumentation instance is deleted - if !assertInstrumentationInstanceDeleted(t, client, pod_id, 1) { + if !assertInstrumentationInstanceNotExisting(t, client, pod_id, 1) { t.FailNow() } @@ -210,18 +212,18 @@ func TestInstrumentNotExistingProcess(t *testing.T) { WithRuntimeObjects(&pod). Build() - origIsProcessExists := ebpf.IsProcessExists - ebpf.IsProcessExists = func(pid int) bool { + origIsProcessExists := IsProcessExists + IsProcessExists = func(pid int) bool { return true } - t.Cleanup(func() { ebpf.IsProcessExists = origIsProcessExists }) + t.Cleanup(func() { IsProcessExists = origIsProcessExists }) // setup the cleanup interval to be very short for the test to be responsive - origCleanupInterval := ebpf.CleanupInterval - ebpf.CleanupInterval = 10 * time.Millisecond - t.Cleanup(func() { ebpf.CleanupInterval = origCleanupInterval }) + origCleanupInterval := CleanupInterval + CleanupInterval = 10 * time.Millisecond + t.Cleanup(func() { CleanupInterval = origCleanupInterval }) - dir := newFakeDirector(ctx, client).(*ebpf.EbpfDirector[*FakeEbpfSdk]) + dir := newFakeDirector(ctx, client, time.Millisecond).(*EbpfDirector[*FakeEbpfSdk]) err := dir.Instrument(ctx, 1, pod_id, workload, "test-app", "test-container") assert.NoError(t, err) @@ -238,11 +240,11 @@ func TestInstrumentNotExistingProcess(t *testing.T) { assert.False(t, inst.closed) // "kill" the process - ebpf.IsProcessExists = func(pid int) bool { + IsProcessExists = func(pid int) bool { return false } // the instrumentation instance is deleted - if !assertInstrumentationInstanceDeleted(t, client, pod_id, 1) { + if !assertInstrumentationInstanceNotExisting(t, client, pod_id, 1) { t.FailNow() } // the director stopped tracking the instrumentation @@ -254,6 +256,67 @@ func TestInstrumentNotExistingProcess(t *testing.T) { assert.True(t, inst.closed) } +func TestInstrumentNotExistingProcessWithSlowInstrumentation(t *testing.T) { + ctx := context.Background() + scheme := runtime.NewScheme() + corev1.AddToScheme(scheme) + odigosv1.AddToScheme(scheme) + + workload := &workload.PodWorkload{ + Name: "test-workload", + Namespace: "default", + Kind: "Deployment", + } + pod_id := types.NamespacedName{Name: "test", Namespace: "default"} + pod := corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod_id.Name, + Namespace: pod_id.Namespace, + }, + } + + client := fake. + NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&odigosv1.InstrumentationInstance{}). + WithRuntimeObjects(&pod). + Build() + + origIsProcessExists := IsProcessExists + IsProcessExists = func(pid int) bool { + return true + } + t.Cleanup(func() { IsProcessExists = origIsProcessExists }) + + // setup the cleanup interval to be very short for the test to be responsive + origCleanupInterval := CleanupInterval + CleanupInterval = 10 * time.Millisecond + t.Cleanup(func() { CleanupInterval = origCleanupInterval }) + + dir := newFakeDirector(ctx, client, time.Second).(*EbpfDirector[*FakeEbpfSdk]) + err := dir.Instrument(ctx, 1, pod_id, workload, "test-app", "test-container") + assert.NoError(t, err) + + <-time.After(100 * time.Millisecond) + // "kill" the process while the instrumentation is still setting up + IsProcessExists = func(pid int) bool { + return false + } + + // wait for the instrumentation to initialize + <-time.After(1 * time.Second) + + // the instrumentation instance is not existing + if !assertInstrumentationInstanceNotExisting(t, client, pod_id, 1) { + t.FailNow() + } + // the director stopped tracking the instrumentation + insts := dir.GetWorkloadInstrumentations(workload) + if !assert.Len(t, insts, 0) { + t.FailNow() + } +} + func TestMultiplePodsInstrumentation(t *testing.T) { ctx := context.Background() scheme := runtime.NewScheme() @@ -287,13 +350,13 @@ func TestMultiplePodsInstrumentation(t *testing.T) { WithLists(&podList). Build() - origIsProcessExists := ebpf.IsProcessExists - ebpf.IsProcessExists = func(pid int) bool { + origIsProcessExists := IsProcessExists + IsProcessExists = func(pid int) bool { return true } - t.Cleanup(func() { ebpf.IsProcessExists = origIsProcessExists }) + t.Cleanup(func() { IsProcessExists = origIsProcessExists }) - dir := newFakeDirector(ctx, client).(*ebpf.EbpfDirector[*FakeEbpfSdk]) + dir := newFakeDirector(ctx, client, time.Millisecond).(*EbpfDirector[*FakeEbpfSdk]) for i := 0; i < numOfPods; i++ { err := dir.Instrument(ctx, i+1, pod_ids[i], workload, "test-app", "test-container") assert.NoError(t, err) @@ -327,7 +390,7 @@ func TestMultiplePodsInstrumentation(t *testing.T) { // the instrumentation instances are deleted for i := 0; i < numOfPods - 1; i++ { - if !assertInstrumentationInstanceDeleted(t, client, pod_ids[i], i+1) { + if !assertInstrumentationInstanceNotExisting(t, client, pod_ids[i], i+1) { t.FailNow() } } @@ -354,3 +417,14 @@ func TestMultiplePodsInstrumentation(t *testing.T) { // The last instrumentation is the one returned assert.Equal(t, insts[0].pid, numOfPods) } + +func TestIsNil(t *testing.T) { + var e OtelEbpfSdk + assert.True(t, isNil(e)) + + e = &FakeEbpfSdk{} + assert.False(t, isNil(e)) + + var e2 *FakeEbpfSdk + assert.True(t, isNil(e2)) +}