Skip to content

Commit

Permalink
receiver: avoid race of hashring
Browse files Browse the repository at this point in the history
Signed-off-by: YaoZengzeng <[email protected]>
  • Loading branch information
YaoZengzeng committed Aug 5, 2019
1 parent 4cf32d0 commit 303f861
Showing 1 changed file with 29 additions and 1 deletion.
30 changes: 29 additions & 1 deletion pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net"
"net/http"
"strconv"
"sync"
"sync/atomic"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -46,10 +47,12 @@ type Handler struct {
logger log.Logger
receiver *Writer
router *route.Router
hashring Hashring
options *Options
listener net.Listener

mtx sync.RWMutex
hashring Hashring

// Metrics
requestDuration *prometheus.HistogramVec
requestsTotal *prometheus.CounterVec
Expand Down Expand Up @@ -140,6 +143,9 @@ func (h *Handler) StorageReady() {
// Hashring sets the hashring for the handler and marks the hashring as ready.
// If the hashring is nil, then the hashring is marked as not ready.
func (h *Handler) Hashring(hashring Hashring) {
h.mtx.Lock()
defer h.mtx.Unlock()

if hashring == nil {
atomic.StoreUint32(&h.hashringReady, 0)
h.hashring = nil
Expand Down Expand Up @@ -275,6 +281,15 @@ func (h *Handler) receive(w http.ResponseWriter, r *http.Request) {
func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *prompb.WriteRequest) error {
wreqs := make(map[string]*prompb.WriteRequest)
replicas := make(map[string]replica)

h.mtx.RLock()
// It is possible that hashring is ready in testReady() but become unready now,
// so we need to lock here.
if h.hashring == nil {
h.mtx.RUnlock()
return errors.New("hashring is not ready")
}

// Batch all of the time series in the write request
// into several smaller write requests that are
// grouped by target endpoint. This ensures that
Expand All @@ -285,6 +300,7 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p
for i := range wreq.Timeseries {
endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[i], r.n)
if err != nil {
h.mtx.RUnlock()
return err
}
if _, ok := wreqs[endpoint]; !ok {
Expand All @@ -294,6 +310,7 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p
wr := wreqs[endpoint]
wr.Timeseries = append(wr.Timeseries, wreq.Timeseries[i])
}
h.mtx.RUnlock()

return h.parallelizeRequests(ctx, tenant, replicas, wreqs)
}
Expand Down Expand Up @@ -400,14 +417,25 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri
wreqs := make(map[string]*prompb.WriteRequest)
replicas := make(map[string]replica)
var i uint64

h.mtx.RLock()
// It is possible that hashring is ready in testReady() but become unready now,
// so we need to lock here.
if h.hashring == nil {
h.mtx.RLock()
return errors.New("hashring is not ready")
}

for i = 0; i < h.options.ReplicationFactor; i++ {
endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[0], i)
if err != nil {
h.mtx.RUnlock()
return err
}
wreqs[endpoint] = wreq
replicas[endpoint] = replica{i, true}
}
h.mtx.RUnlock()

err := h.parallelizeRequests(ctx, tenant, replicas, wreqs)
if errs, ok := err.(terrors.MultiError); ok {
Expand Down

0 comments on commit 303f861

Please sign in to comment.