-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Datadog UDS support #123
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,8 +4,9 @@ import ( | |
"bytes" | ||
"io" | ||
"log" | ||
"net" | ||
"net/url" | ||
"os" | ||
"strings" | ||
"syscall" | ||
"time" | ||
|
||
|
@@ -37,6 +38,8 @@ var ( | |
// The ClientConfig type is used to configure datadog clients. | ||
type ClientConfig struct { | ||
// Address of the datadog database to send metrics to. | ||
// UDP: host:port (default) | ||
// UDS: unixgram://dir/file.ext | ||
Address string | ||
|
||
// Maximum size of batch of events sent to datadog. | ||
|
@@ -89,15 +92,24 @@ func NewClientWith(config ClientConfig) *Client { | |
}, | ||
} | ||
|
||
conn, bufferSize, err := dial(config.Address, config.BufferSize) | ||
w, err := newWriter(config.Address) | ||
if err != nil { | ||
log.Printf("stats/datadog: %s", err) | ||
log.Printf("stats/datadog: unable to create writer %s", err) | ||
c.err = err | ||
w = &noopWriter{} | ||
} | ||
|
||
c.conn, c.err, c.bufferSize = conn, err, bufferSize | ||
c.buffer.BufferSize = bufferSize | ||
newBufSize, err := w.CalcBufferSize(config.BufferSize) | ||
if err != nil { | ||
log.Printf("stats/datadog: unable to calc writer's buffer size. Defaulting to a buffer of size %d - %v\n", DefaultBufferSize, err) | ||
newBufSize = DefaultBufferSize | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if we prefer seting newBufSize to 0 (existing behaviour)? i am open for suggestion |
||
} | ||
c.bufferSize = newBufSize | ||
c.buffer.BufferSize = newBufSize | ||
|
||
c.serializer.w = w | ||
c.buffer.Serializer = &c.serializer | ||
log.Printf("stats/datadog: sending metrics with a buffer of size %d B", bufferSize) | ||
log.Printf("stats/datadog: sending metrics with a buffer of size %d B", newBufSize) | ||
return c | ||
} | ||
|
||
|
@@ -124,7 +136,7 @@ func (c *Client) Close() error { | |
} | ||
|
||
type serializer struct { | ||
conn net.Conn | ||
w io.WriteCloser | ||
bufferSize int | ||
filters map[string]struct{} | ||
} | ||
|
@@ -137,12 +149,9 @@ func (s *serializer) AppendMeasures(b []byte, _ time.Time, measures ...stats.Mea | |
} | ||
|
||
func (s *serializer) Write(b []byte) (int, error) { | ||
if s.conn == nil { | ||
return 0, io.ErrClosedPipe | ||
} | ||
|
||
if len(b) <= s.bufferSize { | ||
return s.conn.Write(b) | ||
return s.w.Write(b) | ||
} | ||
|
||
// When the serialized metrics are larger than the configured socket buffer | ||
|
@@ -167,8 +176,7 @@ func (s *serializer) Write(b []byte) (int, error) { | |
} | ||
splitIndex += i + 1 | ||
} | ||
|
||
c, err := s.conn.Write(b[:splitIndex]) | ||
c, err := s.w.Write(b[:splitIndex]) | ||
if err != nil { | ||
return n + c, err | ||
} | ||
|
@@ -181,32 +189,17 @@ func (s *serializer) Write(b []byte) (int, error) { | |
} | ||
|
||
func (s *serializer) close() { | ||
if s.conn != nil { | ||
s.conn.Close() | ||
} | ||
s.w.Close() | ||
} | ||
|
||
func dial(address string, sizehint int) (conn net.Conn, bufsize int, err error) { | ||
var f *os.File | ||
|
||
if conn, err = net.Dial("udp", address); err != nil { | ||
return | ||
} | ||
|
||
if f, err = conn.(*net.UDPConn).File(); err != nil { | ||
conn.Close() | ||
return | ||
} | ||
defer f.Close() | ||
func bufSizeFromFD(f *os.File, sizehint int) (bufsize int, err error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i would need an expert eye on this, i used the same existing logic against UDP for Unix socket too. I think that is right but i am not 100% sure. if not i can make this only for UDP? |
||
fd := int(f.Fd()) | ||
|
||
// The kernel refuses to send UDP datagrams that are larger than the size of | ||
// the size of the socket send buffer. To maximize the number of metrics | ||
// sent in one batch we attempt to attempt to adjust the kernel buffer size | ||
// to accept larger datagrams, or fallback to the default socket buffer size | ||
// if it failed. | ||
if bufsize, err = syscall.GetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_SNDBUF); err != nil { | ||
conn.Close() | ||
return | ||
} | ||
|
||
|
@@ -244,3 +237,44 @@ func dial(address string, sizehint int) (conn net.Conn, bufsize int, err error) | |
syscall.SetNonblock(fd, true) | ||
return | ||
} | ||
|
||
type ddWriter interface { | ||
io.WriteCloser | ||
CalcBufferSize(desiredBufSize int) (int, error) | ||
} | ||
|
||
func newWriter(addr string) (ddWriter, error) { | ||
if strings.HasPrefix(addr, "unixgram://") || | ||
strings.HasPrefix(addr, "udp://") { | ||
u, err := url.Parse(addr) | ||
if err != nil { | ||
return nil, err | ||
} | ||
switch u.Scheme { | ||
case "unixgram": | ||
return newUDSWriter(u.Path) | ||
case "udp": | ||
return newUDPWriter(u.Path) | ||
} | ||
} | ||
// default assume addr host:port to use UDP | ||
return newUDPWriter(addr) | ||
} | ||
|
||
// noopWriter is a writer that does nothing | ||
type noopWriter struct{} | ||
|
||
// Write writes nothing | ||
func (w *noopWriter) Write(data []byte) (int, error) { | ||
return 0, nil | ||
} | ||
|
||
// Close is a noop close | ||
func (w *noopWriter) Close() error { | ||
return nil | ||
} | ||
|
||
// CalcBufferSize returns the sizehint | ||
func (w *noopWriter) CalcBufferSize(sizehint int) (int, error) { | ||
return sizehint, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package datadog | ||
|
||
import "net" | ||
|
||
type udpWriter struct { | ||
conn net.Conn | ||
} | ||
|
||
// newUDPWriter returns a pointer to a new newUDPWriter given a socket file path as addr. | ||
func newUDPWriter(addr string) (*udpWriter, error) { | ||
udpAddr, err := net.ResolveUDPAddr("udp", addr) | ||
if err != nil { | ||
return nil, err | ||
} | ||
conn, err := net.DialUDP("udp", nil, udpAddr) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &udpWriter{conn: conn}, nil | ||
|
||
} | ||
|
||
// Write data to the UDP connection | ||
func (w *udpWriter) Write(data []byte) (int, error) { | ||
return w.conn.Write(data) | ||
} | ||
|
||
func (w *udpWriter) Close() error { | ||
return w.conn.Close() | ||
} | ||
|
||
func (w *udpWriter) CalcBufferSize(sizehint int) (int, error) { | ||
f, err := w.conn.(*net.UDPConn).File() | ||
if err != nil { | ||
return 0, err | ||
} | ||
defer f.Close() | ||
|
||
return bufSizeFromFD(f, sizehint) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally i would return an error to the caller of NewClientWith if we can not create a writer, but decided to not change the API