Skip to content

Commit

Permalink
Address review issues
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <[email protected]>
  • Loading branch information
kakkoyun committed Dec 17, 2020
1 parent 0b41518 commit 8039222
Showing 1 changed file with 26 additions and 24 deletions.
50 changes: 26 additions & 24 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
level.Debug(h.logger).Log("msg", "failed to handle request", "err", err)
}

switch writeErrorCause(err) {
switch determineWriteErrorCause(err, 1) {
case nil:
return
case errNotReady:
Expand Down Expand Up @@ -453,7 +453,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested.
// To avoid breaking the counting logic, we need to flatten the error.
level.Debug(h.logger).Log("msg", "local tsdb write failed", "err", err.Error())
ec <- errors.Wrapf(writeErrorCause(err), "store locally for endpoint %v", endpoint)
ec <- errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint)
return
}
ec <- nil
Expand Down Expand Up @@ -619,7 +619,7 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
if err != nil {
level.Debug(h.logger).Log("msg", "failed to handle request", "err", err)
}
switch writeErrorCause(err) {
switch determineWriteErrorCause(err, 1) {
case nil:
return &storepb.WriteResponse{}, nil
case errNotReady:
Expand Down Expand Up @@ -684,39 +684,41 @@ func determineWriteErrorCause(err error, threshold int) error {
if err == nil {
return nil
}

unwrappedErr := errors.Cause(err)
errs, ok := unwrappedErr.(errutil.MultiError)
if !ok {
errs = []error{unwrappedErr}
}
if threshold > 0 {
expErrs := expectedErrors{
{err: errConflict, cause: isConflict},
{err: errNotReady, cause: isNotReady},
{err: errUnavailable, cause: isUnavailable},
}
for _, exp := range expErrs {
exp.count = 0
for _, err := range errs {
if exp.cause(errors.Cause(err)) {
exp.count++
}
if len(errs) == 0 {
return nil
}

if threshold == 0 {
return nil
}
expErrs := expectedErrors{
{err: errConflict, cause: isConflict},
{err: errNotReady, cause: isNotReady},
{err: errUnavailable, cause: isUnavailable},
}
for _, exp := range expErrs {
exp.count = 0
for _, err := range errs {
if exp.cause(errors.Cause(err)) {
exp.count++
}
}
sort.Sort(sort.Reverse(expErrs))
if exp := expErrs[0]; exp.count >= threshold {
return exp.err
}
}
if len(errs) == 0 {
return nil
// Determine which error occurred most.
sort.Sort(sort.Reverse(expErrs))
if exp := expErrs[0]; exp.count >= threshold {
return exp.err
}

return err
}

// writeErrorCause extracts the root cause of a write error.
func writeErrorCause(err error) error { return determineWriteErrorCause(err, 1) }

func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup {
return &peerGroup{
dialOpts: dialOpts,
Expand Down

0 comments on commit 8039222

Please sign in to comment.