Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statsd: set client_transport properly #290

Merged
merged 6 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions statsd/end_to_end_uds_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !windows
// +build !windows

package statsd
Expand All @@ -9,6 +10,7 @@ import (
"testing"
)

// TODO: implement the same test for uds-stream
func TestFullPipelineUDS(t *testing.T) {
for testName, c := range getTestMap() {
socketPath := fmt.Sprintf("/tmp/dsd_%d.socket", rand.Int())
Expand Down
4 changes: 2 additions & 2 deletions statsd/pipe.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//go:build !windows
// +build !windows

package statsd

import (
"errors"
"io"
"time"
)

func newWindowsPipeWriter(pipepath string, writeTimeout time.Duration) (io.WriteCloser, error) {
func newWindowsPipeWriter(pipepath string, writeTimeout time.Duration) (Transport, error) {
return nil, errors.New("Windows Named Pipes are only supported on Windows")
}
6 changes: 6 additions & 0 deletions statsd/pipe_windows.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build windows
// +build windows

package statsd
Expand Down Expand Up @@ -65,6 +66,11 @@ func (p *pipeWriter) Close() error {
return p.conn.Close()
}

// GetTransportName returns the name of the transport
func (p *pipeWriter) GetTransportName() string {
return writerWindowsPipe
}

func newWindowsPipeWriter(pipepath string, writeTimeout time.Duration) (*pipeWriter, error) {
// Defer connection establishment to first write
return &pipeWriter{
Expand Down
15 changes: 13 additions & 2 deletions statsd/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@ type senderTelemetry struct {
totalBytesDroppedWriter uint64
}

type Transport interface {
io.WriteCloser

// GetTransportName returns the name of the transport
GetTransportName() string
}

type sender struct {
transport io.WriteCloser
transport Transport
pool *bufferPool
queue chan *statsdBuffer
telemetry *senderTelemetry
Expand All @@ -35,7 +42,7 @@ func (e *ErrorSenderChannelFull) Error() string {
return e.Msg
}

func newSender(transport io.WriteCloser, queueSize int, pool *bufferPool, errorHandler ErrorHandler) *sender {
func newSender(transport Transport, queueSize int, pool *bufferPool, errorHandler ErrorHandler) *sender {
sender := &sender{
transport: transport,
pool: pool,
Expand Down Expand Up @@ -132,3 +139,7 @@ func (s *sender) close() error {
s.flushInputQueue()
return s.transport.Close()
}

func (s *sender) getTransportName() string {
return s.transport.GetTransportName()
}
4 changes: 4 additions & 0 deletions statsd/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ func (w *mockedWriter) Close() error {
return args.Error(0)
}

func (w *mockedWriter) GetTransportName() string {
return "mock"
}

func TestSender(t *testing.T) {
writer := new(mockedWriter)
writer.On("Write", mock.Anything).Return(1, nil)
Expand Down
47 changes: 34 additions & 13 deletions statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,12 @@ const (
)

const (
writerNameUDP string = "udp"
writerNameUDS string = "uds"
writerWindowsPipe string = "pipe"
writerNameUDP string = "udp"
writerNameUDS string = "uds"
writerNameUDSDatagram string = "uds-datagram"
writerNameUDSStream string = "uds-stream"
writerWindowsPipe string = "pipe"
writerNameCustom string = "custom"
)

// noTimestamp is used as a value for metric without a given timestamp.
Expand Down Expand Up @@ -363,7 +366,7 @@ func parseAgentURL(agentURL string) string {
return ""
}

func createWriter(addr string, writeTimeout time.Duration) (io.WriteCloser, string, error) {
func createWriter(addr string, writeTimeout time.Duration) (Transport, string, error) {
addr = resolveAddr(addr)
if addr == "" {
return nil, "", errors.New("No address passed and autodetection from environment failed")
Expand All @@ -378,10 +381,10 @@ func createWriter(addr string, writeTimeout time.Duration) (io.WriteCloser, stri
return w, writerNameUDS, err
case strings.HasPrefix(addr, UnixAddressDatagramPrefix):
w, err := newUDSWriter(addr[len(UnixAddressDatagramPrefix):], writeTimeout, "unixgram")
return w, writerNameUDS, err
return w, writerNameUDSDatagram, err
iksaif marked this conversation as resolved.
Show resolved Hide resolved
case strings.HasPrefix(addr, UnixAddressStreamPrefix):
w, err := newUDSWriter(addr[len(UnixAddressStreamPrefix):], writeTimeout, "unix")
return w, writerNameUDS, err
return w, writerNameUDSStream, err
iksaif marked this conversation as resolved.
Show resolved Hide resolved
default:
w, err := newUDPWriter(addr, writeTimeout)
return w, writerNameUDP, err
Expand Down Expand Up @@ -409,14 +412,22 @@ func New(addr string, options ...Option) (*Client, error) {
return client, err
}

type customWriter struct {
io.WriteCloser
}

func (w *customWriter) GetTransportName() string {
return writerNameCustom
}

// NewWithWriter creates a new Client with given writer. Writer is a
// io.WriteCloser
func NewWithWriter(w io.WriteCloser, options ...Option) (*Client, error) {
o, err := resolveOptions(options)
if err != nil {
return nil, err
}
return newWithWriter(w, o, "custom")
return newWithWriter(&customWriter{w}, o, writerNameCustom)
}

// CloneWithExtraOptions create a new Client with extra options
Expand All @@ -432,7 +443,7 @@ func CloneWithExtraOptions(c *Client, options ...Option) (*Client, error) {
return New(c.addrOption, opt...)
}

func newWithWriter(w io.WriteCloser, o *Options, writerName string) (*Client, error) {
func newWithWriter(w Transport, o *Options, writerName string) (*Client, error) {
c := Client{
namespace: o.namespace,
tags: o.tags,
Expand All @@ -456,22 +467,24 @@ func newWithWriter(w io.WriteCloser, o *Options, writerName string) (*Client, er
initContainerID(o.containerID, isOriginDetectionEnabled(o, hasEntityID))
}

isUDS := writerName == writerNameUDS || writerName == writerNameUDSDatagram || writerName == writerNameUDSStream

if o.maxBytesPerPayload == 0 {
if writerName == writerNameUDS {
if isUDS {
iksaif marked this conversation as resolved.
Show resolved Hide resolved
o.maxBytesPerPayload = DefaultMaxAgentPayloadSize
} else {
o.maxBytesPerPayload = OptimalUDPPayloadSize
}
}
if o.bufferPoolSize == 0 {
if writerName == writerNameUDS {
if isUDS {
o.bufferPoolSize = DefaultUDSBufferPoolSize
} else {
o.bufferPoolSize = DefaultUDPBufferPoolSize
}
}
if o.senderQueueSize == 0 {
if writerName == writerNameUDS {
if isUDS {
o.senderQueueSize = DefaultUDSBufferPoolSize
} else {
o.senderQueueSize = DefaultUDPBufferPoolSize
Expand Down Expand Up @@ -524,10 +537,10 @@ func newWithWriter(w io.WriteCloser, o *Options, writerName string) (*Client, er

if o.telemetry {
if o.telemetryAddr == "" {
c.telemetryClient = newTelemetryClient(&c, writerName, c.agg != nil)
c.telemetryClient = newTelemetryClient(&c, c.agg != nil)
} else {
var err error
c.telemetryClient, err = newTelemetryClientWithCustomAddr(&c, writerName, o.telemetryAddr, c.agg != nil, bufferPool, o.writeTimeout)
c.telemetryClient, err = newTelemetryClientWithCustomAddr(&c, o.telemetryAddr, c.agg != nil, bufferPool, o.writeTimeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -600,6 +613,14 @@ func (c *Client) GetTelemetry() Telemetry {
return c.telemetryClient.getTelemetry()
}

// GetTransport return the name of the transport used.
func (c *Client) GetTransport() string {
if c.sender == nil {
return ""
}
return c.sender.getTransportName()
}

type ErrorInputChannelFull struct {
Metric metric
ChannelSize int
Expand Down
71 changes: 54 additions & 17 deletions statsd/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,39 +113,45 @@ type Telemetry struct {
}

type telemetryClient struct {
c *Client
tags []string
aggEnabled bool // is aggregation enabled and should we sent aggregation telemetry.
tagsByType map[metricType][]string
sender *sender
worker *worker
lastSample Telemetry // The previous sample of telemetry sent
sync.RWMutex // used mostly to change the transport tag.

c *Client
aggEnabled bool // is aggregation enabled and should we sent aggregation telemetry.
transport string
tags []string
tagsByType map[metricType][]string
transportTagKnown bool
sender *sender
worker *worker
lastSample Telemetry // The previous sample of telemetry sent
}

func newTelemetryClient(c *Client, transport string, aggregationEnabled bool) *telemetryClient {
func clientTransportTag(transport string) string {
if transport == writerNameUDS || transport == writerNameUDSDatagram {
return writerNameUDS // For backward compatibility.
}
return transport
iksaif marked this conversation as resolved.
Show resolved Hide resolved
}

func newTelemetryClient(c *Client, aggregationEnabled bool) *telemetryClient {
t := &telemetryClient{
c: c,
tags: append(c.tags, clientTelemetryTag, clientVersionTelemetryTag, "client_transport:"+transport),
aggEnabled: aggregationEnabled,
tags: []string{},
tagsByType: map[metricType][]string{},
}

t.tagsByType[gauge] = append(append([]string{}, t.tags...), "metrics_type:gauge")
t.tagsByType[count] = append(append([]string{}, t.tags...), "metrics_type:count")
t.tagsByType[set] = append(append([]string{}, t.tags...), "metrics_type:set")
t.tagsByType[timing] = append(append([]string{}, t.tags...), "metrics_type:timing")
t.tagsByType[histogram] = append(append([]string{}, t.tags...), "metrics_type:histogram")
t.tagsByType[distribution] = append(append([]string{}, t.tags...), "metrics_type:distribution")
t.setTags()
return t
}

func newTelemetryClientWithCustomAddr(c *Client, transport string, telemetryAddr string, aggregationEnabled bool, pool *bufferPool, writeTimeout time.Duration) (*telemetryClient, error) {
func newTelemetryClientWithCustomAddr(c *Client, telemetryAddr string, aggregationEnabled bool, pool *bufferPool, writeTimeout time.Duration) (*telemetryClient, error) {
telemetryWriter, _, err := createWriter(telemetryAddr, writeTimeout)
if err != nil {
return nil, fmt.Errorf("Could not resolve telemetry address: %v", err)
}

t := newTelemetryClient(c, transport, aggregationEnabled)
t := newTelemetryClient(c, aggregationEnabled)

// Creating a custom sender/worker with 1 worker in mutex mode for the
// telemetry that share the same bufferPool.
Expand Down Expand Up @@ -222,6 +228,36 @@ func (t *telemetryClient) getTelemetry() Telemetry {
return tlm
}

// setTransportTag if it was never set and is now known.
func (t *telemetryClient) setTags() {
transport := t.c.GetTransport()
t.RLock()
// We need to refresh if we never set the tags or if the transport changed.
// For example when `unix://` is used we might return `uds` until we actually connect and detect that
// this is a UDS Stream socket and then return `uds-stream`.
needsRefresh := len(t.tags) == len(t.c.tags) || t.transport != transport
t.RUnlock()

if !needsRefresh {
return
}

t.Lock()
defer t.Unlock()

t.transport = transport
t.tags = append(t.c.tags, clientTelemetryTag, clientVersionTelemetryTag)
if transport != "" {
t.tags = append(t.tags, "client_transport:"+clientTransportTag(transport))
}
t.tagsByType[gauge] = append(append([]string{}, t.tags...), "metrics_type:gauge")
t.tagsByType[count] = append(append([]string{}, t.tags...), "metrics_type:count")
t.tagsByType[set] = append(append([]string{}, t.tags...), "metrics_type:set")
t.tagsByType[timing] = append(append([]string{}, t.tags...), "metrics_type:timing")
t.tagsByType[histogram] = append(append([]string{}, t.tags...), "metrics_type:histogram")
t.tagsByType[distribution] = append(append([]string{}, t.tags...), "metrics_type:distribution")
}

// flushTelemetry returns Telemetry metrics to be flushed. It's its own function to ease testing.
func (t *telemetryClient) flush() []metric {
m := []metric{}
Expand All @@ -232,6 +268,7 @@ func (t *telemetryClient) flush() []metric {
}

tlm := t.getTelemetry()
t.setTags()

// We send the diff between now and the previous telemetry flush. This keep the same telemetry behavior from V4
// so users dashboard's aren't broken when upgrading to V5. It also allow to graph on the same dashboard a mix
Expand Down
6 changes: 4 additions & 2 deletions statsd/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,15 @@ func TestTelemetryCustomAddr(t *testing.T) {
}()

ts.sendAllType(client)
client.Flush()
err = client.Flush()
require.NoError(t, err)

client.telemetryClient.sendTelemetry()

select {
case <-readDone:
case <-time.After(2 * time.Second):
require.Fail(t, "No data was flush on Close")
require.Fail(t, "No data was flushed on Close")
}

result := []string{}
Expand Down
5 changes: 5 additions & 0 deletions statsd/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,8 @@ func (w *udpWriter) Write(data []byte) (int, error) {
func (w *udpWriter) Close() error {
return w.conn.Close()
}

// GetTransportName returns the transport used by the sender
func (w *udpWriter) GetTransportName() string {
return writerNameUDP
}
12 changes: 12 additions & 0 deletions statsd/uds.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ func newUDSWriter(addr string, writeTimeout time.Duration, transport string) (*u
return writer, nil
}

// GetTransportName returns the transport used by the writer
func (w *udsWriter) GetTransportName() string {
if w.transport == "unixgram" {
iksaif marked this conversation as resolved.
Show resolved Hide resolved
return writerNameUDS
} else if w.transport == "unix" {
return writerNameUDSStream
} else {
// For backward compatibility.
return writerNameUDS
}
iksaif marked this conversation as resolved.
Show resolved Hide resolved
}

// retryOnWriteErr returns true if we should retry writing after a write error
func (w *udsWriter) retryOnWriteErr(err error) bool {
// Never retry when using unixgram (to preserve the historical behavior)
Expand Down
3 changes: 1 addition & 2 deletions statsd/uds_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ package statsd

import (
"fmt"
"io"
"time"
)

// newUDSWriter is disabled on Windows, SOCK_DGRAM are still unavailable but
// SOCK_STREAM should work once implemented in the agent (https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/)
func newUDSWriter(_ string, _ time.Duration, _ string) (io.WriteCloser, error) {
func newUDSWriter(_ string, _ time.Duration, _ string) (Transport, error) {
return nil, fmt.Errorf("Unix socket is not available on Windows")
}