From dbec11e4dc7de4e246de9353a092f2239968ec4f Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Sun, 23 Apr 2023 02:50:44 -0700 Subject: [PATCH] mcs, tso: fix ts fallback caused by multi-primary of the same keyspace group (#6362) * Change participant election-prifix from listen-addr to advertise-listen-addr to gurantee uniqueness. Signed-off-by: Bin Shi * Add TestPariticipantStartWithAdvertiseListenAddr Signed-off-by: Bin Shi * Add comments to fix go fmt errors Signed-off-by: Bin Shi --------- Signed-off-by: Bin Shi Co-authored-by: Ryan Leung --- pkg/mcs/tso/server/server.go | 2 +- tests/integrations/mcs/cluster.go | 4 ++-- tests/integrations/mcs/testutil.go | 10 +++++---- tests/integrations/mcs/tso/server_test.go | 26 +++++++++++++++++++++++ 4 files changed, 35 insertions(+), 7 deletions(-) diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index be4f2be00a5..36207ebf4e0 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -467,7 +467,7 @@ func (s *Server) startServer() (err error) { tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID) s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} s.keyspaceGroupManager = tso.NewKeyspaceGroupManager( - s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.listenURL.Host, legacySvcRootPath, tsoSvcRootPath, s.cfg) + s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr, legacySvcRootPath, tsoSvcRootPath, s.cfg) if err := s.keyspaceGroupManager.Initialize(); err != nil { return err } diff --git a/tests/integrations/mcs/cluster.go b/tests/integrations/mcs/cluster.go index e0802b9f974..228f506454d 100644 --- a/tests/integrations/mcs/cluster.go +++ b/tests/integrations/mcs/cluster.go @@ -62,11 +62,11 @@ func (tc *TestTSOCluster) AddServer(addr string) error { if err != nil { return err } - err = initLogger(generatedCfg) + err = InitLogger(generatedCfg) if err != nil { return err } - server, cleanup, err := newTSOTestServer(tc.ctx, generatedCfg) + server, cleanup, err := NewTSOTestServer(tc.ctx, generatedCfg) if err != nil { return err } diff --git a/tests/integrations/mcs/testutil.go b/tests/integrations/mcs/testutil.go index 7a006289fec..da23d2fe7f2 100644 --- a/tests/integrations/mcs/testutil.go +++ b/tests/integrations/mcs/testutil.go @@ -34,7 +34,8 @@ import ( var once sync.Once -func initLogger(cfg *tso.Config) (err error) { +// InitLogger initializes the logger for test. +func InitLogger(cfg *tso.Config) (err error) { once.Do(func() { // Setup the logger. err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) @@ -88,10 +89,10 @@ func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backe re.NoError(err) // Setup the logger. - err = initLogger(cfg) + err = InitLogger(cfg) re.NoError(err) - s, cleanup, err := newTSOTestServer(ctx, cfg) + s, cleanup, err := NewTSOTestServer(ctx, cfg) re.NoError(err) testutil.Eventually(re, func() bool { return !s.IsClosed() @@ -100,7 +101,8 @@ func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backe return s, cleanup } -func newTSOTestServer(ctx context.Context, cfg *tso.Config) (*tso.Server, testutil.CleanupFunc, error) { +// NewTSOTestServer creates a tso server with given config for testing. +func NewTSOTestServer(ctx context.Context, cfg *tso.Config) (*tso.Server, testutil.CleanupFunc, error) { s := tso.CreateServer(ctx, cfg) if err := s.Run(); err != nil { return nil, nil, err diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 740de03e718..64c59685d51 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -123,6 +123,32 @@ func (suite *tsoServerTestSuite) TestTSOServerStartAndStopNormally() { } } +func (suite *tsoServerTestSuite) TestPariticipantStartWithAdvertiseListenAddr() { + re := suite.Require() + + cfg := tso.NewConfig() + cfg.BackendEndpoints = suite.backendEndpoints + cfg.ListenAddr = tempurl.Alloc() + cfg.AdvertiseListenAddr = tempurl.Alloc() + cfg, err := tso.GenerateConfig(cfg) + re.NoError(err) + + // Setup the logger. + err = mcs.InitLogger(cfg) + re.NoError(err) + + s, cleanup, err := mcs.NewTSOTestServer(suite.ctx, cfg) + re.NoError(err) + defer cleanup() + testutil.Eventually(re, func() bool { + return s.IsServing() + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + member, err := s.GetMember(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID) + re.NoError(err) + re.Equal(fmt.Sprintf("%s-%05d", cfg.AdvertiseListenAddr, utils.DefaultKeyspaceGroupID), member.Name()) +} + func TestTSOPath(t *testing.T) { re := require.New(t) checkTSOPath(re, true /*isAPIServiceMode*/)