Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support container ID field (dogstatsd 1.2) #250

Merged
merged 6 commits into from
Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions statsd/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,38 +35,38 @@ func newStatsdBuffer(maxSize, maxElements int) *statsdBuffer {
}
}

func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error {
func (b *statsdBuffer) writeGauge(namespace, name, containerID string, globalTags []string, value float64, tags []string, rate float64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendGauge(b.buffer, namespace, globalTags, name, value, tags, rate)
b.buffer = appendGauge(b.buffer, namespace, name, containerID, globalTags, value, tags, rate)
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}

func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name string, value int64, tags []string, rate float64) error {
func (b *statsdBuffer) writeCount(namespace, name, containerID string, globalTags []string, value int64, tags []string, rate float64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendCount(b.buffer, namespace, globalTags, name, value, tags, rate)
b.buffer = appendCount(b.buffer, namespace, name, containerID, globalTags, value, tags, rate)
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}

func (b *statsdBuffer) writeHistogram(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error {
func (b *statsdBuffer) writeHistogram(namespace, name, containerID string, globalTags []string, value float64, tags []string, rate float64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendHistogram(b.buffer, namespace, globalTags, name, value, tags, rate)
b.buffer = appendHistogram(b.buffer, namespace, name, containerID, globalTags, value, tags, rate)
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}

// writeAggregated serialized as many values as possible in the current buffer and return the position in values where it stopped.
func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, globalTags []string, name string, values []float64, tags string, tagSize int, precision int) (int, error) {
func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace, name, containerID string, globalTags []string, values []float64, tags string, tagSize int, precision int) (int, error) {
if b.elementCount >= b.maxElements {
return 0, errBufferFull
}
Expand Down Expand Up @@ -107,6 +107,7 @@ func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, gl
b.buffer = append(b.buffer, '|')
b.buffer = append(b.buffer, metricSymbol...)
b.buffer = appendTagsAggregated(b.buffer, globalTags, tags)
b.buffer = appendContainerID(b.buffer, containerID)
b.writeSeparator()
b.elementCount++

Expand All @@ -117,52 +118,52 @@ func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, gl

}

func (b *statsdBuffer) writeDistribution(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error {
func (b *statsdBuffer) writeDistribution(namespace, name, containerID string, globalTags []string, value float64, tags []string, rate float64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendDistribution(b.buffer, namespace, globalTags, name, value, tags, rate)
b.buffer = appendDistribution(b.buffer, namespace, name, containerID, globalTags, value, tags, rate)
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}

func (b *statsdBuffer) writeSet(namespace string, globalTags []string, name string, value string, tags []string, rate float64) error {
func (b *statsdBuffer) writeSet(namespace, name, containerID string, globalTags []string, value string, tags []string, rate float64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendSet(b.buffer, namespace, globalTags, name, value, tags, rate)
b.buffer = appendSet(b.buffer, namespace, name, containerID, globalTags, value, tags, rate)
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}

func (b *statsdBuffer) writeTiming(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error {
func (b *statsdBuffer) writeTiming(namespace, name, containerID string, globalTags []string, value float64, tags []string, rate float64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendTiming(b.buffer, namespace, globalTags, name, value, tags, rate)
b.buffer = appendTiming(b.buffer, namespace, name, containerID, globalTags, value, tags, rate)
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}

func (b *statsdBuffer) writeEvent(event *Event, globalTags []string) error {
func (b *statsdBuffer) writeEvent(event *Event, globalTags []string, containerID string) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendEvent(b.buffer, event, globalTags)
b.buffer = appendEvent(b.buffer, event, globalTags, containerID)
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}

func (b *statsdBuffer) writeServiceCheck(serviceCheck *ServiceCheck, globalTags []string) error {
func (b *statsdBuffer) writeServiceCheck(serviceCheck *ServiceCheck, globalTags []string, containerID string) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendServiceCheck(b.buffer, serviceCheck, globalTags)
b.buffer = appendServiceCheck(b.buffer, serviceCheck, globalTags, containerID)
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}
Expand Down
2 changes: 1 addition & 1 deletion statsd/buffer_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestBufferPoolEmpty(t *testing.T) {
func TestBufferReturn(t *testing.T) {
bufferPool := newBufferPool(1, 1024, 20)
buffer := bufferPool.borrowBuffer()
buffer.writeCount("", nil, "", 1, nil, 1)
buffer.writeCount("", "", "", nil, 1, nil, 1)

assert.Equal(t, 0, len(bufferPool.pool))
bufferPool.returnBuffer(buffer)
Expand Down
115 changes: 85 additions & 30 deletions statsd/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,163 +8,218 @@ import (

func TestBufferGauge(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeGauge("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|g|#tag:tag\n", string(buffer.bytes()))

// with a container ID field
buffer = newStatsdBuffer(1024, 1)
err = buffer.writeGauge("namespace.", "metric", "container-id", []string{"tag:tag"}, 1, []string{}, 1)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|g|#tag:tag|c:container-id\n", string(buffer.bytes()))
}

func TestBufferCount(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)
err := buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeCount("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|c|#tag:tag\n", string(buffer.bytes()))

// with a container ID field
buffer = newStatsdBuffer(1024, 1)
err = buffer.writeCount("namespace.", "metric", "container-id", []string{"tag:tag"}, 1, []string{}, 1)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|c|#tag:tag|c:container-id\n", string(buffer.bytes()))
}

func TestBufferHistogram(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)
err := buffer.writeHistogram("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeHistogram("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|h|#tag:tag\n", string(buffer.bytes()))

// with a container ID field
buffer = newStatsdBuffer(1024, 1)
err = buffer.writeHistogram("namespace.", "metric", "container-id", []string{"tag:tag"}, 1, []string{}, 1)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|h|#tag:tag|c:container-id\n", string(buffer.bytes()))
}

func TestBufferDistribution(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)
err := buffer.writeDistribution("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeDistribution("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|d|#tag:tag\n", string(buffer.bytes()))

// with a container ID field
buffer = newStatsdBuffer(1024, 1)
err = buffer.writeDistribution("namespace.", "metric", "container-id", []string{"tag:tag"}, 1, []string{}, 1)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|d|#tag:tag|c:container-id\n", string(buffer.bytes()))
}
func TestBufferSet(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)
err := buffer.writeSet("namespace.", []string{"tag:tag"}, "metric", "value", []string{}, 1)
err := buffer.writeSet("namespace.", "metric", "", []string{"tag:tag"}, "value", []string{}, 1)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:value|s|#tag:tag\n", string(buffer.bytes()))

// with a container ID field
buffer = newStatsdBuffer(1024, 1)
err = buffer.writeSet("namespace.", "metric", "container-id", []string{"tag:tag"}, "value", []string{}, 1)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:value|s|#tag:tag|c:container-id\n", string(buffer.bytes()))
}

func TestBufferTiming(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)
err := buffer.writeTiming("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeTiming("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1.000000|ms|#tag:tag\n", string(buffer.bytes()))

// with a container ID field
buffer = newStatsdBuffer(1024, 1)
err = buffer.writeTiming("namespace.", "metric", "container-id", []string{"tag:tag"}, 1, []string{}, 1)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1.000000|ms|#tag:tag|c:container-id\n", string(buffer.bytes()))
}

func TestBufferEvent(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)
err := buffer.writeEvent(&Event{Title: "title", Text: "text"}, []string{"tag:tag"})
err := buffer.writeEvent(&Event{Title: "title", Text: "text"}, []string{"tag:tag"}, "")
assert.Nil(t, err)
assert.Equal(t, "_e{5,4}:title|text|#tag:tag\n", string(buffer.bytes()))

// with a container ID field
buffer = newStatsdBuffer(1024, 1)
err = buffer.writeEvent(&Event{Title: "title", Text: "text"}, []string{"tag:tag"}, "container-id")
assert.Nil(t, err)
assert.Equal(t, "_e{5,4}:title|text|#tag:tag|c:container-id\n", string(buffer.bytes()))
}

func TestBufferServiceCheck(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)
err := buffer.writeServiceCheck(&ServiceCheck{Name: "name", Status: Ok}, []string{"tag:tag"})
err := buffer.writeServiceCheck(&ServiceCheck{Name: "name", Status: Ok}, []string{"tag:tag"}, "")
assert.Nil(t, err)
assert.Equal(t, "_sc|name|0|#tag:tag\n", string(buffer.bytes()))

// with a container ID field
buffer = newStatsdBuffer(1024, 1)
err = buffer.writeServiceCheck(&ServiceCheck{Name: "name", Status: Ok}, []string{"tag:tag"}, "container-id")
assert.Nil(t, err)
assert.Equal(t, "_sc|name|0|#tag:tag|c:container-id\n", string(buffer.bytes()))
}

func TestBufferFullSize(t *testing.T) {
buffer := newStatsdBuffer(30, 10)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeGauge("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1)
assert.Nil(t, err)
assert.Len(t, buffer.bytes(), 30)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeGauge("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1)
assert.Equal(t, errBufferFull, err)
}

func TestBufferSeparator(t *testing.T) {
buffer := newStatsdBuffer(1024, 10)
err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeGauge("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1)
assert.Nil(t, err)
err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeGauge("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1)
assert.Nil(t, err)
assert.Equal(t, "namespace.metric:1|g|#tag:tag\nnamespace.metric:1|g|#tag:tag\n", string(buffer.bytes()))
}

func TestBufferAggregated(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)
pos, err := buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1}, "", 12, -1)
pos, err := buffer.writeAggregated([]byte("h"), "namespace.", "metric", "", []string{"tag:tag"}, []float64{1}, "", 12, -1)
assert.Nil(t, err)
assert.Equal(t, 1, pos)
assert.Equal(t, "namespace.metric:1|h|#tag:tag\n", string(buffer.bytes()))

buffer = newStatsdBuffer(1024, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", "metric", "", []string{"tag:tag"}, []float64{1, 2, 3, 4}, "", 12, -1)
assert.Nil(t, err)
assert.Equal(t, 4, pos)
assert.Equal(t, "namespace.metric:1:2:3:4|h|#tag:tag\n", string(buffer.bytes()))

// max element already used
buffer = newStatsdBuffer(1024, 1)
buffer.elementCount = 1
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", "metric", "", []string{"tag:tag"}, []float64{1, 2, 3, 4}, "", 12, -1)
assert.Equal(t, errBufferFull, err)

// not enought size to start serializing (tags and header too big)
buffer = newStatsdBuffer(4, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", "metric", "", []string{"tag:tag"}, []float64{1, 2, 3, 4}, "", 12, -1)
assert.Equal(t, errBufferFull, err)

// not enought size to serializing one message
buffer = newStatsdBuffer(29, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", "metric", "", []string{"tag:tag"}, []float64{1, 2, 3, 4}, "", 12, -1)
assert.Equal(t, errBufferFull, err)

// space for only 1 number
buffer = newStatsdBuffer(30, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", "metric", "", []string{"tag:tag"}, []float64{1, 2, 3, 4}, "", 12, -1)
assert.Equal(t, errPartialWrite, err)
assert.Equal(t, 1, pos)
assert.Equal(t, "namespace.metric:1|h|#tag:tag\n", string(buffer.bytes()))

// first value too big
buffer = newStatsdBuffer(30, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{12, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", "metric", "", []string{"tag:tag"}, []float64{12, 2, 3, 4}, "", 12, -1)
assert.Equal(t, errBufferFull, err)
assert.Equal(t, 0, pos)
assert.Equal(t, "", string(buffer.bytes())) // checking that the buffer was reset

// not enough space left
buffer = newStatsdBuffer(40, 1)
buffer.buffer = append(buffer.buffer, []byte("abcdefghij")...)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{12, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", "metric", "", []string{"tag:tag"}, []float64{12, 2, 3, 4}, "", 12, -1)
assert.Equal(t, errBufferFull, err)
assert.Equal(t, 0, pos)
assert.Equal(t, "abcdefghij", string(buffer.bytes())) // checking that the buffer was reset

// space for only 2 number
buffer = newStatsdBuffer(32, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", "metric", "", []string{"tag:tag"}, []float64{1, 2, 3, 4}, "", 12, -1)
assert.Equal(t, errPartialWrite, err)
assert.Equal(t, 2, pos)
assert.Equal(t, "namespace.metric:1:2|h|#tag:tag\n", string(buffer.bytes()))

// with a container ID field
buffer = newStatsdBuffer(1024, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", "metric", "container-id", []string{"tag:tag"}, []float64{1}, "", 12, -1)
assert.Nil(t, err)
assert.Equal(t, 1, pos)
assert.Equal(t, "namespace.metric:1|h|#tag:tag|c:container-id\n", string(buffer.bytes()))
}

func TestBufferMaxElement(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)

err := buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err := buffer.writeGauge("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1)
assert.Nil(t, err)

err = buffer.writeGauge("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeGauge("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1)
assert.Equal(t, errBufferFull, err)

err = buffer.writeCount("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeCount("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1)
assert.Equal(t, errBufferFull, err)

err = buffer.writeHistogram("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeHistogram("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1)
assert.Equal(t, errBufferFull, err)

err = buffer.writeDistribution("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeDistribution("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1)
assert.Equal(t, errBufferFull, err)

err = buffer.writeSet("namespace.", []string{"tag:tag"}, "metric", "value", []string{}, 1)
err = buffer.writeSet("namespace.", "metric", "", []string{"tag:tag"}, "value", []string{}, 1)
assert.Equal(t, errBufferFull, err)

err = buffer.writeTiming("namespace.", []string{"tag:tag"}, "metric", 1, []string{}, 1)
err = buffer.writeTiming("namespace.", "metric", "", []string{"tag:tag"}, 1, []string{}, 1)
assert.Equal(t, errBufferFull, err)

err = buffer.writeEvent(&Event{Title: "title", Text: "text"}, []string{"tag:tag"})
err = buffer.writeEvent(&Event{Title: "title", Text: "text"}, []string{"tag:tag"}, "")
assert.Equal(t, errBufferFull, err)

err = buffer.writeServiceCheck(&ServiceCheck{Name: "name", Status: Ok}, []string{"tag:tag"})
err = buffer.writeServiceCheck(&ServiceCheck{Name: "name", Status: Ok}, []string{"tag:tag"}, "")
assert.Equal(t, errBufferFull, err)
}
Loading