Skip to content

Commit

Permalink
mcs, tso: fix ts fallback caused by multi-primary of the same keyspac…
Browse files Browse the repository at this point in the history
…e group (#6362)

* Change participant election-prifix from listen-addr to advertise-listen-addr to gurantee uniqueness.

Signed-off-by: Bin Shi <[email protected]>

* Add TestPariticipantStartWithAdvertiseListenAddr

Signed-off-by: Bin Shi <[email protected]>

* Add comments to fix go fmt errors

Signed-off-by: Bin Shi <[email protected]>

---------

Signed-off-by: Bin Shi <[email protected]>
Co-authored-by: Ryan Leung <[email protected]>
  • Loading branch information
binshi-bing and rleungx committed Apr 23, 2023
1 parent 41a9581 commit dbec11e
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func (s *Server) startServer() (err error) {
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.listenURL.Host, legacySvcRootPath, tsoSvcRootPath, s.cfg)
s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr, legacySvcRootPath, tsoSvcRootPath, s.cfg)
if err := s.keyspaceGroupManager.Initialize(); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/mcs/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ func (tc *TestTSOCluster) AddServer(addr string) error {
if err != nil {
return err
}
err = initLogger(generatedCfg)
err = InitLogger(generatedCfg)
if err != nil {
return err
}
server, cleanup, err := newTSOTestServer(tc.ctx, generatedCfg)
server, cleanup, err := NewTSOTestServer(tc.ctx, generatedCfg)
if err != nil {
return err
}
Expand Down
10 changes: 6 additions & 4 deletions tests/integrations/mcs/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import (

var once sync.Once

func initLogger(cfg *tso.Config) (err error) {
// InitLogger initializes the logger for test.
func InitLogger(cfg *tso.Config) (err error) {
once.Do(func() {
// Setup the logger.
err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
Expand Down Expand Up @@ -88,10 +89,10 @@ func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backe
re.NoError(err)

// Setup the logger.
err = initLogger(cfg)
err = InitLogger(cfg)
re.NoError(err)

s, cleanup, err := newTSOTestServer(ctx, cfg)
s, cleanup, err := NewTSOTestServer(ctx, cfg)
re.NoError(err)
testutil.Eventually(re, func() bool {
return !s.IsClosed()
Expand All @@ -100,7 +101,8 @@ func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backe
return s, cleanup
}

func newTSOTestServer(ctx context.Context, cfg *tso.Config) (*tso.Server, testutil.CleanupFunc, error) {
// NewTSOTestServer creates a tso server with given config for testing.
func NewTSOTestServer(ctx context.Context, cfg *tso.Config) (*tso.Server, testutil.CleanupFunc, error) {
s := tso.CreateServer(ctx, cfg)
if err := s.Run(); err != nil {
return nil, nil, err
Expand Down
26 changes: 26 additions & 0 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,32 @@ func (suite *tsoServerTestSuite) TestTSOServerStartAndStopNormally() {
}
}

func (suite *tsoServerTestSuite) TestPariticipantStartWithAdvertiseListenAddr() {
re := suite.Require()

cfg := tso.NewConfig()
cfg.BackendEndpoints = suite.backendEndpoints
cfg.ListenAddr = tempurl.Alloc()
cfg.AdvertiseListenAddr = tempurl.Alloc()
cfg, err := tso.GenerateConfig(cfg)
re.NoError(err)

// Setup the logger.
err = mcs.InitLogger(cfg)
re.NoError(err)

s, cleanup, err := mcs.NewTSOTestServer(suite.ctx, cfg)
re.NoError(err)
defer cleanup()
testutil.Eventually(re, func() bool {
return s.IsServing()
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

member, err := s.GetMember(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.Equal(fmt.Sprintf("%s-%05d", cfg.AdvertiseListenAddr, utils.DefaultKeyspaceGroupID), member.Name())
}

func TestTSOPath(t *testing.T) {
re := require.New(t)
checkTSOPath(re, true /*isAPIServiceMode*/)
Expand Down

0 comments on commit dbec11e

Please sign in to comment.