From 8039222846a28d2a313ac9479e4e58d9e922f6c1 Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Thu, 17 Dec 2020 11:20:54 +0300 Subject: [PATCH] Address review issues Signed-off-by: Kemal Akkoyun --- pkg/receive/handler.go | 50 ++++++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 6c229428609..589a9f32a64 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -315,7 +315,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { level.Debug(h.logger).Log("msg", "failed to handle request", "err", err) } - switch writeErrorCause(err) { + switch determineWriteErrorCause(err, 1) { case nil: return case errNotReady: @@ -453,7 +453,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma // When a MultiError is added to another MultiError, the error slices are concatenated, not nested. // To avoid breaking the counting logic, we need to flatten the error. level.Debug(h.logger).Log("msg", "local tsdb write failed", "err", err.Error()) - ec <- errors.Wrapf(writeErrorCause(err), "store locally for endpoint %v", endpoint) + ec <- errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint) return } ec <- nil @@ -619,7 +619,7 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st if err != nil { level.Debug(h.logger).Log("msg", "failed to handle request", "err", err) } - switch writeErrorCause(err) { + switch determineWriteErrorCause(err, 1) { case nil: return &storepb.WriteResponse{}, nil case errNotReady: @@ -684,39 +684,41 @@ func determineWriteErrorCause(err error, threshold int) error { if err == nil { return nil } + unwrappedErr := errors.Cause(err) errs, ok := unwrappedErr.(errutil.MultiError) if !ok { errs = []error{unwrappedErr} } - if threshold > 0 { - expErrs := expectedErrors{ - {err: errConflict, cause: isConflict}, - {err: errNotReady, cause: isNotReady}, - {err: errUnavailable, cause: isUnavailable}, - } - for _, exp := range expErrs { - exp.count = 0 - for _, err := range errs { - if exp.cause(errors.Cause(err)) { - exp.count++ - } + if len(errs) == 0 { + return nil + } + + if threshold == 0 { + return nil + } + expErrs := expectedErrors{ + {err: errConflict, cause: isConflict}, + {err: errNotReady, cause: isNotReady}, + {err: errUnavailable, cause: isUnavailable}, + } + for _, exp := range expErrs { + exp.count = 0 + for _, err := range errs { + if exp.cause(errors.Cause(err)) { + exp.count++ } } - sort.Sort(sort.Reverse(expErrs)) - if exp := expErrs[0]; exp.count >= threshold { - return exp.err - } } - if len(errs) == 0 { - return nil + // Determine which error occurred most. + sort.Sort(sort.Reverse(expErrs)) + if exp := expErrs[0]; exp.count >= threshold { + return exp.err } + return err } -// writeErrorCause extracts the root cause of a write error. -func writeErrorCause(err error) error { return determineWriteErrorCause(err, 1) } - func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup { return &peerGroup{ dialOpts: dialOpts,