Skip to content

Commit

Permalink
ms, tso: pick some enhancements (tikv#113)
Browse files Browse the repository at this point in the history
* Add keyspace and keyspace group info to the time fallback log. (tikv#6581)

ref tikv#5895

Add keyspace and keyspace group info to the time fallback log to help debugging time fallback issue in multi-timeline scenario.

Signed-off-by: Bin Shi <[email protected]>

* config, server: fault injection in TSO Proxy (tikv#6588)

ref tikv#5895

Add failure test cases.

Signed-off-by: Bin Shi <[email protected]>

* mcs, tso: fix stream Send() and CloseSend() data race issue in TSO proxy (tikv#6591)

close tikv#6590

No need to call SendClose(), because TSO proxy will cancel the stream context which will cause the corresponding grpc stream on the server side to exit.

Signed-off-by: Bin Shi <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>

* tests: make TestSplitKeyspaceGroup stable (tikv#6584)

close tikv#6571

Signed-off-by: lhy1024 <[email protected]>

---------

Signed-off-by: Bin Shi <[email protected]>
Signed-off-by: lhy1024 <[email protected]>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
Co-authored-by: lhy1024 <[email protected]>
  • Loading branch information
3 people authored Jun 13, 2023
1 parent 1007f82 commit 188d0d8
Show file tree
Hide file tree
Showing 11 changed files with 203 additions and 54 deletions.
6 changes: 4 additions & 2 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,10 @@ func (c *tsoClient) compareAndSwapTS(dcLocation string, physical, firstLogical i
// to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then
// all TSOs we get will be [6, 7, 8, 9, 10].
if tsoutil.TSLessEqual(physical, firstLogical, lastPhysical, lastLogical) {
panic(errors.Errorf("%s timestamp fallback, newly acquired ts (%d, %d) is less or equal to last one (%d, %d)",
dcLocation, physical, firstLogical, lastPhysical, lastLogical))
panic(errors.Errorf(
"%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). keyspace: %d, keyspace group: %d",
dcLocation, physical, firstLogical, lastPhysical, lastLogical,
c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID()))
}
lastTSOPointer.physical = physical
// Same as above, we save the largest logical part here.
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (suite *keyspaceTestSuite) SetupTest() {
allocator := mockid.NewIDAllocator()
kgm := NewKeyspaceGroupManager(suite.ctx, store, nil, 0)
suite.manager = NewKeyspaceManager(suite.ctx, store, nil, allocator, &mockConfig{}, kgm)
suite.NoError(kgm.Bootstrap())
suite.NoError(kgm.Bootstrap(suite.ctx))
suite.NoError(suite.manager.Bootstrap())
}

Expand Down
55 changes: 36 additions & 19 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -51,9 +52,11 @@ const (

// GroupManager is the manager of keyspace group related data.
type GroupManager struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
client *clientv3.Client
clusterID uint64

sync.RWMutex
// groups is the cache of keyspace group related information.
Expand Down Expand Up @@ -90,24 +93,24 @@ func NewKeyspaceGroupManager(
cancel: cancel,
store: store,
groups: groups,
client: client,
clusterID: clusterID,
nodesBalancer: balancer.GenByPolicy[string](defaultBalancerPolicy),
serviceRegistryMap: make(map[string]string),
}

// If the etcd client is not nil, start the watch loop for the registered tso servers.
// The PD(TSO) Client relies on this info to discover tso servers.
if client != nil {
m.initTSONodesWatcher(client, clusterID)
m.wg.Add(2)
if m.client != nil {
m.initTSONodesWatcher(m.client, m.clusterID)
m.wg.Add(1)
go m.tsoNodesWatcher.StartWatchLoop()
go m.allocNodesToAllKeyspaceGroups()
}

return m
}

// Bootstrap saves default keyspace group info and init group mapping in the memory.
func (m *GroupManager) Bootstrap() error {
func (m *GroupManager) Bootstrap(ctx context.Context) error {
// Force the membership restriction that the default keyspace must belong to default keyspace group.
// Have no information to specify the distribution of the default keyspace group replicas, so just
// leave the replica/member list empty. The TSO service will assign the default keyspace group replica
Expand Down Expand Up @@ -137,6 +140,11 @@ func (m *GroupManager) Bootstrap() error {
m.groups[userKind].Put(group)
}

// It will only alloc node when the group manager is on API leader.
if m.client != nil {
m.wg.Add(1)
go m.allocNodesToAllKeyspaceGroups(ctx)
}
return nil
}

Expand All @@ -146,7 +154,7 @@ func (m *GroupManager) Close() {
m.wg.Wait()
}

func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
defer logutil.LogPanic()
defer m.wg.Done()
ticker := time.NewTicker(allocNodesToKeyspaceGroupsInterval)
Expand All @@ -158,7 +166,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
log.Info("start to alloc nodes to all keyspace groups")
for {
select {
case <-m.ctx.Done():
case <-ctx.Done():
log.Info("stop to alloc nodes to all keyspace groups")
return
case <-ticker.C:
Expand Down Expand Up @@ -338,11 +346,6 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro
Members: keyspaceGroup.Members,
Keyspaces: keyspaceGroup.Keyspaces,
}
if oldKG.IsSplitting() {
newKG.SplitState = &endpoint.SplitState{
SplitSource: oldKG.SplitState.SplitSource,
}
}
err = m.store.SaveKeyspaceGroup(txn, newKG)
if err != nil {
return err
Expand Down Expand Up @@ -380,6 +383,8 @@ func (m *GroupManager) getKeyspaceConfigByKindLocked(userKind endpoint.UserKind)
return config, nil
}

var failpointOnce sync.Once

// UpdateKeyspaceForGroup updates the keyspace field for the keyspace group.
func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupID string, keyspaceID uint32, mutation int) error {
// when server is not in API mode, we don't need to update the keyspace for keyspace group
Expand All @@ -391,6 +396,12 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI
return err
}

failpoint.Inject("externalAllocNode", func(val failpoint.Value) {
failpointOnce.Do(func() {
addrs := val.(string)
m.SetNodesForKeyspaceGroup(utils.DefaultKeyspaceGroupID, strings.Split(addrs, ","))
})
})
m.Lock()
defer m.Unlock()
return m.updateKeyspaceForGroupLocked(userKind, id, keyspaceID, mutation)
Expand Down Expand Up @@ -425,7 +436,6 @@ func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind,
if err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{kg}, true); err != nil {
return err
}

m.groups[userKind].Put(kg)
}
return nil
Expand Down Expand Up @@ -696,8 +706,10 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount
func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error {
m.Lock()
defer m.Unlock()
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err := m.store.LoadKeyspaceGroup(txn, id)
var kg *endpoint.KeyspaceGroup
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
var err error
kg, err = m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err
}
Expand All @@ -714,6 +726,11 @@ func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error
kg.Members = members
return m.store.SaveKeyspaceGroup(txn, kg)
})
if err != nil {
return err
}
m.groups[endpoint.StringUserKind(kg.UserKind)].Put(kg)
return nil
}

// IsExistNode checks if the node exists.
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() {
idAllocator := mockid.NewIDAllocator()
cluster := mockcluster.NewCluster(suite.ctx, config.NewTestOptions())
suite.kg = NewKeyspaceManager(suite.ctx, store, cluster, idAllocator, &mockConfig{}, suite.kgm)
suite.NoError(suite.kgm.Bootstrap())
suite.NoError(suite.kgm.Bootstrap(suite.ctx))
}

func (suite *keyspaceGroupTestSuite) TearDownTest() {
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (c *RaftCluster) Start(s Server) error {
}

if s.IsAPIServiceMode() {
err = c.keyspaceGroupManager.Bootstrap()
err = c.keyspaceGroupManager.Bootstrap(c.ctx)
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ type Config struct {
// Set this to 0 will disable TSO Proxy.
// Set this to the negative value to disable the limit.
MaxConcurrentTSOProxyStreamings int `toml:"max-concurrent-tso-proxy-streamings" json:"max-concurrent-tso-proxy-streamings"`
// TSOProxyClientRecvTimeout is the timeout for the TSO proxy to receive a tso request from a client via grpc TSO stream.
// TSOProxyRecvFromClientTimeout is the timeout for the TSO proxy to receive a tso request from a client via grpc TSO stream.
// After the timeout, the TSO proxy will close the grpc TSO stream.
TSOProxyClientRecvTimeout typeutil.Duration `toml:"tso-proxy-client-recv-timeout" json:"tso-proxy-client-recv-timeout"`
TSOProxyRecvFromClientTimeout typeutil.Duration `toml:"tso-proxy-recv-from-client-timeout" json:"tso-proxy-recv-from-client-timeout"`

// TSOSaveInterval is the interval to save timestamp.
TSOSaveInterval typeutil.Duration `toml:"tso-save-interval" json:"tso-save-interval"`
Expand Down Expand Up @@ -228,7 +228,7 @@ const (
defaultDRWaitStoreTimeout = time.Minute

defaultMaxConcurrentTSOProxyStreamings = 5000
defaultTSOProxyClientRecvTimeout = 1 * time.Hour
defaultTSOProxyRecvFromClientTimeout = 1 * time.Hour

defaultTSOSaveInterval = time.Duration(defaultLeaderLease) * time.Second
// defaultTSOUpdatePhysicalInterval is the default value of the config `TSOUpdatePhysicalInterval`.
Expand Down Expand Up @@ -458,7 +458,7 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
}

configutil.AdjustInt(&c.MaxConcurrentTSOProxyStreamings, defaultMaxConcurrentTSOProxyStreamings)
configutil.AdjustDuration(&c.TSOProxyClientRecvTimeout, defaultTSOProxyClientRecvTimeout)
configutil.AdjustDuration(&c.TSOProxyRecvFromClientTimeout, defaultTSOProxyRecvFromClientTimeout)

configutil.AdjustInt64(&c.LeaderLease, defaultLeaderLease)
configutil.AdjustDuration(&c.TSOSaveInterval, defaultTSOSaveInterval)
Expand Down Expand Up @@ -1266,9 +1266,9 @@ func (c *Config) GetMaxConcurrentTSOProxyStreamings() int {
return c.MaxConcurrentTSOProxyStreamings
}

// GetTSOProxyClientRecvTimeout returns the TSO proxy client receive timeout.
func (c *Config) GetTSOProxyClientRecvTimeout() time.Duration {
return c.TSOProxyClientRecvTimeout.Duration
// GetTSOProxyRecvFromClientTimeout returns timeout value for TSO proxy receiving from the client.
func (c *Config) GetTSOProxyRecvFromClientTimeout() time.Duration {
return c.TSOProxyRecvFromClientTimeout.Duration
}

// GetTSOUpdatePhysicalInterval returns TSO update physical interval.
Expand Down
40 changes: 23 additions & 17 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ var (
ErrNotFoundTSOAddr = status.Errorf(codes.NotFound, "not found tso address")
ErrForwardTSOTimeout = status.Errorf(codes.DeadlineExceeded, "forward tso request timeout")
ErrMaxCountTSOProxyRoutinesExceeded = status.Errorf(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded")
ErrTSOProxyClientRecvTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy client recv timeout. stream closed by server")
ErrTSOProxyRecvFromClientTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server")
)

// GrpcServer wraps Server to provide grpc service.
Expand Down Expand Up @@ -409,9 +409,6 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {
)
defer func() {
s.concurrentTSOProxyStreamings.Add(-1)
if forwardStream != nil {
forwardStream.CloseSend()
}
// cancel the forward stream
if cancel != nil {
cancel()
Expand All @@ -433,7 +430,7 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {
default:
}

request, err := server.Recv(s.GetTSOProxyClientRecvTimeout())
request, err := server.Recv(s.GetTSOProxyRecvFromClientTimeout())
if err == io.EOF {
return nil
}
Expand All @@ -450,9 +447,6 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {
return errors.WithStack(ErrNotFoundTSOAddr)
}
if forwardStream == nil || lastForwardedHost != forwardedHost {
if forwardStream != nil {
forwardStream.CloseSend()
}
if cancel != nil {
cancel()
}
Expand Down Expand Up @@ -548,6 +542,11 @@ func (s *GrpcServer) forwardTSORequestAsync(
DcLocation: request.GetDcLocation(),
}

failpoint.Inject("tsoProxySendToTSOTimeout", func() {
<-ctxTimeout.Done()
failpoint.Return()
})

if err := forwardStream.Send(tsopbReq); err != nil {
select {
case <-ctxTimeout.Done():
Expand All @@ -563,23 +562,21 @@ func (s *GrpcServer) forwardTSORequestAsync(
default:
}

failpoint.Inject("tsoProxyRecvFromTSOTimeout", func() {
<-ctxTimeout.Done()
failpoint.Return()
})

response, err := forwardStream.Recv()
if err != nil {
if strings.Contains(err.Error(), errs.NotLeaderErr) {
s.tsoPrimaryWatcher.ForceLoad()
}
select {
case <-ctxTimeout.Done():
return
case tsoRespCh <- &tsopbTSOResponse{err: err}:
}
return
}

select {
case <-ctxTimeout.Done():
return
case tsoRespCh <- &tsopbTSOResponse{response: response}:
case tsoRespCh <- &tsopbTSOResponse{response: response, err: err}:
}
}

Expand Down Expand Up @@ -607,6 +604,10 @@ func (s *tsoServer) Send(m *pdpb.TsoResponse) error {
done := make(chan error, 1)
go func() {
defer logutil.LogPanic()
failpoint.Inject("tsoProxyFailToSendToClient", func() {
done <- errors.New("injected error")
failpoint.Return()
})
done <- s.stream.Send(m)
}()
select {
Expand All @@ -625,6 +626,11 @@ func (s *tsoServer) Recv(timeout time.Duration) (*pdpb.TsoRequest, error) {
if atomic.LoadInt32(&s.closed) == 1 {
return nil, io.EOF
}
failpoint.Inject("tsoProxyRecvFromClientTimeout", func(val failpoint.Value) {
if customTimeoutInSeconds, ok := val.(int); ok {
timeout = time.Duration(customTimeoutInSeconds) * time.Second
}
})
requestCh := make(chan *pdpbTSORequest, 1)
go func() {
defer logutil.LogPanic()
Expand All @@ -640,7 +646,7 @@ func (s *tsoServer) Recv(timeout time.Duration) (*pdpb.TsoRequest, error) {
return req.request, nil
case <-time.After(timeout):
atomic.StoreInt32(&s.closed, 1)
return nil, ErrTSOProxyClientRecvTimeout
return nil, ErrTSOProxyRecvFromClientTimeout
}
}

Expand Down
6 changes: 3 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1844,9 +1844,9 @@ func (s *Server) GetMaxConcurrentTSOProxyStreamings() int {
return s.cfg.GetMaxConcurrentTSOProxyStreamings()
}

// GetTSOProxyClientRecvTimeout returns the TSO proxy client receive timeout.
func (s *Server) GetTSOProxyClientRecvTimeout() time.Duration {
return s.cfg.GetTSOProxyClientRecvTimeout()
// GetTSOProxyRecvFromClientTimeout returns timeout value for TSO proxy receiving from the client.
func (s *Server) GetTSOProxyRecvFromClientTimeout() time.Duration {
return s.cfg.GetTSOProxyRecvFromClientTimeout()
}

// GetLeaderLease returns the leader lease.
Expand Down
Loading

0 comments on commit 188d0d8

Please sign in to comment.