Skip to content

Commit

Permalink
mcs: some bug fix for keyspace group (tikv#118)
Browse files Browse the repository at this point in the history
* mcs, tso: fix potential inconsistency caused by non-atomic applying keyspace movement state change in the persistent store (tikv#6596)

ref tikv#5895

fix potential inconsistency caused by non-atomic applying the state change in the persistent in the following cases:
1. Keyspace group split/merge
2. Keyspace movement across keyspace groups.

Signed-off-by: Bin Shi <[email protected]>

* client: fix keyspace update in `tsoSvcDiscovery` (tikv#6612)

close tikv#6611

Signed-off-by: lhy1024 <[email protected]>

---------

Signed-off-by: Bin Shi <[email protected]>
Signed-off-by: lhy1024 <[email protected]>
Co-authored-by: Bin Shi <[email protected]>
  • Loading branch information
lhy1024 and binshi-bing authored Jun 16, 2023
1 parent 188d0d8 commit 63635ec
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 14 deletions.
21 changes: 15 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,14 @@ type serviceModeKeeper struct {
tsoSvcDiscovery ServiceDiscovery
}

func (k *serviceModeKeeper) SetKeyspaceID(keyspaceID uint32) {
k.Lock()
defer k.Unlock()
if k.serviceMode == pdpb.ServiceMode_API_SVC_MODE {
k.tsoSvcDiscovery.SetKeyspaceID(keyspaceID)
}
}

func (k *serviceModeKeeper) close() {
k.Lock()
defer k.Unlock()
Expand Down Expand Up @@ -457,9 +465,6 @@ func newClientWithKeyspaceName(
ctx context.Context, keyspaceName string, svrAddrs []string,
security SecurityOption, opts ...ClientOption,
) (Client, error) {
log.Info("[pd] create pd client with endpoints and keyspace",
zap.Strings("pd-address", svrAddrs), zap.String("keyspace-name", keyspaceName))

tlsCfg := &tlsutil.TLSConfig{
CAPath: security.CAPath,
CertPath: security.CertPath,
Expand Down Expand Up @@ -496,8 +501,12 @@ func newClientWithKeyspaceName(
if err := c.initRetry(c.loadKeyspaceMeta, keyspaceName); err != nil {
return nil, err
}
// We call "c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID)" after service mode already switching to API mode
// and tso service discovery already initialized, so here we need to set the tso_service_discovery's keyspace id too.
c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID)

c.serviceModeKeeper.SetKeyspaceID(c.keyspaceID)
log.Info("[pd] create pd client with endpoints and keyspace",
zap.Strings("pd-address", svrAddrs), zap.String("keyspace-name", keyspaceName), zap.Uint32("keyspace-id", c.keyspaceID))
return c, nil
}

Expand Down Expand Up @@ -574,15 +583,15 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
)
switch newMode {
case pdpb.ServiceMode_PD_SVC_MODE:
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
newTSOCli = newTSOClient(c.ctx, c.option,
c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{})
case pdpb.ServiceMode_API_SVC_MODE:
newTSOSvcDiscovery = newTSOServiceDiscovery(
c.ctx, MetaStorageClient(c), c.pdSvcDiscovery,
c.GetClusterID(c.ctx), c.keyspaceID, c.tlsCfg, c.option)
// At this point, the keyspace group isn't known yet. Starts from the default keyspace group,
// and will be updated later.
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
newTSOCli = newTSOClient(c.ctx, c.option,
newTSOSvcDiscovery, &tsoTSOStreamBuilderFactory{})
if err := newTSOSvcDiscovery.Init(); err != nil {
log.Error("[pd] failed to initialize tso service discovery. keep the current service mode",
Expand Down
4 changes: 1 addition & 3 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type tsoClient struct {
wg sync.WaitGroup
option *option

keyspaceID uint32
svcDiscovery ServiceDiscovery
tsoStreamBuilderFactory
// tsoAllocators defines the mapping {dc-location -> TSO allocator leader URL}
Expand All @@ -94,15 +93,14 @@ type tsoClient struct {

// newTSOClient returns a new TSO client.
func newTSOClient(
ctx context.Context, option *option, keyspaceID uint32,
ctx context.Context, option *option,
svcDiscovery ServiceDiscovery, factory tsoStreamBuilderFactory,
) *tsoClient {
ctx, cancel := context.WithCancel(ctx)
c := &tsoClient{
ctx: ctx,
cancel: cancel,
option: option,
keyspaceID: keyspaceID,
svcDiscovery: svcDiscovery,
tsoStreamBuilderFactory: factory,
checkTSDeadlineCh: make(chan struct{}),
Expand Down
3 changes: 2 additions & 1 deletion pkg/balancer/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func (r *RoundRobin[T]) Next() (t T) {
func (r *RoundRobin[T]) GetAll() []T {
r.RLock()
defer r.RUnlock()
return r.nodes
// return a copy to avoid data race
return append(r.nodes[:0:0], r.nodes...)
}

// Put puts one into balancer.
Expand Down
18 changes: 16 additions & 2 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,14 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership(
i++
j++
} else if i < oldLen && j < newLen && oldKeyspaces[i] < newKeyspaces[j] || j == newLen {
delete(kgm.keyspaceLookupTable, oldKeyspaces[i])
// kgm.keyspaceLookupTable is a global lookup table for all keyspace groups, storing the
// keyspace group ID for each keyspace. If the keyspace group of this keyspace in this
// lookup table isn't the current keyspace group, it means the keyspace has been moved
// to another keyspace group which has already declared the ownership of the keyspace,
// and we shouldn't delete and overwrite the ownership.
if curGroupID, ok := kgm.keyspaceLookupTable[oldKeyspaces[i]]; ok && curGroupID == groupID {
delete(kgm.keyspaceLookupTable, oldKeyspaces[i])
}
i++
} else {
newGroup.KeyspaceLookupTable[newKeyspaces[j]] = struct{}{}
Expand Down Expand Up @@ -621,7 +628,8 @@ func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) {
// if kid == kg.ID, it means the keyspace still belongs to this keyspace group,
// so we decouple the relationship in the global keyspace lookup table.
// if kid != kg.ID, it means the keyspace has been moved to another keyspace group
// which has already declared the ownership of the keyspace.
// which has already declared the ownership of the keyspace, so we don't need
// delete it from the global keyspace lookup table and overwrite the ownership.
if kid == kg.ID {
delete(kgm.keyspaceLookupTable, kid)
}
Expand Down Expand Up @@ -821,6 +829,12 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit(
return err
}
if tsoutil.CompareTimestamp(&splitSourceTSO, &splitTSO) <= 0 {
log.Debug("the split source TSO is not greater than the newly split TSO",
zap.Int64("split-source-tso-physical", splitSourceTSO.Physical),
zap.Int64("split-source-tso-logical", splitSourceTSO.Logical),
zap.Int64("split-tso-physical", splitTSO.Physical),
zap.Int64("split-tso-logical", splitTSO.Logical),
)
return nil
}
// If the split source TSO is greater than the newly split TSO, we need to update the split
Expand Down
76 changes: 74 additions & 2 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() {
err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg)
re.NoError(err)

// Sleep for a while to wait for the events to propagate. If the restriction is not working,
// it will cause random failure.
// Sleep for a while to wait for the events to propagate. If the logic doesn't work
// as expected, it will cause random failure.
time.Sleep(1 * time.Second)
// Should still be able to get AM for keyspace 0 in keyspace group 0.
am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(
Expand All @@ -508,6 +508,78 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() {
re.NotNil(kg)
}

// TestKeyspaceMovementConsistency tests the consistency of keyspace movement.
// When a keyspace is moved from one keyspace group to another, the allocator manager
// update source group and target group state in etcd atomically. The TSO keyspace group
// manager watches the state change in persistent store but hard to apply the movement state
// change across two groups atomically. This test case is to test the movement state is
// eventually consistent, for example, if a keyspace "move to group B" event is applied
// before "move away from group A" event, the second event shouldn't overwrite the global
// state, such as the global keyspace group lookup table.
func (suite *keyspaceGroupManagerTestSuite) TestKeyspaceMovementConsistency() {
re := suite.Require()

mgr := suite.newUniqueKeyspaceGroupManager(1)
re.NotNil(mgr)
defer mgr.Close()

rootPath := mgr.legacySvcRootPath
svcAddr := mgr.tsoServiceID.ServiceAddr

var (
am *AllocatorManager
kg *endpoint.KeyspaceGroup
kgid uint32
err error
event *etcdEvent
)

// Create keyspace group 0 which contains keyspace 0, 1, 2.
addKeyspaceGroupAssignment(
suite.ctx, suite.etcdClient, true,
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr,
mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 10, 20})
// Create keyspace group 1 which contains keyspace 3, 4.
addKeyspaceGroupAssignment(
suite.ctx, suite.etcdClient, true,
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr,
uint32(1), []uint32{11, 21})

err = mgr.Initialize()
re.NoError(err)

// Should be able to get AM for keyspace 10 in keyspace group 0.
am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(10, mcsutils.DefaultKeyspaceGroupID)
re.NoError(err)
re.Equal(mcsutils.DefaultKeyspaceGroupID, kgid)
re.NotNil(am)
re.NotNil(kg)

// Move keyspace 10 from keyspace group 0 to keyspace group 1 and apply this state change
// to TSO first.
event = generateKeyspaceGroupPutEvent(1, []uint32{10, 11, 21}, []string{svcAddr})
err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg)
re.NoError(err)
// Wait until the keyspace 10 is served by keyspace group 1.
testutil.Eventually(re, func() bool {
_, _, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(10, 1)
return err == nil && kgid == 1
}, testutil.WithWaitFor(3*time.Second), testutil.WithTickInterval(50*time.Millisecond))

event = generateKeyspaceGroupPutEvent(
mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 20}, []string{svcAddr})
err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg)
re.NoError(err)

// Sleep for a while to wait for the events to propagate. If the restriction is not working,
// it will cause random failure.
time.Sleep(1 * time.Second)
// Should still be able to get AM for keyspace 10 in keyspace group 1.
_, _, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(10, 1)
re.NoError(err)
re.Equal(uint32(1), kgid)
}

// TestHandleTSORequestWithWrongMembership tests the case that HandleTSORequest receives
// a tso request with mismatched keyspace and keyspace group.
func (suite *keyspaceGroupManagerTestSuite) TestHandleTSORequestWithWrongMembership() {
Expand Down
104 changes: 104 additions & 0 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/server/apiv2/handlers"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
handlersutil "github.com/tikv/pd/tests/server/apiv2/handlers"
Expand Down Expand Up @@ -465,3 +466,106 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}

func TestTwiceSplitKeyspaceGroup(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))

// Init api server config but not start.
tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, serverName string) {
conf.Keyspace.PreAlloc = []string{
"keyspace_a", "keyspace_b",
}
})
re.NoError(err)
pdAddr := tc.GetConfig().GetClientURL()

// Start pd client and wait pd server start.
var clients sync.Map
go func() {
apiCtx := pd.NewAPIContextV2("keyspace_b") // its keyspace id is 2.
cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{})
re.NoError(err)
clients.Store("keyspace_b", cli)
}()
go func() {
apiCtx := pd.NewAPIContextV2("keyspace_a") // its keyspace id is 1.
cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{})
re.NoError(err)
clients.Store("keyspace_a", cli)
}()

// Start api server and tso server.
err = tc.RunInitialServers()
re.NoError(err)
defer tc.Destroy()
tc.WaitLeader()
leaderServer := tc.GetServer(tc.GetLeader())
re.NoError(leaderServer.BootstrapCluster())

tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, pdAddr)
re.NoError(err)
defer tsoCluster.Destroy()
tsoCluster.WaitForDefaultPrimaryServing(re)

// Wait pd clients are ready.
testutil.Eventually(re, func() bool {
count := 0
clients.Range(func(key, value interface{}) bool {
count++
return true
})
return count == 2
})
clientA, ok := clients.Load("keyspace_a")
re.True(ok)
clientB, ok := clients.Load("keyspace_b")
re.True(ok)

// First split keyspace group 0 to 1 with keyspace 2.
kgm := leaderServer.GetServer().GetKeyspaceGroupManager()
re.NotNil(kgm)
testutil.Eventually(re, func() bool {
err = kgm.SplitKeyspaceGroupByID(0, 1, []uint32{2})
return err == nil
})

// Trigger checkTSOSplit to ensure the split is finished.
testutil.Eventually(re, func() bool {
_, _, err = clientB.(pd.Client).GetTS(ctx)
re.NoError(err)
kg := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0)
return !kg.IsSplitting()
})
clientB.(pd.Client).Close()

// Then split keyspace group 0 to 2 with keyspace 1.
testutil.Eventually(re, func() bool {
err = kgm.SplitKeyspaceGroupByID(0, 2, []uint32{1})
return err == nil
})

// Trigger checkTSOSplit to ensure the split is finished.
testutil.Eventually(re, func() bool {
_, _, err = clientA.(pd.Client).GetTS(ctx)
re.NoError(err)
kg := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0)
return !kg.IsSplitting()
})
clientA.(pd.Client).Close()

// Check the keyspace group 0 is split to 1 and 2.
kg0 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0)
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 1)
kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 2)
re.Equal([]uint32{0}, kg0.Keyspaces)
re.Equal([]uint32{2}, kg1.Keyspaces)
re.Equal([]uint32{1}, kg2.Keyspaces)
re.False(kg0.IsSplitting())
re.False(kg1.IsSplitting())
re.False(kg2.IsSplitting())

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}

0 comments on commit 63635ec

Please sign in to comment.