Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runutil: add Exhaust* fns, initial users #1302

Merged
merged 11 commits into from
Jul 22, 2019
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
### Changed

- [#1284](https://github.com/improbable-eng/thanos/pull/1284) Add support for multiple label-sets in Info gRPC service. This deprecates the single `Labels` slice of the `InfoResponse`, in a future release backward compatible handling for the single set of Labels will be removed. Upgrading to v0.6.0 or higher is advised.
- [#1302](https://github.com/improbable-eng/thanos/pull/1302) Thanos now efficiently reuses HTTP keep-alive connections

## [v0.5.0](https://github.com/improbable-eng/thanos/releases/tag/v0.5.0) - 2019.06.05

Expand Down
2 changes: 1 addition & 1 deletion pkg/alert/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (s *Sender) sendOne(ctx context.Context, url string, b []byte) error {
if err != nil {
return errors.Wrapf(err, "send request to %q", url)
}
defer runutil.CloseWithLogOnErr(s.logger, resp.Body, "send one alert")
defer runutil.ExhaustCloseWithLogOnErr(s.logger, resp.Body, "send one alert")

if resp.StatusCode/100 != 2 {
return errors.Errorf("bad response status %v from %q", resp.Status, url)
Expand Down
2 changes: 1 addition & 1 deletion pkg/objstore/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (
return nil, err
}
if _, err := resp.Body.Read(nil); err != nil {
runutil.CloseWithLogOnErr(b.logger, resp.Body, "cos get range obj close")
runutil.ExhaustCloseWithLogOnErr(b.logger, resp.Body, "cos get range obj close")
return nil, err
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func ExternalLabels(ctx context.Context, logger log.Logger, base *url.URL) (labe
if err != nil {
return nil, errors.Wrapf(err, "request flags against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")
defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
Expand Down Expand Up @@ -185,7 +185,7 @@ func ConfiguredFlags(ctx context.Context, logger log.Logger, base *url.URL) (Fla
if err != nil {
return Flags{}, errors.Wrapf(err, "request config against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")
defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
Expand Down Expand Up @@ -234,7 +234,7 @@ func Snapshot(ctx context.Context, logger log.Logger, base *url.URL, skipHead bo
if err != nil {
return "", errors.Wrapf(err, "request snapshot against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")
defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
Expand Down Expand Up @@ -317,7 +317,7 @@ func QueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query s
if err != nil {
return nil, nil, errors.Wrapf(err, "perform GET request against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")
defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body")

// Decode only ResultType and load Result only as RawJson since we don't know
// structure of the Result yet.
Expand Down Expand Up @@ -452,7 +452,7 @@ func MetricValues(ctx context.Context, logger log.Logger, base *url.URL, perMetr
if err != nil {
return errors.Wrapf(err, "perform GET request against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "metrics body")
defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "metrics body")

if resp.StatusCode != http.StatusOK {
return errors.Errorf("server returned HTTP status %s", resp.Status)
Expand Down
2 changes: 1 addition & 1 deletion pkg/reloader/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (r *Reloader) triggerReload(ctx context.Context) error {
if err != nil {
return errors.Wrap(err, "reload request failed")
}
defer runutil.CloseWithLogOnErr(r.logger, resp.Body, "trigger reload resp body")
defer runutil.ExhaustCloseWithLogOnErr(r.logger, resp.Body, "trigger reload resp body")

if resp.StatusCode != 200 {
return errors.Errorf("received non-200 response: %s; have you set `--web.enable-lifecycle` Prometheus flag?", resp.Status)
Expand Down
19 changes: 19 additions & 0 deletions pkg/runutil/runutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,18 @@
// // ...
//
// If Close() returns error, err will capture it and return by argument.
//
// The rununtil.Exhaust* family of functions provide the same functionality but
// they take an io.ReadCloser and they exhaust the whole reader before closing
// them. They are useful when trying to use http keep-alive connections because
// for the same connection to be re-used the whole response body needs to be
// exhausted.
package runutil

import (
"fmt"
"io"
"io/ioutil"
"os"
"time"

Expand Down Expand Up @@ -108,6 +115,12 @@ func CloseWithLogOnErr(logger log.Logger, closer io.Closer, format string, a ...
level.Warn(logger).Log("msg", "detected close error", "err", errors.Wrap(err, fmt.Sprintf(format, a...)))
}

// ExhaustCloseWithLogOnErr closes the io.ReadCloser with a log message on error but exhausts the reader before.
func ExhaustCloseWithLogOnErr(logger log.Logger, r io.ReadCloser, format string, a ...interface{}) {
_, _ = io.Copy(ioutil.Discard, r)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically log on error might mean checking err here as well (: But I think it's fine for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true but such an error message would be harmless and users would just ignore it, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No if you want to exhaust and that failed, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah but I'm not sure what an user could do. Plus, if that doesn't have any negative consequences besides just a little bit lower performance then I'm not sure that it's that important. I will add checking here as well, I just hope that it will not lead to unnecessary noise in logs 😄

CloseWithLogOnErr(logger, r, format, a...)
}

// CloseWithErrCapture runs function and on error return error by argument including the given error (usually
// from caller function).
func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...interface{}) {
Expand All @@ -118,3 +131,9 @@ func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...inter

*err = merr.Err()
}

// ExhaustCloseWithErrCapture closes the io.ReadCloser with error capture but exhausts the reader before.
func ExhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string, a ...interface{}) {
_, _ = io.Copy(ioutil.Discard, r)
CloseWithErrCapture(err, r, format, a...)
}
6 changes: 3 additions & 3 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (p *PrometheusStore) promSeries(ctx context.Context, q prompb.Query) (*prom
return nil, errors.Wrap(err, "send request")
}
spanReqDo.Finish()
defer runutil.CloseWithLogOnErr(p.logger, presp.Body, "prom series request body")
defer runutil.ExhaustCloseWithLogOnErr(p.logger, presp.Body, "prom series request body")

if presp.StatusCode/100 != 2 {
return nil, errors.Errorf("request failed with code %s", presp.Status)
Expand Down Expand Up @@ -377,7 +377,7 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesR
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label names request body")
defer runutil.ExhaustCloseWithLogOnErr(p.logger, resp.Body, "label names request body")

if resp.StatusCode/100 != 2 {
return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status))
Expand Down Expand Up @@ -437,7 +437,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label values request body")
defer runutil.ExhaustCloseWithLogOnErr(p.logger, resp.Body, "label values request body")

if resp.StatusCode/100 != 2 {
return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status))
Expand Down