Skip to content

Commit

Permalink
Fix writer errors
Browse files Browse the repository at this point in the history
Signed-off-by: fpetkovski <[email protected]>
  • Loading branch information
fpetkovski committed Nov 26, 2022
1 parent b002700 commit d83a624
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
32 changes: 22 additions & 10 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,10 +777,9 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
if !more {
for _, rerr := range seriesErrs {
if seriesReplicated {
errs.Add(rerr.Cause())
errs.Add(rerr.ErrOrNil())
} else {
cause := rerr.replicationErr(quorum)
errs.Add(errors.Wrapf(cause, rerr.Error()))
errs.Add(rerr.replicationErr(quorum).ErrOrNil())
}
}
return errs.ErrOrNil()
Expand Down Expand Up @@ -932,7 +931,13 @@ func (es *writeErrors) Add(err error) {
if es.reasonSet == nil {
es.reasonSet = make(map[string]struct{})
}
es.reasonSet[err.Error()] = struct{}{}
if werr, ok := err.(*writeErrors); ok {
for reason := range werr.reasonSet {
es.reasonSet[reason] = struct{}{}
}
} else {
es.reasonSet[err.Error()] = struct{}{}
}
}

// ErrOrNil returns the writeErrors instance if any
Expand Down Expand Up @@ -981,13 +986,13 @@ func (es *writeErrors) Cause() error {
// If no single error has occurred more than the threshold, but the
// total number of errors meets the threshold,
// replicationErr will return errInternal.
func (es *writeErrors) replicationErr(threshold int) error {
func (es *writeErrors) replicationErr(threshold int) *writeErrors {
if es == nil {
return nil
return &writeErrors{}
}

if len(es.errs) == 0 {
return nil
return &writeErrors{}
}

expErrs := expectedErrors{
Expand All @@ -1003,17 +1008,24 @@ func (es *writeErrors) replicationErr(threshold int) error {
}
}
}

// Determine which error occurred most.
sort.Sort(sort.Reverse(expErrs))
if exp := expErrs[0]; exp.count >= threshold {
return exp.err
return &writeErrors{
errs: []error{exp.err},
reasonSet: es.reasonSet,
}
}

if len(es.errs) >= threshold {
return errInternal
return &writeErrors{
errs: []error{errInternal},
reasonSet: es.reasonSet,
}
}

return nil
return &writeErrors{}
}

// Error returns a string containing a deduplicated set of reasons.
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,5 +209,5 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
if err := app.Commit(); err != nil {
errs.Add(errors.Wrap(err, "commit samples"))
}
return errs.Cause()
return errs.ErrOrNil()
}

0 comments on commit d83a624

Please sign in to comment.