Skip to content

Commit

Permalink
feat: container origin detection on UDP
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmed-mez committed Jan 21, 2022
1 parent d2b1d82 commit d8b32e7
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 4 deletions.
59 changes: 59 additions & 0 deletions statsd/container.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package statsd

import (
"bufio"
"fmt"
"io"
"os"
"regexp"
)

const (
// cgroupPath is the path to the cgroup file where we can find the container id if one exists.
cgroupPath = "/proc/self/cgroup"
)

const (
uuidSource = "[0-9a-f]{8}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{12}"
containerSource = "[0-9a-f]{64}"
taskSource = "[0-9a-f]{32}-\\d+"
)

var (
// expLine matches a line in the /proc/self/cgroup file. It has a submatch for the last element (path), which contains the container ID.
expLine = regexp.MustCompile(`^\d+:[^:]*:(.+)$`)

// expContainerID matches contained IDs and sources. Source: https://github.com/Qard/container-info/blob/master/index.js
expContainerID = regexp.MustCompile(fmt.Sprintf(`(%s|%s|%s)(?:.scope)?$`, uuidSource, containerSource, taskSource))
)

// parseContainerID finds the first container ID reading from r and returns it.
func parseContainerID(r io.Reader) string {
scn := bufio.NewScanner(r)
for scn.Scan() {
path := expLine.FindStringSubmatch(scn.Text())
if len(path) != 2 {
// invalid entry, continue
continue
}
if parts := expContainerID.FindStringSubmatch(path[1]); len(parts) == 2 {
return parts[1]
}
}
return ""
}

// readContainerID attempts to return the container ID from the provided file path or empty on failure.
func readContainerID(fpath string) string {
f, err := os.Open(fpath)
if err != nil {
return ""
}
defer f.Close()
return parseContainerID(f)
}

// ContainerID attempts to return the container ID from /proc/self/cgroup or empty on failure.
var containerID func() string = func() string {
return readContainerID(cgroupPath)
}
65 changes: 65 additions & 0 deletions statsd/container_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package statsd

import (
"io"
"io/ioutil"
"os"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func TestParseContainerID(t *testing.T) {
for in, out := range map[string]string{
`other_line
10:hugetlb:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa
9:cpuset:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa
8:pids:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa
7:freezer:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa
6:cpu,cpuacct:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa
5:perf_event:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa
4:blkio:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa
3:devices:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa
2:net_cls,net_prio:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa`: "8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa",
"10:hugetlb:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa": "8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa",
"10:hugetlb:/kubepods": "",
"11:hugetlb:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da": "432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da",
"1:name=systemd:/docker/34dc0b5e626f2c5c4c5170e34b10e7654ce36f0fcd532739f4445baabea03376": "34dc0b5e626f2c5c4c5170e34b10e7654ce36f0fcd532739f4445baabea03376",
"1:name=systemd:/uuid/34dc0b5e-626f-2c5c-4c51-70e34b10e765": "34dc0b5e-626f-2c5c-4c51-70e34b10e765",
"1:name=systemd:/ecs/34dc0b5e626f2c5c4c5170e34b10e765-1234567890": "34dc0b5e626f2c5c4c5170e34b10e765-1234567890",
"1:name=systemd:/docker/34dc0b5e626f2c5c4c5170e34b10e7654ce36f0fcd532739f4445baabea03376.scope": "34dc0b5e626f2c5c4c5170e34b10e7654ce36f0fcd532739f4445baabea03376",
`1:name=systemd:/nope
2:pids:/docker/34dc0b5e626f2c5c4c5170e34b10e7654ce36f0fcd532739f4445baabea03376
3:cpu:/invalid`: "34dc0b5e626f2c5c4c5170e34b10e7654ce36f0fcd532739f4445baabea03376",
} {
id := parseContainerID(strings.NewReader(in))
if id != out {
t.Fatalf("%q -> %q: %q", in, out, id)
}
}
}

func TestReadContainerID(t *testing.T) {
cid := "8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa"
cgroupContents := "10:hugetlb:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/" + cid

tmpFile, err := ioutil.TempFile(os.TempDir(), "fake-cgroup-")
if err != nil {
t.Fatalf("failed to create fake cgroup file: %v", err)
}
defer os.Remove(tmpFile.Name())

_, err = io.WriteString(tmpFile, cgroupContents)
if err != nil {
t.Fatalf("failed writing to fake cgroup file: %v", err)
}

err = tmpFile.Close()
if err != nil {
t.Fatalf("failed closing fake cgroup file: %v", err)
}

actualCID := readContainerID(tmpFile.Name())
assert.Equal(t, cid, actualCID)
}
70 changes: 70 additions & 0 deletions statsd/end_to_end_udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import (
"github.com/stretchr/testify/assert"
)

func dummyContainerID() string { return "" }

func init() {
containerID = dummyContainerID
}

func TestPipelineWithGlobalTags(t *testing.T) {
ts, client := newClientAndTestServer(t,
"udp",
Expand Down Expand Up @@ -106,6 +112,70 @@ func TestKnownEnvTagsEmptyString(t *testing.T) {
ts.sendAllAndAssert(t, client)
}

func TestPrefixedOriginWithEntityID(t *testing.T) {
entityIDEnvName := "DD_ENTITY_ID"

defer func() { os.Unsetenv(entityIDEnvName) }()

os.Setenv(entityIDEnvName, "pod-uid")

containerID = func() string { return "fake-container-id" }
defer func() { containerID = dummyContainerID }()

expectedTags := []string{"dd.internal.entity_id:pod-uid"}
ts, client := newClientAndTestServer(t,
"udp",
"localhost:8765",
expectedTags,
)

sort.Strings(client.tags)
assert.Equal(t, expectedTags, client.tags)
ts.sendAllAndAssert(t, client)
}

func TestPrefixedOriginWithoutEntityID(t *testing.T) {
entityIDEnvName := "DD_ENTITY_ID"
os.Unsetenv(entityIDEnvName)

containerID = func() string { return "fake-container-id" }
defer func() { containerID = dummyContainerID }()

expectedTags := []string{"dd.internal.prefixed_origin:container_id://fake-container-id"}
ts, client := newClientAndTestServer(t,
"udp",
"localhost:8765",
expectedTags,
)

sort.Strings(client.tags)
assert.Equal(t, expectedTags, client.tags)
ts.sendAllAndAssert(t, client)
}

func TestPrefixedOriginDisabled(t *testing.T) {
entityIDEnvName := "DD_ENTITY_ID"
os.Unsetenv(entityIDEnvName)

originDisabledEnvName := "DD_UDP_ORIGIN_DISABLED"
defer func() { os.Unsetenv(originDisabledEnvName) }()
os.Setenv(originDisabledEnvName, "true")

containerID = func() string { return "fake-container-id" }
defer func() { containerID = dummyContainerID }()

expectedTags := []string{}
ts, client := newClientAndTestServer(t,
"udp",
"localhost:8765",
expectedTags,
)

sort.Strings(client.tags)
assert.Equal(t, expectedTags, client.tags)
ts.sendAllAndAssert(t, client)
}

func TestPipelineWithGlobalTagsAndEnv(t *testing.T) {
orig := os.Getenv("DD_ENV")
os.Setenv("DD_ENV", "test")
Expand Down
45 changes: 41 additions & 4 deletions statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,36 @@ const (
defaultUDPPort = "8125"
)

const (
// ddEntityID specifies client-side user-specified entity ID injection.
// This env var can be set to the Pod UID on Kubernetes via the downward API.
// Docs: https://docs.datadoghq.com/developers/dogstatsd/?tab=kubernetes#origin-detection-over-udp
ddEntityID = "DD_ENTITY_ID"

// ddEntityIDTag specifies the tag name for the client-side entity ID injection
// The Agent expects this tag to contain a non-prefixed Kubernetes Pod UID.
ddEntityIDTag = "dd.internal.entity_id"

// ddPrefixedOrigin specifies the tag name for the client-side prefixed entity ID injection.
// Currently the only supported format is dd.internal.prefixed_origin:container_id://<id>
// dd.internal.entity_id is prioritized over dd.internal.prefixed_origin for backward compatibility.
// If DD_ENTITY_ID is not set, the client tries to fill dd.internal.prefixed_origin automatically unless
// DD_UDP_ORIGIN_DISABLED is explicitly set to "true".
ddPrefixedOrigin = "dd.internal.prefixed_origin"

// udpOriginDisabled specifies the env var to disable injecting the dd.internal.prefixed_origin tag.
udpOriginDisabled = "DD_UDP_ORIGIN_DISABLED"
)

/*
ddEnvTagsMapping is a mapping of each "DD_" prefixed environment variable
to a specific tag name. We use a slice to keep the order and simplify tests.
*/
var ddEnvTagsMapping = []struct{ envName, tagName string }{
{"DD_ENTITY_ID", "dd.internal.entity_id"}, // Client-side entity ID injection for container tagging.
{"DD_ENV", "env"}, // The name of the env in which the service runs.
{"DD_SERVICE", "service"}, // The name of the running service.
{"DD_VERSION", "version"}, // The current version of the running service.
{ddEntityID, ddEntityIDTag}, // Client-side entity ID injection for container tagging.
{"DD_ENV", "env"}, // The name of the env in which the service runs.
{"DD_SERVICE", "service"}, // The name of the running service.
{"DD_VERSION", "version"}, // The current version of the running service.
}

type metricType int
Expand Down Expand Up @@ -319,13 +340,29 @@ func newWithWriter(w io.WriteCloser, o *Options, writerName string) (*Client, er
tags: o.tags,
telemetry: &statsdTelemetry{},
}

hasEntityID := false
// Inject values of DD_* environment variables as global tags.
for _, mapping := range ddEnvTagsMapping {
if value := os.Getenv(mapping.envName); value != "" {
if mapping.envName == ddEntityID {
hasEntityID = true
}
c.tags = append(c.tags, fmt.Sprintf("%s:%s", mapping.tagName, value))
}
}

// If DD_ENTITY_ID is set, we don't send a prefixed origin ID
// as dd.internal.entity_id is prioritized over dd.internal.prefixed_origin
if !hasEntityID {
// Sending a prefixed origin can be disabled by configuring DD_UDP_ORIGIN_DISABLED=true
if isDisabled := strings.ToLower(os.Getenv(udpOriginDisabled)); isDisabled != "true" {
if cID := containerID(); cID != "" {
c.tags = append(c.tags, ddPrefixedOrigin+":container_id://"+cID)
}
}
}

if o.maxBytesPerPayload == 0 {
if writerName == writerNameUDS {
o.maxBytesPerPayload = DefaultMaxAgentPayloadSize
Expand Down

0 comments on commit d8b32e7

Please sign in to comment.