diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index c436d32e8f1..9b93431be10 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -767,8 +767,11 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e }() quorum := h.writeQuorum() + if seriesReplicated { + quorum = 1 + } successes := make([]int, numSeries) - seriesErrs := make([]writeErrors, numSeries) + seriesErrs := newSeriesErrors(quorum, numSeries) for { select { case <-fctx.Done(): @@ -776,11 +779,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e case wresp, more := <-ec: if !more { for _, rerr := range seriesErrs { - if seriesReplicated { - errs.Add(rerr.ErrOrNil()) - } else { - errs.Add(rerr.replicationErr(quorum).ErrOrNil()) - } + errs.Add(rerr) } return errs.ErrOrNil() } @@ -911,18 +910,46 @@ func (a expectedErrors) Len() int { return len(a) } func (a expectedErrors) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a expectedErrors) Less(i, j int) bool { return a[i].count < a[j].count } -// writeErrors contains all errors that have -// occurred during a remote-write request. -type writeErrors struct { +// errorSet is a set of errors. +type errorSet struct { reasonSet map[string]struct{} errs []error } -// Add adds an error to the writeErrors. -func (es *writeErrors) Add(err error) { +// Error returns a string containing a deduplicated set of reasons. +func (es errorSet) Error() string { + if len(es.reasonSet) == 0 { + return "" + } + reasons := make([]string, 0, len(es.reasonSet)) + for reason := range es.reasonSet { + reasons = append(reasons, reason) + } + sort.Strings(reasons) + + var buf bytes.Buffer + if len(reasons) > 1 { + fmt.Fprintf(&buf, "%d errors: ", len(es.reasonSet)) + } + + var more bool + for _, reason := range reasons { + if more { + buf.WriteString("; ") + } + buf.WriteString(reason) + more = true + } + + return buf.String() +} + +// Add adds an error to the errorSet. +func (es *errorSet) Add(err error) { if err == nil { return } + if len(es.errs) == 0 { es.errs = []error{err} } else { @@ -931,18 +958,30 @@ func (es *writeErrors) Add(err error) { if es.reasonSet == nil { es.reasonSet = make(map[string]struct{}) } - if werr, ok := err.(*writeErrors); ok { + + switch werr := err.(type) { + case *replicationErrors: for reason := range werr.reasonSet { es.reasonSet[reason] = struct{}{} } - } else { + case *writeErrors: + for reason := range werr.reasonSet { + es.reasonSet[reason] = struct{}{} + } + default: es.reasonSet[err.Error()] = struct{}{} } } +// writeErrors contains all errors that have +// occurred during a remote-write request. +type writeErrors struct { + errorSet +} + // ErrOrNil returns the writeErrors instance if any // errors are contained in it. -// Otherwise it returns nil. +// Otherwise, it returns nil. func (es *writeErrors) ErrOrNil() error { if len(es.errs) == 0 { return nil @@ -981,18 +1020,21 @@ func (es *writeErrors) Cause() error { return nonRecoverableErr } -// replicationErr extracts a sentinel error with the highest occurrence -// that has happened more than the given threshold. +// replicationErrors contains errors that have happened while +// replicating a time series within a remote-write request. +type replicationErrors struct { + errorSet + threshold int +} + +// Cause extracts a sentinel error with the highest occurrence that +// has happened more than the given threshold. // If no single error has occurred more than the threshold, but the // total number of errors meets the threshold, // replicationErr will return errInternal. -func (es *writeErrors) replicationErr(threshold int) *writeErrors { - if es == nil { - return &writeErrors{} - } - +func (es *replicationErrors) Cause() error { if len(es.errs) == 0 { - return &writeErrors{} + return errorSet{} } expErrs := expectedErrors{ @@ -1011,49 +1053,23 @@ func (es *writeErrors) replicationErr(threshold int) *writeErrors { // Determine which error occurred most. sort.Sort(sort.Reverse(expErrs)) - if exp := expErrs[0]; exp.count >= threshold { - return &writeErrors{ - errs: []error{exp.err}, - reasonSet: es.reasonSet, - } + if exp := expErrs[0]; exp.count >= es.threshold { + return exp.err } - if len(es.errs) >= threshold { - return &writeErrors{ - errs: []error{errInternal}, - reasonSet: es.reasonSet, - } + if len(es.errs) >= es.threshold { + return errInternal } - return &writeErrors{} + return nil } -// Error returns a string containing a deduplicated set of reasons. -func (es *writeErrors) Error() string { - if es == nil || len(es.reasonSet) == 0 { - return "" - } - reasons := make([]string, 0, len(es.reasonSet)) - for reason := range es.reasonSet { - reasons = append(reasons, reason) - } - sort.Strings(reasons) - - var buf bytes.Buffer - if len(reasons) > 1 { - fmt.Fprintf(&buf, "%d errors: ", len(es.reasonSet)) - } - - var more bool - for _, reason := range reasons { - if more { - buf.WriteString("; ") - } - buf.WriteString(reason) - more = true +func newSeriesErrors(threshold, numErrors int) []*replicationErrors { + errs := make([]*replicationErrors, numErrors) + for i := range errs { + errs[i] = &replicationErrors{threshold: threshold} } - - return buf.String() + return errs } func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup { diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index 31877826318..7c10a8649da 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -72,7 +72,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR var ( ref storage.SeriesRef - errs writeErrors + errs = &writeErrors{} ) for _, t := range wreq.Timeseries { // Check if time series labels are valid. If not, skip the time series