-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
http_listener: pre-allocate buffer for reading request body #1892
Conversation
@supershal do you happen to have any steps to reproduce this? I haven't been able to under a very high load using our influx stress tool (>300,000 points/s) I think that this is going to have the same issue that #1826 was intending to solve, which is that we are basically doing an ioutil.ReadAll with the request body, from what I can tell of this change. we will need to test with very large request bodies, larger than 100,000 points. |
@sparrc I notice higher memory usage when http_listener timeouts while reading the request body. With fix above, when I used the same timeout for the same traffic, I do see a lot of timeout errors in log, but memory is not going up. I was not able to reproduce it on my local test. I suspect GC is collecting memory faster on my local box. I have to create similar configuration as c3.large (2 core) in order to consistently reproduce it. I will try to create comprehensive stress test tonight. I think the difference between ioutil.ReadAll and above fix is the pre-allocating buffer capacity so the buffer byte slice does not have to expand frequently while reading bytes from request body. I think that could be reason for timeouts. |
it makes some sense that pre-allocating the buffer rather than expanding it starting with a 512-byte buffer would perform better, though I'm not sure I understand why the memory usage is so much lower either.
We may want to look into using that as well. I know that in InfluxDB it's much more efficient to batch and write all the points together, and this might be why they didn't do it incrementally. |
@supershal I've been experiencing better performance and more consistent writes using your changes 👍 . I've made a few adjustments, mainly:
|
@sparrc Thanks for making enhancements to the fix. |
1fb42ff
to
c73964c
Compare
bd0af9a
to
9ced250
Compare
@sparrc These changes helps a lot to read http body faster. While going through code, I noticed that metrics get flushed to output writer when metricbatchsize is reached while adding a point to each output buffer. I will play around with couple of fixes to see if I see as similar performance as against single influxdb. Let me know what are your thoughts and suggestion to improve the performance. |
@supershal I've made some changes to your original PR, I don't think that reading the entire http body as fast as possible is the absolute best solution, because in some instances the body might be very large and we don't want to create very large allocations. For that reason I've changed it to read the body byte-by-byte up to a maximum 10MB buffer, then flush each time that fills. Any perf improvement PRs are appreciated but I wouldn't recommend relying on Telegraf as a high-throughput relay to InfluxDB and I wouldn't recommend that you implement this setup in your environment, regardless of the performance improvements we can make. From what you've said so far I'm fairly certain that if you simply use the |
ce24c8f
to
2504b97
Compare
8760187
to
71aa0d5
Compare
if the content-length of the http request is smaller than the maximum size, then we will read the entire body at once. if the content-length of the http request is larger than the maximum size, then we will read the request body in chunks no larger than 20MB at a time. This is to prevent the http handler creating huge allocations and potentially OOMing if a user posts a large file of metrics to the /write endpoint.
71aa0d5
to
dee15aa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this change is unnecessarily complex.
The "chunking" behavior can be done with a simple []byte
, io.ReadFull
, and bytes.LastIndexByte
rather than the convoluted usage of bufio.Reader
, bytes.Buffer
, reading one byte at a time, guessing when we might be filling the buffer, and arbitrarily modifying the input data to attempt to parse it...
The simplified approach of grabbing a timestamp ahead of time would also allow eliminate the difference between "chunked" from "read full" behavior. Although the MaxBytesReader
should be added after the gzip reader, to prevent a seemingly small payload from causing problems as it gets processed.
// DEFAULT_REQUEST_BODY_MAX is the default maximum request body size, in bytes. | ||
// if the request body is over this size, we will return an HTTP 413 error. | ||
// 1 GB | ||
DEFAULT_REQUEST_BODY_MAX = 1 * 1000 * 1000 * 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be a little high as a default...
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) | ||
} | ||
partial = "partial write: " | ||
var msg413 bytes.Buffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest breaking these out as separate handlers, rather than embedded in the switch statement.
} else { | ||
res.WriteHeader(http.StatusNoContent) | ||
body = http.MaxBytesReader(res, req.Body, t.MaxBodySize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MaxBytesReader
Should be set before the gzip check, so that this line isn't duplicated.
var body io.ReadCloser | ||
var err error | ||
if req.Header.Get("Content-Encoding") == "gzip" { | ||
body, err = gzip.NewReader(http.MaxBytesReader(res, req.Body, t.MaxBodySize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The gzip reader (and not the underlying connection) should be closed, probably with a defer
.
var msg413 bytes.Buffer | ||
var msg400 bytes.Buffer | ||
defer func() { | ||
if msg413.Len() > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be a utility function, and not logic embedded within a defer checking the state of a shared variable...
for { | ||
b, err := reader.ReadByte() | ||
if err != nil { | ||
if err != io.EOF { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This multi-nested conditional should be restructured to prevent the deep nesting. First example: if err == io.EOF { break }
. The !=
case has an unconditional return at the end of the block, meaning the only way the break
on line 217 gets called is if err == io.EOF
@@ -24,6 +24,14 @@ func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) { | |||
// parse even if the buffer begins with a newline | |||
buf = bytes.TrimPrefix(buf, []byte("\n")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this PR, but could just as well be bytes.TrimLeftFunc(buf, unicode.IsSpace)
if err != nil { | ||
if len(err.Error()) > 1024 { | ||
err = fmt.Errorf("Error parsing influx line-protocol (error truncated): %s", | ||
err.Error()[0:256]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra 0
, can just be [:256]
http.NotFound(res, req) | ||
} | ||
} | ||
|
||
func (t *HttpListener) parse(b []byte, errmsg *bytes.Buffer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like the idea of passing in a buffer, just to modify its state (which then gets checked in a deferred function later...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest returning an error and changing the order of the statements of this function: Iterate over the metrics returned first since that is unconditional, then conditionally return a (potentially modified) error.
// if we have a newline and we're nearing the end of the buffer, | ||
// do a write and continue with a fresh buffer. | ||
if buffer.Len() > MAX_ALLOCATION_SIZE-256*1000 && b == '\n' { | ||
t.parse(buffer.Bytes(), &msg400) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This chunked processing differs in behavior from the standard influxdb handler in that these "chunks" will end up with separate timestamps, if they weren't set on the original points. If there's going to be chunked processing, the current time should be grabbed at the beginning of the processing, then passed around as the default time to models.ParsePointsWithPrecision
. I see that models.ParsePoints
is used in the parser, which just calls models.ParsePointsWithPrecision(buf, time.Now(), "n")
. Capturing the timestamp earlier and replacing all uses of ParsePoints(buf)
with ParsePointsWithPrecision(buf, ts, "n")
would keep similar behavior to the database handler.
thank you for the contribution @supershal, I'm going to close this in favor of #1915 Our hope is that #1915 will perform better in the long run than reading the request body all at once. It's possible that in some situations it may seem as if the reading of the request body is timing out more, because it does iteratively read the body. That being said, it should allocate significantly less memory in regular average usage, and should also put substantially less pressure on GC. |
Required for all PRs:
Fixes: #1823 and #1856
We are using http_listener as a relay to route traffic between two influxdb instances in our HA setup. We noticed memory leaks when sending avg 800 metrics per batch from around 500 instances running telegraf. The bugfix #1826 caused issue with metrics parsing #1856.
This fix handles the incoming requests the same way as influxdb's HTTP write handler. It reads content-length header and preallocates buffer with that size and reads all data before parsing them using scanner. We have done some stress tests on the fix and running the fixed version since two days without any issue. The telegraf instances does not seems to leak memory and memory usage remains low.