Skip to content

Commit

Permalink
Fix nil check for eBPF instrumentation (#1576)
Browse files Browse the repository at this point in the history
Co-authored-by: Tamir David <[email protected]>
  • Loading branch information
RonFed and tamirdavid1 authored Oct 8, 2024
1 parent 8b16bd0 commit 8599f1a
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 28 deletions.
13 changes: 11 additions & 2 deletions odiglet/pkg/ebpf/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"os"
"reflect"
"sync"
"sync/atomic"
"syscall"
Expand Down Expand Up @@ -175,6 +176,11 @@ var IsProcessExists = func(pid int) bool {
return false
}

// Since OtelEbpfSdk is a generic type, we can't simply check it is nil with inst == nil
func isNil[T OtelEbpfSdk](inst T) bool {
return reflect.ValueOf(&inst).Elem().IsZero()
}

func (d *EbpfDirector[T]) periodicCleanup(ctx context.Context) {
ticker := time.NewTicker(CleanupInterval)
defer ticker.Stop()
Expand All @@ -189,7 +195,10 @@ func (d *EbpfDirector[T]) periodicCleanup(ctx context.Context) {
newInstrumentedProcesses := make([]*InstrumentedProcess[T], 0, len(details.InstrumentedProcesses))
for i := range details.InstrumentedProcesses {
ip := details.InstrumentedProcesses[i]
if !IsProcessExists(ip.PID) && any(ip.inst) != nil {
// if the process does not exist, we should make sure we clean the instrumentation resources.
// Also making sure the instrumentation itself is not nil to avoid closing it here.
// This can happen if the process exits while the instrumentation is initializing.
if !IsProcessExists(ip.PID) && !isNil(ip.inst) {
log.Logger.V(0).Info("Instrumented process does not exist, cleaning up", "pid", ip.PID)
d.cleanProcess(ctx, pod, ip)
} else {
Expand Down Expand Up @@ -428,7 +437,7 @@ func (d *EbpfDirector[T]) GetWorkloadInstrumentations(workload *workload.PodWork
}

for _, ip := range details.InstrumentedProcesses {
if any(ip.inst) != nil {
if !isNil(ip.inst) {
insts = append(insts, ip.inst)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package test
package ebpf

import (
"context"
Expand All @@ -11,7 +11,6 @@ import (
"github.com/odigos-io/odigos/common"
"github.com/odigos-io/odigos/k8sutils/pkg/instrumentation_instance"
"github.com/odigos-io/odigos/k8sutils/pkg/workload"
"github.com/odigos-io/odigos/odiglet/pkg/ebpf"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -32,7 +31,7 @@ type FakeEbpfSdk struct {
}

// compile-time check that FakeEbpfSdk implements ConfigurableOtelEbpfSdk
var _ ebpf.ConfigurableOtelEbpfSdk = (*FakeEbpfSdk)(nil)
var _ ConfigurableOtelEbpfSdk = (*FakeEbpfSdk)(nil)

func (f *FakeEbpfSdk) ApplyConfig(ctx context.Context, config *odigosv1.InstrumentationConfig) error {
return nil
Expand Down Expand Up @@ -63,24 +62,27 @@ func (f *FakeEbpfSdk) Run(ctx context.Context) error {
}

type FakeInstrumentationFactory struct {
timeToSetup time.Duration
kubeclient client.Client
}

func NewFakeInstrumentationFactory(kubeclient client.Client) ebpf.InstrumentationFactory[*FakeEbpfSdk] {
func NewFakeInstrumentationFactory(kubeclient client.Client, setupDuration time.Duration) InstrumentationFactory[*FakeEbpfSdk] {
return &FakeInstrumentationFactory{
kubeclient: kubeclient,
timeToSetup: setupDuration,
}
}

func (f *FakeInstrumentationFactory) CreateEbpfInstrumentation(ctx context.Context, pid int, serviceName string, podWorkload *workload.PodWorkload, containerName string, podName string, loadedIndicator chan struct{}) (*FakeEbpfSdk, error) {
<-time.After(f.timeToSetup)
return &FakeEbpfSdk{
loadedIndicator: loadedIndicator,
pid: pid,
}, nil
}

func newFakeDirector(ctx context.Context, client client.Client) ebpf.Director {
dir := ebpf.NewEbpfDirector(ctx, client, client.Scheme(), common.GoProgrammingLanguage, NewFakeInstrumentationFactory(client))
func newFakeDirector(ctx context.Context, client client.Client, setupDuration time.Duration) Director {
dir := NewEbpfDirector(ctx, client, client.Scheme(), common.GoProgrammingLanguage, NewFakeInstrumentationFactory(client, setupDuration))
return dir
}

Expand Down Expand Up @@ -113,7 +115,7 @@ func assertHealthyInstrumentationInstance(t *testing.T, client client.Client, po
return assert.False(t, *instance.Status.Healthy)
}

func assertInstrumentationInstanceDeleted(t *testing.T, client client.Client, pod types.NamespacedName, pid int) bool {
func assertInstrumentationInstanceNotExisting(t *testing.T, client client.Client, pod types.NamespacedName, pid int) bool {
// instrumentation instance is deleted
return assert.Eventually(t, func() bool {
return getInstrumentationInstance(client, pod, pid) == nil
Expand Down Expand Up @@ -145,15 +147,15 @@ func TestSingleInstrumentation(t *testing.T) {
WithRuntimeObjects(&pod).
Build()

origIsProcessExists := ebpf.IsProcessExists
ebpf.IsProcessExists = func(pid int) bool {
origIsProcessExists := IsProcessExists
IsProcessExists = func(pid int) bool {
return true
}
t.Cleanup(func() { ebpf.IsProcessExists = origIsProcessExists })
t.Cleanup(func() { IsProcessExists = origIsProcessExists })

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dir := newFakeDirector(ctx, client).(*ebpf.EbpfDirector[*FakeEbpfSdk])
dir := newFakeDirector(ctx, client, time.Millisecond).(*EbpfDirector[*FakeEbpfSdk])
err := dir.Instrument(ctx, 1, pod_id, workload, "test-app", "test-container")
assert.NoError(t, err)

Expand All @@ -171,7 +173,7 @@ func TestSingleInstrumentation(t *testing.T) {
// cleanup
dir.Cleanup(pod_id)
// the instrumentation instance is deleted
if !assertInstrumentationInstanceDeleted(t, client, pod_id, 1) {
if !assertInstrumentationInstanceNotExisting(t, client, pod_id, 1) {
t.FailNow()
}

Expand Down Expand Up @@ -210,18 +212,18 @@ func TestInstrumentNotExistingProcess(t *testing.T) {
WithRuntimeObjects(&pod).
Build()

origIsProcessExists := ebpf.IsProcessExists
ebpf.IsProcessExists = func(pid int) bool {
origIsProcessExists := IsProcessExists
IsProcessExists = func(pid int) bool {
return true
}
t.Cleanup(func() { ebpf.IsProcessExists = origIsProcessExists })
t.Cleanup(func() { IsProcessExists = origIsProcessExists })

// setup the cleanup interval to be very short for the test to be responsive
origCleanupInterval := ebpf.CleanupInterval
ebpf.CleanupInterval = 10 * time.Millisecond
t.Cleanup(func() { ebpf.CleanupInterval = origCleanupInterval })
origCleanupInterval := CleanupInterval
CleanupInterval = 10 * time.Millisecond
t.Cleanup(func() { CleanupInterval = origCleanupInterval })

dir := newFakeDirector(ctx, client).(*ebpf.EbpfDirector[*FakeEbpfSdk])
dir := newFakeDirector(ctx, client, time.Millisecond).(*EbpfDirector[*FakeEbpfSdk])
err := dir.Instrument(ctx, 1, pod_id, workload, "test-app", "test-container")
assert.NoError(t, err)

Expand All @@ -238,11 +240,11 @@ func TestInstrumentNotExistingProcess(t *testing.T) {
assert.False(t, inst.closed)

// "kill" the process
ebpf.IsProcessExists = func(pid int) bool {
IsProcessExists = func(pid int) bool {
return false
}
// the instrumentation instance is deleted
if !assertInstrumentationInstanceDeleted(t, client, pod_id, 1) {
if !assertInstrumentationInstanceNotExisting(t, client, pod_id, 1) {
t.FailNow()
}
// the director stopped tracking the instrumentation
Expand All @@ -254,6 +256,67 @@ func TestInstrumentNotExistingProcess(t *testing.T) {
assert.True(t, inst.closed)
}

func TestInstrumentNotExistingProcessWithSlowInstrumentation(t *testing.T) {
ctx := context.Background()
scheme := runtime.NewScheme()
corev1.AddToScheme(scheme)
odigosv1.AddToScheme(scheme)

workload := &workload.PodWorkload{
Name: "test-workload",
Namespace: "default",
Kind: "Deployment",
}
pod_id := types.NamespacedName{Name: "test", Namespace: "default"}
pod := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: pod_id.Name,
Namespace: pod_id.Namespace,
},
}

client := fake.
NewClientBuilder().
WithScheme(scheme).
WithStatusSubresource(&odigosv1.InstrumentationInstance{}).
WithRuntimeObjects(&pod).
Build()

origIsProcessExists := IsProcessExists
IsProcessExists = func(pid int) bool {
return true
}
t.Cleanup(func() { IsProcessExists = origIsProcessExists })

// setup the cleanup interval to be very short for the test to be responsive
origCleanupInterval := CleanupInterval
CleanupInterval = 10 * time.Millisecond
t.Cleanup(func() { CleanupInterval = origCleanupInterval })

dir := newFakeDirector(ctx, client, time.Second).(*EbpfDirector[*FakeEbpfSdk])
err := dir.Instrument(ctx, 1, pod_id, workload, "test-app", "test-container")
assert.NoError(t, err)

<-time.After(100 * time.Millisecond)
// "kill" the process while the instrumentation is still setting up
IsProcessExists = func(pid int) bool {
return false
}

// wait for the instrumentation to initialize
<-time.After(1 * time.Second)

// the instrumentation instance is not existing
if !assertInstrumentationInstanceNotExisting(t, client, pod_id, 1) {
t.FailNow()
}
// the director stopped tracking the instrumentation
insts := dir.GetWorkloadInstrumentations(workload)
if !assert.Len(t, insts, 0) {
t.FailNow()
}
}

func TestMultiplePodsInstrumentation(t *testing.T) {
ctx := context.Background()
scheme := runtime.NewScheme()
Expand Down Expand Up @@ -287,13 +350,13 @@ func TestMultiplePodsInstrumentation(t *testing.T) {
WithLists(&podList).
Build()

origIsProcessExists := ebpf.IsProcessExists
ebpf.IsProcessExists = func(pid int) bool {
origIsProcessExists := IsProcessExists
IsProcessExists = func(pid int) bool {
return true
}
t.Cleanup(func() { ebpf.IsProcessExists = origIsProcessExists })
t.Cleanup(func() { IsProcessExists = origIsProcessExists })

dir := newFakeDirector(ctx, client).(*ebpf.EbpfDirector[*FakeEbpfSdk])
dir := newFakeDirector(ctx, client, time.Millisecond).(*EbpfDirector[*FakeEbpfSdk])
for i := 0; i < numOfPods; i++ {
err := dir.Instrument(ctx, i+1, pod_ids[i], workload, "test-app", "test-container")
assert.NoError(t, err)
Expand Down Expand Up @@ -327,7 +390,7 @@ func TestMultiplePodsInstrumentation(t *testing.T) {

// the instrumentation instances are deleted
for i := 0; i < numOfPods - 1; i++ {
if !assertInstrumentationInstanceDeleted(t, client, pod_ids[i], i+1) {
if !assertInstrumentationInstanceNotExisting(t, client, pod_ids[i], i+1) {
t.FailNow()
}
}
Expand All @@ -354,3 +417,14 @@ func TestMultiplePodsInstrumentation(t *testing.T) {
// The last instrumentation is the one returned
assert.Equal(t, insts[0].pid, numOfPods)
}

func TestIsNil(t *testing.T) {
var e OtelEbpfSdk
assert.True(t, isNil(e))

e = &FakeEbpfSdk{}
assert.False(t, isNil(e))

var e2 *FakeEbpfSdk
assert.True(t, isNil(e2))
}

0 comments on commit 8599f1a

Please sign in to comment.