From 275fcb17d7b19e0f5dce15a42ab2cef00877bac7 Mon Sep 17 00:00:00 2001 From: Sebastien cante Date: Mon, 14 Sep 2020 17:40:44 +1000 Subject: [PATCH 1/5] initial draft for uds support --- datadog/client.go | 96 ++++++++++++++++++++++++++++-------------- datadog/client_test.go | 62 +++++++++++++++++++++++++-- datadog/server_test.go | 70 +++++++++++++++++++++++++++++- 3 files changed, 191 insertions(+), 37 deletions(-) diff --git a/datadog/client.go b/datadog/client.go index 3ac7afc..8bf0753 100644 --- a/datadog/client.go +++ b/datadog/client.go @@ -4,8 +4,9 @@ import ( "bytes" "io" "log" - "net" + "net/url" "os" + "strings" "syscall" "time" @@ -14,7 +15,7 @@ import ( const ( // DefaultAddress is the default address to which the datadog client tries - // to connect to. + // to connect to. By default is connects to UDP DefaultAddress = "localhost:8125" // DefaultBufferSize is the default size for batches of metrics sent to @@ -37,6 +38,8 @@ var ( // The ClientConfig type is used to configure datadog clients. type ClientConfig struct { // Address of the datadog database to send metrics to. + // UDP: host:port + // UDS: unixgram://dir/file.ext Address string // Maximum size of batch of events sent to datadog. @@ -89,15 +92,24 @@ func NewClientWith(config ClientConfig) *Client { }, } - conn, bufferSize, err := dial(config.Address, config.BufferSize) + w, err := newWriter(config.Address) if err != nil { - log.Printf("stats/datadog: %s", err) + log.Printf("stats/datadog: unable to create writer %s", err) + c.err = err + w = &noopWriter{} } - c.conn, c.err, c.bufferSize = conn, err, bufferSize - c.buffer.BufferSize = bufferSize + newBufSize, err := w.CalcBufferSize(config.BufferSize) + if err != nil { + log.Printf("stats/datadog: unable to calc buffer size from connn. Defaulting to a buffer of size %d - %v\n", DefaultBufferSize, err) + newBufSize = DefaultBufferSize + } + c.bufferSize = newBufSize + c.buffer.BufferSize = newBufSize + + c.serializer.w = w c.buffer.Serializer = &c.serializer - log.Printf("stats/datadog: sending metrics with a buffer of size %d B", bufferSize) + log.Printf("stats/datadog: sending metrics with a buffer of size %d B", c.serializer.bufferSize) return c } @@ -124,7 +136,7 @@ func (c *Client) Close() error { } type serializer struct { - conn net.Conn + w io.WriteCloser bufferSize int filters map[string]struct{} } @@ -137,12 +149,9 @@ func (s *serializer) AppendMeasures(b []byte, _ time.Time, measures ...stats.Mea } func (s *serializer) Write(b []byte) (int, error) { - if s.conn == nil { - return 0, io.ErrClosedPipe - } if len(b) <= s.bufferSize { - return s.conn.Write(b) + return s.w.Write(b) } // When the serialized metrics are larger than the configured socket buffer @@ -167,8 +176,7 @@ func (s *serializer) Write(b []byte) (int, error) { } splitIndex += i + 1 } - - c, err := s.conn.Write(b[:splitIndex]) + c, err := s.w.Write(b[:splitIndex]) if err != nil { return n + c, err } @@ -181,33 +189,18 @@ func (s *serializer) Write(b []byte) (int, error) { } func (s *serializer) close() { - if s.conn != nil { - s.conn.Close() - } + s.w.Close() } -func dial(address string, sizehint int) (conn net.Conn, bufsize int, err error) { - var f *os.File - - if conn, err = net.Dial("udp", address); err != nil { - return - } - - if f, err = conn.(*net.UDPConn).File(); err != nil { - conn.Close() - return - } - defer f.Close() +func bufSizeFromFD(f *os.File, sizehint int) (bufsize int, err error) { fd := int(f.Fd()) - // The kernel refuses to send UDP datagrams that are larger than the size of // the size of the socket send buffer. To maximize the number of metrics // sent in one batch we attempt to attempt to adjust the kernel buffer size // to accept larger datagrams, or fallback to the default socket buffer size // if it failed. if bufsize, err = syscall.GetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_SNDBUF); err != nil { - conn.Close() - return + return 0, err } // The kernel applies a 2x factor on the socket buffer size, only half of it @@ -244,3 +237,42 @@ func dial(address string, sizehint int) (conn net.Conn, bufsize int, err error) syscall.SetNonblock(fd, true) return } + +type ddWriter interface { + io.WriteCloser + CalcBufferSize(desiredBufSize int) (int, error) +} + +func newWriter(addr string) (ddWriter, error) { + if strings.HasPrefix(addr, "unixgram://") || + strings.HasPrefix(addr, "udp://") { + u, err := url.Parse(addr) + if err != nil { + return nil, err + } + switch u.Scheme { + case "unixgram": + return newUDSWriter(u.Path) + case "udp": + return newUDPWriter(u.Path) + } + } + // default assume addr host:port to use UDP + return newUDPWriter(addr) +} + +// noopWriter is a writer that does not do anything +type noopWriter struct{} + +// Write writes nothing +func (w *noopWriter) Write(data []byte) (int, error) { + return 0, nil +} + +func (w *noopWriter) Close() error { + return nil +} + +func (w *noopWriter) CalcBufferSize(sizehint int) (int, error) { + return sizehint, nil +} diff --git a/datadog/client_test.go b/datadog/client_test.go index 3238140..29363c0 100644 --- a/datadog/client_test.go +++ b/datadog/client_test.go @@ -13,7 +13,7 @@ import ( "github.com/segmentio/stats/v4" ) -func TestClient(t *testing.T) { +func TestClient_UDP(t *testing.T) { client := NewClient(DefaultAddress) for i := 0; i != 1000; i++ { @@ -35,7 +35,29 @@ func TestClient(t *testing.T) { } } -func TestClientWriteLargeMetrics(t *testing.T) { +func TestClient_UDS(t *testing.T) { + client := NewClient("unixgram://do-not-exist") + + for i := 0; i != 1000; i++ { + client.HandleMeasures(time.Time{}, stats.Measure{ + Name: "request", + Fields: []stats.Field{ + {Name: "count", Value: stats.ValueOf(5)}, + {Name: "rtt", Value: stats.ValueOf(100 * time.Millisecond)}, + }, + Tags: []stats.Tag{ + stats.T("answer", "42"), + stats.T("hello", "world"), + }, + }) + } + + if err := client.Close(); err != nil { + t.Error(err) + } +} + +func TestClientWriteLargeMetrics_UDP(t *testing.T) { const data = `main.http.error.count:0|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity main.http.message.count:1|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request main.http.message.header.size:2|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request @@ -51,7 +73,7 @@ main.http.rtt.seconds:0.001215296|h|#http_req_content_charset:,http_req_content_ count := int32(0) expect := int32(strings.Count(data, "\n")) - addr, closer := startTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) { + addr, closer := startUDPTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) { atomic.AddInt32(&count, 1) })) defer closer.Close() @@ -69,6 +91,40 @@ main.http.rtt.seconds:0.001215296|h|#http_req_content_charset:,http_req_content_ } } +func TestClientWriteLargeMetrics_UDS(t *testing.T) { + const data = `main.http.error.count:0|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity +main.http.message.count:1|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request +main.http.message.header.size:2|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request +main.http.message.header.bytes:240|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request +main.http.message.body.bytes:0|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request +main.http.message.count:1|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response +main.http.message.header.size:1|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response +main.http.message.header.bytes:70|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response +main.http.message.body.bytes:839|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response +main.http.rtt.seconds:0.001215296|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response +` + + count := int32(0) + expect := int32(strings.Count(data, "\n")) + + addr, closer := startUDSTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) { + atomic.AddInt32(&count, 1) + })) + defer closer.Close() + + client := NewClient("unixgram://" + addr) + + if _, err := client.Write([]byte(data)); err != nil { + t.Error(err) + } + + time.Sleep(100 * time.Millisecond) + + if n := atomic.LoadInt32(&count); n != expect { + t.Error("bad metric count:", n) + } +} + func BenchmarkClient(b *testing.B) { log.SetOutput(ioutil.Discard) diff --git a/datadog/server_test.go b/datadog/server_test.go index 0030a6b..11d58df 100644 --- a/datadog/server_test.go +++ b/datadog/server_test.go @@ -2,7 +2,10 @@ package datadog import ( "io" + "io/ioutil" "net" + "os" + "path/filepath" "sync/atomic" "testing" "time" @@ -17,7 +20,7 @@ func TestServer(t *testing.T) { b := uint32(0) c := uint32(0) - addr, closer := startTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) { + addr, closer := startUDPTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) { switch m.Name { case "datadog.test.A": atomic.AddUint32(&a, uint32(m.Value)) @@ -68,7 +71,7 @@ func TestServer(t *testing.T) { } } -func startTestServer(t *testing.T, handler Handler) (addr string, closer io.Closer) { +func startUDPTestServer(t *testing.T, handler Handler) (addr string, closer io.Closer) { conn, err := net.ListenPacket("udp", "127.0.0.1:0") if err != nil { @@ -80,3 +83,66 @@ func startTestServer(t *testing.T, handler Handler) (addr string, closer io.Clos return conn.LocalAddr().String(), conn } + +// startUDSTestServerWithSocketFile starts a UDS server with a given socket file +func startUDSTestServerWithSocketFile(t *testing.T, socketPath string, handler Handler) (closer io.Closer) { + udsAddr, err := net.ResolveUnixAddr("unixgram", socketPath) + if err != nil { + t.Error(err) + t.FailNow() + } + + conn, err := net.ListenUnixgram("unixgram", udsAddr) + if err != nil { + t.Error(err) + t.FailNow() + } + + go Serve(conn, handler) + + return &testUnixgramServer{ + UnixConn: conn, + pathToDelete: socketPath, + } +} + +// startUDSTestServer starts a Unix domain socket server with a random tmp file for the socket file +func startUDSTestServer(t *testing.T, handler Handler) (socketPath string, closer io.Closer) { + dir, err := ioutil.TempDir("", "socket") + if err != nil { + t.Error(err) + t.FailNow() + } + + socketPath = filepath.Join(dir, "dsd.socket") + + udsAddr, err := net.ResolveUnixAddr("unixgram", socketPath) + if err != nil { + t.Error(err) + t.FailNow() + } + + conn, err := net.ListenUnixgram("unixgram", udsAddr) + if err != nil { + t.Error(err) + t.FailNow() + } + + ts := testUnixgramServer{ + UnixConn: conn, + pathToDelete: dir, // so we delete any tmp dir we created + } + + go Serve(conn, handler) + return socketPath, &ts +} + +type testUnixgramServer struct { + *net.UnixConn + pathToDelete string +} + +func (ts *testUnixgramServer) Close() error { + os.RemoveAll(ts.pathToDelete) // clean up + return ts.UnixConn.Close() +} From 54c9f6ed35c9e57d10dde149de55188a2eb924c3 Mon Sep 17 00:00:00 2001 From: Sebastien cante Date: Mon, 14 Sep 2020 17:40:51 +1000 Subject: [PATCH 2/5] initial draft for uds support --- datadog/udp.go | 42 +++++++++++++++++ datadog/uds.go | 108 ++++++++++++++++++++++++++++++++++++++++++++ datadog/uds_test.go | 55 ++++++++++++++++++++++ 3 files changed, 205 insertions(+) create mode 100644 datadog/udp.go create mode 100644 datadog/uds.go create mode 100644 datadog/uds_test.go diff --git a/datadog/udp.go b/datadog/udp.go new file mode 100644 index 0000000..33d88aa --- /dev/null +++ b/datadog/udp.go @@ -0,0 +1,42 @@ +package datadog + +import "net" + +// udsWriter is an internal class wrapping around management of UDS connection +type udpWriter struct { + conn net.Conn +} + +// newUDSWriter returns a pointer to a new udpWriter given a socket file path as addr. +func newUDPWriter(addr string) (*udpWriter, error) { + udpAddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + return nil, err + } + conn, err := net.DialUDP("udp", nil, udpAddr) + if err != nil { + return nil, err + } + return &udpWriter{conn: conn}, nil + +} + +// Write data to the UDS connection with write timeout and minimal error handling: +// create the connection if nil, and destroy it if the statsd server has disconnected +func (w *udpWriter) Write(data []byte) (int, error) { + return w.conn.Write(data) +} + +func (w *udpWriter) Close() error { + return w.conn.Close() +} + +func (w *udpWriter) CalcBufferSize(sizehint int) (int, error) { + f, err := w.conn.(*net.UDPConn).File() + if err != nil { + return 0, err + } + defer f.Close() + + return bufSizeFromFD(f, sizehint) +} diff --git a/datadog/uds.go b/datadog/uds.go new file mode 100644 index 0000000..29b71b1 --- /dev/null +++ b/datadog/uds.go @@ -0,0 +1,108 @@ +package datadog + +import ( + "net" + "sync" + "time" +) + +// UDSTimeout holds the default timeout for UDS socket writes, as they can get +// blocking when the receiving buffer is full. +// same value as in official datadog client: https://github.com/DataDog/datadog-go/blob/master/statsd/uds.go#L13 +const defaultUDSTimeout = 1 * time.Millisecond + +// udsWriter is an internal class wrapping around management of UDS connection +// credits to Datadog team: https://github.com/DataDog/datadog-go/blob/master/statsd/uds.go +type udsWriter struct { + // Address to send metrics to, needed to allow reconnection on error + addr net.Addr + + // Established connection object, or nil if not connected yet + conn net.Conn + connMu sync.RWMutex // so that we can replace the failing conn on error + + // write timeout + writeTimeout time.Duration +} + +// newUDSWriter returns a pointer to a new udsWriter given a socket file path as addr. +func newUDSWriter(addr string) (*udsWriter, error) { + udsAddr, err := net.ResolveUnixAddr("unixgram", addr) + if err != nil { + return nil, err + } + // Defer connection to first Write + writer := &udsWriter{addr: udsAddr, conn: nil, writeTimeout: defaultUDSTimeout} + return writer, nil +} + +// Write data to the UDS connection with write timeout and minimal error handling: +// create the connection if nil, and destroy it if the statsd server has disconnected +func (w *udsWriter) Write(data []byte) (int, error) { + conn, err := w.ensureConnection() + if err != nil { + return 0, err + } + + conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)) + n, e := conn.Write(data) + if err, isNetworkErr := e.(net.Error); err != nil && (!isNetworkErr || !err.Temporary()) { + // Statsd server disconnected, retry connecting at next packet + w.unsetConnection() + return 0, e + } + return n, e +} + +func (w *udsWriter) Close() error { + if w.conn != nil { + return w.conn.Close() + } + return nil +} + +func (w *udsWriter) CalcBufferSize(sizehint int) (int, error) { + conn, err := w.ensureConnection() + if err != nil { + return 0, err + } + f, err := conn.(*net.UnixConn).File() + if err != nil { + w.unsetConnection() + return 0, err + } + defer f.Close() + + return bufSizeFromFD(f, sizehint) +} + +func (w *udsWriter) ensureConnection() (net.Conn, error) { + // Check if we've already got a socket we can use + w.connMu.RLock() + currentConn := w.conn + w.connMu.RUnlock() + + if currentConn != nil { + return currentConn, nil + } + + // Looks like we might need to connect - try again with write locking. + w.connMu.Lock() + defer w.connMu.Unlock() + if w.conn != nil { + return w.conn, nil + } + + newConn, err := net.Dial(w.addr.Network(), w.addr.String()) + if err != nil { + return nil, err + } + w.conn = newConn + return newConn, nil +} + +func (w *udsWriter) unsetConnection() { + w.connMu.Lock() + defer w.connMu.Unlock() + w.conn = nil +} diff --git a/datadog/uds_test.go b/datadog/uds_test.go new file mode 100644 index 0000000..1030edb --- /dev/null +++ b/datadog/uds_test.go @@ -0,0 +1,55 @@ +package datadog + +import ( + "io/ioutil" + "net" + "path/filepath" + "sync/atomic" + "testing" +) + +func TestUDSReconnectWhenConnRefused(t *testing.T) { + dir, err := ioutil.TempDir("", "socket") + if err != nil { + t.Error(err) + t.FailNow() + } + socketPath := filepath.Join(dir, "dsd.socket") + count := int32(0) + closerServer1 := startUDSTestServerWithSocketFile(t, socketPath, HandlerFunc(func(m Metric, _ net.Addr) { + atomic.AddInt32(&count, 1) + })) + defer closerServer1.Close() + + client := NewClientWith(ClientConfig{ + Address: "unixgram://" + socketPath, + BufferSize: 1, // small buffer to force write to unix socket for each measure + }) + + measure := `main.http.error.count:0|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity +` + + _, err = client.Write([]byte(measure)) + if err != nil { + t.Errorf("unable to write data %v", err) + } + + closerServer1.Close() + + _, err = client.Write([]byte(measure)) + if err == nil { + t.Errorf("invalid error expected none, got %v", err) + } + // restart UDS server with same socket file + closerServer2 := startUDSTestServerWithSocketFile(t, socketPath, HandlerFunc(func(m Metric, _ net.Addr) { + atomic.AddInt32(&count, 1) + })) + + defer closerServer2.Close() + + _, err = client.Write([]byte(measure)) + if err != nil { + t.Errorf("unable to write data but should be able to as the client should reconnect %v", err) + } + +} From 91731bf3e8e9c034066a67f018b1719737f165c3 Mon Sep 17 00:00:00 2001 From: Sebastien cante Date: Mon, 14 Sep 2020 17:50:03 +1000 Subject: [PATCH 3/5] review pr --- datadog/client.go | 8 ++++---- datadog/server_test.go | 2 +- datadog/udp.go | 6 ++---- datadog/uds_test.go | 2 +- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/datadog/client.go b/datadog/client.go index 8bf0753..a007092 100644 --- a/datadog/client.go +++ b/datadog/client.go @@ -15,7 +15,7 @@ import ( const ( // DefaultAddress is the default address to which the datadog client tries - // to connect to. By default is connects to UDP + // to connect to. By default it connects to UDP DefaultAddress = "localhost:8125" // DefaultBufferSize is the default size for batches of metrics sent to @@ -101,7 +101,7 @@ func NewClientWith(config ClientConfig) *Client { newBufSize, err := w.CalcBufferSize(config.BufferSize) if err != nil { - log.Printf("stats/datadog: unable to calc buffer size from connn. Defaulting to a buffer of size %d - %v\n", DefaultBufferSize, err) + log.Printf("stats/datadog: unable to calc writer's buffer size. Defaulting to a buffer of size %d - %v\n", DefaultBufferSize, err) newBufSize = DefaultBufferSize } c.bufferSize = newBufSize @@ -109,7 +109,7 @@ func NewClientWith(config ClientConfig) *Client { c.serializer.w = w c.buffer.Serializer = &c.serializer - log.Printf("stats/datadog: sending metrics with a buffer of size %d B", c.serializer.bufferSize) + log.Printf("stats/datadog: sending metrics with a buffer of size %d B", newBufSize) return c } @@ -200,7 +200,7 @@ func bufSizeFromFD(f *os.File, sizehint int) (bufsize int, err error) { // to accept larger datagrams, or fallback to the default socket buffer size // if it failed. if bufsize, err = syscall.GetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_SNDBUF); err != nil { - return 0, err + return } // The kernel applies a 2x factor on the socket buffer size, only half of it diff --git a/datadog/server_test.go b/datadog/server_test.go index 11d58df..0f494bf 100644 --- a/datadog/server_test.go +++ b/datadog/server_test.go @@ -106,7 +106,7 @@ func startUDSTestServerWithSocketFile(t *testing.T, socketPath string, handler H } } -// startUDSTestServer starts a Unix domain socket server with a random tmp file for the socket file +// startUDSTestServer starts a Unix domain socket server with a random socket file func startUDSTestServer(t *testing.T, handler Handler) (socketPath string, closer io.Closer) { dir, err := ioutil.TempDir("", "socket") if err != nil { diff --git a/datadog/udp.go b/datadog/udp.go index 33d88aa..ff2d4cc 100644 --- a/datadog/udp.go +++ b/datadog/udp.go @@ -2,12 +2,11 @@ package datadog import "net" -// udsWriter is an internal class wrapping around management of UDS connection type udpWriter struct { conn net.Conn } -// newUDSWriter returns a pointer to a new udpWriter given a socket file path as addr. +// newUDPWriter returns a pointer to a new newUDPWriter given a socket file path as addr. func newUDPWriter(addr string) (*udpWriter, error) { udpAddr, err := net.ResolveUDPAddr("udp", addr) if err != nil { @@ -21,8 +20,7 @@ func newUDPWriter(addr string) (*udpWriter, error) { } -// Write data to the UDS connection with write timeout and minimal error handling: -// create the connection if nil, and destroy it if the statsd server has disconnected +// Write data to the UDP connection func (w *udpWriter) Write(data []byte) (int, error) { return w.conn.Write(data) } diff --git a/datadog/uds_test.go b/datadog/uds_test.go index 1030edb..4adf3cf 100644 --- a/datadog/uds_test.go +++ b/datadog/uds_test.go @@ -8,7 +8,7 @@ import ( "testing" ) -func TestUDSReconnectWhenConnRefused(t *testing.T) { +func TestUDSReconnectsWhenConnRefused(t *testing.T) { dir, err := ioutil.TempDir("", "socket") if err != nil { t.Error(err) From 8a1ca58d26abf25f655ac513ff62a9494e8f0a10 Mon Sep 17 00:00:00 2001 From: Sebastien cante Date: Mon, 14 Sep 2020 18:07:06 +1000 Subject: [PATCH 4/5] review pr --- datadog/client.go | 2 +- datadog/server_test.go | 2 +- datadog/uds_test.go | 16 +++++----------- 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/datadog/client.go b/datadog/client.go index a007092..f8608a8 100644 --- a/datadog/client.go +++ b/datadog/client.go @@ -261,7 +261,7 @@ func newWriter(addr string) (ddWriter, error) { return newUDPWriter(addr) } -// noopWriter is a writer that does not do anything +// noopWriter is a writer that does nothing type noopWriter struct{} // Write writes nothing diff --git a/datadog/server_test.go b/datadog/server_test.go index 0f494bf..6133c3b 100644 --- a/datadog/server_test.go +++ b/datadog/server_test.go @@ -106,7 +106,7 @@ func startUDSTestServerWithSocketFile(t *testing.T, socketPath string, handler H } } -// startUDSTestServer starts a Unix domain socket server with a random socket file +// startUDSTestServer starts a UDS server with server with a random socket file internally generated func startUDSTestServer(t *testing.T, handler Handler) (socketPath string, closer io.Closer) { dir, err := ioutil.TempDir("", "socket") if err != nil { diff --git a/datadog/uds_test.go b/datadog/uds_test.go index 4adf3cf..44709c6 100644 --- a/datadog/uds_test.go +++ b/datadog/uds_test.go @@ -4,7 +4,6 @@ import ( "io/ioutil" "net" "path/filepath" - "sync/atomic" "testing" ) @@ -15,15 +14,13 @@ func TestUDSReconnectsWhenConnRefused(t *testing.T) { t.FailNow() } socketPath := filepath.Join(dir, "dsd.socket") - count := int32(0) - closerServer1 := startUDSTestServerWithSocketFile(t, socketPath, HandlerFunc(func(m Metric, _ net.Addr) { - atomic.AddInt32(&count, 1) - })) + + closerServer1 := startUDSTestServerWithSocketFile(t, socketPath, HandlerFunc(func(m Metric, _ net.Addr) {})) defer closerServer1.Close() client := NewClientWith(ClientConfig{ Address: "unixgram://" + socketPath, - BufferSize: 1, // small buffer to force write to unix socket for each measure + BufferSize: 1, // small buffer to force write to unix socket for each measure written }) measure := `main.http.error.count:0|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity @@ -38,13 +35,10 @@ func TestUDSReconnectsWhenConnRefused(t *testing.T) { _, err = client.Write([]byte(measure)) if err == nil { - t.Errorf("invalid error expected none, got %v", err) + t.Errorf("got no error but expected one as the connection should be refused as we closed the server") } // restart UDS server with same socket file - closerServer2 := startUDSTestServerWithSocketFile(t, socketPath, HandlerFunc(func(m Metric, _ net.Addr) { - atomic.AddInt32(&count, 1) - })) - + closerServer2 := startUDSTestServerWithSocketFile(t, socketPath, HandlerFunc(func(m Metric, _ net.Addr) {})) defer closerServer2.Close() _, err = client.Write([]byte(measure)) From 969fe44d97dbf97bf959bbdcf05fd305f6cfa033 Mon Sep 17 00:00:00 2001 From: Sebastien cante Date: Mon, 14 Sep 2020 20:08:03 +1000 Subject: [PATCH 5/5] more docs --- datadog/client.go | 6 ++++-- datadog/server_test.go | 3 ++- datadog/uds.go | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/datadog/client.go b/datadog/client.go index f8608a8..df64a2d 100644 --- a/datadog/client.go +++ b/datadog/client.go @@ -15,7 +15,7 @@ import ( const ( // DefaultAddress is the default address to which the datadog client tries - // to connect to. By default it connects to UDP + // to connect to. DefaultAddress = "localhost:8125" // DefaultBufferSize is the default size for batches of metrics sent to @@ -38,7 +38,7 @@ var ( // The ClientConfig type is used to configure datadog clients. type ClientConfig struct { // Address of the datadog database to send metrics to. - // UDP: host:port + // UDP: host:port (default) // UDS: unixgram://dir/file.ext Address string @@ -269,10 +269,12 @@ func (w *noopWriter) Write(data []byte) (int, error) { return 0, nil } +// Close is a noop close func (w *noopWriter) Close() error { return nil } +// CalcBufferSize returns the sizehint func (w *noopWriter) CalcBufferSize(sizehint int) (int, error) { return sizehint, nil } diff --git a/datadog/server_test.go b/datadog/server_test.go index 6133c3b..27b483b 100644 --- a/datadog/server_test.go +++ b/datadog/server_test.go @@ -106,8 +106,9 @@ func startUDSTestServerWithSocketFile(t *testing.T, socketPath string, handler H } } -// startUDSTestServer starts a UDS server with server with a random socket file internally generated +// startUDSTestServer starts a UDS server with a random socket file internally generated func startUDSTestServer(t *testing.T, handler Handler) (socketPath string, closer io.Closer) { + // generate a random dir dir, err := ioutil.TempDir("", "socket") if err != nil { t.Error(err) diff --git a/datadog/uds.go b/datadog/uds.go index 29b71b1..a2b7e08 100644 --- a/datadog/uds.go +++ b/datadog/uds.go @@ -31,7 +31,7 @@ func newUDSWriter(addr string) (*udsWriter, error) { if err != nil { return nil, err } - // Defer connection to first Write + // Defer connection to first read/write writer := &udsWriter{addr: udsAddr, conn: nil, writeTimeout: defaultUDSTimeout} return writer, nil }