Skip to content

Commit

Permalink
keyspace: enhance LockGroup with RemoveEntryOnUnlock LockOption (tikv…
Browse files Browse the repository at this point in the history
…#6629)

close tikv#6628

Enhance LockGroup with RemoveEntryOnUnlock.
Remove the lock of the given key from the lock group when unlock to keep minimal working set, which is suited for low qps, non-time-critical and non-consecutive large key space scenarios. One example of the last use case is that keyspace group split loads non-consecutive keyspace meta in batches and lock all loaded keyspace meta within a batch at the same time.

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing authored and rleungx committed Aug 2, 2023
1 parent 3c51bd6 commit 5deac5d
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 11 deletions.
9 changes: 7 additions & 2 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,16 @@ func NewKeyspaceManager(
kgm *GroupManager,
) *Manager {
return &Manager{
metaLock: syncutil.NewLockGroup(syncutil.WithHash(keyspaceIDHash)),
ctx: ctx,
// Remove the lock of the given key from the lock group when unlock to
// keep minimal working set, which is suited for low qps, non-time-critical
// and non-consecutive large key space scenarios. One of scenarios for
// last use case is keyspace group split loads non-consecutive keyspace meta
// in batches and lock all loaded keyspace meta within a batch at the same time.
metaLock: syncutil.NewLockGroup(syncutil.WithRemoveEntryOnUnlock(true)),
idAllocator: idAllocator,
store: store,
cluster: cluster,
ctx: ctx,
config: config,
kgm: kgm,
nextPatrolStartID: utils.DefaultKeyspaceID,
Expand Down
38 changes: 32 additions & 6 deletions pkg/utils/syncutil/lock_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@ package syncutil

import "fmt"

type lockEntry struct {
mu *Mutex
refCount int
}

// LockGroup is a map of mutex that locks entries with different id separately.
// It's used levitate lock contentions of using a global lock.
type LockGroup struct {
groupLock Mutex // protects group.
entries map[uint32]*Mutex // map of locks with id as key.
groupLock Mutex // protects group.
removeEntryOnUnlock bool // if remove entry from entries on Unlock().
entries map[uint32]*lockEntry // map of locks with id as key.
// hashFn hashes id to map key, it's main purpose is to limit the total
// number of mutexes in the group, as using a mutex for every id is too memory heavy.
hashFn func(id uint32) uint32
Expand All @@ -36,10 +42,17 @@ func WithHash(hashFn func(id uint32) uint32) LockGroupOption {
}
}

// WithRemoveEntryOnUnlock sets the lockGroup's removeEntryOnUnlock to provided value.
func WithRemoveEntryOnUnlock(removeEntryOnUnlock bool) LockGroupOption {
return func(lg *LockGroup) {
lg.removeEntryOnUnlock = removeEntryOnUnlock
}
}

// NewLockGroup create and return an empty lockGroup.
func NewLockGroup(options ...LockGroupOption) *LockGroup {
lockGroup := &LockGroup{
entries: make(map[uint32]*Mutex),
entries: make(map[uint32]*lockEntry),
// If no custom hash function provided, use identity hash.
hashFn: func(id uint32) uint32 { return id },
}
Expand All @@ -56,11 +69,15 @@ func (g *LockGroup) Lock(id uint32) {
e, ok := g.entries[hashedID]
// If target id's lock has not been initialized, create a new lock.
if !ok {
e = &Mutex{}
e = &lockEntry{
mu: &Mutex{},
refCount: 0,
}
g.entries[hashedID] = e
}
e.refCount++
g.groupLock.Unlock()
e.Lock()
e.mu.Lock()
}

// Unlock unlocks the target mutex based on the hash of the id.
Expand All @@ -73,6 +90,15 @@ func (g *LockGroup) Unlock(id uint32) {
g.groupLock.Unlock()
panic(fmt.Errorf("unlock requested for key %v, but no entry found", id))
}
e.refCount--
if e.refCount == -1 {
// Ref count should never be negative, otherwise there should be a run-time error and panic.
g.groupLock.Unlock()
panic(fmt.Errorf("unlock requested for key %v, but ref count is negative", id))
}
if g.removeEntryOnUnlock && e.refCount == 0 {
delete(g.entries, hashedID)
}
g.groupLock.Unlock()
e.Unlock()
e.mu.Unlock()
}
36 changes: 33 additions & 3 deletions pkg/utils/syncutil/lock_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,47 @@ func TestLockGroup(t *testing.T) {
for i := 0; i < concurrency; i++ {
go func(spaceID uint32) {
defer wg.Done()
mustSequentialUpdateSingle(re, spaceID, group)
mustSequentialUpdateSingle(re, spaceID, group, concurrency)
}(rand.Uint32())
}
wg.Wait()
// Check that size of the lock group is limited.
re.LessOrEqual(len(group.entries), 16)
}

func TestLockGroupWithRemoveEntryOnUnlock(t *testing.T) {
re := require.New(t)
group := NewLockGroup(WithRemoveEntryOnUnlock(true))
maxID := 1024

// Test Concurrent lock/unlock.
var wg sync.WaitGroup
wg.Add(maxID)
for i := 0; i < maxID; i++ {
go func(spaceID uint32) {
defer wg.Done()
mustSequentialUpdateSingle(re, spaceID, group, 10)
}(uint32(i))
}

// Test range lock in a scenario with non-consecutive large key space. One of example is
// keyspace group split loads non-consecutive keyspace meta in batches and lock all loaded
// keyspace meta within a batch at the same time.
for i := 0; i < maxID; i++ {
group.Lock(uint32(i))
}
re.Equal(len(group.entries), maxID)
for i := 0; i < maxID; i++ {
group.Unlock(uint32(i))
}

wg.Wait()
// Check that size of the lock group is limited.
re.Equal(len(group.entries), 0)
}

// mustSequentialUpdateSingle checks that for any given update, update is sequential.
func mustSequentialUpdateSingle(re *require.Assertions, spaceID uint32, group *LockGroup) {
concurrency := 50
func mustSequentialUpdateSingle(re *require.Assertions, spaceID uint32, group *LockGroup, concurrency int) {
total := 0
var wg sync.WaitGroup
wg.Add(concurrency)
Expand Down

0 comments on commit 5deac5d

Please sign in to comment.