diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 798a61046f4e..3159518387fa 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -628,7 +628,8 @@ func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) { // if kid == kg.ID, it means the keyspace still belongs to this keyspace group, // so we decouple the relationship in the global keyspace lookup table. // if kid != kg.ID, it means the keyspace has been moved to another keyspace group - // which has already declared the ownership of the keyspace. + // which has already declared the ownership of the keyspace, so we don't need + // delete it from the global keyspace lookup table and overwrite the ownership. if kid == kg.ID { delete(kgm.keyspaceLookupTable, kid) } diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 7817057aaec3..95af4d7e3d8e 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -490,8 +490,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() { err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) re.NoError(err) - // Sleep for a while to wait for the events to propagate. If the restriction is not working, - // it will cause random failure. + // Sleep for a while to wait for the events to propagate. If the logic doesn't work + // as expected, it will cause random failure. time.Sleep(1 * time.Second) // Should still be able to get AM for keyspace 0 in keyspace group 0. am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck( @@ -508,6 +508,78 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() { re.NotNil(kg) } +// TestKeyspaceMovementConsistency tests the consistency of keyspace movement. +// When a keyspace is moved from one keyspace group to another, the allocator manager +// update source group and target group state in etcd atomically. The TSO keyspace group +// manager watches the keyspace group state change but hard to apply the movement state +// change across two groups atomically. This test case is to test the movement state is +// eventually consistent, for example, if a keyspace "move to group B" event is applied +// before "move away from group A" event, the second event shouldn't overwrite the global +// state, such as the global keyspace group lookup table. +func (suite *keyspaceGroupManagerTestSuite) TestKeyspaceMovementConsistency() { + re := suite.Require() + + mgr := suite.newUniqueKeyspaceGroupManager(1) + re.NotNil(mgr) + defer mgr.Close() + + rootPath := mgr.legacySvcRootPath + svcAddr := mgr.tsoServiceID.ServiceAddr + + var ( + am *AllocatorManager + kg *endpoint.KeyspaceGroup + kgid uint32 + err error + event *etcdEvent + ) + + // Create keyspace group 0 which contains keyspace 0, 1, 2. + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, true, + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, + mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 10, 20}) + // Create keyspace group 1 which contains keyspace 3, 4. + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, true, + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, + uint32(1), []uint32{11, 21}) + + err = mgr.Initialize() + re.NoError(err) + + // Should be able to get AM for keyspace 10 in keyspace group 0. + am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(10, mcsutils.DefaultKeyspaceGroupID) + re.NoError(err) + re.Equal(mcsutils.DefaultKeyspaceGroupID, kgid) + re.NotNil(am) + re.NotNil(kg) + + // Move keyspace 10 from keyspace group 0 to keyspace group 1 and apply this state change + // to TSO first. + event = generateKeyspaceGroupPutEvent(1, []uint32{10, 11, 21}, []string{svcAddr}) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + re.NoError(err) + // Wait until the keyspace 10 is served by keyspace group 1. + testutil.Eventually(re, func() bool { + _, _, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(10, 1) + return err == nil && kgid == 1 + }, testutil.WithWaitFor(3*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + event = generateKeyspaceGroupPutEvent( + mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 20}, []string{svcAddr}) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + re.NoError(err) + + // Sleep for a while to wait for the events to propagate. If the restriction is not working, + // it will cause random failure. + time.Sleep(1 * time.Second) + // Should still be able to get AM for keyspace 10 in keyspace group 1. + _, _, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(10, 1) + re.NoError(err) + re.Equal(uint32(1), kgid) +} + // TestHandleTSORequestWithWrongMembership tests the case that HandleTSORequest receives // a tso request with mismatched keyspace and keyspace group. func (suite *keyspaceGroupManagerTestSuite) TestHandleTSORequestWithWrongMembership() {