Skip to content

Commit

Permalink
Limit the maximum size of a single http body read
Browse files Browse the repository at this point in the history
if the content-length of the http request is smaller than the maximum
    size, then we will read the entire body at once.

if the content-length of the http request is larger than the maximum
    size, then we will read the request body in chunks no larger than
    20MB at a time. This is to prevent the http handler creating huge
    allocations and potentially OOMing if a user posts a large file
    of metrics to the /write endpoint.
  • Loading branch information
sparrc committed Oct 17, 2016
1 parent c73964c commit dee15aa
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 41 deletions.
10 changes: 9 additions & 1 deletion plugins/inputs/http_listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
The HTTP listener is a service input plugin that listens for messages sent via HTTP POST.
The plugin expects messages in the InfluxDB line-protocol ONLY, other Telegraf input data formats are not supported.
The intent of the plugin is to allow Telegraf to serve as a proxy/router for the /write endpoint of the InfluxDB HTTP API.
When chaining Telegraf instances using this plugin, CREATE DATABASE requests receive a 200 OK response with message body `{"results":[]}` but they are not relayed. The output configuration of the Telegraf instance which ultimately submits data to InfluxDB determines the destination database.
When chaining Telegraf instances using this plugin, CREATE DATABASE requests receive a 200 OK response with message body `{"results":[]}` but they are not relayed.
The output configuration of the Telegraf instance which ultimately submits data to InfluxDB determines the destination database.

See: [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#influx).

Example: curl -i -XPOST 'http://localhost:8186/write' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'

### Configuration:
Expand All @@ -19,6 +21,12 @@ This is a sample configuration for the plugin.
service_address = ":8186"

## timeouts
## maximum duration before timing out read of the request
read_timeout = "10s"
## maximum duration before timing out write of the response
write_timeout = "10s"

## Maximum allowed http request body size in bytes.
## 0 means to use the default of 1,000,000,000 bytes (1 gigabyte)
max_body_size = 0
```
162 changes: 125 additions & 37 deletions plugins/inputs/http_listener/http_listener.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package http_listener

import (
"bufio"
"bytes"
"compress/gzip"
"fmt"
"io"
"log"
"net"
"net/http"
Expand All @@ -17,15 +19,25 @@ import (
"github.com/influxdata/telegraf/plugins/parsers"
)

const MAX_REQUEST_BODY_SIZE = 50 * 1024 * 1024
const (
// DEFAULT_REQUEST_BODY_MAX is the default maximum request body size, in bytes.
// if the request body is over this size, we will return an HTTP 413 error.
// 1 GB
DEFAULT_REQUEST_BODY_MAX = 1 * 1000 * 1000 * 1000

// MAX_ALLOCATION_SIZE is the maximum size, in bytes, of a single allocation
// of bytes that will be made handling a single HTTP request.
// 15 MB
MAX_ALLOCATION_SIZE = 10 * 1000 * 1000
)

type HttpListener struct {
ServiceAddress string
ReadTimeout internal.Duration
WriteTimeout internal.Duration
MaxBodySize int64

sync.Mutex
wg sync.WaitGroup

listener *stoppableListener.StoppableListener

Expand All @@ -38,8 +50,14 @@ const sampleConfig = `
service_address = ":8186"
## timeouts
## maximum duration before timing out read of the request
read_timeout = "10s"
## maximum duration before timing out write of the response
write_timeout = "10s"
## Maximum allowed http request body size in bytes.
## 0 means to use the default of 1,000,000,000 bytes (1 gigabyte)
max_body_size = 0
`

func (t *HttpListener) SampleConfig() string {
Expand All @@ -63,6 +81,10 @@ func (t *HttpListener) Start(acc telegraf.Accumulator) error {
t.Lock()
defer t.Unlock()

if t.MaxBodySize == 0 {
t.MaxBodySize = DEFAULT_REQUEST_BODY_MAX
}

t.acc = acc

var rawListener, err = net.Listen("tcp", t.ServiceAddress)
Expand All @@ -89,8 +111,6 @@ func (t *HttpListener) Stop() {
t.listener.Stop()
t.listener.Close()

t.wg.Wait()

log.Println("I! Stopped HTTP listener service on ", t.ServiceAddress)
}

Expand All @@ -113,58 +133,112 @@ func (t *HttpListener) httpListen() error {
}

func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
t.wg.Add(1)
defer t.wg.Done()

switch req.URL.Path {
case "/write":
var http400msg bytes.Buffer
var msg413 bytes.Buffer
var msg400 bytes.Buffer
defer func() {
if http400msg.Len() > 0 {
if msg413.Len() > 0 {
res.WriteHeader(http.StatusRequestEntityTooLarge)
res.Write([]byte(fmt.Sprintf(`{"error":"%s"}`, msg413.String())))
} else if msg400.Len() > 0 {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte(fmt.Sprintf(`{"error":"%s"}`, http400msg.String())))
res.Write([]byte(fmt.Sprintf(`{"error":"%s"}`, msg400.String())))
} else {
res.WriteHeader(http.StatusNoContent)
}
}()

body := req.Body
// Check that the content length is not too large for us to handle.
if req.ContentLength > t.MaxBodySize {
msg413.WriteString("http: request body too large")
return
}

// Handle gzip request bodies
var body io.ReadCloser
var err error
if req.Header.Get("Content-Encoding") == "gzip" {
b, err := gzip.NewReader(req.Body)
body, err = gzip.NewReader(http.MaxBytesReader(res, req.Body, t.MaxBodySize))
if err != nil {
http400msg.WriteString(err.Error() + " ")
msg400.WriteString(err.Error() + " ")
return
}
defer b.Close()
body = b
} else {
body = http.MaxBytesReader(res, req.Body, t.MaxBodySize)
}

allocSize := 512
if req.ContentLength < MAX_REQUEST_BODY_SIZE {
allocSize = int(req.ContentLength)
}
buf := bytes.NewBuffer(make([]byte, 0, allocSize))
_, err := buf.ReadFrom(http.MaxBytesReader(res, body, MAX_REQUEST_BODY_SIZE))
if err != nil {
log.Printf("E! HttpListener unable to read request body. error: %s\n", err.Error())
http400msg.WriteString("HttpHandler unable to read from request body: " + err.Error())
return
}

metrics, err := t.parser.Parse(buf.Bytes())
if err != nil {
log.Printf("E! HttpListener unable to parse metrics. error: %s \n", err.Error())
if len(metrics) == 0 {
http400msg.WriteString(err.Error())
} else {
http400msg.WriteString("partial write: " + err.Error())
var buffer *bytes.Buffer
if req.ContentLength < MAX_ALLOCATION_SIZE {
// if the content length is less than the max allocation size, then
// read in the whole request at once:
buffer = bytes.NewBuffer(make([]byte, 0, req.ContentLength+1))
_, err := buffer.ReadFrom(body)
if err != nil {
msg := "E! "
if netErr, ok := err.(net.Error); ok {
if netErr.Timeout() {
msg += "Read timeout error, you may want to increase the read_timeout setting. "
}
}
log.Printf(msg + err.Error())
msg400.WriteString("Error reading request body: " + err.Error())
return
}
} else {
// If the body is larger than the max allocation size then set the
// maximum size of the buffer that we will allocate at a time.
// The following loop goes through the request body byte-by-byte.
// If there is a newline within 256 kilobytes of the end of the body
// we will attempt to parse metrics, reset the buffer, and continue.
buffer = bytes.NewBuffer(make([]byte, 0, MAX_ALLOCATION_SIZE))
reader := bufio.NewReader(body)
for {
b, err := reader.ReadByte()
if err != nil {
if err != io.EOF {
msg := "E! "
if netErr, ok := err.(net.Error); ok {
if netErr.Timeout() {
msg += "Read timeout error, you may want to increase the read_timeout setting. "
}
} else {
// if it's not an EOF or a net.Error, then it's almost certainly a
// tooLarge error coming from http.MaxBytesReader. It's unlikely
// that this code path will get hit because the client should
// be setting the ContentLength header, unless it's malicious.
msg413.WriteString(err.Error())
}
log.Printf(msg + err.Error())
return
}
break
}
// returned error is always nil:
// https://golang.org/pkg/bytes/#Buffer.WriteByte
buffer.WriteByte(b)
// if we have a newline and we're nearing the end of the buffer,
// do a write and continue with a fresh buffer.
if buffer.Len() > MAX_ALLOCATION_SIZE-256*1000 && b == '\n' {
t.parse(buffer.Bytes(), &msg400)
buffer.Reset()
} else if buffer.Len() == buffer.Cap() {
// we've reached the end of our buffer without finding a newline
// in the body, so we insert a newline here and attempt to parse.
if buffer.Len() == 0 {
continue
}
buffer.WriteByte('\n')
t.parse(buffer.Bytes(), &msg400)
buffer.Reset()
}
}
}

for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
if buffer.Len() != 0 {
t.parse(buffer.Bytes(), &msg400)
}
case "/query":
// Deliver a dummy response to the query endpoint, as some InfluxDB
Expand All @@ -177,11 +251,25 @@ func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
// respond to ping requests
res.WriteHeader(http.StatusNoContent)
default:
// Don't know how to respond to calls to other endpoints
http.NotFound(res, req)
}
}

func (t *HttpListener) parse(b []byte, errmsg *bytes.Buffer) {
metrics, err := t.parser.Parse(b)
if err != nil {
if len(metrics) == 0 {
errmsg.WriteString(err.Error())
} else {
errmsg.WriteString("partial write: " + err.Error())
}
}

for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
}

func init() {
inputs.Add("http_listener", func() telegraf.Input {
return &HttpListener{}
Expand Down
21 changes: 18 additions & 3 deletions plugins/inputs/http_listener/http_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"

Expand Down Expand Up @@ -133,7 +134,11 @@ func TestWriteHTTPHighTraffic(t *testing.T) {

// // writes 5000 metrics to the listener with 10 different writers
func TestWriteHTTPHighBatchSize(t *testing.T) {
listener := &HttpListener{ServiceAddress: ":8287"}
listener := &HttpListener{
ServiceAddress: ":8287",
ReadTimeout: internal.Duration{Duration: time.Second * 30},
WriteTimeout: internal.Duration{Duration: time.Second * 30},
}
parser, _ := parsers.NewInfluxParser()
listener.SetParser(parser)

Expand All @@ -143,19 +148,29 @@ func TestWriteHTTPHighBatchSize(t *testing.T) {

time.Sleep(time.Millisecond * 25)

type result struct {
err error
resp *http.Response
}
results := make(chan *result, 10)
// post many messages to listener
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
resp, err := http.Post("http://localhost:8287/write?db=mydb", "", bytes.NewBuffer(makeMetricsBatch(5000)))
require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode)
results <- &result{err: err, resp: resp}
}()
}

wg.Wait()
close(results)
for result := range results {
require.NoError(t, result.err)
require.EqualValues(t, 204, result.resp.StatusCode)
}

time.Sleep(time.Millisecond * 50)
listener.Gather(acc)

Expand Down

0 comments on commit dee15aa

Please sign in to comment.