From 2512d2b8ae62e3f9684e8d960b74182452a1e59e Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Mon, 25 Feb 2019 19:47:25 +0800 Subject: [PATCH] stats: reduce stats collecor's lock contention (#9233) --- statistics/update.go | 70 +++++++++++++++------------------- statistics/update_list_test.go | 6 +-- 2 files changed, 33 insertions(+), 43 deletions(-) diff --git a/statistics/update.go b/statistics/update.go index 00d81258a12d9..04cf2d37c7ee5 100644 --- a/statistics/update.go +++ b/statistics/update.go @@ -112,18 +112,14 @@ func (m errorRateDeltaMap) clear(tableID int64, histID int64, isIndex bool) { m[tableID] = item } -func (h *Handle) merge(s *SessionStatsCollector) { - s.Lock() - defer s.Unlock() +func (h *Handle) merge(s *SessionStatsCollector, rateMap errorRateDeltaMap) { for id, item := range s.mapper { h.globalMap.update(id, item.Delta, item.Count, &item.ColSize) } - h.mu.Lock() - h.mu.rateMap.merge(s.rateMap) - h.mu.Unlock() + s.mapper = make(tableDeltaMap) + rateMap.merge(s.rateMap) s.rateMap = make(errorRateDeltaMap) h.feedback = mergeQueryFeedback(h.feedback, s.feedback) - s.mapper = make(tableDeltaMap) s.feedback = s.feedback[:0] } @@ -134,7 +130,6 @@ type SessionStatsCollector struct { mapper tableDeltaMap feedback []*QueryFeedback rateMap errorRateDeltaMap - prev *SessionStatsCollector next *SessionStatsCollector // deleted is set to true when a session is closed. Every time we sweep the list, we will remove the useless collector. deleted bool @@ -207,21 +202,6 @@ func (s *SessionStatsCollector) StoreQueryFeedback(feedback interface{}, h *Hand return nil } -// tryToRemoveFromList will remove this collector from the list if it's deleted flag is set. -func (s *SessionStatsCollector) tryToRemoveFromList() { - s.Lock() - defer s.Unlock() - if !s.deleted { - return - } - next := s.next - prev := s.prev - prev.next = next - if next != nil { - next.prev = prev - } -} - // NewSessionStatsCollector allocates a stats collector for a session. func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector { h.listHead.Lock() @@ -230,10 +210,6 @@ func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector { mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap), next: h.listHead.next, - prev: h.listHead, - } - if h.listHead.next != nil { - h.listHead.next.prev = newCollector } h.listHead.next = newCollector return newCollector @@ -275,15 +251,36 @@ const ( DumpDelta = false ) +// sweepList will loop over the list, merge each session's local stats into handle +// and remove closed session's collector. +func (h *Handle) sweepList() { + prev := h.listHead + prev.Lock() + errorRateMap := make(errorRateDeltaMap) + for curr := prev.next; curr != nil; curr = curr.next { + curr.Lock() + // Merge the session stats into handle and error rate map. + h.merge(curr, errorRateMap) + if curr.deleted { + prev.next = curr.next + // Since the session is already closed, we can safely unlock it here. + curr.Unlock() + } else { + // Unlock the previous lock, so we only holds at most two session's lock at the same time. + prev.Unlock() + prev = curr + } + } + prev.Unlock() + h.mu.Lock() + h.mu.rateMap.merge(errorRateMap) + h.mu.Unlock() +} + // DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV. // If the `dumpAll` is false, it will only dump that delta info that `Modify Count / Table Count` greater than a ratio. func (h *Handle) DumpStatsDeltaToKV(dumpMode bool) error { - h.listHead.Lock() - for collector := h.listHead.next; collector != nil; collector = collector.next { - collector.tryToRemoveFromList() - h.merge(collector) - } - h.listHead.Unlock() + h.sweepList() currentTime := time.Now() for id, item := range h.globalMap { if dumpMode == DumpDelta && !needDumpStatsDelta(h, id, item, currentTime) { @@ -416,12 +413,7 @@ func (h *Handle) dumpFeedbackToKV(fb *QueryFeedback) error { // it takes 10 minutes for a feedback to take effect. However, we can use the // feedback locally on this tidb-server, so it could be used more timely. func (h *Handle) UpdateStatsByLocalFeedback(is infoschema.InfoSchema) { - h.listHead.Lock() - for collector := h.listHead.next; collector != nil; collector = collector.next { - collector.tryToRemoveFromList() - h.merge(collector) - } - h.listHead.Unlock() + h.sweepList() for _, fb := range h.feedback { table, ok := is.TableByID(fb.tableID) if !ok { diff --git a/statistics/update_list_test.go b/statistics/update_list_test.go index 8fffa041ad0fc..f8428be602986 100644 --- a/statistics/update_list_test.go +++ b/statistics/update_list_test.go @@ -31,17 +31,15 @@ func (s *testUpdateListSuite) TestInsertAndDelete(c *C) { items[0].Delete() // delete tail items[2].Delete() // delete middle items[4].Delete() // delete head - h.DumpStatsDeltaToKV(DumpAll) + h.sweepList() c.Assert(h.listHead.next, Equals, items[3]) c.Assert(items[3].next, Equals, items[1]) c.Assert(items[1].next, IsNil) - c.Assert(items[1].prev, Equals, items[3]) - c.Assert(items[3].prev, Equals, h.listHead) // delete rest items[1].Delete() items[3].Delete() - h.DumpStatsDeltaToKV(DumpAll) + h.sweepList() c.Assert(h.listHead.next, IsNil) }