Skip to content

Commit

Permalink
mcs, tso: fix potential inconsistency caused by non-atomic applying k…
Browse files Browse the repository at this point in the history
…eyspace movement state change in the persistent store (#6596)

ref #5895

fix potential inconsistency caused by non-atomic applying the state change in the persistent in the following cases:
1. Keyspace group split/merge
2. Keyspace movement across keyspace groups.

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing authored Jun 14, 2023
1 parent 7158cb3 commit 7ce61d6
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 4 deletions.
12 changes: 10 additions & 2 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,14 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership(
i++
j++
} else if i < oldLen && j < newLen && oldKeyspaces[i] < newKeyspaces[j] || j == newLen {
delete(kgm.keyspaceLookupTable, oldKeyspaces[i])
// kgm.keyspaceLookupTable is a global lookup table for all keyspace groups, storing the
// keyspace group ID for each keyspace. If the keyspace group of this keyspace in this
// lookup table isn't the current keyspace group, it means the keyspace has been moved
// to another keyspace group which has already declared the ownership of the keyspace,
// and we shouldn't delete and overwrite the ownership.
if curGroupID, ok := kgm.keyspaceLookupTable[oldKeyspaces[i]]; ok && curGroupID == groupID {
delete(kgm.keyspaceLookupTable, oldKeyspaces[i])
}
i++
} else {
newGroup.KeyspaceLookupTable[newKeyspaces[j]] = struct{}{}
Expand Down Expand Up @@ -621,7 +628,8 @@ func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) {
// if kid == kg.ID, it means the keyspace still belongs to this keyspace group,
// so we decouple the relationship in the global keyspace lookup table.
// if kid != kg.ID, it means the keyspace has been moved to another keyspace group
// which has already declared the ownership of the keyspace.
// which has already declared the ownership of the keyspace, so we don't need
// delete it from the global keyspace lookup table and overwrite the ownership.
if kid == kg.ID {
delete(kgm.keyspaceLookupTable, kid)
}
Expand Down
76 changes: 74 additions & 2 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() {
err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg)
re.NoError(err)

// Sleep for a while to wait for the events to propagate. If the restriction is not working,
// it will cause random failure.
// Sleep for a while to wait for the events to propagate. If the logic doesn't work
// as expected, it will cause random failure.
time.Sleep(1 * time.Second)
// Should still be able to get AM for keyspace 0 in keyspace group 0.
am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(
Expand All @@ -508,6 +508,78 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() {
re.NotNil(kg)
}

// TestKeyspaceMovementConsistency tests the consistency of keyspace movement.
// When a keyspace is moved from one keyspace group to another, the allocator manager
// update source group and target group state in etcd atomically. The TSO keyspace group
// manager watches the state change in persistent store but hard to apply the movement state
// change across two groups atomically. This test case is to test the movement state is
// eventually consistent, for example, if a keyspace "move to group B" event is applied
// before "move away from group A" event, the second event shouldn't overwrite the global
// state, such as the global keyspace group lookup table.
func (suite *keyspaceGroupManagerTestSuite) TestKeyspaceMovementConsistency() {
re := suite.Require()

mgr := suite.newUniqueKeyspaceGroupManager(1)
re.NotNil(mgr)
defer mgr.Close()

rootPath := mgr.legacySvcRootPath
svcAddr := mgr.tsoServiceID.ServiceAddr

var (
am *AllocatorManager
kg *endpoint.KeyspaceGroup
kgid uint32
err error
event *etcdEvent
)

// Create keyspace group 0 which contains keyspace 0, 1, 2.
addKeyspaceGroupAssignment(
suite.ctx, suite.etcdClient, true,
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr,
mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 10, 20})
// Create keyspace group 1 which contains keyspace 3, 4.
addKeyspaceGroupAssignment(
suite.ctx, suite.etcdClient, true,
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr,
uint32(1), []uint32{11, 21})

err = mgr.Initialize()
re.NoError(err)

// Should be able to get AM for keyspace 10 in keyspace group 0.
am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(10, mcsutils.DefaultKeyspaceGroupID)
re.NoError(err)
re.Equal(mcsutils.DefaultKeyspaceGroupID, kgid)
re.NotNil(am)
re.NotNil(kg)

// Move keyspace 10 from keyspace group 0 to keyspace group 1 and apply this state change
// to TSO first.
event = generateKeyspaceGroupPutEvent(1, []uint32{10, 11, 21}, []string{svcAddr})
err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg)
re.NoError(err)
// Wait until the keyspace 10 is served by keyspace group 1.
testutil.Eventually(re, func() bool {
_, _, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(10, 1)
return err == nil && kgid == 1
}, testutil.WithWaitFor(3*time.Second), testutil.WithTickInterval(50*time.Millisecond))

event = generateKeyspaceGroupPutEvent(
mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 20}, []string{svcAddr})
err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg)
re.NoError(err)

// Sleep for a while to wait for the events to propagate. If the restriction is not working,
// it will cause random failure.
time.Sleep(1 * time.Second)
// Should still be able to get AM for keyspace 10 in keyspace group 1.
_, _, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(10, 1)
re.NoError(err)
re.Equal(uint32(1), kgid)
}

// TestHandleTSORequestWithWrongMembership tests the case that HandleTSORequest receives
// a tso request with mismatched keyspace and keyspace group.
func (suite *keyspaceGroupManagerTestSuite) TestHandleTSORequestWithWrongMembership() {
Expand Down

0 comments on commit 7ce61d6

Please sign in to comment.