Skip to content

Commit

Permalink
MB-25156 scan-coordinator: fix stats update panic with drop index
Browse files Browse the repository at this point in the history
scan_coordinator has race condition in stats update with drop index.
drop index can cleanup the stats while the scan has acquired the snapshot
and will execute the request. In all such cases, the stat update needs
to be skipped.

Change-Id: If03ec2ebce841255074eb7efc578bbb3a0432cf2
  • Loading branch information
deepkaran authored and Deepkaran Salooja committed Jul 11, 2017
1 parent 1f522c9 commit 801ffee
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions secondary/indexer/scan_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1481,16 +1481,19 @@ func (s *scanCoordinator) serverCallback(protoReq interface{}, conn net.Conn,
return
}

req.Stats.scanReqAllocDuration.Add(time.Now().Sub(atime).Nanoseconds())
if req.Stats != nil {
req.Stats.scanReqAllocDuration.Add(time.Now().Sub(atime).Nanoseconds())
}

if err := s.isScanAllowed(*req.Consistency, req); err != nil {
s.tryRespondWithError(w, req, err)
return
}

req.Stats.numRequests.Add(1)

req.Stats.scanReqInitDuration.Add(time.Now().Sub(ttime).Nanoseconds())
if req.Stats != nil {
req.Stats.numRequests.Add(1)
req.Stats.scanReqInitDuration.Add(time.Now().Sub(ttime).Nanoseconds())
}

t0 := time.Now()
is, err := s.getRequestedIndexSnapshot(req)
Expand All @@ -1506,7 +1509,9 @@ func (s *scanCoordinator) serverCallback(protoReq interface{}, conn net.Conn,
})

defer func() {
req.Stats.scanReqDuration.Add(time.Now().Sub(ttime).Nanoseconds())
if req.Stats != nil {
req.Stats.scanReqDuration.Add(time.Now().Sub(ttime).Nanoseconds())
}
}()

s.processRequest(req, w, is, t0)
Expand Down Expand Up @@ -1546,10 +1551,12 @@ func (s *scanCoordinator) handleScanRequest(req *ScanRequest, w ScanResponseWrit
err := scanPipeline.Execute()
scanTime := time.Now().Sub(t0)

req.Stats.numRowsReturned.Add(int64(scanPipeline.RowsReturned()))
req.Stats.scanBytesRead.Add(int64(scanPipeline.BytesRead()))
req.Stats.scanDuration.Add(scanTime.Nanoseconds())
req.Stats.scanWaitDuration.Add(waitTime.Nanoseconds())
if req.Stats != nil {
req.Stats.numRowsReturned.Add(int64(scanPipeline.RowsReturned()))
req.Stats.scanBytesRead.Add(int64(scanPipeline.BytesRead()))
req.Stats.scanDuration.Add(scanTime.Nanoseconds())
req.Stats.scanWaitDuration.Add(waitTime.Nanoseconds())
}

if err != nil {
status := fmt.Sprintf("(error = %s)", err)
Expand Down

0 comments on commit 801ffee

Please sign in to comment.