Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

influxdb output is messed if server shutdown the connection non-gracefully #6614

Closed
cfz opened this issue Nov 5, 2019 · 9 comments · Fixed by #6621
Closed

influxdb output is messed if server shutdown the connection non-gracefully #6614

cfz opened this issue Nov 5, 2019 · 9 comments · Fixed by #6621
Labels
area/influxdb bug unexpected problem or unintended behavior
Milestone

Comments

@cfz
Copy link
Contributor

cfz commented Nov 5, 2019

Relevant telegraf.conf:

[agent]
  interval = "100ms"
  metric_batch_size = 100
  metric_buffer_limit = 1000
  flush_interval = "1500ms"

[[outputs.influxdb]]
  urls = ["http://127.0.0.1:6086", "http://127.0.0.1:6086"]
  content_encoding = "gzip"
[[inputs.cpu]]
[[inputs.disk]]
[[inputs.diskio]]
[[inputs.kernel]]
[[inputs.kernel_vmstat]]
[[inputs.mem]]
[[inputs.processes]]
[[inputs.swap]]
[[inputs.system]]
[[inputs.net]]
[[inputs.netstat]]

System info:

telegraf version: master branch 9efc376
os: Ubuntu 19.04

Steps to reproduce:

  1. ... setup up a haproxy in front of influxdb, to simulate hard connection shutdown, the config is like following
cfz@cfz-desktop:/etc/haproxy$ cat haproxy.cfg
global
        chroot /var/lib/haproxy
        user haproxy
        group haproxy
        daemon

defaults
        mode    http
        timeout connect 5s
        timeout server  40s
        timeout client  30s
        timeout http-request 40s
        http-reuse safe

frontend http_server
        bind 0.0.0.0:6086

        http-request deny deny_status 429 if { rand(10) eq 5 } # 50% change to return 429 directly
        use_backend backend_tsdb
        option httpclose

backend backend_tsdb
        server  server1 localhost:8086
  1. ... start telegraf

Expected behavior:

to see all data written correctly, with some "429 Too Many Requests" error log

Actual behavior:

output are messed up during the retry for the second url: lots of errors of malformed line protocol are printed:

2019-11-05T02:22:12Z E! [outputs.influxdb] When writing to [http://127.0.0.1:6086]: 429 Too Many Requests
2019-11-05T02:22:12Z E! [outputs.influxdb] When writing to [http://127.0.0.1:6086]: received error partial write: unable to parse 'mem,host=cfz-desktop -desktop,host=cfz-desktop,name,name=loop16 available_perce,io_time=28i,weighted_io_time=0i,reads=57i,writes=0i,huge_page_size=40i,read_time=13i,write_time=us,iops_i=5597335552ii 1572920521302000000': invalid boolean
unable to parse 'diskio,host=cfz-desktop,name=sda2 ,,compact_migrate_scanned=104677iread_time=thp,,read_bytes=nr_anon_transparent_hugepages=,nr_file_pag=0i,numa_huge_pte_updates=write_time=0i,pageoutrunpte_updates=write_time=0ikswapd_high_wmark_hit_quickly=io_time=44i,weighted_io_time=0i 1572920521302000000': invalid number

Additional info:

i think the root case is multiple clients share a single Serializer instance, and the first async gzip pipe(CompressWithGzip) is still working after that the output receive a error response, and retry on the second url with another client

i have 2 solutions, both are tested:

  1. use separate Serializer for each http client
  2. add close flag in influx.Reader, which will be set on error response arrived

the first one is preferred, since the Serializer is a lightweight object, and each Serializer for each client make the code cleaner, and more bug-proof.

@danielnelson
Copy link
Contributor

The separate Serializer sounds good, can you open a pull request?

@danielnelson danielnelson added the bug unexpected problem or unintended behavior label Nov 5, 2019
@danielnelson danielnelson added this to the 1.12.5 milestone Nov 5, 2019
cfz added a commit to cfz/telegraf that referenced this issue Nov 6, 2019
cfz added a commit to cfz/telegraf that referenced this issue Nov 6, 2019
reader maybe used in an async manner, like CompressWithGzip, which
may continue to consume the serializer even after server return a
error. If the client retry in a short period, there may be more than
one reader consuming the single serializer, which leads to malformed
output data

refer influxdata#6614 for details
@cfz
Copy link
Contributor Author

cfz commented Nov 6, 2019

@danielnelson i found the first solution is not enough: if the single client send output in a vary shot interval, there still a chance to have more than one Reader consuming the serializer. since CompressWithGzip has no way to know the first requests is gone. to over come this. we should either create new serializer for each request, or , we find a way to inform the CompressWithGzip to stop.

the second commit add a "close flag" in the reader. which will be closed while the request is finished. and the following read for the reader will receive a EOF.

@cfz
Copy link
Contributor Author

cfz commented Nov 6, 2019

or, i think the most "correct" way is to use a sync.Pool for the serializer , and for each write request, grab a instance from the pool.

@danielnelson
Copy link
Contributor

I'd really like to avoid a sync.Pool. Only one query at a time is ran, so it seems like what we really need to do is ensure the goroutine within internal.CompressWithGzip has stopped before making another request.

Could we have CompressWithGzip return a new type that wraps the PipeReader with a Close function that will wait for the goroutine to finish?

type CloseWaitReader struct {
	reader io.Reader
	wg sync.WaitGroup
}

func (r *CloseWaitReader) Close() error {
	r.wg.Wait()
	r.reader.Close()
}

The goroutine in CompressWithGzip would call Done() on the waitgroup as its last action.

cfz added a commit to cfz/telegraf that referenced this issue Nov 7, 2019
reader maybe used in an async manner, like CompressWithGzip, which
may continue to consume the serializer even after server return a
error. If the client retry in a short period, there may be more than
one reader consuming the single serializer, which leads to malformed
output data

refer influxdata#6614 for details
@cfz
Copy link
Contributor Author

cfz commented Nov 7, 2019

sounds good, but i found it's better to implement the logic in the source(influx.Reader). http client will not automatically close the reader when it receive error response, which means we may wait for a pretty lone time before CompressWithGzip's goroutine return.
By putting the close method in influx.Read, we can fail fast, and may save some network bandwidth.

i have updated the second commit, please have a look

@danielnelson
Copy link
Contributor

My issue with this method is that it still leaves the argument to CompressWithGzip unsafe for future use, without similar code for any user, since we can't determine when the goroutine has exited. It imposes the concurrency related sync on any user of the function, instead of encapsulating the concurrency which is a detail of it's implementation. Often the influx.Reader isn't even using gzip so it doesn't feel like it should have sync code. To me it feels like its in the wrong spot.

But I see what you mean, until io.Copy completes we can't exit the goroutine, which would mean we need to compress the full body even when we have already failed.

I think what this means for my previous solution is that we would need to take an io.ReadCloser as an argument to CompressWithGzip and close it when the returned io.ReadCloser is closed, so io.Copy will return immediately, instead of only the influx.Reader. We need to make sure that we have a handle to the returned reader so maybe add a function that creates the body io.Reader to the output plugin?

func (c *httpClient) requestBodyReader(metrics []telegraf.Metric) (io.ReadCloser, error) {
	reader := influx.NewReader(metrics, c.config.Serializer)
	if c.config.ContentEncoding == "gzip" {
		reader, err = internal.CompressWithGzip(body)
		if err != nil {
			return nil, err
		}
	}
	return reader
}

I hope that makes sense... it's all fairly tricky code in my book.

@cfz
Copy link
Contributor Author

cfz commented Nov 9, 2019

i got your point. make sense for me. i've updated the PR. and i will supplement unit test later if you think the api is ok.
thanks for the detailed explanation~~

@danielnelson
Copy link
Contributor

PR looks great, thanks so much. Let's go ahead with some unit tests if possible.

@danielnelson danielnelson modified the milestones: 1.12.5, 1.13.0 Nov 12, 2019
@cfz
Copy link
Contributor Author

cfz commented Nov 13, 2019

i added a test for CompressWithGzip. For those "request body makers" calling CompressWithGzip, i think they are pretty straight forward, so i didn't write UT for them....

please have a look~

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/influxdb bug unexpected problem or unintended behavior
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants