Skip to content

Commit

Permalink
receive: Unify error handling (#2899)
Browse files Browse the repository at this point in the history
* Unify error handling

Signed-off-by: Kemal Akkoyun <[email protected]>

* Do not traverse nested multierrors

Signed-off-by: Kemal Akkoyun <[email protected]>

* Trigger CI

Signed-off-by: Kemal Akkoyun <[email protected]>

* Simplify

Signed-off-by: Kemal Akkoyun <[email protected]>

* Rename methods

Signed-off-by: Kemal Akkoyun <[email protected]>

* Address review issues

Signed-off-by: Kemal Akkoyun <[email protected]>
  • Loading branch information
kakkoyun authored Dec 18, 2020
1 parent 5c12d46 commit 4723967
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 95 deletions.
128 changes: 73 additions & 55 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
stdlog "log"
"net"
"net/http"
"sort"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -56,8 +57,8 @@ const (
)

var (
// conflictErr is returned whenever an operation fails due to any conflict-type error.
conflictErr = errors.New("conflict")
// errConflict is returned whenever an operation fails due to any conflict-type error.
errConflict = errors.New("conflict")

errBadReplica = errors.New("replica count exceeds replication factor")
errNotReady = errors.New("target not ready")
Expand Down Expand Up @@ -269,13 +270,7 @@ func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string,
// Forward any time series as necessary. All time series
// destined for the local node will be written to the receiver.
// Time series will be replicated as necessary.
if err := h.forward(ctx, tenant, r, wreq); err != nil {
if countCause(err, isConflict) > 0 {
return conflictErr
}
return err
}
return nil
return h.forward(ctx, tenant, r, wreq)
}

func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -316,14 +311,18 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
}

err = h.handleRequest(ctx, rep, tenant, &wreq)
switch err {
if err != nil {
level.Debug(h.logger).Log("msg", "failed to handle request", "err", err)
}

switch determineWriteErrorCause(err, 1) {
case nil:
return
case errNotReady:
http.Error(w, err.Error(), http.StatusServiceUnavailable)
case errUnavailable:
http.Error(w, err.Error(), http.StatusServiceUnavailable)
case conflictErr:
case errConflict:
http.Error(w, err.Error(), http.StatusConflict)
case errBadReplica:
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down Expand Up @@ -425,7 +424,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
})
if err != nil {
h.replications.WithLabelValues(labelError).Inc()
ec <- errors.Wrapf(err, "replicate write request, endpoint %v", endpoint)
ec <- errors.Wrapf(err, "replicate write request for endpoint %v", endpoint)
return
}

Expand Down Expand Up @@ -453,16 +452,8 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
if err != nil {
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested.
// To avoid breaking the counting logic, we need to flatten the error.
if errs, ok := err.(errutil.MultiError); ok {
if countCause(errs, isConflict) > 0 {
err = errors.Wrap(conflictErr, errs.Error())
} else if countCause(errs, isNotReady) > 0 {
err = errNotReady
} else {
err = errors.New(errs.Error())
}
}
ec <- errors.Wrapf(err, "storing locally, endpoint %v", endpoint)
level.Debug(h.logger).Log("msg", "local tsdb write failed", "err", err.Error())
ec <- errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint)
return
}
ec <- nil
Expand Down Expand Up @@ -614,18 +605,8 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri
quorum := h.writeQuorum()
// fanoutForward only returns an error if successThreshold (quorum) is not reached.
if err := h.fanoutForward(ctx, tenant, replicas, wreqs, quorum); err != nil {
if countCause(err, isNotReady) >= quorum {
return errors.Wrap(errNotReady, "replicate: quorum not reached")
}
if countCause(err, isConflict) >= quorum {
return errors.Wrap(conflictErr, "replicate: quorum not reached")
}
if countCause(err, isUnavailable) >= quorum {
return errors.Wrap(errUnavailable, "replicate: quorum not reached")
}
return errors.Wrap(err, "unexpected error, before quorum is reached")
return errors.Wrap(determineWriteErrorCause(err, quorum), "quorum not reached")
}

return nil
}

Expand All @@ -635,14 +616,17 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
defer span.Finish()

err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, &prompb.WriteRequest{Timeseries: r.Timeseries})
switch err {
if err != nil {
level.Debug(h.logger).Log("msg", "failed to handle request", "err", err)
}
switch determineWriteErrorCause(err, 1) {
case nil:
return &storepb.WriteResponse{}, nil
case errNotReady:
return nil, status.Error(codes.Unavailable, err.Error())
case errUnavailable:
return nil, status.Error(codes.Unavailable, err.Error())
case conflictErr:
case errConflict:
return nil, status.Error(codes.AlreadyExists, err.Error())
case errBadReplica:
return nil, status.Error(codes.InvalidArgument, err.Error())
Expand All @@ -651,34 +635,15 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
}
}

// countCause counts the number of errors within the given error
// whose causes satisfy the given function.
// countCause will inspect the error's cause or, if the error is a MultiError,
// the cause of each contained error but will not traverse any deeper.
func countCause(err error, f func(error) bool) int {
errs, ok := err.(errutil.MultiError)
if !ok {
errs = []error{err}
}
var n int
for i := range errs {
if f(errors.Cause(errs[i])) {
n++
}
}
return n
}

// isConflict returns whether or not the given error represents a conflict.
func isConflict(err error) bool {
if err == nil {
return false
}
return err == conflictErr ||
return err == errConflict ||
err == storage.ErrDuplicateSampleForTimestamp ||
err == storage.ErrOutOfOrderSample ||
err == storage.ErrOutOfBounds ||
err.Error() == strconv.Itoa(http.StatusConflict) ||
status.Code(err) == codes.AlreadyExists
}

Expand All @@ -702,6 +667,59 @@ type retryState struct {
nextAllowed time.Time
}

type expectedErrors []*struct {
err error
cause func(error) bool
count int
}

func (a expectedErrors) Len() int { return len(a) }
func (a expectedErrors) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a expectedErrors) Less(i, j int) bool { return a[i].count < a[j].count }

// determineWriteErrorCause extracts a sentinel error that has occurred more than the given threshold from a given fan-out error.
// It will inspect the error's cause if the error is a MultiError,
// It will return cause of each contained error but will not traverse any deeper.
func determineWriteErrorCause(err error, threshold int) error {
if err == nil {
return nil
}

unwrappedErr := errors.Cause(err)
errs, ok := unwrappedErr.(errutil.MultiError)
if !ok {
errs = []error{unwrappedErr}
}
if len(errs) == 0 {
return nil
}

if threshold < 1 {
return err
}

expErrs := expectedErrors{
{err: errConflict, cause: isConflict},
{err: errNotReady, cause: isNotReady},
{err: errUnavailable, cause: isUnavailable},
}
for _, exp := range expErrs {
exp.count = 0
for _, err := range errs {
if exp.cause(errors.Cause(err)) {
exp.count++
}
}
}
// Determine which error occurred most.
sort.Sort(sort.Reverse(expErrs))
if exp := expErrs[0]; exp.count >= threshold {
return exp.err
}

return err
}

func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup {
return &peerGroup{
dialOpts: dialOpts,
Expand Down
Loading

0 comments on commit 4723967

Please sign in to comment.