From 303f8611c18fcdfc07c17ba35a58da6b950e2266 Mon Sep 17 00:00:00 2001 From: YaoZengzeng Date: Mon, 5 Aug 2019 19:31:26 +0800 Subject: [PATCH] receiver: avoid race of hashring Signed-off-by: YaoZengzeng --- pkg/receive/handler.go | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 5e16bdd3ba1..34e512a9b2a 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -9,6 +9,7 @@ import ( "net" "net/http" "strconv" + "sync" "sync/atomic" "github.com/go-kit/kit/log" @@ -46,10 +47,12 @@ type Handler struct { logger log.Logger receiver *Writer router *route.Router - hashring Hashring options *Options listener net.Listener + mtx sync.RWMutex + hashring Hashring + // Metrics requestDuration *prometheus.HistogramVec requestsTotal *prometheus.CounterVec @@ -140,6 +143,9 @@ func (h *Handler) StorageReady() { // Hashring sets the hashring for the handler and marks the hashring as ready. // If the hashring is nil, then the hashring is marked as not ready. func (h *Handler) Hashring(hashring Hashring) { + h.mtx.Lock() + defer h.mtx.Unlock() + if hashring == nil { atomic.StoreUint32(&h.hashringReady, 0) h.hashring = nil @@ -275,6 +281,15 @@ func (h *Handler) receive(w http.ResponseWriter, r *http.Request) { func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *prompb.WriteRequest) error { wreqs := make(map[string]*prompb.WriteRequest) replicas := make(map[string]replica) + + h.mtx.RLock() + // It is possible that hashring is ready in testReady() but become unready now, + // so we need to lock here. + if h.hashring == nil { + h.mtx.RUnlock() + return errors.New("hashring is not ready") + } + // Batch all of the time series in the write request // into several smaller write requests that are // grouped by target endpoint. This ensures that @@ -285,6 +300,7 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p for i := range wreq.Timeseries { endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[i], r.n) if err != nil { + h.mtx.RUnlock() return err } if _, ok := wreqs[endpoint]; !ok { @@ -294,6 +310,7 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p wr := wreqs[endpoint] wr.Timeseries = append(wr.Timeseries, wreq.Timeseries[i]) } + h.mtx.RUnlock() return h.parallelizeRequests(ctx, tenant, replicas, wreqs) } @@ -400,14 +417,25 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri wreqs := make(map[string]*prompb.WriteRequest) replicas := make(map[string]replica) var i uint64 + + h.mtx.RLock() + // It is possible that hashring is ready in testReady() but become unready now, + // so we need to lock here. + if h.hashring == nil { + h.mtx.RLock() + return errors.New("hashring is not ready") + } + for i = 0; i < h.options.ReplicationFactor; i++ { endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[0], i) if err != nil { + h.mtx.RUnlock() return err } wreqs[endpoint] = wreq replicas[endpoint] = replica{i, true} } + h.mtx.RUnlock() err := h.parallelizeRequests(ctx, tenant, replicas, wreqs) if errs, ok := err.(terrors.MultiError); ok {