From e3ae4a0754b40aa438b07d9ac3743344edac3e83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 3 Jul 2019 20:24:01 +0300 Subject: [PATCH 1/9] runutil: add Exhaust* fns, initial users --- pkg/promclient/promclient.go | 10 +++++----- pkg/runutil/runutil.go | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index 1da84ec4c0..e35e21a646 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -68,7 +68,7 @@ func ExternalLabels(ctx context.Context, logger log.Logger, base *url.URL) (labe if err != nil { return nil, errors.Wrapf(err, "request flags against %s", u.String()) } - defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body") + defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body") b, err := ioutil.ReadAll(resp.Body) if err != nil { @@ -185,7 +185,7 @@ func ConfiguredFlags(ctx context.Context, logger log.Logger, base *url.URL) (Fla if err != nil { return Flags{}, errors.Wrapf(err, "request config against %s", u.String()) } - defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body") + defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body") b, err := ioutil.ReadAll(resp.Body) if err != nil { @@ -234,7 +234,7 @@ func Snapshot(ctx context.Context, logger log.Logger, base *url.URL, skipHead bo if err != nil { return "", errors.Wrapf(err, "request snapshot against %s", u.String()) } - defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body") + defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body") b, err := ioutil.ReadAll(resp.Body) if err != nil { @@ -317,7 +317,7 @@ func QueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query s if err != nil { return nil, nil, errors.Wrapf(err, "perform GET request against %s", u.String()) } - defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body") + defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body") // Decode only ResultType and load Result only as RawJson since we don't know // structure of the Result yet. @@ -452,7 +452,7 @@ func MetricValues(ctx context.Context, logger log.Logger, base *url.URL, perMetr if err != nil { return errors.Wrapf(err, "perform GET request against %s", u.String()) } - defer runutil.CloseWithLogOnErr(logger, resp.Body, "metrics body") + defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "metrics body") if resp.StatusCode != http.StatusOK { return errors.Errorf("server returned HTTP status %s", resp.Status) diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go index ae63cae4a2..a46e896c44 100644 --- a/pkg/runutil/runutil.go +++ b/pkg/runutil/runutil.go @@ -38,11 +38,18 @@ // // ... // // If Close() returns error, err will capture it and return by argument. +// +// The rununtil.Exhaust* family of functions provide the same functionality but +// they take an io.ReadCloser and they exhaust the whole reader before closing +// them. They are useful when trying to use http keep-alive connections because +// for the same connection to be re-used the whole response body needs to be +// exhausted. package runutil import ( "fmt" "io" + "io/ioutil" "os" "time" @@ -108,6 +115,12 @@ func CloseWithLogOnErr(logger log.Logger, closer io.Closer, format string, a ... level.Warn(logger).Log("msg", "detected close error", "err", errors.Wrap(err, fmt.Sprintf(format, a...))) } +// ExhaustCloseWithLogOnErr closes the io.ReadCloser with a log message on error but exhausts the reader before. +func ExhaustCloseWithLogOnErr(logger log.Logger, readcloser io.ReadCloser, format string, a ...interface{}) { + io.Copy(ioutil.Discard, readcloser) + CloseWithLogOnErr(logger, readcloser, format, a...) +} + // CloseWithErrCapture runs function and on error return error by argument including the given error (usually // from caller function). func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...interface{}) { @@ -118,3 +131,9 @@ func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...inter *err = merr.Err() } + +// ExhaustCloseWithErrCapture closes the io.ReadCloser with error capture but exhausts the reader before. +func ExhaustCloseWithErrCapture(err *error, readcloser io.ReadCloser, format string, a ...interface{}) { + io.Copy(ioutil.Discard, readcloser) + CloseWithErrCapture(err, readcloser, format, a...) +} From c05d89afa24e2d8d77b3ac02e2a70aa1b6be333f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 3 Jul 2019 20:29:13 +0300 Subject: [PATCH 2/9] runutil: explicitly ignore return values --- pkg/runutil/runutil.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go index a46e896c44..2c6ffb0879 100644 --- a/pkg/runutil/runutil.go +++ b/pkg/runutil/runutil.go @@ -117,7 +117,7 @@ func CloseWithLogOnErr(logger log.Logger, closer io.Closer, format string, a ... // ExhaustCloseWithLogOnErr closes the io.ReadCloser with a log message on error but exhausts the reader before. func ExhaustCloseWithLogOnErr(logger log.Logger, readcloser io.ReadCloser, format string, a ...interface{}) { - io.Copy(ioutil.Discard, readcloser) + var _, _ = io.Copy(ioutil.Discard, readcloser) // best effort CloseWithLogOnErr(logger, readcloser, format, a...) } @@ -134,6 +134,6 @@ func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...inter // ExhaustCloseWithErrCapture closes the io.ReadCloser with error capture but exhausts the reader before. func ExhaustCloseWithErrCapture(err *error, readcloser io.ReadCloser, format string, a ...interface{}) { - io.Copy(ioutil.Discard, readcloser) + var _, _ = io.Copy(ioutil.Discard, readcloser) // best effort CloseWithErrCapture(err, readcloser, format, a...) } From a274d7790744d416dddc81065121e26533766bdc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 9 Jul 2019 15:30:47 +0300 Subject: [PATCH 3/9] runutil: fix according to comments --- pkg/runutil/runutil.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go index 2c6ffb0879..41b12ab306 100644 --- a/pkg/runutil/runutil.go +++ b/pkg/runutil/runutil.go @@ -116,9 +116,9 @@ func CloseWithLogOnErr(logger log.Logger, closer io.Closer, format string, a ... } // ExhaustCloseWithLogOnErr closes the io.ReadCloser with a log message on error but exhausts the reader before. -func ExhaustCloseWithLogOnErr(logger log.Logger, readcloser io.ReadCloser, format string, a ...interface{}) { - var _, _ = io.Copy(ioutil.Discard, readcloser) // best effort - CloseWithLogOnErr(logger, readcloser, format, a...) +func ExhaustCloseWithLogOnErr(logger log.Logger, r io.ReadCloser, format string, a ...interface{}) { + io.Copy(ioutil.Discard, r) + CloseWithLogOnErr(logger, r, format, a...) } // CloseWithErrCapture runs function and on error return error by argument including the given error (usually @@ -133,7 +133,7 @@ func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...inter } // ExhaustCloseWithErrCapture closes the io.ReadCloser with error capture but exhausts the reader before. -func ExhaustCloseWithErrCapture(err *error, readcloser io.ReadCloser, format string, a ...interface{}) { - var _, _ = io.Copy(ioutil.Discard, readcloser) // best effort - CloseWithErrCapture(err, readcloser, format, a...) +func ExhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string, a ...interface{}) { + io.Copy(ioutil.Discard, r) + CloseWithErrCapture(err, r, format, a...) } From d601b01920cc13b97916e50427e3c35632210dd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 9 Jul 2019 15:34:54 +0300 Subject: [PATCH 4/9] pkg/*: convert more users to Exhaust*() --- pkg/alert/alert.go | 2 +- pkg/objstore/cos/cos.go | 2 +- pkg/reloader/reloader.go | 2 +- pkg/store/prometheus.go | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/alert/alert.go b/pkg/alert/alert.go index 87ceab7052..9806802ebb 100644 --- a/pkg/alert/alert.go +++ b/pkg/alert/alert.go @@ -375,7 +375,7 @@ func (s *Sender) sendOne(ctx context.Context, url string, b []byte) error { if err != nil { return errors.Wrapf(err, "send request to %q", url) } - defer runutil.CloseWithLogOnErr(s.logger, resp.Body, "send one alert") + defer runutil.ExhaustCloseWithLogOnErr(s.logger, resp.Body, "send one alert") if resp.StatusCode/100 != 2 { return errors.Errorf("bad response status %v from %q", resp.Status, url) diff --git a/pkg/objstore/cos/cos.go b/pkg/objstore/cos/cos.go index 946d4bd750..7e9a937e2f 100644 --- a/pkg/objstore/cos/cos.go +++ b/pkg/objstore/cos/cos.go @@ -145,7 +145,7 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) ( return nil, err } if _, err := resp.Body.Read(nil); err != nil { - runutil.CloseWithLogOnErr(b.logger, resp.Body, "cos get range obj close") + runutil.ExhaustCloseWithLogOnErr(b.logger, resp.Body, "cos get range obj close") return nil, err } diff --git a/pkg/reloader/reloader.go b/pkg/reloader/reloader.go index 88b87a80bb..7cd14b051c 100644 --- a/pkg/reloader/reloader.go +++ b/pkg/reloader/reloader.go @@ -329,7 +329,7 @@ func (r *Reloader) triggerReload(ctx context.Context) error { if err != nil { return errors.Wrap(err, "reload request failed") } - defer runutil.CloseWithLogOnErr(r.logger, resp.Body, "trigger reload resp body") + defer runutil.ExhaustCloseWithLogOnErr(r.logger, resp.Body, "trigger reload resp body") if resp.StatusCode != 200 { return errors.Errorf("received non-200 response: %s; have you set `--web.enable-lifecycle` Prometheus flag?", resp.Status) diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 12cff5d085..b0eef1e10b 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -251,7 +251,7 @@ func (p *PrometheusStore) promSeries(ctx context.Context, q prompb.Query) (*prom return nil, errors.Wrap(err, "send request") } spanReqDo.Finish() - defer runutil.CloseWithLogOnErr(p.logger, presp.Body, "prom series request body") + defer runutil.ExhaustCloseWithLogOnErr(p.logger, presp.Body, "prom series request body") if presp.StatusCode/100 != 2 { return nil, errors.Errorf("request failed with code %s", presp.Status) @@ -377,7 +377,7 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesR if err != nil { return nil, status.Error(codes.Internal, err.Error()) } - defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label names request body") + defer runutil.ExhaustCloseWithLogOnErr(p.logger, resp.Body, "label names request body") if resp.StatusCode/100 != 2 { return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status)) @@ -437,7 +437,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue if err != nil { return nil, status.Error(codes.Internal, err.Error()) } - defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label values request body") + defer runutil.ExhaustCloseWithLogOnErr(p.logger, resp.Body, "label values request body") if resp.StatusCode/100 != 2 { return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status)) From 38d38c7e2a9b698a0f6b790000a0bd9746be38ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 9 Jul 2019 15:37:39 +0300 Subject: [PATCH 5/9] runutil: fix errcheck --- pkg/runutil/runutil.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go index 41b12ab306..b425f40043 100644 --- a/pkg/runutil/runutil.go +++ b/pkg/runutil/runutil.go @@ -117,7 +117,7 @@ func CloseWithLogOnErr(logger log.Logger, closer io.Closer, format string, a ... // ExhaustCloseWithLogOnErr closes the io.ReadCloser with a log message on error but exhausts the reader before. func ExhaustCloseWithLogOnErr(logger log.Logger, r io.ReadCloser, format string, a ...interface{}) { - io.Copy(ioutil.Discard, r) + _, _ = io.Copy(ioutil.Discard, r) CloseWithLogOnErr(logger, r, format, a...) } @@ -134,6 +134,6 @@ func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...inter // ExhaustCloseWithErrCapture closes the io.ReadCloser with error capture but exhausts the reader before. func ExhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string, a ...interface{}) { - io.Copy(ioutil.Discard, r) + _, _ = io.Copy(ioutil.Discard, r) CloseWithErrCapture(err, r, format, a...) } From 3fcb06af923c46a208ae0a64cf6376465390b79c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 9 Jul 2019 15:41:33 +0300 Subject: [PATCH 6/9] CHANGELOG: add item --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 923e6cd0e5..e61890d953 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Changed - [#1284](https://github.com/improbable-eng/thanos/pull/1284) Add support for multiple label-sets in Info gRPC service. This deprecates the single `Labels` slice of the `InfoResponse`, in a future release backward compatible handling for the single set of Labels will be removed. Upgrading to v0.6.0 or higher is advised. +- [#1302](https://github.com/improbable-eng/thanos/pull/1302) Thanos now efficiently reuses HTTP keep-alive connections ## [v0.5.0](https://github.com/improbable-eng/thanos/releases/tag/v0.5.0) - 2019.06.05 From a1cad0cc5a6dc9bb74ac31450af438f85d08a90e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 19 Jul 2019 13:56:13 +0300 Subject: [PATCH 7/9] runutil: inform the user if exhaustion fails --- pkg/runutil/runutil.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go index b425f40043..80417fa8d0 100644 --- a/pkg/runutil/runutil.go +++ b/pkg/runutil/runutil.go @@ -117,7 +117,9 @@ func CloseWithLogOnErr(logger log.Logger, closer io.Closer, format string, a ... // ExhaustCloseWithLogOnErr closes the io.ReadCloser with a log message on error but exhausts the reader before. func ExhaustCloseWithLogOnErr(logger log.Logger, r io.ReadCloser, format string, a ...interface{}) { - _, _ = io.Copy(ioutil.Discard, r) + _, err := io.Copy(ioutil.Discard, r) + level.Warn(logger).Log("msg", "failed to exhaust reader, performance may be impeded", "err", err) + CloseWithLogOnErr(logger, r, format, a...) } @@ -134,6 +136,14 @@ func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...inter // ExhaustCloseWithErrCapture closes the io.ReadCloser with error capture but exhausts the reader before. func ExhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string, a ...interface{}) { - _, _ = io.Copy(ioutil.Discard, r) + _, copyErr := io.Copy(ioutil.Discard, r) + CloseWithErrCapture(err, r, format, a...) + + // Prepend the io.Copy error. + merr := tsdberrors.MultiError{} + merr.Add(copyErr) + merr.Add(*err) + + *err = merr.Err() } From 4a575b47f46ebc90c8d13c406f3eb0cdda85aeea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 19 Jul 2019 15:39:52 +0300 Subject: [PATCH 8/9] CHANGELOG: fix up the merge --- CHANGELOG.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index edd0b42fe6..5f5269e30b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,8 +59,6 @@ The other `type` you can use is `JAEGER` now. The `config` keys and values are J ### Changed -- [#1284](https://github.com/improbable-eng/thanos/pull/1284) Add support for multiple label-sets in Info gRPC service. This deprecates the single `Labels` slice of the `InfoResponse`, in a future release backward compatible handling for the single set of Labels will be removed. Upgrading to v0.6.0 or higher is advised. -======= - [#1284](https://github.com/improbable-eng/thanos/pull/1284) Add support for multiple label-sets in Info gRPC service. This deprecates the single `Labels` slice of the `InfoResponse`, in a future release backward compatible handling for the single set of Labels will be removed. Upgrading to v0.6.0 or higher is advised. *breaking* If you run have duplicate queries in your Querier configuration with hierarchical federation of multiple Queries this PR makes Thanos Querier to detect this case and block all duplicates. Refer to 0.6.1 which at least allows for single replica to work. From 22e52ebbb24bea4198956b7dfcbd601450ae7e36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 22 Jul 2019 10:28:31 +0300 Subject: [PATCH 9/9] runutil: add missing `if err != nil` --- pkg/runutil/runutil.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go index 80417fa8d0..48ceb0a124 100644 --- a/pkg/runutil/runutil.go +++ b/pkg/runutil/runutil.go @@ -118,7 +118,9 @@ func CloseWithLogOnErr(logger log.Logger, closer io.Closer, format string, a ... // ExhaustCloseWithLogOnErr closes the io.ReadCloser with a log message on error but exhausts the reader before. func ExhaustCloseWithLogOnErr(logger log.Logger, r io.ReadCloser, format string, a ...interface{}) { _, err := io.Copy(ioutil.Discard, r) - level.Warn(logger).Log("msg", "failed to exhaust reader, performance may be impeded", "err", err) + if err != nil { + level.Warn(logger).Log("msg", "failed to exhaust reader, performance may be impeded", "err", err) + } CloseWithLogOnErr(logger, r, format, a...) }