From 5deac5dd6843ec71772f6e4471b3839f135b8b5a Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Sun, 18 Jun 2023 18:51:10 -0700 Subject: [PATCH] keyspace: enhance LockGroup with RemoveEntryOnUnlock LockOption (#6629) close tikv/pd#6628 Enhance LockGroup with RemoveEntryOnUnlock. Remove the lock of the given key from the lock group when unlock to keep minimal working set, which is suited for low qps, non-time-critical and non-consecutive large key space scenarios. One example of the last use case is that keyspace group split loads non-consecutive keyspace meta in batches and lock all loaded keyspace meta within a batch at the same time. Signed-off-by: Bin Shi --- pkg/keyspace/keyspace.go | 9 +++++-- pkg/utils/syncutil/lock_group.go | 38 ++++++++++++++++++++++----- pkg/utils/syncutil/lock_group_test.go | 36 ++++++++++++++++++++++--- 3 files changed, 72 insertions(+), 11 deletions(-) diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 302e18f4562..34057016125 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -106,11 +106,16 @@ func NewKeyspaceManager( kgm *GroupManager, ) *Manager { return &Manager{ - metaLock: syncutil.NewLockGroup(syncutil.WithHash(keyspaceIDHash)), + ctx: ctx, + // Remove the lock of the given key from the lock group when unlock to + // keep minimal working set, which is suited for low qps, non-time-critical + // and non-consecutive large key space scenarios. One of scenarios for + // last use case is keyspace group split loads non-consecutive keyspace meta + // in batches and lock all loaded keyspace meta within a batch at the same time. + metaLock: syncutil.NewLockGroup(syncutil.WithRemoveEntryOnUnlock(true)), idAllocator: idAllocator, store: store, cluster: cluster, - ctx: ctx, config: config, kgm: kgm, nextPatrolStartID: utils.DefaultKeyspaceID, diff --git a/pkg/utils/syncutil/lock_group.go b/pkg/utils/syncutil/lock_group.go index e6abe21c9e8..a38f490368c 100644 --- a/pkg/utils/syncutil/lock_group.go +++ b/pkg/utils/syncutil/lock_group.go @@ -16,11 +16,17 @@ package syncutil import "fmt" +type lockEntry struct { + mu *Mutex + refCount int +} + // LockGroup is a map of mutex that locks entries with different id separately. // It's used levitate lock contentions of using a global lock. type LockGroup struct { - groupLock Mutex // protects group. - entries map[uint32]*Mutex // map of locks with id as key. + groupLock Mutex // protects group. + removeEntryOnUnlock bool // if remove entry from entries on Unlock(). + entries map[uint32]*lockEntry // map of locks with id as key. // hashFn hashes id to map key, it's main purpose is to limit the total // number of mutexes in the group, as using a mutex for every id is too memory heavy. hashFn func(id uint32) uint32 @@ -36,10 +42,17 @@ func WithHash(hashFn func(id uint32) uint32) LockGroupOption { } } +// WithRemoveEntryOnUnlock sets the lockGroup's removeEntryOnUnlock to provided value. +func WithRemoveEntryOnUnlock(removeEntryOnUnlock bool) LockGroupOption { + return func(lg *LockGroup) { + lg.removeEntryOnUnlock = removeEntryOnUnlock + } +} + // NewLockGroup create and return an empty lockGroup. func NewLockGroup(options ...LockGroupOption) *LockGroup { lockGroup := &LockGroup{ - entries: make(map[uint32]*Mutex), + entries: make(map[uint32]*lockEntry), // If no custom hash function provided, use identity hash. hashFn: func(id uint32) uint32 { return id }, } @@ -56,11 +69,15 @@ func (g *LockGroup) Lock(id uint32) { e, ok := g.entries[hashedID] // If target id's lock has not been initialized, create a new lock. if !ok { - e = &Mutex{} + e = &lockEntry{ + mu: &Mutex{}, + refCount: 0, + } g.entries[hashedID] = e } + e.refCount++ g.groupLock.Unlock() - e.Lock() + e.mu.Lock() } // Unlock unlocks the target mutex based on the hash of the id. @@ -73,6 +90,15 @@ func (g *LockGroup) Unlock(id uint32) { g.groupLock.Unlock() panic(fmt.Errorf("unlock requested for key %v, but no entry found", id)) } + e.refCount-- + if e.refCount == -1 { + // Ref count should never be negative, otherwise there should be a run-time error and panic. + g.groupLock.Unlock() + panic(fmt.Errorf("unlock requested for key %v, but ref count is negative", id)) + } + if g.removeEntryOnUnlock && e.refCount == 0 { + delete(g.entries, hashedID) + } g.groupLock.Unlock() - e.Unlock() + e.mu.Unlock() } diff --git a/pkg/utils/syncutil/lock_group_test.go b/pkg/utils/syncutil/lock_group_test.go index 4e7dd123700..ff306983e05 100644 --- a/pkg/utils/syncutil/lock_group_test.go +++ b/pkg/utils/syncutil/lock_group_test.go @@ -31,7 +31,7 @@ func TestLockGroup(t *testing.T) { for i := 0; i < concurrency; i++ { go func(spaceID uint32) { defer wg.Done() - mustSequentialUpdateSingle(re, spaceID, group) + mustSequentialUpdateSingle(re, spaceID, group, concurrency) }(rand.Uint32()) } wg.Wait() @@ -39,9 +39,39 @@ func TestLockGroup(t *testing.T) { re.LessOrEqual(len(group.entries), 16) } +func TestLockGroupWithRemoveEntryOnUnlock(t *testing.T) { + re := require.New(t) + group := NewLockGroup(WithRemoveEntryOnUnlock(true)) + maxID := 1024 + + // Test Concurrent lock/unlock. + var wg sync.WaitGroup + wg.Add(maxID) + for i := 0; i < maxID; i++ { + go func(spaceID uint32) { + defer wg.Done() + mustSequentialUpdateSingle(re, spaceID, group, 10) + }(uint32(i)) + } + + // Test range lock in a scenario with non-consecutive large key space. One of example is + // keyspace group split loads non-consecutive keyspace meta in batches and lock all loaded + // keyspace meta within a batch at the same time. + for i := 0; i < maxID; i++ { + group.Lock(uint32(i)) + } + re.Equal(len(group.entries), maxID) + for i := 0; i < maxID; i++ { + group.Unlock(uint32(i)) + } + + wg.Wait() + // Check that size of the lock group is limited. + re.Equal(len(group.entries), 0) +} + // mustSequentialUpdateSingle checks that for any given update, update is sequential. -func mustSequentialUpdateSingle(re *require.Assertions, spaceID uint32, group *LockGroup) { - concurrency := 50 +func mustSequentialUpdateSingle(re *require.Assertions, spaceID uint32, group *LockGroup, concurrency int) { total := 0 var wg sync.WaitGroup wg.Add(concurrency)