Skip to content

Commit

Permalink
discovery: report CPU usage (#29003)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yumasi committed Sep 5, 2024
1 parent 7786b70 commit 9185aee
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 20 deletions.
2 changes: 2 additions & 0 deletions pkg/collector/corechecks/servicediscovery/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type eventPayload struct {
PID int `json:"pid"`
CommandLine []string `json:"command_line"`
RSSMemory uint64 `json:"rss_memory"`
CPUCores float64 `json:"cpu_cores"`
}

type event struct {
Expand Down Expand Up @@ -87,6 +88,7 @@ func (ts *telemetrySender) newEvent(t eventType, svc serviceInfo) *event {
PID: svc.service.PID,
CommandLine: svc.service.CommandLine,
RSSMemory: svc.service.RSS,
CPUCores: svc.service.CPUCores,
},
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/collector/corechecks/servicediscovery/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func Test_telemetrySender(t *testing.T) {
GeneratedName: "generated-name",
DDService: "dd-service",
DDServiceInjected: true,
CPUCores: 1.5,
},
meta: ServiceMetadata{
Name: "test-service",
Expand Down Expand Up @@ -99,6 +100,7 @@ func Test_telemetrySender(t *testing.T) {
PID: 99,
CommandLine: []string{"test-service", "--args"},
RSSMemory: 500 * 1024 * 1024,
CPUCores: 1.5,
},
},
{
Expand All @@ -121,6 +123,7 @@ func Test_telemetrySender(t *testing.T) {
PID: 99,
CommandLine: []string{"test-service", "--args"},
RSSMemory: 500 * 1024 * 1024,
CPUCores: 1.5,
},
},
{
Expand All @@ -143,6 +146,7 @@ func Test_telemetrySender(t *testing.T) {
PID: 99,
CommandLine: []string{"test-service", "--args"},
RSSMemory: 500 * 1024 * 1024,
CPUCores: 1.5,
},
},
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/collector/corechecks/servicediscovery/impl_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (li *linuxImpl) DiscoverServices() (*discoveredServices, error) {
if service, ok := serviceMap[pid]; ok {
svc.LastHeartbeat = now
svc.service.RSS = service.RSS
svc.service.CPUCores = service.CPUCores
li.aliveServices[pid] = svc
events.start = append(events.start, *svc)
}
Expand Down Expand Up @@ -112,6 +113,7 @@ func (li *linuxImpl) DiscoverServices() (*discoveredServices, error) {
} else if now.Sub(svc.LastHeartbeat).Truncate(time.Minute) >= heartbeatTime {
svc.LastHeartbeat = now
svc.service.RSS = service.RSS
svc.service.CPUCores = service.CPUCores
events.heartbeat = append(events.heartbeat, *svc)
}
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/collector/corechecks/servicediscovery/impl_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ var (
Ports: []uint16{8080},
APMInstrumentation: string(apm.None),
RSS: 100 * 1024 * 1024,
CPUCores: 1.5,
CommandLine: []string{"test-service-1"},
StartTimeSecs: procLaunchedSeconds,
}
Expand All @@ -87,6 +88,7 @@ var (
Ports: []uint16{8080},
APMInstrumentation: string(apm.None),
RSS: 200 * 1024 * 1024,
CPUCores: 1.5,
CommandLine: []string{"test-service-1"},
StartTimeSecs: procLaunchedSeconds,
}
Expand Down Expand Up @@ -229,6 +231,7 @@ func Test_linuxImpl(t *testing.T) {
CommandLine: []string{"test-service-1"},
APMInstrumentation: "none",
RSSMemory: 100 * 1024 * 1024,
CPUCores: 1.5,
},
},
{
Expand All @@ -250,6 +253,7 @@ func Test_linuxImpl(t *testing.T) {
CommandLine: []string{"test-service-1"},
APMInstrumentation: "none",
RSSMemory: 200 * 1024 * 1024,
CPUCores: 1.5,
},
},
{
Expand All @@ -271,6 +275,7 @@ func Test_linuxImpl(t *testing.T) {
CommandLine: []string{"test-service-1"},
APMInstrumentation: "none",
RSSMemory: 200 * 1024 * 1024,
CPUCores: 1.5,
},
},
{
Expand Down Expand Up @@ -382,6 +387,7 @@ func Test_linuxImpl(t *testing.T) {
CommandLine: []string{"test-service-1"},
APMInstrumentation: "none",
RSSMemory: 100 * 1024 * 1024,
CPUCores: 1.5,
},
},
{
Expand Down Expand Up @@ -437,6 +443,7 @@ func Test_linuxImpl(t *testing.T) {
CommandLine: []string{"test-service-1"},
APMInstrumentation: "none",
RSSMemory: 100 * 1024 * 1024,
CPUCores: 1.5,
},
},
},
Expand Down Expand Up @@ -493,6 +500,7 @@ func Test_linuxImpl(t *testing.T) {
CommandLine: []string{"test-service-1"},
APMInstrumentation: "none",
RSSMemory: 100 * 1024 * 1024,
CPUCores: 1.5,
},
},
{
Expand Down
1 change: 1 addition & 0 deletions pkg/collector/corechecks/servicediscovery/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Service struct {
RSS uint64 `json:"rss"`
CommandLine []string `json:"cmdline"`
StartTimeSecs uint64 `json:"start_time"`
CPUCores float64 `json:"cpu_cores"`
}

// ServicesResponse is the response for the system-probe /discovery/services endpoint.
Expand Down
27 changes: 23 additions & 4 deletions pkg/collector/corechecks/servicediscovery/module/impl_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type serviceInfo struct {
apmInstrumentation apm.Instrumentation
cmdLine []string
startTimeSecs uint64
cpuTime uint64
}

// discovery is an implementation of the Module interface for the discovery module.
Expand All @@ -63,6 +64,10 @@ type discovery struct {

// scrubber is used to remove potentially sensitive data from the command line
scrubber *procutil.DataScrubber

// lastGlobalCPUTime stores the total cpu time of the system from the last time
// the endpoint was called.
lastGlobalCPUTime uint64
}

// NewDiscoveryModule creates a new discovery system probe module.
Expand Down Expand Up @@ -288,8 +293,9 @@ func getNsInfo(pid int) (*namespaceInfo, error) {
// parsingContext holds temporary context not preserved between invocations of
// the endpoint.
type parsingContext struct {
procRoot string
netNsInfo map[uint32]*namespaceInfo
procRoot string
netNsInfo map[uint32]*namespaceInfo
globalCPUTime uint64
}

// getServiceInfo gets the service information for a process using the
Expand Down Expand Up @@ -454,6 +460,11 @@ func (s *discovery) getService(context parsingContext, pid int32) *model.Service
name = info.generatedName
}

cpu, err := updateCPUCoresStats(proc, info, s.lastGlobalCPUTime, context.globalCPUTime)
if err != nil {
return nil
}

return &model.Service{
PID: int(pid),
Name: name,
Expand All @@ -466,6 +477,7 @@ func (s *discovery) getService(context parsingContext, pid int32) *model.Service
RSS: rss,
CommandLine: info.cmdLine,
StartTimeSecs: info.startTimeSecs,
CPUCores: cpu,
}
}

Expand All @@ -492,9 +504,15 @@ func (s *discovery) getServices() (*[]model.Service, error) {
return nil, err
}

globalCPUTime, err := getGlobalCPUTime()
if err != nil {
return nil, err
}

context := parsingContext{
procRoot: procRoot,
netNsInfo: make(map[uint32]*namespaceInfo),
procRoot: procRoot,
netNsInfo: make(map[uint32]*namespaceInfo),
globalCPUTime: globalCPUTime,
}

var services []model.Service
Expand All @@ -512,6 +530,7 @@ func (s *discovery) getServices() (*[]model.Service, error) {
}

s.cleanCache(alivePids)
s.lastGlobalCPUTime = context.globalCPUTime

return &services, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ func TestAPMInstrumentationProvided(t *testing.T) {
assert.Equal(collect, string(test.language), portMap[pid].Language)
assert.Equal(collect, string(apm.Provided), portMap[pid].APMInstrumentation)
assertStat(t, portMap[pid])
assertCPU(t, url, pid)
}, 30*time.Second, 100*time.Millisecond)
})
}
Expand Down Expand Up @@ -512,6 +513,21 @@ func assertStat(t assert.TestingT, svc model.Service) {
assert.Equal(t, uint64(createTimeMs/1000), svc.StartTimeSecs)
}

func assertCPU(t *testing.T, url string, pid int) {
proc, err := process.NewProcess(int32(pid))
require.NoError(t, err, "could not create gopsutil process handle")

// Compare CPU usage measurement over an interval.
_ = getServicesMap(t, url)
referenceValue, err := proc.Percent(1 * time.Second)
require.NoError(t, err, "could not get gopsutil cpu usage value")

// Calling getServicesMap a second time us the CPU usage percentage since the last call, which should be close to gopsutil value.
portMap := getServicesMap(t, url)
assert.Contains(t, portMap, pid)
assert.InDelta(t, referenceValue, portMap[pid].CPUCores, 0.10)
}

func TestCommandLineSanitization(t *testing.T) {
serverDir := buildFakeServer(t)
url := setupDiscoveryModule(t)
Expand Down Expand Up @@ -590,6 +606,7 @@ func TestNodeDocker(t *testing.T) {
assert.Equal(collect, svcMap[pid].GeneratedName, svcMap[pid].Name)
assert.Equal(collect, "provided", svcMap[pid].APMInstrumentation)
assertStat(collect, svcMap[pid])
assertCPU(t, url, pid)
}, 30*time.Second, 100*time.Millisecond)
}

Expand Down
79 changes: 79 additions & 0 deletions pkg/collector/corechecks/servicediscovery/module/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
package module

import (
"bufio"
"bytes"
"errors"
"os"
"runtime"
"strconv"
"strings"

Expand Down Expand Up @@ -47,3 +50,79 @@ func getRSS(proc *process.Process) (uint64, error) {

return rssPages * pageSize, nil
}

func getGlobalCPUTime() (uint64, error) {
globalStatPath := kernel.HostProc("stat")

// This file is very small so just read it fully.
file, err := os.Open(globalStatPath)
if err != nil {
return 0, err
}
defer file.Close()

scanner := bufio.NewScanner(file)
// Try to read the first line; it contains all the info we need.
if !scanner.Scan() {
return 0, scanner.Err()
}

// See proc(5) for a description of the format of statm and the fields.
fields := strings.Fields(scanner.Text())
if fields[0] != "cpu" {
return 0, errors.New("invalid /proc/stat file")
}

var totalTime uint64
for _, field := range fields[1:] {
val, err := strconv.ParseUint(field, 10, 64)
if err != nil {
return 0, err
}
totalTime += val
}

return totalTime, nil
}

func updateCPUCoresStats(proc *process.Process, info *serviceInfo, lastGlobalCPUTime, currentGlobalCPUTime uint64) (float64, error) {
statPath := kernel.HostProc(strconv.Itoa(int(proc.Pid)), "stat")

// This file is very small so just read it fully.
content, err := os.ReadFile(statPath)
if err != nil {
return 0, err
}

startIndex := bytes.LastIndexByte(content, byte(')'))
if startIndex == -1 || startIndex+1 >= len(content) {
return 0, errors.New("invalid stat format")
}

// See proc(5) for a description of the format of statm and the fields.
fields := strings.Fields(string(content[startIndex+1:]))
if len(fields) < 50 {
return 0, errors.New("invalid stat format")
}

// Parse fields number 14 and 15, resp. User and System CPU time.
// See proc_pid_stat(5), for details.
// Here we address 11 & 12 since we skipped the first two fields.
usrTime, err := strconv.ParseUint(fields[11], 10, 64)
if err != nil {
return 0, err
}

sysTime, err := strconv.ParseUint(fields[12], 10, 64)
if err != nil {
return 0, err
}

processTimeDelta := float64(usrTime + sysTime - info.cpuTime)
globalTimeDelta := float64(currentGlobalCPUTime - lastGlobalCPUTime)
cpuUsage := processTimeDelta / globalTimeDelta * float64(runtime.NumCPU())

info.cpuTime = usrTime + sysTime

return cpuUsage, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ type osImpl interface {
DiscoverServices() (*discoveredServices, error)
}

var (
newOSImpl func(ignoreCfg map[string]bool) (osImpl, error)
)
var newOSImpl func(ignoreCfg map[string]bool) (osImpl, error)

type config struct {
IgnoreProcesses []string `yaml:"ignore_processes"`
Expand Down
27 changes: 14 additions & 13 deletions test/fakeintake/aggregator/servicediscoveryAggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,20 @@ type ServiceDiscoveryPayload struct {
RequestType string `json:"request_type"`
APIVersion string `json:"api_version"`
Payload struct {
NamingSchemaVersion string `json:"naming_schema_version"`
ServiceName string `json:"service_name"`
GeneratedServiceName string `json:"generated_service_name"`
DDService string `json:"dd_service,omitempty"`
HostName string `json:"host_name"`
Env string `json:"env"`
ServiceLanguage string `json:"service_language"`
ServiceType string `json:"service_type"`
StartTime int64 `json:"start_time"`
LastSeen int64 `json:"last_seen"`
APMInstrumentation string `json:"apm_instrumentation"`
ServiceNameSource string `json:"service_name_source,omitempty"`
RSSMemory uint64 `json:"rss_memory"`
NamingSchemaVersion string `json:"naming_schema_version"`
ServiceName string `json:"service_name"`
GeneratedServiceName string `json:"generated_service_name"`
DDService string `json:"dd_service,omitempty"`
HostName string `json:"host_name"`
Env string `json:"env"`
ServiceLanguage string `json:"service_language"`
ServiceType string `json:"service_type"`
StartTime int64 `json:"start_time"`
LastSeen int64 `json:"last_seen"`
APMInstrumentation string `json:"apm_instrumentation"`
ServiceNameSource string `json:"service_name_source,omitempty"`
RSSMemory uint64 `json:"rss_memory"`
CPUCores float64 `json:"cpu_cores"`
} `json:"payload"`
}

Expand Down

0 comments on commit 9185aee

Please sign in to comment.