Skip to content

Commit

Permalink
close write side corrently if server fast reject
Browse files Browse the repository at this point in the history
  • Loading branch information
cfz committed Nov 9, 2019
1 parent 7c20132 commit 47993f5
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 16 deletions.
20 changes: 19 additions & 1 deletion internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"time"
"unicode"
Expand Down Expand Up @@ -50,6 +51,11 @@ type Number struct {
Value float64
}

type ReadWaitCloser struct {
pipeReader *io.PipeReader
wg sync.WaitGroup
}

// SetVersion sets the telegraf agent version
func SetVersion(v string) error {
if version != "" {
Expand Down Expand Up @@ -281,21 +287,33 @@ func ExitStatus(err error) (int, bool) {
return 0, false
}

func (r *ReadWaitCloser) Close() error {
err := r.pipeReader.Close()
r.wg.Wait() // wait for the gzip goroutine finish
return err
}

// CompressWithGzip takes an io.Reader as input and pipes
// it through a gzip.Writer returning an io.Reader containing
// the gzipped data.
// An error is returned if passing data to the gzip.Writer fails
func CompressWithGzip(data io.Reader) (io.Reader, error) {
func CompressWithGzip(data io.Reader) (io.ReadCloser, error) {
pipeReader, pipeWriter := io.Pipe()
gzipWriter := gzip.NewWriter(pipeWriter)

rc := &ReadWaitCloser{
pipeReader: pipeReader,
}

rc.wg.Add(1)
var err error
go func() {
_, err = io.Copy(gzipWriter, data)
gzipWriter.Close()
// subsequent reads from the read half of the pipe will
// return no bytes and the error err, or EOF if err is nil.
pipeWriter.CloseWithError(err)
rc.wg.Done()
}()

return pipeReader, err
Expand Down
4 changes: 3 additions & 1 deletion plugins/outputs/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,12 @@ func (h *HTTP) write(reqBody []byte) error {

var err error
if h.ContentEncoding == "gzip" {
reqBodyBuffer, err = internal.CompressWithGzip(reqBodyBuffer)
rc, err := internal.CompressWithGzip(reqBodyBuffer)
if err != nil {
return err
}
defer rc.Close()
reqBodyBuffer = rc
}

req, err := http.NewRequest(h.Method, h.URL, reqBodyBuffer)
Expand Down
31 changes: 24 additions & 7 deletions plugins/outputs/influxdb/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -288,7 +289,12 @@ func (c *httpClient) writeBatch(ctx context.Context, db string, metrics []telegr
return err
}

reader := influx.NewReader(metrics, c.config.Serializer)
reader, err := c.requestBodyReader(metrics)
if err != nil {
return err
}
defer reader.Close()

req, err := c.makeWriteRequest(url, reader)
if err != nil {
return err
Expand Down Expand Up @@ -386,12 +392,6 @@ func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) {

func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request, error) {
var err error
if c.config.ContentEncoding == "gzip" {
body, err = internal.CompressWithGzip(body)
if err != nil {
return nil, err
}
}

req, err := http.NewRequest("POST", url, body)
if err != nil {
Expand All @@ -408,6 +408,23 @@ func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request
return req, nil
}

// requestBodyReader warp io.Reader from influx.NewReader to io.ReadCloser, which is usefully to fast close the write
// side of the connection in case of error
func (c *httpClient) requestBodyReader(metrics []telegraf.Metric) (io.ReadCloser, error) {
reader := influx.NewReader(metrics, c.config.Serializer)

if c.config.ContentEncoding == "gzip" {
rc, err := internal.CompressWithGzip(reader)
if err != nil {
return nil, err
}

return rc, nil
}

return ioutil.NopCloser(reader), nil
}

func (c *httpClient) addHeaders(req *http.Request) {
if c.config.Username != "" || c.config.Password != "" {
req.SetBasicAuth(c.config.Username, c.config.Password)
Expand Down
30 changes: 23 additions & 7 deletions plugins/outputs/influxdb_v2/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
Expand Down Expand Up @@ -214,7 +215,11 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
return err
}

reader := influx.NewReader(metrics, c.serializer)
reader, err := c.requestBodyReader(metrics)
if err != nil {
return err
}
defer reader.Close()
req, err := c.makeWriteRequest(url, reader)
if err != nil {
return err
Expand Down Expand Up @@ -282,12 +287,6 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te

func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request, error) {
var err error
if c.ContentEncoding == "gzip" {
body, err = internal.CompressWithGzip(body)
if err != nil {
return nil, err
}
}

req, err := http.NewRequest("POST", url, body)
if err != nil {
Expand All @@ -304,6 +303,23 @@ func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request
return req, nil
}

// requestBodyReader warp io.Reader from influx.NewReader to io.ReadCloser, which is usefully to fast close the write
// side of the connection in case of error
func (c *httpClient) requestBodyReader(metrics []telegraf.Metric) (io.ReadCloser, error) {
reader := influx.NewReader(metrics, c.serializer)

if c.ContentEncoding == "gzip" {
rc, err := internal.CompressWithGzip(reader)
if err != nil {
return nil, err
}

return rc, nil
}

return ioutil.NopCloser(reader), nil
}

func (c *httpClient) addHeaders(req *http.Request) {
for header, value := range c.Headers {
req.Header.Set(header, value)
Expand Down

0 comments on commit 47993f5

Please sign in to comment.