diff --git a/statsd/buffer.go b/statsd/buffer.go index 40d191f5a..cc4bfe979 100644 --- a/statsd/buffer.go +++ b/statsd/buffer.go @@ -35,38 +35,38 @@ func newStatsdBuffer(maxSize, maxElements int) *statsdBuffer { } } -func (b *statsdBuffer) writeGauge(namespace, name, containerID string, globalTags []string, value float64, tags []string, rate float64) error { +func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer - b.buffer = appendGauge(b.buffer, namespace, name, containerID, globalTags, value, tags, rate) + b.buffer = appendGauge(b.buffer, namespace, globalTags, name, value, tags, rate) b.writeSeparator() return b.validateNewElement(originalBuffer) } -func (b *statsdBuffer) writeCount(namespace, name, containerID string, globalTags []string, value int64, tags []string, rate float64) error { +func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name string, value int64, tags []string, rate float64) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer - b.buffer = appendCount(b.buffer, namespace, name, containerID, globalTags, value, tags, rate) + b.buffer = appendCount(b.buffer, namespace, globalTags, name, value, tags, rate) b.writeSeparator() return b.validateNewElement(originalBuffer) } -func (b *statsdBuffer) writeHistogram(namespace, name, containerID string, globalTags []string, value float64, tags []string, rate float64) error { +func (b *statsdBuffer) writeHistogram(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer - b.buffer = appendHistogram(b.buffer, namespace, name, containerID, globalTags, value, tags, rate) + b.buffer = appendHistogram(b.buffer, namespace, globalTags, name, value, tags, rate) b.writeSeparator() return b.validateNewElement(originalBuffer) } // writeAggregated serialized as many values as possible in the current buffer and return the position in values where it stopped. -func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace, name, containerID string, globalTags []string, values []float64, tags string, tagSize int, precision int) (int, error) { +func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, globalTags []string, name string, values []float64, tags string, tagSize int, precision int) (int, error) { if b.elementCount >= b.maxElements { return 0, errBufferFull } @@ -107,7 +107,7 @@ func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace, name, con b.buffer = append(b.buffer, '|') b.buffer = append(b.buffer, metricSymbol...) b.buffer = appendTagsAggregated(b.buffer, globalTags, tags) - b.buffer = appendContainerID(b.buffer, containerID) + b.buffer = appendContainerID(b.buffer) b.writeSeparator() b.elementCount++ @@ -118,52 +118,52 @@ func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace, name, con } -func (b *statsdBuffer) writeDistribution(namespace, name, containerID string, globalTags []string, value float64, tags []string, rate float64) error { +func (b *statsdBuffer) writeDistribution(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer - b.buffer = appendDistribution(b.buffer, namespace, name, containerID, globalTags, value, tags, rate) + b.buffer = appendDistribution(b.buffer, namespace, globalTags, name, value, tags, rate) b.writeSeparator() return b.validateNewElement(originalBuffer) } -func (b *statsdBuffer) writeSet(namespace, name, containerID string, globalTags []string, value string, tags []string, rate float64) error { +func (b *statsdBuffer) writeSet(namespace string, globalTags []string, name string, value string, tags []string, rate float64) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer - b.buffer = appendSet(b.buffer, namespace, name, containerID, globalTags, value, tags, rate) + b.buffer = appendSet(b.buffer, namespace, globalTags, name, value, tags, rate) b.writeSeparator() return b.validateNewElement(originalBuffer) } -func (b *statsdBuffer) writeTiming(namespace, name, containerID string, globalTags []string, value float64, tags []string, rate float64) error { +func (b *statsdBuffer) writeTiming(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer - b.buffer = appendTiming(b.buffer, namespace, name, containerID, globalTags, value, tags, rate) + b.buffer = appendTiming(b.buffer, namespace, globalTags, name, value, tags, rate) b.writeSeparator() return b.validateNewElement(originalBuffer) } -func (b *statsdBuffer) writeEvent(event *Event, globalTags []string, containerID string) error { +func (b *statsdBuffer) writeEvent(event *Event, globalTags []string) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer - b.buffer = appendEvent(b.buffer, event, globalTags, containerID) + b.buffer = appendEvent(b.buffer, event, globalTags) b.writeSeparator() return b.validateNewElement(originalBuffer) } -func (b *statsdBuffer) writeServiceCheck(serviceCheck *ServiceCheck, globalTags []string, containerID string) error { +func (b *statsdBuffer) writeServiceCheck(serviceCheck *ServiceCheck, globalTags []string) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer - b.buffer = appendServiceCheck(b.buffer, serviceCheck, globalTags, containerID) + b.buffer = appendServiceCheck(b.buffer, serviceCheck, globalTags) b.writeSeparator() return b.validateNewElement(originalBuffer) } diff --git a/statsd/buffer_pool_test.go b/statsd/buffer_pool_test.go index 71ecb1777..f150af759 100644 --- a/statsd/buffer_pool_test.go +++ b/statsd/buffer_pool_test.go @@ -33,7 +33,7 @@ func TestBufferPoolEmpty(t *testing.T) { func TestBufferReturn(t *testing.T) { bufferPool := newBufferPool(1, 1024, 20) buffer := bufferPool.borrowBuffer() - buffer.writeCount("", "", "", nil, 1, nil, 1) + buffer.writeCount("", nil, "", 1, nil, 1) assert.Equal(t, 0, len(bufferPool.pool)) bufferPool.returnBuffer(buffer) diff --git a/statsd/buffer_test.go b/statsd/buffer_test.go index 758aba176..9096407a1 100644 --- a/statsd/buffer_test.go +++ b/statsd/buffer_test.go @@ -8,134 +8,158 @@ import ( func TestBufferGauge(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - err := buffer.writeGauge("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1) + err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|g|#tag:tag\n", string(buffer.bytes())) // with a container ID field + patchContainerID("container-id") + defer resetContainerID() + buffer = newStatsdBuffer(1024, 1) - err = buffer.writeGauge("namespace.", "metric", "container-id", []string{"tag:tag"}, 1, []string{}, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|g|#tag:tag|c:container-id\n", string(buffer.bytes())) } func TestBufferCount(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - err := buffer.writeCount("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1) + err := buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|c|#tag:tag\n", string(buffer.bytes())) // with a container ID field + patchContainerID("container-id") + defer resetContainerID() + buffer = newStatsdBuffer(1024, 1) - err = buffer.writeCount("namespace.", "metric", "container-id", []string{"tag:tag"}, 1, []string{}, 1) + err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|c|#tag:tag|c:container-id\n", string(buffer.bytes())) } func TestBufferHistogram(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - err := buffer.writeHistogram("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1) + err := buffer.writeHistogram("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|h|#tag:tag\n", string(buffer.bytes())) // with a container ID field + patchContainerID("container-id") + defer resetContainerID() + buffer = newStatsdBuffer(1024, 1) - err = buffer.writeHistogram("namespace.", "metric", "container-id", []string{"tag:tag"}, 1, []string{}, 1) + err = buffer.writeHistogram("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|h|#tag:tag|c:container-id\n", string(buffer.bytes())) } func TestBufferDistribution(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - err := buffer.writeDistribution("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1) + err := buffer.writeDistribution("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|d|#tag:tag\n", string(buffer.bytes())) // with a container ID field + patchContainerID("container-id") + defer resetContainerID() + buffer = newStatsdBuffer(1024, 1) - err = buffer.writeDistribution("namespace.", "metric", "container-id", []string{"tag:tag"}, 1, []string{}, 1) + err = buffer.writeDistribution("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|d|#tag:tag|c:container-id\n", string(buffer.bytes())) } func TestBufferSet(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - err := buffer.writeSet("namespace.", "metric", "", []string{"tag:tag"}, "value", []string{}, 1) + err := buffer.writeSet("namespace.", []string{"tag:tag"}, "metric", "value", []string{}, 1) assert.Nil(t, err) assert.Equal(t, "namespace.metric:value|s|#tag:tag\n", string(buffer.bytes())) // with a container ID field + patchContainerID("container-id") + defer resetContainerID() + buffer = newStatsdBuffer(1024, 1) - err = buffer.writeSet("namespace.", "metric", "container-id", []string{"tag:tag"}, "value", []string{}, 1) + err = buffer.writeSet("namespace.", []string{"tag:tag"}, "metric", "value", []string{}, 1) assert.Nil(t, err) assert.Equal(t, "namespace.metric:value|s|#tag:tag|c:container-id\n", string(buffer.bytes())) } func TestBufferTiming(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - err := buffer.writeTiming("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1) + err := buffer.writeTiming("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1.000000|ms|#tag:tag\n", string(buffer.bytes())) // with a container ID field + patchContainerID("container-id") + defer resetContainerID() + buffer = newStatsdBuffer(1024, 1) - err = buffer.writeTiming("namespace.", "metric", "container-id", []string{"tag:tag"}, 1, []string{}, 1) + err = buffer.writeTiming("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1.000000|ms|#tag:tag|c:container-id\n", string(buffer.bytes())) } func TestBufferEvent(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - err := buffer.writeEvent(&Event{Title: "title", Text: "text"}, []string{"tag:tag"}, "") + err := buffer.writeEvent(&Event{Title: "title", Text: "text"}, []string{"tag:tag"}) assert.Nil(t, err) assert.Equal(t, "_e{5,4}:title|text|#tag:tag\n", string(buffer.bytes())) // with a container ID field + patchContainerID("container-id") + defer resetContainerID() + buffer = newStatsdBuffer(1024, 1) - err = buffer.writeEvent(&Event{Title: "title", Text: "text"}, []string{"tag:tag"}, "container-id") + err = buffer.writeEvent(&Event{Title: "title", Text: "text"}, []string{"tag:tag"}) assert.Nil(t, err) assert.Equal(t, "_e{5,4}:title|text|#tag:tag|c:container-id\n", string(buffer.bytes())) } func TestBufferServiceCheck(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - err := buffer.writeServiceCheck(&ServiceCheck{Name: "name", Status: Ok}, []string{"tag:tag"}, "") + err := buffer.writeServiceCheck(&ServiceCheck{Name: "name", Status: Ok}, []string{"tag:tag"}) assert.Nil(t, err) assert.Equal(t, "_sc|name|0|#tag:tag\n", string(buffer.bytes())) // with a container ID field + patchContainerID("container-id") + defer resetContainerID() + buffer = newStatsdBuffer(1024, 1) - err = buffer.writeServiceCheck(&ServiceCheck{Name: "name", Status: Ok}, []string{"tag:tag"}, "container-id") + err = buffer.writeServiceCheck(&ServiceCheck{Name: "name", Status: Ok}, []string{"tag:tag"}) assert.Nil(t, err) assert.Equal(t, "_sc|name|0|#tag:tag|c:container-id\n", string(buffer.bytes())) } func TestBufferFullSize(t *testing.T) { buffer := newStatsdBuffer(30, 10) - err := buffer.writeGauge("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1) + err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Nil(t, err) assert.Len(t, buffer.bytes(), 30) - err = buffer.writeGauge("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Equal(t, errBufferFull, err) } func TestBufferSeparator(t *testing.T) { buffer := newStatsdBuffer(1024, 10) - err := buffer.writeGauge("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1) + err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Nil(t, err) - err = buffer.writeGauge("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|g|#tag:tag\nnamespace.metric:1|g|#tag:tag\n", string(buffer.bytes())) } func TestBufferAggregated(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - pos, err := buffer.writeAggregated([]byte("h"), "namespace.", "metric", "", []string{"tag:tag"}, []float64{1}, "", 12, -1) + pos, err := buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1}, "", 12, -1) assert.Nil(t, err) assert.Equal(t, 1, pos) assert.Equal(t, "namespace.metric:1|h|#tag:tag\n", string(buffer.bytes())) buffer = newStatsdBuffer(1024, 1) - pos, err = buffer.writeAggregated([]byte("h"), "namespace.", "metric", "", []string{"tag:tag"}, []float64{1, 2, 3, 4}, "", 12, -1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1) assert.Nil(t, err) assert.Equal(t, 4, pos) assert.Equal(t, "namespace.metric:1:2:3:4|h|#tag:tag\n", string(buffer.bytes())) @@ -143,29 +167,29 @@ func TestBufferAggregated(t *testing.T) { // max element already used buffer = newStatsdBuffer(1024, 1) buffer.elementCount = 1 - pos, err = buffer.writeAggregated([]byte("h"), "namespace.", "metric", "", []string{"tag:tag"}, []float64{1, 2, 3, 4}, "", 12, -1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1) assert.Equal(t, errBufferFull, err) // not enought size to start serializing (tags and header too big) buffer = newStatsdBuffer(4, 1) - pos, err = buffer.writeAggregated([]byte("h"), "namespace.", "metric", "", []string{"tag:tag"}, []float64{1, 2, 3, 4}, "", 12, -1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1) assert.Equal(t, errBufferFull, err) // not enought size to serializing one message buffer = newStatsdBuffer(29, 1) - pos, err = buffer.writeAggregated([]byte("h"), "namespace.", "metric", "", []string{"tag:tag"}, []float64{1, 2, 3, 4}, "", 12, -1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1) assert.Equal(t, errBufferFull, err) // space for only 1 number buffer = newStatsdBuffer(30, 1) - pos, err = buffer.writeAggregated([]byte("h"), "namespace.", "metric", "", []string{"tag:tag"}, []float64{1, 2, 3, 4}, "", 12, -1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1) assert.Equal(t, errPartialWrite, err) assert.Equal(t, 1, pos) assert.Equal(t, "namespace.metric:1|h|#tag:tag\n", string(buffer.bytes())) // first value too big buffer = newStatsdBuffer(30, 1) - pos, err = buffer.writeAggregated([]byte("h"), "namespace.", "metric", "", []string{"tag:tag"}, []float64{12, 2, 3, 4}, "", 12, -1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{12, 2, 3, 4}, "", 12, -1) assert.Equal(t, errBufferFull, err) assert.Equal(t, 0, pos) assert.Equal(t, "", string(buffer.bytes())) // checking that the buffer was reset @@ -173,21 +197,24 @@ func TestBufferAggregated(t *testing.T) { // not enough space left buffer = newStatsdBuffer(40, 1) buffer.buffer = append(buffer.buffer, []byte("abcdefghij")...) - pos, err = buffer.writeAggregated([]byte("h"), "namespace.", "metric", "", []string{"tag:tag"}, []float64{12, 2, 3, 4}, "", 12, -1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{12, 2, 3, 4}, "", 12, -1) assert.Equal(t, errBufferFull, err) assert.Equal(t, 0, pos) assert.Equal(t, "abcdefghij", string(buffer.bytes())) // checking that the buffer was reset // space for only 2 number buffer = newStatsdBuffer(32, 1) - pos, err = buffer.writeAggregated([]byte("h"), "namespace.", "metric", "", []string{"tag:tag"}, []float64{1, 2, 3, 4}, "", 12, -1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1) assert.Equal(t, errPartialWrite, err) assert.Equal(t, 2, pos) assert.Equal(t, "namespace.metric:1:2|h|#tag:tag\n", string(buffer.bytes())) // with a container ID field + patchContainerID("container-id") + defer resetContainerID() + buffer = newStatsdBuffer(1024, 1) - pos, err = buffer.writeAggregated([]byte("h"), "namespace.", "metric", "container-id", []string{"tag:tag"}, []float64{1}, "", 12, -1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1}, "", 12, -1) assert.Nil(t, err) assert.Equal(t, 1, pos) assert.Equal(t, "namespace.metric:1|h|#tag:tag|c:container-id\n", string(buffer.bytes())) @@ -196,30 +223,30 @@ func TestBufferAggregated(t *testing.T) { func TestBufferMaxElement(t *testing.T) { buffer := newStatsdBuffer(1024, 1) - err := buffer.writeGauge("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1) + err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Nil(t, err) - err = buffer.writeGauge("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1) + err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Equal(t, errBufferFull, err) - err = buffer.writeCount("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1) + err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Equal(t, errBufferFull, err) - err = buffer.writeHistogram("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1) + err = buffer.writeHistogram("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Equal(t, errBufferFull, err) - err = buffer.writeDistribution("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1) + err = buffer.writeDistribution("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Equal(t, errBufferFull, err) - err = buffer.writeSet("namespace.", "metric", "", []string{"tag:tag"}, "value", []string{}, 1) + err = buffer.writeSet("namespace.", []string{"tag:tag"}, "metric", "value", []string{}, 1) assert.Equal(t, errBufferFull, err) - err = buffer.writeTiming("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1) + err = buffer.writeTiming("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1) assert.Equal(t, errBufferFull, err) - err = buffer.writeEvent(&Event{Title: "title", Text: "text"}, []string{"tag:tag"}, "") + err = buffer.writeEvent(&Event{Title: "title", Text: "text"}, []string{"tag:tag"}) assert.Equal(t, errBufferFull, err) - err = buffer.writeServiceCheck(&ServiceCheck{Name: "name", Status: Ok}, []string{"tag:tag"}, "") + err = buffer.writeServiceCheck(&ServiceCheck{Name: "name", Status: Ok}, []string{"tag:tag"}) assert.Equal(t, errBufferFull, err) } diff --git a/statsd/container.go b/statsd/container.go index 898784ba9..4584e158a 100644 --- a/statsd/container.go +++ b/statsd/container.go @@ -6,6 +6,7 @@ import ( "io" "os" "regexp" + "sync" ) const ( @@ -25,6 +26,9 @@ var ( // expContainerID matches contained IDs and sources. Source: https://github.com/Qard/container-info/blob/master/index.js expContainerID = regexp.MustCompile(fmt.Sprintf(`(%s|%s|%s)(?:.scope)?$`, uuidSource, containerSource, taskSource)) + + userProvidedContainerID = "" + cgroupContainerID = "" ) // parseContainerID finds the first container ID reading from r and returns it. @@ -53,7 +57,26 @@ func readContainerID(fpath string) string { return parseContainerID(f) } -// getContainerID attempts to return the container ID from /proc/self/cgroup or empty on failure. -var getContainerID func() string = func() string { - return readContainerID(cgroupPath) +// getContainerID returns the container ID configured at the client creation +// It can either be auto-discovered with origin detection or provided by the user. +// User-defined container ID is prioritized. +func getContainerID() string { + if userProvidedContainerID != "" { + return userProvidedContainerID + } + return cgroupContainerID +} + +var readOnce sync.Once + +// setCgroupContainerID attempts to read the container ID from /proc/self/cgroup. +func setCgroupContainerID() { + readOnce.Do(func() { + cgroupContainerID = readContainerID(cgroupPath) + }) +} + +// setUserProvidedContainerID stores the container ID provided by the user. +func setUserProvidedContainerID(id string) { + userProvidedContainerID = id } diff --git a/statsd/end_to_end_udp_test.go b/statsd/end_to_end_udp_test.go index 3bdcdc129..5c5ada8fd 100644 --- a/statsd/end_to_end_udp_test.go +++ b/statsd/end_to_end_udp_test.go @@ -9,11 +9,8 @@ import ( "github.com/stretchr/testify/assert" ) -func resetGetContainerID() { - getContainerID = func() string { - return readContainerID(cgroupPath) - } -} +func patchContainerID(id string) { userProvidedContainerID = id } +func resetContainerID() { userProvidedContainerID = "" } func TestPipelineWithGlobalTags(t *testing.T) { ts, client := newClientAndTestServer(t, @@ -113,63 +110,62 @@ func TestKnownEnvTagsEmptyString(t *testing.T) { } func TestContainerIDWithEntityID(t *testing.T) { + resetContainerID() + entityIDEnvName := "DD_ENTITY_ID" defer func() { os.Unsetenv(entityIDEnvName) }() os.Setenv(entityIDEnvName, "pod-uid") - getContainerID = func() string { return "fake-container-id" } - defer resetGetContainerID() - expectedTags := []string{"dd.internal.entity_id:pod-uid"} ts, client := newClientAndTestServer(t, "udp", "localhost:8765", expectedTags, + WithContainerID("fake-container-id"), ) sort.Strings(client.tags) assert.Equal(t, expectedTags, client.tags) - assert.Equal(t, "", client.containerID) + ts.assertContainerID(t, "") ts.sendAllAndAssert(t, client) } func TestContainerIDWithoutEntityID(t *testing.T) { + resetContainerID() os.Unsetenv("DD_ENTITY_ID") - getContainerID = func() string { return "fake-container-id" } - defer resetGetContainerID() - ts, client := newClientAndTestServer(t, "udp", "localhost:8765", []string{}, + WithContainerID("fake-container-id"), ) - assert.Equal(t, "fake-container-id", client.containerID) + ts.assertContainerID(t, "fake-container-id") ts.sendAllAndAssert(t, client) } func TestOriginDetectionDisabled(t *testing.T) { + resetContainerID() os.Unsetenv("DD_ENTITY_ID") originDetectionEnvName := "DD_ORIGIN_DETECTION_ENABLED" defer func() { os.Unsetenv(originDetectionEnvName) }() os.Setenv(originDetectionEnvName, "false") - getContainerID = func() string { return "fake-container-id" } - defer resetGetContainerID() - ts, client := newClientAndTestServer(t, "udp", "localhost:8765", []string{}, ) - assert.Equal(t, "", client.containerID) + ts.assertContainerID(t, "") ts.sendAllAndAssert(t, client) } func TestOriginDetectionEnabledWithEntityID(t *testing.T) { + resetContainerID() + entityIDEnvName := "DD_ENTITY_ID" defer func() { os.Unsetenv(entityIDEnvName) }() os.Setenv(entityIDEnvName, "pod-uid") @@ -178,19 +174,17 @@ func TestOriginDetectionEnabledWithEntityID(t *testing.T) { defer func() { os.Unsetenv(originDetectionEnvName) }() os.Setenv(originDetectionEnvName, "true") - getContainerID = func() string { return "fake-container-id" } - defer resetGetContainerID() - expectedTags := []string{"dd.internal.entity_id:pod-uid"} ts, client := newClientAndTestServer(t, "udp", "localhost:8765", expectedTags, + WithContainerID("fake-container-id"), ) sort.Strings(client.tags) assert.Equal(t, expectedTags, client.tags) - assert.Equal(t, "", client.containerID) + ts.assertContainerID(t, "") ts.sendAllAndAssert(t, client) } diff --git a/statsd/event_test.go b/statsd/event_test.go index fca1bd067..43bb71203 100644 --- a/statsd/event_test.go +++ b/statsd/event_test.go @@ -13,7 +13,7 @@ func encodeEvent(e *Event) (string, error) { return "", err } var buffer []byte - buffer = appendEvent(buffer, e, nil, "") + buffer = appendEvent(buffer, e, nil) return string(buffer), nil } diff --git a/statsd/format.go b/statsd/format.go index 8427c28b5..72b2075c0 100644 --- a/statsd/format.go +++ b/statsd/format.go @@ -94,61 +94,61 @@ func appendTagsAggregated(buffer []byte, globalTags []string, tags string) []byt return buffer } -func appendFloatMetric(buffer []byte, typeSymbol []byte, namespace, name, containerID string, globalTags []string, value float64, tags []string, rate float64, precision int) []byte { +func appendFloatMetric(buffer []byte, typeSymbol []byte, namespace string, globalTags []string, name string, value float64, tags []string, rate float64, precision int) []byte { buffer = appendHeader(buffer, namespace, name) buffer = strconv.AppendFloat(buffer, value, 'f', precision, 64) buffer = append(buffer, '|') buffer = append(buffer, typeSymbol...) buffer = appendRate(buffer, rate) buffer = appendTags(buffer, globalTags, tags) - buffer = appendContainerID(buffer, containerID) + buffer = appendContainerID(buffer) return buffer } -func appendIntegerMetric(buffer []byte, typeSymbol []byte, namespace, name, containerID string, globalTags []string, value int64, tags []string, rate float64) []byte { +func appendIntegerMetric(buffer []byte, typeSymbol []byte, namespace string, globalTags []string, name string, value int64, tags []string, rate float64) []byte { buffer = appendHeader(buffer, namespace, name) buffer = strconv.AppendInt(buffer, value, 10) buffer = append(buffer, '|') buffer = append(buffer, typeSymbol...) buffer = appendRate(buffer, rate) buffer = appendTags(buffer, globalTags, tags) - buffer = appendContainerID(buffer, containerID) + buffer = appendContainerID(buffer) return buffer } -func appendStringMetric(buffer []byte, typeSymbol []byte, namespace, name, containerID string, globalTags []string, value string, tags []string, rate float64) []byte { +func appendStringMetric(buffer []byte, typeSymbol []byte, namespace string, globalTags []string, name string, value string, tags []string, rate float64) []byte { buffer = appendHeader(buffer, namespace, name) buffer = append(buffer, value...) buffer = append(buffer, '|') buffer = append(buffer, typeSymbol...) buffer = appendRate(buffer, rate) buffer = appendTags(buffer, globalTags, tags) - buffer = appendContainerID(buffer, containerID) + buffer = appendContainerID(buffer) return buffer } -func appendGauge(buffer []byte, namespace, name, containerID string, globalTags []string, value float64, tags []string, rate float64) []byte { - return appendFloatMetric(buffer, gaugeSymbol, namespace, name, containerID, globalTags, value, tags, rate, -1) +func appendGauge(buffer []byte, namespace string, globalTags []string, name string, value float64, tags []string, rate float64) []byte { + return appendFloatMetric(buffer, gaugeSymbol, namespace, globalTags, name, value, tags, rate, -1) } -func appendCount(buffer []byte, namespace, name, containerID string, globalTags []string, value int64, tags []string, rate float64) []byte { - return appendIntegerMetric(buffer, countSymbol, namespace, name, containerID, globalTags, value, tags, rate) +func appendCount(buffer []byte, namespace string, globalTags []string, name string, value int64, tags []string, rate float64) []byte { + return appendIntegerMetric(buffer, countSymbol, namespace, globalTags, name, value, tags, rate) } -func appendHistogram(buffer []byte, namespace, name, containerID string, globalTags []string, value float64, tags []string, rate float64) []byte { - return appendFloatMetric(buffer, histogramSymbol, namespace, name, containerID, globalTags, value, tags, rate, -1) +func appendHistogram(buffer []byte, namespace string, globalTags []string, name string, value float64, tags []string, rate float64) []byte { + return appendFloatMetric(buffer, histogramSymbol, namespace, globalTags, name, value, tags, rate, -1) } -func appendDistribution(buffer []byte, namespace, name, containerID string, globalTags []string, value float64, tags []string, rate float64) []byte { - return appendFloatMetric(buffer, distributionSymbol, namespace, name, containerID, globalTags, value, tags, rate, -1) +func appendDistribution(buffer []byte, namespace string, globalTags []string, name string, value float64, tags []string, rate float64) []byte { + return appendFloatMetric(buffer, distributionSymbol, namespace, globalTags, name, value, tags, rate, -1) } -func appendSet(buffer []byte, namespace, name, containerID string, globalTags []string, value string, tags []string, rate float64) []byte { - return appendStringMetric(buffer, setSymbol, namespace, name, containerID, globalTags, value, tags, rate) +func appendSet(buffer []byte, namespace string, globalTags []string, name string, value string, tags []string, rate float64) []byte { + return appendStringMetric(buffer, setSymbol, namespace, globalTags, name, value, tags, rate) } -func appendTiming(buffer []byte, namespace, name, containerID string, globalTags []string, value float64, tags []string, rate float64) []byte { - return appendFloatMetric(buffer, timingSymbol, namespace, name, containerID, globalTags, value, tags, rate, 6) +func appendTiming(buffer []byte, namespace string, globalTags []string, name string, value float64, tags []string, rate float64) []byte { + return appendFloatMetric(buffer, timingSymbol, namespace, globalTags, name, value, tags, rate, 6) } func escapedEventTextLen(text string) int { @@ -166,7 +166,7 @@ func appendEscapedEventText(buffer []byte, text string) []byte { return buffer } -func appendEvent(buffer []byte, event *Event, globalTags []string, containerID string) []byte { +func appendEvent(buffer []byte, event *Event, globalTags []string) []byte { escapedTextLen := escapedEventTextLen(event.Text) buffer = append(buffer, "_e{"...) @@ -213,7 +213,7 @@ func appendEvent(buffer []byte, event *Event, globalTags []string, containerID s } buffer = appendTags(buffer, globalTags, event.Tags) - buffer = appendContainerID(buffer, containerID) + buffer = appendContainerID(buffer) return buffer } @@ -231,7 +231,7 @@ func appendEscapedServiceCheckText(buffer []byte, text string) []byte { return buffer } -func appendServiceCheck(buffer []byte, serviceCheck *ServiceCheck, globalTags []string, containerID string) []byte { +func appendServiceCheck(buffer []byte, serviceCheck *ServiceCheck, globalTags []string) []byte { buffer = append(buffer, "_sc|"...) buffer = append(buffer, serviceCheck.Name...) buffer = append(buffer, '|') @@ -254,7 +254,7 @@ func appendServiceCheck(buffer []byte, serviceCheck *ServiceCheck, globalTags [] buffer = appendEscapedServiceCheckText(buffer, serviceCheck.Message) } - buffer = appendContainerID(buffer, containerID) + buffer = appendContainerID(buffer) return buffer } @@ -262,8 +262,8 @@ func appendSeparator(buffer []byte) []byte { return append(buffer, '\n') } -func appendContainerID(buffer []byte, containerID string) []byte { - if len(containerID) > 0 { +func appendContainerID(buffer []byte) []byte { + if containerID := getContainerID(); len(containerID) > 0 { buffer = append(buffer, "|c:"...) buffer = append(buffer, containerID...) } diff --git a/statsd/format_benchmark_test.go b/statsd/format_benchmark_test.go index ef4966c7d..62dbbba6d 100644 --- a/statsd/format_benchmark_test.go +++ b/statsd/format_benchmark_test.go @@ -35,14 +35,14 @@ func benchmarkFormat(b *testing.B, tagsNumber int) { } b.ResetTimer() for n := 0; n < b.N; n++ { - payloadSink = appendGauge(payloadSink[:0], "namespace", "metric", "", []string{}, 1, tags, 0.1) - payloadSink = appendCount(payloadSink[:0], "namespace", "metric", "", []string{}, 1, tags, 0.1) - payloadSink = appendHistogram(payloadSink[:0], "namespace", "metric", "", []string{}, 1, tags, 0.1) - payloadSink = appendDistribution(payloadSink[:0], "namespace", "metric", "", []string{}, 1, tags, 0.1) - payloadSink = appendSet(payloadSink[:0], "namespace", "metric", "", []string{}, "setelement", tags, 0.1) - payloadSink = appendTiming(payloadSink[:0], "namespace", "metric", "", []string{}, 1, tags, 0.1) - payloadSink = appendEvent(payloadSink[:0], event, []string{}, "") - payloadSink = appendServiceCheck(payloadSink[:0], serviceCheck, []string{}, "") + payloadSink = appendGauge(payloadSink[:0], "namespace", []string{}, "metric", 1, tags, 0.1) + payloadSink = appendCount(payloadSink[:0], "namespace", []string{}, "metric", 1, tags, 0.1) + payloadSink = appendHistogram(payloadSink[:0], "namespace", []string{}, "metric", 1, tags, 0.1) + payloadSink = appendDistribution(payloadSink[:0], "namespace", []string{}, "metric", 1, tags, 0.1) + payloadSink = appendSet(payloadSink[:0], "namespace", []string{}, "metric", "setelement", tags, 0.1) + payloadSink = appendTiming(payloadSink[:0], "namespace", []string{}, "metric", 1, tags, 0.1) + payloadSink = appendEvent(payloadSink[:0], event, []string{}) + payloadSink = appendServiceCheck(payloadSink[:0], serviceCheck, []string{}) } } diff --git a/statsd/format_test.go b/statsd/format_test.go index ff09dd163..a65d7b525 100644 --- a/statsd/format_test.go +++ b/statsd/format_test.go @@ -45,79 +45,79 @@ func TestFormatAppendTagsAggregated(t *testing.T) { func TestFormatAppendGauge(t *testing.T) { var buffer []byte - buffer = appendGauge(buffer, "namespace.", "gauge", "", []string{"global:tag"}, 1., []string{"tag:tag"}, 1) + buffer = appendGauge(buffer, "namespace.", []string{"global:tag"}, "gauge", 1., []string{"tag:tag"}, 1) assert.Equal(t, `namespace.gauge:1|g|#global:tag,tag:tag`, string(buffer)) } func TestFormatAppendCount(t *testing.T) { var buffer []byte - buffer = appendCount(buffer, "namespace.", "count", "", []string{"global:tag"}, 2, []string{"tag:tag"}, 1) + buffer = appendCount(buffer, "namespace.", []string{"global:tag"}, "count", 2, []string{"tag:tag"}, 1) assert.Equal(t, `namespace.count:2|c|#global:tag,tag:tag`, string(buffer)) } func TestFormatAppendHistogram(t *testing.T) { var buffer []byte - buffer = appendHistogram(buffer, "namespace.", "histogram", "", []string{"global:tag"}, 3., []string{"tag:tag"}, 1) + buffer = appendHistogram(buffer, "namespace.", []string{"global:tag"}, "histogram", 3., []string{"tag:tag"}, 1) assert.Equal(t, `namespace.histogram:3|h|#global:tag,tag:tag`, string(buffer)) } func TestFormatAppendDistribution(t *testing.T) { var buffer []byte - buffer = appendDistribution(buffer, "namespace.", "distribution", "", []string{"global:tag"}, 4., []string{"tag:tag"}, 1) + buffer = appendDistribution(buffer, "namespace.", []string{"global:tag"}, "distribution", 4., []string{"tag:tag"}, 1) assert.Equal(t, `namespace.distribution:4|d|#global:tag,tag:tag`, string(buffer)) } func TestFormatAppendSet(t *testing.T) { var buffer []byte - buffer = appendSet(buffer, "namespace.", "set", "", []string{"global:tag"}, "five", []string{"tag:tag"}, 1) + buffer = appendSet(buffer, "namespace.", []string{"global:tag"}, "set", "five", []string{"tag:tag"}, 1) assert.Equal(t, `namespace.set:five|s|#global:tag,tag:tag`, string(buffer)) } func TestFormatAppendTiming(t *testing.T) { var buffer []byte - buffer = appendTiming(buffer, "namespace.", "timing", "", []string{"global:tag"}, 6., []string{"tag:tag"}, 1) + buffer = appendTiming(buffer, "namespace.", []string{"global:tag"}, "timing", 6., []string{"tag:tag"}, 1) assert.Equal(t, `namespace.timing:6.000000|ms|#global:tag,tag:tag`, string(buffer)) } func TestFormatNoTag(t *testing.T) { var buffer []byte - buffer = appendGauge(buffer, "", "gauge", "", []string{}, 1., []string{}, 1) + buffer = appendGauge(buffer, "", []string{}, "gauge", 1., []string{}, 1) assert.Equal(t, `gauge:1|g`, string(buffer)) } func TestFormatOneTag(t *testing.T) { var buffer []byte - buffer = appendGauge(buffer, "", "gauge", "", []string{}, 1., []string{"tag1:tag1"}, 1) + buffer = appendGauge(buffer, "", []string{}, "gauge", 1., []string{"tag1:tag1"}, 1) assert.Equal(t, `gauge:1|g|#tag1:tag1`, string(buffer)) } func TestFormatTwoTag(t *testing.T) { var buffer []byte - buffer = appendGauge(buffer, "", "metric", "", []string{}, 1., []string{"tag1:tag1", "tag2:tag2"}, 1) + buffer = appendGauge(buffer, "", []string{}, "metric", 1., []string{"tag1:tag1", "tag2:tag2"}, 1) assert.Equal(t, `metric:1|g|#tag1:tag1,tag2:tag2`, string(buffer)) } func TestFormatRate(t *testing.T) { var buffer []byte - buffer = appendGauge(buffer, "", "metric", "", []string{}, 1., []string{}, 0.1) + buffer = appendGauge(buffer, "", []string{}, "metric", 1., []string{}, 0.1) assert.Equal(t, `metric:1|g|@0.1`, string(buffer)) } func TestFormatRateAndTag(t *testing.T) { var buffer []byte - buffer = appendGauge(buffer, "", "metric", "", []string{}, 1., []string{"tag1:tag1"}, 0.1) + buffer = appendGauge(buffer, "", []string{}, "metric", 1., []string{"tag1:tag1"}, 0.1) assert.Equal(t, `metric:1|g|@0.1|#tag1:tag1`, string(buffer)) } func TestFormatNil(t *testing.T) { var buffer []byte - buffer = appendGauge(buffer, "", "metric", "", nil, 1., nil, 1) + buffer = appendGauge(buffer, "", nil, "metric", 1., nil, 1) assert.Equal(t, `metric:1|g`, string(buffer)) } func TestFormatTagRemoveNewLines(t *testing.T) { var buffer []byte - buffer = appendGauge(buffer, "", "metric", "", []string{"tag\n:d\nog\n"}, 1., []string{"\ntag\n:d\nog2\n"}, 0.1) + buffer = appendGauge(buffer, "", []string{"tag\n:d\nog\n"}, "metric", 1., []string{"\ntag\n:d\nog2\n"}, 0.1) assert.Equal(t, `metric:1|g|@0.1|#tag:dog,tag:dog2`, string(buffer)) } @@ -126,7 +126,7 @@ func TestFormatEvent(t *testing.T) { buffer = appendEvent(buffer, &Event{ Title: "EvenTitle", Text: "EventText", - }, []string{}, "") + }, []string{}) assert.Equal(t, `_e{9,9}:EvenTitle|EventText`, string(buffer)) } @@ -135,7 +135,7 @@ func TestFormatEventEscapeText(t *testing.T) { buffer = appendEvent(buffer, &Event{ Title: "EvenTitle", Text: "\nEventText\nLine2\n\nLine4\n", - }, []string{}, "") + }, []string{}) assert.Equal(t, `_e{9,29}:EvenTitle|\nEventText\nLine2\n\nLine4\n`, string(buffer)) } @@ -145,7 +145,7 @@ func TestFormatEventTimeStamp(t *testing.T) { Title: "EvenTitle", Text: "EventText", Timestamp: time.Date(2016, time.August, 15, 0, 0, 0, 0, time.UTC), - }, []string{}, "") + }, []string{}) assert.Equal(t, `_e{9,9}:EvenTitle|EventText|d:1471219200`, string(buffer)) } @@ -155,7 +155,7 @@ func TestFormatEventHostname(t *testing.T) { Title: "EvenTitle", Text: "EventText", Hostname: "hostname", - }, []string{}, "") + }, []string{}) assert.Equal(t, `_e{9,9}:EvenTitle|EventText|h:hostname`, string(buffer)) } @@ -165,7 +165,7 @@ func TestFormatEventAggregationKey(t *testing.T) { Title: "EvenTitle", Text: "EventText", AggregationKey: "aggregationKey", - }, []string{}, "") + }, []string{}) assert.Equal(t, `_e{9,9}:EvenTitle|EventText|k:aggregationKey`, string(buffer)) } @@ -175,7 +175,7 @@ func TestFormatEventPriority(t *testing.T) { Title: "EvenTitle", Text: "EventText", Priority: "priority", - }, []string{}, "") + }, []string{}) assert.Equal(t, `_e{9,9}:EvenTitle|EventText|p:priority`, string(buffer)) } @@ -185,7 +185,7 @@ func TestFormatEventSourceTypeName(t *testing.T) { Title: "EvenTitle", Text: "EventText", SourceTypeName: "sourceTypeName", - }, []string{}, "") + }, []string{}) assert.Equal(t, `_e{9,9}:EvenTitle|EventText|s:sourceTypeName`, string(buffer)) } @@ -195,7 +195,7 @@ func TestFormatEventAlertType(t *testing.T) { Title: "EvenTitle", Text: "EventText", AlertType: "alertType", - }, []string{}, "") + }, []string{}) assert.Equal(t, `_e{9,9}:EvenTitle|EventText|t:alertType`, string(buffer)) } @@ -204,7 +204,7 @@ func TestFormatEventOneTag(t *testing.T) { buffer = appendEvent(buffer, &Event{ Title: "EvenTitle", Text: "EventText", - }, []string{"tag:test"}, "") + }, []string{"tag:test"}) assert.Equal(t, `_e{9,9}:EvenTitle|EventText|#tag:test`, string(buffer)) } @@ -214,7 +214,7 @@ func TestFormatEventTwoTag(t *testing.T) { Title: "EvenTitle", Text: "EventText", Tags: []string{"tag1:test"}, - }, []string{"tag2:test"}, "") + }, []string{"tag2:test"}) assert.Equal(t, `_e{9,9}:EvenTitle|EventText|#tag2:test,tag1:test`, string(buffer)) } @@ -230,13 +230,13 @@ func TestFormatEventAllOptions(t *testing.T) { SourceTypeName: "SourceTypeName", AlertType: "alertType", Tags: []string{"tag:normal"}, - }, []string{"tag:global"}, "") + }, []string{"tag:global"}) assert.Equal(t, `_e{9,9}:EvenTitle|EventText|d:1471219200|h:hostname|k:aggregationKey|p:priority|s:SourceTypeName|t:alertType|#tag:global,tag:normal`, string(buffer)) } func TestFormatEventNil(t *testing.T) { var buffer []byte - buffer = appendEvent(buffer, &Event{}, []string{}, "") + buffer = appendEvent(buffer, &Event{}, []string{}) assert.Equal(t, `_e{0,0}:|`, string(buffer)) } @@ -245,7 +245,7 @@ func TestFormatServiceCheck(t *testing.T) { buffer = appendServiceCheck(buffer, &ServiceCheck{ Name: "service.check", Status: Ok, - }, []string{}, "") + }, []string{}) assert.Equal(t, `_sc|service.check|0`, string(buffer)) } @@ -255,7 +255,7 @@ func TestFormatServiceCheckEscape(t *testing.T) { Name: "service.check", Status: Ok, Message: "\n\nmessagem:hello...\n\nm:aa\nm:m", - }, []string{}, "") + }, []string{}) assert.Equal(t, `_sc|service.check|0|m:\n\nmessagem\:hello...\n\nm\:aa\nm\:m`, string(buffer)) } @@ -265,7 +265,7 @@ func TestFormatServiceCheckTimestamp(t *testing.T) { Name: "service.check", Status: Ok, Timestamp: time.Date(2016, time.August, 15, 0, 0, 0, 0, time.UTC), - }, []string{}, "") + }, []string{}) assert.Equal(t, `_sc|service.check|0|d:1471219200`, string(buffer)) } @@ -275,7 +275,7 @@ func TestFormatServiceCheckHostname(t *testing.T) { Name: "service.check", Status: Ok, Hostname: "hostname", - }, []string{}, "") + }, []string{}) assert.Equal(t, `_sc|service.check|0|h:hostname`, string(buffer)) } @@ -285,7 +285,7 @@ func TestFormatServiceCheckMessage(t *testing.T) { Name: "service.check", Status: Ok, Message: "message", - }, []string{}, "") + }, []string{}) assert.Equal(t, `_sc|service.check|0|m:message`, string(buffer)) } @@ -295,7 +295,7 @@ func TestFormatServiceCheckOneTag(t *testing.T) { Name: "service.check", Status: Ok, Tags: []string{"tag:tag"}, - }, []string{}, "") + }, []string{}) assert.Equal(t, `_sc|service.check|0|#tag:tag`, string(buffer)) } @@ -305,7 +305,7 @@ func TestFormatServiceCheckTwoTag(t *testing.T) { Name: "service.check", Status: Ok, Tags: []string{"tag1:tag1"}, - }, []string{"tag2:tag2"}, "") + }, []string{"tag2:tag2"}) assert.Equal(t, `_sc|service.check|0|#tag2:tag2,tag1:tag1`, string(buffer)) } @@ -318,13 +318,13 @@ func TestFormatServiceCheckAllOptions(t *testing.T) { Hostname: "hostname", Message: "message", Tags: []string{"tag1:tag1"}, - }, []string{"tag2:tag2"}, "") + }, []string{"tag2:tag2"}) assert.Equal(t, `_sc|service.check|0|d:1471219200|h:hostname|#tag2:tag2,tag1:tag1|m:message`, string(buffer)) } func TestFormatServiceCheckNil(t *testing.T) { var buffer []byte - buffer = appendServiceCheck(buffer, &ServiceCheck{}, nil, "") + buffer = appendServiceCheck(buffer, &ServiceCheck{}, nil) assert.Equal(t, `_sc||0`, string(buffer)) } diff --git a/statsd/options.go b/statsd/options.go index 326839f81..a6f360d0e 100644 --- a/statsd/options.go +++ b/statsd/options.go @@ -45,6 +45,7 @@ type Options struct { extendedAggregation bool telemetryAddr string originDetection bool + containerID string } func resolveOptions(options []Option) (*Options, error) { @@ -304,6 +305,9 @@ func WithTelemetryAddr(addr string) Option { // WithoutOriginDetection disables the client origin detection. // When enabled, the client tries to discover its container ID and sends it to the Agent // to enrich the metrics with container tags. +// Origin detection can also be disabled by configuring the environment variabe DD_ORIGIN_DETECTION_ENABLED=false +// The client tries to read the container ID by parsing the file /proc/self/cgroup, this is not supported on Windows. +// The client prioritizes the value passed via DD_ENTITY_ID (if set) over the container ID. // // More on this here: https://docs.datadoghq.com/developers/dogstatsd/?tab=kubernetes#origin-detection-over-udp func WithoutOriginDetection() Option { @@ -312,3 +316,28 @@ func WithoutOriginDetection() Option { return nil } } + +// WithOriginDetection enables the client origin detection. +// When enabled, the client tries to discover its container ID and sends it to the Agent +// to enrich the metrics with container tags. +// Origin detection can be disabled by configuring the environment variabe DD_ORIGIN_DETECTION_ENABLED=false +// The client tries to read the container ID by parsing the file /proc/self/cgroup, this is not supported on Windows. +// The client prioritizes the value passed via DD_ENTITY_ID (if set) over the container ID. +// +// More on this here: https://docs.datadoghq.com/developers/dogstatsd/?tab=kubernetes#origin-detection-over-udp +func WithOriginDetection() Option { + return func(o *Options) error { + o.originDetection = true + return nil + } +} + +// WithContainerID allows passing the container ID, this will be used by the Agent to enrich metrics with container tags. +// When configured, the provided container ID is prioritized over the container ID discovered via Origin Detection. +// The client prioritizes the value passed via DD_ENTITY_ID (if set) over the container ID. +func WithContainerID(id string) Option { + return func(o *Options) error { + o.containerID = id + return nil + } +} diff --git a/statsd/service_check_test.go b/statsd/service_check_test.go index 3380a51d1..b89bfd652 100644 --- a/statsd/service_check_test.go +++ b/statsd/service_check_test.go @@ -13,7 +13,7 @@ func encodeSC(sc *ServiceCheck) (string, error) { return "", err } var buffer []byte - buffer = appendServiceCheck(buffer, sc, nil, "") + buffer = appendServiceCheck(buffer, sc, nil) return string(buffer), nil } diff --git a/statsd/statsd.go b/statsd/statsd.go index 6a231117f..8b9e5ce55 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -125,20 +125,19 @@ const ( ) type metric struct { - metricType metricType - namespace string - globalTags []string - name string - fvalue float64 - fvalues []float64 - ivalue int64 - svalue string - evalue *Event - scvalue *ServiceCheck - tags []string - stags string - rate float64 - containerID string + metricType metricType + namespace string + globalTags []string + name string + fvalue float64 + fvalues []float64 + ivalue int64 + svalue string + evalue *Event + scvalue *ServiceCheck + tags []string + stags string + rate float64 } type noClientErr string @@ -224,8 +223,6 @@ type Client struct { aggExtended *aggregator options []Option addrOption string - // containerID is the container ID of the application sending the metric - containerID string } // statsdTelemetry contains telemetry metrics about the client @@ -349,10 +346,10 @@ func newWithWriter(w io.WriteCloser, o *Options, writerName string) (*Client, er } } - if isOriginDetectionEnabled(o, hasEntityID) { - if cID := getContainerID(); cID != "" { - c.containerID = cID - } + if !hasEntityID && o.containerID != "" { + setUserProvidedContainerID(o.containerID) + } else if isOriginDetectionEnabled(o, hasEntityID) { + setCgroupContainerID() } if o.maxBytesPerPayload == 0 { @@ -511,7 +508,6 @@ func (c *Client) send(m metric) error { func (c *Client) sendBlocking(m metric) error { m.globalTags = c.tags m.namespace = c.namespace - m.containerID = c.containerID h := hashString32(m.name) worker := c.workers[h%uint32(len(c.workers))] @@ -539,7 +535,7 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64) if c.agg != nil { return c.agg.gauge(name, value, tags) } - return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, containerID: c.containerID}) + return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) } // Count tracks how many times something happened per second. @@ -551,7 +547,7 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er if c.agg != nil { return c.agg.count(name, value, tags) } - return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, containerID: c.containerID}) + return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) } // Histogram tracks the statistical distribution of a set of values on each host. @@ -563,7 +559,7 @@ func (c *Client) Histogram(name string, value float64, tags []string, rate float if c.aggExtended != nil { return c.sendToAggregator(histogram, name, value, tags, rate, c.aggExtended.histogram) } - return c.send(metric{metricType: histogram, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, containerID: c.containerID}) + return c.send(metric{metricType: histogram, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) } // Distribution tracks the statistical distribution of a set of values across your infrastructure. @@ -575,7 +571,7 @@ func (c *Client) Distribution(name string, value float64, tags []string, rate fl if c.aggExtended != nil { return c.sendToAggregator(distribution, name, value, tags, rate, c.aggExtended.distribution) } - return c.send(metric{metricType: distribution, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, containerID: c.containerID}) + return c.send(metric{metricType: distribution, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) } // Decr is just Count of -1 @@ -597,7 +593,7 @@ func (c *Client) Set(name string, value string, tags []string, rate float64) err if c.agg != nil { return c.agg.set(name, value, tags) } - return c.send(metric{metricType: set, name: name, svalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, containerID: c.containerID}) + return c.send(metric{metricType: set, name: name, svalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) } // Timing sends timing information, it is an alias for TimeInMilliseconds @@ -615,7 +611,7 @@ func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, r if c.aggExtended != nil { return c.sendToAggregator(timing, name, value, tags, rate, c.aggExtended.timing) } - return c.send(metric{metricType: timing, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, containerID: c.containerID}) + return c.send(metric{metricType: timing, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) } // Event sends the provided Event. @@ -624,7 +620,7 @@ func (c *Client) Event(e *Event) error { return ErrNoClient } atomic.AddUint64(&c.telemetry.totalEvents, 1) - return c.send(metric{metricType: event, evalue: e, rate: 1, globalTags: c.tags, namespace: c.namespace, containerID: c.containerID}) + return c.send(metric{metricType: event, evalue: e, rate: 1, globalTags: c.tags, namespace: c.namespace}) } // SimpleEvent sends an event with the provided title and text. @@ -639,7 +635,7 @@ func (c *Client) ServiceCheck(sc *ServiceCheck) error { return ErrNoClient } atomic.AddUint64(&c.telemetry.totalServiceChecks, 1) - return c.send(metric{metricType: serviceCheck, scvalue: sc, rate: 1, globalTags: c.tags, namespace: c.namespace, containerID: c.containerID}) + return c.send(metric{metricType: serviceCheck, scvalue: sc, rate: 1, globalTags: c.tags, namespace: c.namespace}) } // SimpleServiceCheck sends an serviceCheck with the provided name and status. @@ -690,12 +686,15 @@ func (c *Client) Close() error { // isOriginDetectionEnabled returns whether the clients should fill the container field. // // If DD_ENTITY_ID is set, we don't send the container ID +// If a user-defined container ID is provided, we don't ignore origin detection // as dd.internal.entity_id is prioritized over the container field for backward compatibility. // If DD_ENTITY_ID is not set, we try to fill the container field automatically unless // DD_ORIGIN_DETECTION_ENABLED is explicitly set to false. func isOriginDetectionEnabled(o *Options, hasEntityID bool) bool { - if !o.originDetection || hasEntityID { - // originDetection is explicitly disabled or DD_ENTITY_ID was found + if !o.originDetection || hasEntityID || o.containerID != "" { + // originDetection is explicitly disabled + // or DD_ENTITY_ID was found + // or a user-defined container ID was provided return false } diff --git a/statsd/test_helpers_test.go b/statsd/test_helpers_test.go index 8bd8329c9..ea3f6b810 100644 --- a/statsd/test_helpers_test.go +++ b/statsd/test_helpers_test.go @@ -111,7 +111,7 @@ func newClientAndTestServer(t *testing.T, proto string, addr string, tags []stri client, err := New(addr, options...) require.NoError(t, err) - ts.containerID = client.containerID + ts.containerID = getContainerID() go ts.start() return ts, client @@ -231,6 +231,10 @@ func (ts *testServer) assert(t *testing.T, client *Client, expectedMetrics []str assert.Empty(t, ts.errors) } +func (ts *testServer) assertContainerID(t *testing.T, expected string) { + assert.Equal(t, expected, ts.containerID) +} + // meta helper: most test send all types and then assert func (ts *testServer) sendAllAndAssert(t *testing.T, client *Client) { expectedMetrics := ts.sendAllType(client) @@ -259,36 +263,38 @@ func (ts *testServer) getTelemetry() []string { ts.telemetry.set + ts.telemetry.timing + containerID := ts.getContainerID() + metrics := []string{ - fmt.Sprintf("datadog.dogstatsd.client.metrics:%d|c%s", totalMetrics, tags), - fmt.Sprintf("datadog.dogstatsd.client.events:%d|c%s", ts.telemetry.event, tags), - fmt.Sprintf("datadog.dogstatsd.client.service_checks:%d|c%s", ts.telemetry.service_check, tags), - fmt.Sprintf("datadog.dogstatsd.client.metric_dropped_on_receive:%d|c%s", ts.telemetry.metric_dropped_on_receive, tags), - fmt.Sprintf("datadog.dogstatsd.client.packets_sent:%d|c%s", ts.telemetry.packets_sent, tags), - fmt.Sprintf("datadog.dogstatsd.client.packets_dropped:%d|c%s", ts.telemetry.packets_dropped, tags), - fmt.Sprintf("datadog.dogstatsd.client.packets_dropped_queue:%d|c%s", ts.telemetry.packets_dropped_queue, tags), - fmt.Sprintf("datadog.dogstatsd.client.packets_dropped_writer:%d|c%s", ts.telemetry.packets_dropped_writer, tags), - fmt.Sprintf("datadog.dogstatsd.client.bytes_sent:%d|c%s", ts.telemetry.bytes_sent, tags), - fmt.Sprintf("datadog.dogstatsd.client.bytes_dropped:%d|c%s", ts.telemetry.bytes_dropped, tags), - fmt.Sprintf("datadog.dogstatsd.client.bytes_dropped_queue:%d|c%s", ts.telemetry.bytes_dropped_queue, tags), - fmt.Sprintf("datadog.dogstatsd.client.bytes_dropped_writer:%d|c%s", ts.telemetry.bytes_dropped_writer, tags), - fmt.Sprintf("datadog.dogstatsd.client.metrics_by_type:%d|c%s,metrics_type:gauge", ts.telemetry.gauge, tags), - fmt.Sprintf("datadog.dogstatsd.client.metrics_by_type:%d|c%s,metrics_type:count", ts.telemetry.count, tags), - fmt.Sprintf("datadog.dogstatsd.client.metrics_by_type:%d|c%s,metrics_type:histogram", ts.telemetry.histogram, tags), - fmt.Sprintf("datadog.dogstatsd.client.metrics_by_type:%d|c%s,metrics_type:distribution", ts.telemetry.distribution, tags), - fmt.Sprintf("datadog.dogstatsd.client.metrics_by_type:%d|c%s,metrics_type:set", ts.telemetry.set, tags), - fmt.Sprintf("datadog.dogstatsd.client.metrics_by_type:%d|c%s,metrics_type:timing", ts.telemetry.timing, tags), + fmt.Sprintf("datadog.dogstatsd.client.metrics:%d|c%s", totalMetrics, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.events:%d|c%s", ts.telemetry.event, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.service_checks:%d|c%s", ts.telemetry.service_check, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.metric_dropped_on_receive:%d|c%s", ts.telemetry.metric_dropped_on_receive, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.packets_sent:%d|c%s", ts.telemetry.packets_sent, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.packets_dropped:%d|c%s", ts.telemetry.packets_dropped, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.packets_dropped_queue:%d|c%s", ts.telemetry.packets_dropped_queue, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.packets_dropped_writer:%d|c%s", ts.telemetry.packets_dropped_writer, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.bytes_sent:%d|c%s", ts.telemetry.bytes_sent, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.bytes_dropped:%d|c%s", ts.telemetry.bytes_dropped, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.bytes_dropped_queue:%d|c%s", ts.telemetry.bytes_dropped_queue, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.bytes_dropped_writer:%d|c%s", ts.telemetry.bytes_dropped_writer, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.metrics_by_type:%d|c%s,metrics_type:gauge", ts.telemetry.gauge, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.metrics_by_type:%d|c%s,metrics_type:count", ts.telemetry.count, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.metrics_by_type:%d|c%s,metrics_type:histogram", ts.telemetry.histogram, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.metrics_by_type:%d|c%s,metrics_type:distribution", ts.telemetry.distribution, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.metrics_by_type:%d|c%s,metrics_type:set", ts.telemetry.set, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.metrics_by_type:%d|c%s,metrics_type:timing", ts.telemetry.timing, tags) + containerID, } if ts.aggregation { metrics = append(metrics, []string{ - fmt.Sprintf("datadog.dogstatsd.client.aggregated_context:%d|c%s", ts.telemetry.aggregated_context, tags), - fmt.Sprintf("datadog.dogstatsd.client.aggregated_context_by_type:%d|c%s,metrics_type:gauge", ts.telemetry.aggregated_gauge, tags), - fmt.Sprintf("datadog.dogstatsd.client.aggregated_context_by_type:%d|c%s,metrics_type:count", ts.telemetry.aggregated_count, tags), - fmt.Sprintf("datadog.dogstatsd.client.aggregated_context_by_type:%d|c%s,metrics_type:set", ts.telemetry.aggregated_set, tags), - fmt.Sprintf("datadog.dogstatsd.client.aggregated_context_by_type:%d|c%s,metrics_type:distribution", ts.telemetry.aggregated_distribution, tags), - fmt.Sprintf("datadog.dogstatsd.client.aggregated_context_by_type:%d|c%s,metrics_type:histogram", ts.telemetry.aggregated_histogram, tags), - fmt.Sprintf("datadog.dogstatsd.client.aggregated_context_by_type:%d|c%s,metrics_type:timing", ts.telemetry.aggregated_timing, tags), + fmt.Sprintf("datadog.dogstatsd.client.aggregated_context:%d|c%s", ts.telemetry.aggregated_context, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.aggregated_context_by_type:%d|c%s,metrics_type:gauge", ts.telemetry.aggregated_gauge, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.aggregated_context_by_type:%d|c%s,metrics_type:count", ts.telemetry.aggregated_count, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.aggregated_context_by_type:%d|c%s,metrics_type:set", ts.telemetry.aggregated_set, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.aggregated_context_by_type:%d|c%s,metrics_type:distribution", ts.telemetry.aggregated_distribution, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.aggregated_context_by_type:%d|c%s,metrics_type:histogram", ts.telemetry.aggregated_histogram, tags) + containerID, + fmt.Sprintf("datadog.dogstatsd.client.aggregated_context_by_type:%d|c%s,metrics_type:timing", ts.telemetry.aggregated_timing, tags) + containerID, }...) } return metrics diff --git a/statsd/worker.go b/statsd/worker.go index edc00d919..5446d506a 100644 --- a/statsd/worker.go +++ b/statsd/worker.go @@ -84,7 +84,7 @@ func (w *worker) writeAggregatedMetricUnsafe(m metric, metricSymbol []byte, prec } for { - pos, err := w.buffer.writeAggregated(metricSymbol, m.namespace, m.name, m.containerID, m.globalTags, m.fvalues[globalPos:], m.stags, tagsSize, precision) + pos, err := w.buffer.writeAggregated(metricSymbol, m.namespace, m.globalTags, m.name, m.fvalues[globalPos:], m.stags, tagsSize, precision) if err == errPartialWrite { // We successfully wrote part of the histogram metrics. // We flush the current buffer and finish the histogram @@ -100,21 +100,21 @@ func (w *worker) writeAggregatedMetricUnsafe(m metric, metricSymbol []byte, prec func (w *worker) writeMetricUnsafe(m metric) error { switch m.metricType { case gauge: - return w.buffer.writeGauge(m.namespace, m.name, m.containerID, m.globalTags, m.fvalue, m.tags, m.rate) + return w.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate) case count: - return w.buffer.writeCount(m.namespace, m.name, m.containerID, m.globalTags, m.ivalue, m.tags, m.rate) + return w.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate) case histogram: - return w.buffer.writeHistogram(m.namespace, m.name, m.containerID, m.globalTags, m.fvalue, m.tags, m.rate) + return w.buffer.writeHistogram(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate) case distribution: - return w.buffer.writeDistribution(m.namespace, m.name, m.containerID, m.globalTags, m.fvalue, m.tags, m.rate) + return w.buffer.writeDistribution(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate) case set: - return w.buffer.writeSet(m.namespace, m.name, m.containerID, m.globalTags, m.svalue, m.tags, m.rate) + return w.buffer.writeSet(m.namespace, m.globalTags, m.name, m.svalue, m.tags, m.rate) case timing: - return w.buffer.writeTiming(m.namespace, m.name, m.containerID, m.globalTags, m.fvalue, m.tags, m.rate) + return w.buffer.writeTiming(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate) case event: - return w.buffer.writeEvent(m.evalue, m.globalTags, m.containerID) + return w.buffer.writeEvent(m.evalue, m.globalTags) case serviceCheck: - return w.buffer.writeServiceCheck(m.scvalue, m.globalTags, m.containerID) + return w.buffer.writeServiceCheck(m.scvalue, m.globalTags) case histogramAggregated: return w.writeAggregatedMetricUnsafe(m, histogramSymbol, -1) case distributionAggregated: