Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EBPF-616] gpu: Use event consumer for processs monitoring #30755

Merged
merged 12 commits into from
Nov 12, 2024
6 changes: 4 additions & 2 deletions cmd/system-probe/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func load() (*types.Config, error) {
usmEnabled := cfg.GetBool(smNS("enabled"))
ccmEnabled := cfg.GetBool(ccmNS("enabled"))
csmEnabled := cfg.GetBool(secNS("enabled"))
gpuEnabled := cfg.GetBool(gpuNS("enabled"))

if npmEnabled || usmEnabled || ccmEnabled || (csmEnabled && cfg.GetBool(secNS("network_monitoring.enabled"))) {
c.EnabledModules[NetworkTracerModule] = struct{}{}
Expand All @@ -136,7 +137,8 @@ func load() (*types.Config, error) {
if cfg.GetBool(secNS("enabled")) ||
cfg.GetBool(secNS("fim_enabled")) ||
cfg.GetBool(evNS("process.enabled")) ||
(c.ModuleIsEnabled(NetworkTracerModule) && cfg.GetBool(evNS("network_process.enabled"))) {
(c.ModuleIsEnabled(NetworkTracerModule) && cfg.GetBool(evNS("network_process.enabled")) ||
gpuEnabled) {
c.EnabledModules[EventMonitorModule] = struct{}{}
}
if cfg.GetBool(secNS("enabled")) && cfg.GetBool(secNS("compliance_module.enabled")) {
Expand All @@ -163,7 +165,7 @@ func load() (*types.Config, error) {
if cfg.GetBool(discoveryNS("enabled")) {
c.EnabledModules[DiscoveryModule] = struct{}{}
}
if cfg.GetBool(gpuNS("enabled")) {
if gpuEnabled {
c.EnabledModules[GPUMonitoringModule] = struct{}{}
}

Expand Down
6 changes: 4 additions & 2 deletions cmd/system-probe/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func TestEventMonitor(t *testing.T) {
mock.NewSystemProbe(t)

for i, tc := range []struct {
cws, fim, processEvents, networkEvents bool
enabled bool
cws, fim, processEvents, networkEvents, gpu bool
enabled bool
}{
{cws: false, fim: false, processEvents: false, networkEvents: false, enabled: false},
{cws: false, fim: false, processEvents: true, networkEvents: false, enabled: true},
Expand All @@ -42,6 +42,7 @@ func TestEventMonitor(t *testing.T) {
{cws: true, fim: false, processEvents: true, networkEvents: true, enabled: true},
{cws: true, fim: true, processEvents: false, networkEvents: true, enabled: true},
{cws: true, fim: true, processEvents: true, networkEvents: true, enabled: true},
{cws: false, fim: false, processEvents: false, networkEvents: false, gpu: true, enabled: true},
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
t.Logf("%+v\n", tc)
Expand All @@ -50,6 +51,7 @@ func TestEventMonitor(t *testing.T) {
t.Setenv("DD_SYSTEM_PROBE_EVENT_MONITORING_PROCESS_ENABLED", strconv.FormatBool(tc.processEvents))
t.Setenv("DD_SYSTEM_PROBE_EVENT_MONITORING_NETWORK_PROCESS_ENABLED", strconv.FormatBool(tc.networkEvents))
t.Setenv("DD_SYSTEM_PROBE_NETWORK_ENABLED", strconv.FormatBool(tc.networkEvents))
t.Setenv("DD_GPU_MONITORING_ENABLED", strconv.FormatBool(tc.gpu))

cfg, err := New("/doesnotexist", "")
t.Logf("%+v\n", cfg)
Expand Down
2 changes: 1 addition & 1 deletion cmd/system-probe/modules/all_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var All = []module.Factory{
Pinger,
Traceroute,
DiscoveryModule,
GPUMonitoring,
GPUMonitoring, // GPU monitoring needs to be initialized afer EventMonitor, so that we have the event consumer ready
}

func inactivityEventLog(_ time.Duration) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/system-probe/modules/all_linux_arm64.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var All = []module.Factory{
Pinger,
Traceroute,
DiscoveryModule,
GPUMonitoring,
GPUMonitoring, // GPU monitoring needs to be initialized afer EventMonitor, so that we have the event consumer ready
}

func inactivityEventLog(_ time.Duration) {
Expand Down
11 changes: 11 additions & 0 deletions cmd/system-probe/modules/eventmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
package modules

import (
"fmt"

"github.com/DataDog/datadog-agent/cmd/system-probe/api/module"
sysconfigtypes "github.com/DataDog/datadog-agent/cmd/system-probe/config/types"
"github.com/DataDog/datadog-agent/pkg/eventmonitor"
emconfig "github.com/DataDog/datadog-agent/pkg/eventmonitor/config"
gpuconfig "github.com/DataDog/datadog-agent/pkg/gpu/config"
netconfig "github.com/DataDog/datadog-agent/pkg/network/config"
"github.com/DataDog/datadog-agent/pkg/network/events"
procconsumer "github.com/DataDog/datadog-agent/pkg/process/events/consumer"
Expand Down Expand Up @@ -91,5 +94,13 @@ func createEventMonitorModule(_ *sysconfigtypes.Config, deps module.FactoryDepen
}
}

gpucfg := gpuconfig.New()
if gpucfg.Enabled {
err := createGPUProcessEventConsumer(evm)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'm not 100% sure, and I still have a TODO list item to refactor this, but don't you need to call evm.RegisterEventConsumer( here, like other consumers ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's included in the createGPUProcessEventConsumer function, more specifically here in the generic method for process-data consumers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if you could let me know how do you end up refactoring that part it'd be great, as we were thinking about refactoring the system-probe modules to allow passing dependencies such as the event consumer without resorting to globals.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the process consumer also calling it in

evm.RegisterEventConsumer(process)
? so it would be calling it twice

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the goal of the refactoring would be to just clean up the need to call register twice with subtly different names..

Copy link
Contributor Author

@gjulianm gjulianm Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a different consumer, for forwarding data to process-agent I think. This one is the one I introduced in this PR to have a generic consumer that can be used to replace pkg/process/monitor:ProcessMonitor, which turned out to be only for USM use.

if err != nil {
return nil, fmt.Errorf("cannot create event consumer for GPU: %w", err)
}
}

return evm, err
}
4 changes: 4 additions & 0 deletions cmd/system-probe/modules/eventmonitor_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ var EventMonitor = module.Factory{
func createProcessMonitorConsumer(_ *eventmonitor.EventMonitor, _ *netconfig.Config) (eventmonitor.EventConsumer, error) {
return nil, nil
}

func createGPUProcessEventConsumer(_ *eventmonitor.EventMonitor) error {
return nil
}
29 changes: 28 additions & 1 deletion cmd/system-probe/modules/gpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/DataDog/datadog-agent/cmd/system-probe/config"
sysconfigtypes "github.com/DataDog/datadog-agent/cmd/system-probe/config/types"
"github.com/DataDog/datadog-agent/cmd/system-probe/utils"
"github.com/DataDog/datadog-agent/pkg/eventmonitor"
"github.com/DataDog/datadog-agent/pkg/eventmonitor/consumers"
"github.com/DataDog/datadog-agent/pkg/gpu"
gpuconfig "github.com/DataDog/datadog-agent/pkg/gpu/config"
"github.com/DataDog/datadog-agent/pkg/util/log"
Expand All @@ -27,18 +29,32 @@ import (
var _ module.Module = &GPUMonitoringModule{}
var gpuMonitoringConfigNamespaces = []string{gpuconfig.GPUNS}

// processEventConsumer is a global variable that holds the process event consumer, created in the eventmonitor module
// Note: In the future we should have a better way to handle dependencies between modules
var processEventConsumer *consumers.ProcessConsumer

const processConsumerID = "gpu"
const processConsumerChanSize = 100

var processConsumerEventTypes = []consumers.ProcessConsumerEventTypes{consumers.ExecEventType, consumers.ExitEventType}

// GPUMonitoring Factory
var GPUMonitoring = module.Factory{
Name: config.GPUMonitoringModule,
ConfigNamespaces: gpuMonitoringConfigNamespaces,
Fn: func(_ *sysconfigtypes.Config, deps module.FactoryDependencies) (module.Module, error) {

if processEventConsumer == nil {
return nil, fmt.Errorf("process event consumer not initialized")
}

c := gpuconfig.New()
probeDeps := gpu.ProbeDependencies{
Telemetry: deps.Telemetry,
//if the config parameter doesn't exist or is empty string, the default value is used as defined in go-nvml library
//(https://github.com/NVIDIA/go-nvml/blob/main/pkg/nvml/lib.go#L30)
NvmlLib: nvml.New(nvml.WithLibraryPath(c.NVMLLibraryPath)),
NvmlLib: nvml.New(nvml.WithLibraryPath(c.NVMLLibraryPath)),
ProcessMonitor: processEventConsumer,
}

ret := probeDeps.NvmlLib.Init()
Expand Down Expand Up @@ -95,3 +111,14 @@ func (t *GPUMonitoringModule) GetStats() map[string]interface{} {
func (t *GPUMonitoringModule) Close() {
t.Probe.Close()
}

// createGPUProcessEventConsumer creates the process event consumer for the GPU module. Should be called from the event monitor module
func createGPUProcessEventConsumer(evm *eventmonitor.EventMonitor) error {
var err error
processEventConsumer, err = consumers.NewProcessConsumer(processConsumerID, processConsumerChanSize, processConsumerEventTypes, evm)
if err != nil {
return err
}

return nil
}
29 changes: 3 additions & 26 deletions pkg/gpu/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,16 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build linux

// Package config provides the GPU monitoring config.
package config

import (
"errors"
"fmt"
"time"

sysconfig "github.com/DataDog/datadog-agent/cmd/system-probe/config"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/ebpf"
"github.com/DataDog/datadog-agent/pkg/util/kernel"
)

// GPUNS is the namespace for the GPU monitoring probe.
Expand All @@ -25,31 +21,11 @@ const GPUNS = "gpu_monitoring"
// ErrNotSupported is the error returned if GPU monitoring is not supported on this platform
var ErrNotSupported = errors.New("GPU Monitoring is not supported")

// MinimumKernelVersion indicates the minimum kernel version required for GPU monitoring
var MinimumKernelVersion kernel.Version

func init() {
// we rely on ring buffer support for GPU monitoring, hence the minimal kernel version is 5.8.0
MinimumKernelVersion = kernel.VersionCode(5, 8, 0)
}

// CheckGPUSupported checks if the host's kernel supports GPU monitoring
func CheckGPUSupported() error {
kversion, err := kernel.HostVersion()
if err != nil {
return fmt.Errorf("%w: could not determine the current kernel version: %w", ErrNotSupported, err)
}

if kversion < MinimumKernelVersion {
return fmt.Errorf("%w: a Linux kernel version of %s or higher is required; we detected %s", ErrNotSupported, MinimumKernelVersion, kversion)
}

return nil
}

// Config holds the configuration for the GPU monitoring probe.
type Config struct {
ebpf.Config
// Enabled indicates whether the GPU monitoring probe is enabled.
Enabled bool
// ScanTerminatedProcessesInterval is the interval at which the probe scans for terminated processes.
ScanTerminatedProcessesInterval time.Duration
// InitialProcessSync indicates whether the probe should sync the process list on startup.
Expand All @@ -66,5 +42,6 @@ func New() *Config {
ScanTerminatedProcessesInterval: time.Duration(spCfg.GetInt(sysconfig.FullKeyPath(GPUNS, "process_scan_interval_seconds"))) * time.Second,
InitialProcessSync: spCfg.GetBool(sysconfig.FullKeyPath(GPUNS, "initial_process_sync")),
NVMLLibraryPath: spCfg.GetString(sysconfig.FullKeyPath(GPUNS, "nvml_lib_path")),
Enabled: spCfg.GetBool(sysconfig.FullKeyPath(GPUNS, "enabled")),
}
}
36 changes: 36 additions & 0 deletions pkg/gpu/config/config_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build linux

package config

import (
"fmt"

"github.com/DataDog/datadog-agent/pkg/util/kernel"
)

// MinimumKernelVersion indicates the minimum kernel version required for GPU monitoring
var MinimumKernelVersion kernel.Version

func init() {
// we rely on ring buffer support for GPU monitoring, hence the minimal kernel version is 5.8.0
MinimumKernelVersion = kernel.VersionCode(5, 8, 0)
}

// CheckGPUSupported checks if the host's kernel supports GPU monitoring
func CheckGPUSupported() error {
kversion, err := kernel.HostVersion()
if err != nil {
return fmt.Errorf("%w: could not determine the current kernel version: %w", ErrNotSupported, err)
}

if kversion < MinimumKernelVersion {
return fmt.Errorf("%w: a Linux kernel version of %s or higher is required; we detected %s", ErrNotSupported, MinimumKernelVersion, kversion)
}

return nil
}
15 changes: 15 additions & 0 deletions pkg/gpu/config/config_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build !linux

package config

import "errors"

// CheckGPUSupported checks if the host's kernel supports GPU monitoring
func CheckGPUSupported() error {
return errors.New("GPU monitoring is not supported on this platform")
}
26 changes: 10 additions & 16 deletions pkg/gpu/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ package gpu

import (
"fmt"
sysconfig "github.com/DataDog/datadog-agent/cmd/system-probe/config"
"github.com/DataDog/datadog-agent/pkg/ebpf/bytecode"
"io"
"math"
"os"
"regexp"

sysconfig "github.com/DataDog/datadog-agent/cmd/system-probe/config"
"github.com/DataDog/datadog-agent/pkg/ebpf/bytecode"

manager "github.com/DataDog/ebpf-manager"
"github.com/NVIDIA/go-nvml/pkg/nvml"
"github.com/cilium/ebpf"
Expand All @@ -25,7 +26,6 @@ import (
ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf"
"github.com/DataDog/datadog-agent/pkg/ebpf/uprobes"
"github.com/DataDog/datadog-agent/pkg/gpu/config"
"github.com/DataDog/datadog-agent/pkg/process/monitor"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

Expand Down Expand Up @@ -72,6 +72,9 @@ type ProbeDependencies struct {

// NvmlLib is the NVML library interface
NvmlLib nvml.Interface

// ProcessMonitor is the process monitor interface
ProcessMonitor uprobes.ProcessMonitor
}

// Probe represents the GPU monitoring probe
Expand All @@ -83,7 +86,6 @@ type Probe struct {
statsGenerator *statsGenerator
deps ProbeDependencies
sysCtx *systemContext
procMon *monitor.ProcessMonitor
eventHandler ddebpf.EventHandler
}

Expand All @@ -101,22 +103,15 @@ func NewProbe(cfg *config.Config, deps ProbeDependencies) (*Probe, error) {
}

attachCfg := getAttacherConfig(cfg)
// Note: this will later be replaced by a common way to enable the process monitor across system-probe
procMon := monitor.GetProcessMonitor()
if err := procMon.Initialize(false); err != nil {
return nil, fmt.Errorf("error initializing process monitor: %w", err)
}

sysCtx, err := getSystemContext(deps.NvmlLib, cfg.ProcRoot)
if err != nil {
return nil, fmt.Errorf("error getting system context: %w", err)
}

p := &Probe{
cfg: cfg,
deps: deps,
procMon: procMon,
sysCtx: sysCtx,
cfg: cfg,
deps: deps,
sysCtx: sysCtx,
}

allowRC := cfg.EnableRuntimeCompiler && cfg.AllowRuntimeCompiledFallback
Expand All @@ -143,7 +138,7 @@ func NewProbe(cfg *config.Config, deps ProbeDependencies) (*Probe, error) {
}
}

p.attacher, err = uprobes.NewUprobeAttacher(gpuAttacherName, attachCfg, p.m, nil, &uprobes.NativeBinaryInspector{}, procMon)
p.attacher, err = uprobes.NewUprobeAttacher(gpuAttacherName, attachCfg, p.m, nil, &uprobes.NativeBinaryInspector{}, deps.ProcessMonitor)
if err != nil {
return nil, fmt.Errorf("error creating uprobes attacher: %w", err)
}
Expand Down Expand Up @@ -176,7 +171,6 @@ func (p *Probe) start() error {

// Close stops the probe
func (p *Probe) Close() {
p.procMon.Stop()
p.attacher.Stop()
_ = p.m.Stop(manager.CleanAll)
p.consumer.Stop()
Expand Down
5 changes: 3 additions & 2 deletions pkg/gpu/probe_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (

// ProbeDependencies holds the dependencies for the probe
type ProbeDependencies struct {
Telemetry telemetry.Component
NvmlLib nvml.Interface
Telemetry telemetry.Component
NvmlLib nvml.Interface
ProcessMonitor any // uprobes.ProcessMonitor is only compiled with the linux_bpf build tag, so we need to use type any here
}

// Probe is not implemented on non-linux systems
Expand Down
Loading
Loading