diff --git a/CHANGELOG.md b/CHANGELOG.md index cd3b840cf9b5f..7bc5e7f8d70e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,7 +30,7 @@ continue sending logs to /var/log/telegraf/telegraf.log. - [#1542](https://github.com/influxdata/telegraf/pull/1542): Add filestack webhook plugin. - [#1599](https://github.com/influxdata/telegraf/pull/1599): Add server hostname for each docker measurements. - [#1697](https://github.com/influxdata/telegraf/pull/1697): Add NATS output plugin. -- [#1407](https://github.com/influxdata/telegraf/pull/1407): HTTP service listener input plugin. +- [#1407](https://github.com/influxdata/telegraf/pull/1407) & [#1915](https://github.com/influxdata/telegraf/pull/1915): HTTP service listener input plugin. - [#1699](https://github.com/influxdata/telegraf/pull/1699): Add database blacklist option for Postgresql - [#1791](https://github.com/influxdata/telegraf/pull/1791): Add Docker container state metrics to Docker input plugin output - [#1755](https://github.com/influxdata/telegraf/issues/1755): Add support to SNMP for IP & MAC address conversion. diff --git a/internal/buffer/buffer.go b/internal/buffer/buffer.go index b7a05bf0337af..58cd1c3764d7e 100644 --- a/internal/buffer/buffer.go +++ b/internal/buffer/buffer.go @@ -1,6 +1,8 @@ package buffer import ( + "sync" + "github.com/influxdata/telegraf" ) @@ -11,6 +13,8 @@ type Buffer struct { drops int // total metrics added total int + + mu sync.Mutex } // NewBuffer returns a Buffer @@ -61,11 +65,13 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) { // the batch will be of maximum length batchSize. It can be less than batchSize, // if the length of Buffer is less than batchSize. func (b *Buffer) Batch(batchSize int) []telegraf.Metric { + b.mu.Lock() n := min(len(b.buf), batchSize) out := make([]telegraf.Metric, n) for i := 0; i < n; i++ { out[i] = <-b.buf } + b.mu.Unlock() return out } diff --git a/plugins/inputs/http_listener/bufferpool.go b/plugins/inputs/http_listener/bufferpool.go new file mode 100644 index 0000000000000..00a93652db2fb --- /dev/null +++ b/plugins/inputs/http_listener/bufferpool.go @@ -0,0 +1,43 @@ +package http_listener + +import ( + "sync/atomic" +) + +type pool struct { + buffers chan []byte + size int + + created int64 +} + +// NewPool returns a new pool object. +// n is the number of buffers +// bufSize is the size (in bytes) of each buffer +func NewPool(n, bufSize int) *pool { + return &pool{ + buffers: make(chan []byte, n), + size: bufSize, + } +} + +func (p *pool) get() []byte { + select { + case b := <-p.buffers: + return b + default: + atomic.AddInt64(&p.created, 1) + return make([]byte, p.size) + } +} + +func (p *pool) put(b []byte) { + select { + case p.buffers <- b: + default: + } +} + +func (p *pool) ncreated() int64 { + return atomic.LoadInt64(&p.created) +} diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index 2eeee8e75657d..ddc9ac7bf8cec 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -1,9 +1,9 @@ package http_listener import ( - "bufio" "bytes" - "fmt" + "compress/gzip" + "io" "log" "net" "net/http" @@ -13,135 +13,137 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/influxdata/telegraf/plugins/inputs/http_listener/stoppableListener" - "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/influx" ) -type HttpListener struct { +const ( + // DEFAULT_MAX_BODY_SIZE is the default maximum request body size, in bytes. + // if the request body is over this size, we will return an HTTP 413 error. + // 500 MB + DEFAULT_MAX_BODY_SIZE = 500 * 1024 * 1024 + + // MAX_LINE_SIZE is the maximum size, in bytes, that can be allocated for + // a single InfluxDB point. + // 64 KB + DEFAULT_MAX_LINE_SIZE = 64 * 1024 +) + +type HTTPListener struct { ServiceAddress string ReadTimeout internal.Duration WriteTimeout internal.Duration + MaxBodySize int64 + MaxLineSize int - sync.Mutex + mu sync.Mutex wg sync.WaitGroup - listener *stoppableListener.StoppableListener + listener net.Listener - parser parsers.Parser + parser influx.InfluxParser acc telegraf.Accumulator + pool *pool } const sampleConfig = ` ## Address and port to host HTTP listener on service_address = ":8186" - ## timeouts + ## maximum duration before timing out read of the request read_timeout = "10s" + ## maximum duration before timing out write of the response write_timeout = "10s" + + ## Maximum allowed http request body size in bytes. + ## 0 means to use the default of 536,870,912 bytes (500 mebibytes) + max_body_size = 0 + + ## Maximum line size allowed to be sent in bytes. + ## 0 means to use the default of 65536 bytes (64 kibibytes) + max_line_size = 0 ` -func (t *HttpListener) SampleConfig() string { +func (h *HTTPListener) SampleConfig() string { return sampleConfig } -func (t *HttpListener) Description() string { +func (h *HTTPListener) Description() string { return "Influx HTTP write listener" } -func (t *HttpListener) Gather(_ telegraf.Accumulator) error { +func (h *HTTPListener) Gather(_ telegraf.Accumulator) error { + log.Printf("D! The http_listener has created %d buffers", h.pool.ncreated()) return nil } -func (t *HttpListener) SetParser(parser parsers.Parser) { - t.parser = parser -} - // Start starts the http listener service. -func (t *HttpListener) Start(acc telegraf.Accumulator) error { - t.Lock() - defer t.Unlock() +func (h *HTTPListener) Start(acc telegraf.Accumulator) error { + h.mu.Lock() + defer h.mu.Unlock() - t.acc = acc - - var rawListener, err = net.Listen("tcp", t.ServiceAddress) - if err != nil { - return err + if h.MaxBodySize == 0 { + h.MaxBodySize = DEFAULT_MAX_BODY_SIZE + } + if h.MaxLineSize == 0 { + h.MaxLineSize = DEFAULT_MAX_LINE_SIZE } - t.listener, err = stoppableListener.New(rawListener) + + h.acc = acc + h.pool = NewPool(200, h.MaxLineSize) + + var listener, err = net.Listen("tcp", h.ServiceAddress) if err != nil { return err } + h.listener = listener - go t.httpListen() + h.wg.Add(1) + go func() { + defer h.wg.Done() + h.httpListen() + }() - log.Printf("I! Started HTTP listener service on %s\n", t.ServiceAddress) + log.Printf("I! Started HTTP listener service on %s\n", h.ServiceAddress) return nil } // Stop cleans up all resources -func (t *HttpListener) Stop() { - t.Lock() - defer t.Unlock() - - t.listener.Stop() - t.listener.Close() +func (h *HTTPListener) Stop() { + h.mu.Lock() + defer h.mu.Unlock() - t.wg.Wait() + h.listener.Close() + h.wg.Wait() - log.Println("I! Stopped HTTP listener service on ", t.ServiceAddress) + log.Println("I! Stopped HTTP listener service on ", h.ServiceAddress) } -// httpListen listens for HTTP requests. -func (t *HttpListener) httpListen() error { - if t.ReadTimeout.Duration < time.Second { - t.ReadTimeout.Duration = time.Second * 10 +// httpListen sets up an http.Server and calls server.Serve. +// like server.Serve, httpListen will always return a non-nil error, for this +// reason, the error returned should probably be ignored. +// see https://golang.org/pkg/net/http/#Server.Serve +func (h *HTTPListener) httpListen() error { + if h.ReadTimeout.Duration < time.Second { + h.ReadTimeout.Duration = time.Second * 10 } - if t.WriteTimeout.Duration < time.Second { - t.WriteTimeout.Duration = time.Second * 10 + if h.WriteTimeout.Duration < time.Second { + h.WriteTimeout.Duration = time.Second * 10 } var server = http.Server{ - Handler: t, - ReadTimeout: t.ReadTimeout.Duration, - WriteTimeout: t.WriteTimeout.Duration, + Handler: h, + ReadTimeout: h.ReadTimeout.Duration, + WriteTimeout: h.WriteTimeout.Duration, } - return server.Serve(t.listener) + return server.Serve(h.listener) } -func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { - t.wg.Add(1) - defer t.wg.Done() - +func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { switch req.URL.Path { case "/write": - var http400msg bytes.Buffer - var partial string - scanner := bufio.NewScanner(req.Body) - scanner.Buffer([]byte(""), 128*1024) - for scanner.Scan() { - metrics, err := t.parser.Parse(scanner.Bytes()) - if err == nil { - for _, m := range metrics { - t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) - } - partial = "partial write: " - } else { - http400msg.WriteString(err.Error() + " ") - } - } - - if err := scanner.Err(); err != nil { - http.Error(res, "Internal server error: "+err.Error(), http.StatusInternalServerError) - } else if http400msg.Len() > 0 { - res.Header().Set("Content-Type", "application/json") - res.Header().Set("X-Influxdb-Version", "1.0") - res.WriteHeader(http.StatusBadRequest) - res.Write([]byte(fmt.Sprintf(`{"error":"%s%s"}`, partial, http400msg.String()))) - } else { - res.WriteHeader(http.StatusNoContent) - } + h.serveWrite(res, req) case "/query": // Deliver a dummy response to the query endpoint, as some InfluxDB // clients test endpoint availability with a query @@ -158,8 +160,135 @@ func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { } } +func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { + // Check that the content length is not too large for us to handle. + if req.ContentLength > h.MaxBodySize { + tooLarge(res) + return + } + now := time.Now() + + // Handle gzip request bodies + body := req.Body + var err error + if req.Header.Get("Content-Encoding") == "gzip" { + body, err = gzip.NewReader(req.Body) + defer body.Close() + if err != nil { + log.Println("E! " + err.Error()) + badRequest(res) + return + } + } + body = http.MaxBytesReader(res, body, h.MaxBodySize) + + var return400 bool + var hangingBytes bool + buf := h.pool.get() + defer h.pool.put(buf) + bufStart := 0 + for { + n, err := io.ReadFull(body, buf[bufStart:]) + if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { + log.Println("E! " + err.Error()) + // problem reading the request body + badRequest(res) + return + } + + if err == io.EOF { + if return400 { + badRequest(res) + } else { + res.WriteHeader(http.StatusNoContent) + } + return + } + + if hangingBytes { + i := bytes.IndexByte(buf, '\n') + if i == -1 { + // still didn't find a newline, keep scanning + continue + } + // rotate the bit remaining after the first newline to the front of the buffer + i++ // start copying after the newline + bufStart = len(buf) - i + if bufStart > 0 { + copy(buf, buf[i:]) + } + hangingBytes = false + continue + } + + if err == io.ErrUnexpectedEOF { + // finished reading the request body + if err := h.parse(buf[:n+bufStart], now); err != nil { + log.Println("E! " + err.Error()) + return400 = true + } + if return400 { + badRequest(res) + } else { + res.WriteHeader(http.StatusNoContent) + } + return + } + + // if we got down here it means that we filled our buffer, and there + // are still bytes remaining to be read. So we will parse up until the + // final newline, then push the rest of the bytes into the next buffer. + i := bytes.LastIndexByte(buf, '\n') + if i == -1 { + // drop any line longer than the max buffer size + log.Printf("E! http_listener received a single line longer than the maximum of %d bytes", + len(buf)) + hangingBytes = true + return400 = true + bufStart = 0 + continue + } + if err := h.parse(buf[:i], now); err != nil { + log.Println("E! " + err.Error()) + return400 = true + } + // rotate the bit remaining after the last newline to the front of the buffer + i++ // start copying after the newline + bufStart = len(buf) - i + if bufStart > 0 { + copy(buf, buf[i:]) + } + } +} + +func (h *HTTPListener) parse(b []byte, t time.Time) error { + metrics, err := h.parser.ParseWithDefaultTime(b, t) + + for _, m := range metrics { + h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } + + return err +} + +func tooLarge(res http.ResponseWriter) { + res.Header().Set("Content-Type", "application/json") + res.Header().Set("X-Influxdb-Version", "1.0") + res.WriteHeader(http.StatusRequestEntityTooLarge) + res.Write([]byte(`{"error":"http: request body too large"}`)) +} + +func badRequest(res http.ResponseWriter) { + res.Header().Set("Content-Type", "application/json") + res.Header().Set("X-Influxdb-Version", "1.0") + res.WriteHeader(http.StatusBadRequest) + res.Write([]byte(`{"error":"http: bad request"}`)) +} + func init() { inputs.Add("http_listener", func() telegraf.Input { - return &HttpListener{} + return &HTTPListener{ + ServiceAddress: ":8186", + } }) } diff --git a/plugins/inputs/http_listener/http_listener_test.go b/plugins/inputs/http_listener/http_listener_test.go index 267ba56a106f6..84cf209ff5f2a 100644 --- a/plugins/inputs/http_listener/http_listener_test.go +++ b/plugins/inputs/http_listener/http_listener_test.go @@ -1,16 +1,16 @@ package http_listener import ( + "bytes" + "io/ioutil" + "net/http" "sync" "testing" "time" - "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" - "bytes" "github.com/stretchr/testify/require" - "net/http" ) const ( @@ -27,17 +27,15 @@ cpu_load_short,host=server06 value=12.0 1422568543702900257 emptyMsg = "" ) -func newTestHttpListener() *HttpListener { - listener := &HttpListener{ +func newTestHTTPListener() *HTTPListener { + listener := &HTTPListener{ ServiceAddress: ":8186", } return listener } func TestWriteHTTP(t *testing.T) { - listener := newTestHttpListener() - parser, _ := parsers.NewInfluxParser() - listener.SetParser(parser) + listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -71,10 +69,10 @@ func TestWriteHTTP(t *testing.T) { ) } - // Post a gigantic metric to the listener: + // Post a gigantic metric to the listener and verify that an error is returned: resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric))) require.NoError(t, err) - require.EqualValues(t, 204, resp.StatusCode) + require.EqualValues(t, 400, resp.StatusCode) time.Sleep(time.Millisecond * 15) acc.AssertContainsTaggedFields(t, "cpu_load_short", @@ -83,11 +81,133 @@ func TestWriteHTTP(t *testing.T) { ) } +func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) { + listener := &HTTPListener{ + ServiceAddress: ":8296", + MaxLineSize: 128 * 1000, + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + + // Post a gigantic metric to the listener and verify that it writes OK this time: + resp, err := http.Post("http://localhost:8296/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric))) + require.NoError(t, err) + require.EqualValues(t, 204, resp.StatusCode) +} + +func TestWriteHTTPVerySmallMaxBody(t *testing.T) { + listener := &HTTPListener{ + ServiceAddress: ":8297", + MaxBodySize: 4096, + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + + resp, err := http.Post("http://localhost:8297/write", "", bytes.NewBuffer([]byte(hugeMetric))) + require.NoError(t, err) + require.EqualValues(t, 413, resp.StatusCode) +} + +func TestWriteHTTPVerySmallMaxLineSize(t *testing.T) { + listener := &HTTPListener{ + ServiceAddress: ":8298", + MaxLineSize: 70, + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + + resp, err := http.Post("http://localhost:8298/write", "", bytes.NewBuffer([]byte(testMsgs))) + require.NoError(t, err) + require.EqualValues(t, 204, resp.StatusCode) + + time.Sleep(time.Millisecond * 15) + hostTags := []string{"server02", "server03", + "server04", "server05", "server06"} + for _, hostTag := range hostTags { + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": hostTag}, + ) + } +} + +func TestWriteHTTPLargeLinesSkipped(t *testing.T) { + listener := &HTTPListener{ + ServiceAddress: ":8300", + MaxLineSize: 100, + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + + resp, err := http.Post("http://localhost:8300/write", "", bytes.NewBuffer([]byte(hugeMetric+testMsgs))) + require.NoError(t, err) + require.EqualValues(t, 400, resp.StatusCode) + + time.Sleep(time.Millisecond * 15) + hostTags := []string{"server02", "server03", + "server04", "server05", "server06"} + for _, hostTag := range hostTags { + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": hostTag}, + ) + } +} + +// test that writing gzipped data works +func TestWriteHTTPGzippedData(t *testing.T) { + listener := &HTTPListener{ + ServiceAddress: ":8299", + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + + data, err := ioutil.ReadFile("./testdata/testmsgs.gz") + require.NoError(t, err) + + req, err := http.NewRequest("POST", "http://localhost:8299/write", bytes.NewBuffer(data)) + require.NoError(t, err) + req.Header.Set("Content-Encoding", "gzip") + + client := &http.Client{} + resp, err := client.Do(req) + require.NoError(t, err) + require.EqualValues(t, 204, resp.StatusCode) + + time.Sleep(time.Millisecond * 50) + hostTags := []string{"server02", "server03", + "server04", "server05", "server06"} + for _, hostTag := range hostTags { + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": hostTag}, + ) + } +} + // writes 25,000 metrics to the listener with 10 different writers func TestWriteHTTPHighTraffic(t *testing.T) { - listener := &HttpListener{ServiceAddress: ":8286"} - parser, _ := parsers.NewInfluxParser() - listener.SetParser(parser) + listener := &HTTPListener{ServiceAddress: ":8286"} acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -110,15 +230,14 @@ func TestWriteHTTPHighTraffic(t *testing.T) { } wg.Wait() - time.Sleep(time.Millisecond * 50) + time.Sleep(time.Millisecond * 250) listener.Gather(acc) require.Equal(t, int64(25000), int64(acc.NMetrics())) } func TestReceive404ForInvalidEndpoint(t *testing.T) { - listener := newTestHttpListener() - listener.parser, _ = parsers.NewInfluxParser() + listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -135,8 +254,7 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) { func TestWriteHTTPInvalid(t *testing.T) { time.Sleep(time.Millisecond * 250) - listener := newTestHttpListener() - listener.parser, _ = parsers.NewInfluxParser() + listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -153,8 +271,7 @@ func TestWriteHTTPInvalid(t *testing.T) { func TestWriteHTTPEmpty(t *testing.T) { time.Sleep(time.Millisecond * 250) - listener := newTestHttpListener() - listener.parser, _ = parsers.NewInfluxParser() + listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -171,8 +288,7 @@ func TestWriteHTTPEmpty(t *testing.T) { func TestQueryAndPingHTTP(t *testing.T) { time.Sleep(time.Millisecond * 250) - listener := newTestHttpListener() - listener.parser, _ = parsers.NewInfluxParser() + listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) diff --git a/plugins/inputs/http_listener/stoppableListener/LICENSE b/plugins/inputs/http_listener/stoppableListener/LICENSE deleted file mode 100644 index eb07824517a4d..0000000000000 --- a/plugins/inputs/http_listener/stoppableListener/LICENSE +++ /dev/null @@ -1,10 +0,0 @@ -Copyright (c) 2014, Eric Urban -All rights reserved. - -Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/plugins/inputs/http_listener/stoppableListener/listener.go b/plugins/inputs/http_listener/stoppableListener/listener.go deleted file mode 100644 index 69a9f33cc3b46..0000000000000 --- a/plugins/inputs/http_listener/stoppableListener/listener.go +++ /dev/null @@ -1,62 +0,0 @@ -package stoppableListener - -import ( - "errors" - "net" - "time" -) - -type StoppableListener struct { - *net.TCPListener //Wrapped listener - stop chan int //Channel used only to indicate listener should shutdown -} - -func New(l net.Listener) (*StoppableListener, error) { - tcpL, ok := l.(*net.TCPListener) - - if !ok { - return nil, errors.New("Cannot wrap listener") - } - - retval := &StoppableListener{} - retval.TCPListener = tcpL - retval.stop = make(chan int) - - return retval, nil -} - -var StoppedError = errors.New("Listener stopped") - -func (sl *StoppableListener) Accept() (net.Conn, error) { - - for { - //Wait up to one second for a new connection - sl.SetDeadline(time.Now().Add(time.Second)) - - newConn, err := sl.TCPListener.Accept() - - //Check for the channel being closed - select { - case <-sl.stop: - return nil, StoppedError - default: - //If the channel is still open, continue as normal - } - - if err != nil { - netErr, ok := err.(net.Error) - - //If this is a timeout, then continue to wait for - //new connections - if ok && netErr.Timeout() && netErr.Temporary() { - continue - } - } - - return newConn, err - } -} - -func (sl *StoppableListener) Stop() { - close(sl.stop) -} diff --git a/plugins/inputs/http_listener/testdata/testmsgs.gz b/plugins/inputs/http_listener/testdata/testmsgs.gz new file mode 100644 index 0000000000000..f524dc07128b9 Binary files /dev/null and b/plugins/inputs/http_listener/testdata/testmsgs.gz differ diff --git a/plugins/parsers/influx/parser.go b/plugins/parsers/influx/parser.go index 68b7497fe3a61..8ced6ed50e56f 100644 --- a/plugins/parsers/influx/parser.go +++ b/plugins/parsers/influx/parser.go @@ -3,6 +3,7 @@ package influx import ( "bytes" "fmt" + "time" "github.com/influxdata/telegraf" @@ -15,15 +16,10 @@ type InfluxParser struct { DefaultTags map[string]string } -// Parse returns a slice of Metrics from a text representation of a -// metric (in line-protocol format) -// with each metric separated by newlines. If any metrics fail to parse, -// a non-nil error will be returned in addition to the metrics that parsed -// successfully. -func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) { +func (p *InfluxParser) ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) { // parse even if the buffer begins with a newline buf = bytes.TrimPrefix(buf, []byte("\n")) - points, err := models.ParsePoints(buf) + points, err := models.ParsePointsWithPrecision(buf, t, "n") metrics := make([]telegraf.Metric, len(points)) for i, point := range points { for k, v := range p.DefaultTags { @@ -39,6 +35,15 @@ func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) { return metrics, err } +// Parse returns a slice of Metrics from a text representation of a +// metric (in line-protocol format) +// with each metric separated by newlines. If any metrics fail to parse, +// a non-nil error will be returned in addition to the metrics that parsed +// successfully. +func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) { + return p.ParseWithDefaultTime(buf, time.Now()) +} + func (p *InfluxParser) ParseLine(line string) (telegraf.Metric, error) { metrics, err := p.Parse([]byte(line + "\n"))