Skip to content

Commit

Permalink
Unify error handling
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <[email protected]>
  • Loading branch information
kakkoyun committed Jul 15, 2020
1 parent f15b0c0 commit f3f2cc5
Show file tree
Hide file tree
Showing 2 changed files with 235 additions and 96 deletions.
168 changes: 112 additions & 56 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
stdlog "log"
"net"
"net/http"
"sort"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -54,10 +55,13 @@ const (
labelError = "error"
)

// conflictErr is returned whenever an operation fails due to any conflict-type error.
var conflictErr = errors.New("conflict")
var (
// errConflict is returned whenever an operation fails due to any conflict-type error.
errConflict = errors.New("conflict")

var errBadReplica = errors.New("replica count exceeds replication factor")
errBadReplica = errors.New("replica count exceeds replication factor")
errNotReady = errors.New("target not ready")
)

// Options for the web Handler.
type Options struct {
Expand Down Expand Up @@ -248,13 +252,7 @@ func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string,
// Forward any time series as necessary. All time series
// destined for the local node will be written to the receiver.
// Time series will be replicated as necessary.
if err := h.forward(ctx, tenant, r, wreq); err != nil {
if countCause(err, isConflict) > 0 {
return conflictErr
}
return err
}
return nil
return h.forward(ctx, tenant, r, wreq)
}

func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -295,12 +293,16 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
}

err = h.handleRequest(ctx, rep, tenant, &wreq)
switch err {
if err != nil {
level.Debug(h.logger).Log("msg", "failed to handle request", "err", err)
}

switch rootCause(err) {
case nil:
return
case tsdb.ErrNotReady:
case errNotReady:
http.Error(w, err.Error(), http.StatusServiceUnavailable)
case conflictErr:
case errConflict:
http.Error(w, err.Error(), http.StatusConflict)
case errBadReplica:
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down Expand Up @@ -402,7 +404,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
})
if err != nil {
h.replications.WithLabelValues(labelError).Inc()
ec <- errors.Wrapf(err, "replicate write request, endpoint %v", endpoint)
ec <- errors.Wrapf(err, "replicate write request for endpoint %v", endpoint)
return
}

Expand Down Expand Up @@ -430,16 +432,8 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
if err != nil {
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested.
// To avoid breaking the counting logic, we need to flatten the error.
if errs, ok := err.(terrors.MultiError); ok {
if countCause(errs, isConflict) > 0 {
err = errors.Wrap(conflictErr, errs.Error())
} else if countCause(errs, isNotReady) > 0 {
err = tsdb.ErrNotReady
} else {
err = errors.New(errs.Error())
}
}
ec <- errors.Wrapf(err, "storing locally, endpoint %v", endpoint)
level.Debug(h.logger).Log("msg", "local tsdb write failed", "err", err.Error())
ec <- errors.Wrapf(cause(err, 1), "store locally for endpoint %v", endpoint)
return
}
ec <- nil
Expand Down Expand Up @@ -560,15 +554,8 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri
quorum := h.writeQuorum()
// fanoutForward only returns an error if successThreshold (quorum) is not reached.
if err := h.fanoutForward(ctx, tenant, replicas, wreqs, quorum); err != nil {
if countCause(err, isNotReady) >= quorum {
return errors.Wrap(tsdb.ErrNotReady, "replicate: quorum not reached")
}
if countCause(err, isConflict) >= quorum {
return errors.Wrap(conflictErr, "replicate: quorum not reached")
}
return errors.Wrap(err, "unexpected error, before quorum is reached")
return errors.Wrap(cause(err, quorum), "quorum not reached")
}

return nil
}

Expand All @@ -578,12 +565,15 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
defer span.Finish()

err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, &prompb.WriteRequest{Timeseries: r.Timeseries})
switch err {
if err != nil {
level.Debug(h.logger).Log("msg", "failed to handle request", "err", err)
}
switch rootCause(err) {
case nil:
return &storepb.WriteResponse{}, nil
case tsdb.ErrNotReady:
case errNotReady:
return nil, status.Error(codes.Unavailable, err.Error())
case conflictErr:
case errConflict:
return nil, status.Error(codes.AlreadyExists, err.Error())
case errBadReplica:
return nil, status.Error(codes.InvalidArgument, err.Error())
Expand All @@ -592,43 +582,109 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
}
}

// countCause counts the number of errors within the given error
// whose causes satisfy the given function.
// countCause will inspect the error's cause or, if the error is a MultiError,
// the cause of each contained error but will not traverse any deeper.
func countCause(err error, f func(error) bool) int {
errs, ok := err.(terrors.MultiError)
if !ok {
errs = []error{err}
}
var n int
for i := range errs {
if f(errors.Cause(errs[i])) {
n++
}
}
return n
}

// isConflict returns whether or not the given error represents a conflict.
func isConflict(err error) bool {
if err == nil {
return false
}
return err == conflictErr ||
return err == errConflict ||
err == storage.ErrDuplicateSampleForTimestamp ||
err == storage.ErrOutOfOrderSample ||
err == storage.ErrOutOfBounds ||
err.Error() == strconv.Itoa(http.StatusConflict) ||
status.Code(err) == codes.AlreadyExists
}

// isNotReady returns whether or not the given error represents a not ready error.
func isNotReady(err error) bool {
return err == tsdb.ErrNotReady ||
return err == errNotReady ||
err == tsdb.ErrNotReady ||
status.Code(err) == codes.Unavailable
}

// countCause counts the number of errors within the given error slice
// whose causes satisfy the given function.
// cause will inspect the error's cause or, if the error is a MultiError,
// the cause of each contained error but will not traverse any deeper.
func countCause(errs []error, f func(error) bool) int {
var n int
for _, err := range errs {
errs, ok := err.(terrors.MultiError)
if ok {
n += countCause(errs, f)
continue
}

if f(errors.Cause(err)) {
n++
}
}
return n
}

type expectedErrors []*struct {
err error
cause func(error) bool
count int
}

func (a expectedErrors) Len() int { return len(a) }
func (a expectedErrors) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a expectedErrors) Less(i, j int) bool { return a[i].count < a[j].count }

// cause returns a known error which has occurred more than the given threshold.
// cause will inspect the error's cause or, if the error is a MultiError,
// the cause of each contained error but will not traverse any deeper.
func cause(err error, threshold int) error {
if err == nil {
return nil
}
unwrappedErr := errors.Cause(err)
errs, ok := unwrappedErr.(terrors.MultiError)
if !ok {
errs = []error{unwrappedErr}
}
if threshold > 0 {
expErrs := expectedErrors{
{err: errConflict, cause: isConflict},
{err: errNotReady, cause: isNotReady},
}
for _, exp := range expErrs {
exp.count = countCause(errs, exp.cause)
}
sort.Sort(sort.Reverse(expErrs))
if exp := expErrs[0]; exp.count >= threshold {
return exp.err
}
}
if len(errs) == 1 {
return err
}
if len(errs) == 0 {
return nil
}
return errors.Wrap(err, "unexpected error")
}

// rootCause extracts the root cause of a fan-out error.
func rootCause(err error) error {
if err == nil {
return nil
}
err = errors.Cause(cause(err, 1))
errs, ok := err.(terrors.MultiError)
if !ok {
return err
}
return errors.New(errs.Error())
}

// peerState encapsulates the number of request attempt made against a peer and,
// next allowed time for the next attempt.
type peerState struct {
attempt float64
nextAllowed time.Time
}

func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup {
return &peerGroup{
dialOpts: dialOpts,
Expand Down
Loading

0 comments on commit f3f2cc5

Please sign in to comment.