From 1ae3e2f38ca1b4d5f59e5cc06fc212f4766fab96 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Tue, 18 Apr 2023 17:27:08 -0700 Subject: [PATCH] Improve the server/client side logic Signed-off-by: Bin Shi --- client/tso_service_discovery.go | 274 ++++++++++++++++------------- pkg/mcs/tso/server/grpc_service.go | 30 ++-- pkg/tso/allocator_manager.go | 33 +++- pkg/tso/keyspace_group_manager.go | 10 +- 4 files changed, 196 insertions(+), 151 deletions(-) diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index a6e5efb28824..2e520e0d3488 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -36,42 +36,71 @@ import ( const ( msServiceRootPath = "/ms" tsoServiceName = "tso" - // tspSvcDiscoveryFormat defines the key prefix for keyspace group primary election. + // tsoSvcDiscoveryFormat defines the key prefix for keyspace group primary election. // The entire key is in the format of "/ms//tso//primary". // The is 5 digits integer with leading zeros. - tspSvcDiscoveryFormat = msServiceRootPath + "/%d/" + tsoServiceName + "/%05d/primary" - // tsoRPCTimeout is the timeout for TSO RPC requests. - tsoRPCTimeout = time.Second + tsoSvcDiscoveryFormat = msServiceRootPath + "/%d/" + tsoServiceName + "/%05d/primary" + // initRetryInterval is the rpc retry interval during the initialization phase. + initRetryInterval = time.Second + // tsoQueryRetryMaxTimes is the max retry times for querying TSO. + tsoQueryRetryMaxTimes = 10 + // tsoQueryRetryInterval is the retry interval for querying TSO. + tsoQueryRetryInterval = 500 * time.Millisecond ) var _ ServiceDiscovery = (*tsoServiceDiscovery)(nil) var _ tsoAllocatorEventSource = (*tsoServiceDiscovery)(nil) -type keyspaceGroup struct { +// keyspaceGroupSvcDiscovery is used for discovering the serving endpoints of the keyspace +// group to which the keyspace belongs +type keyspaceGroupSvcDiscovery struct { sync.RWMutex - id uint32 group *tsopb.KeyspaceGroup - // primaryAddr is the TSO primary address of this keyspace group + // primaryAddr is the primary serving address primaryAddr string - // secondaryAddrs are TSO secondary addresses of this keyspace group + // secondaryAddrs are TSO secondary serving addresses secondaryAddrs []string - // allAddrs are all TSO addresses of this keyspace group - allAddrs []string + // addrs are the primary/secondary serving addresses + addrs []string +} + +func (k *keyspaceGroupSvcDiscovery) update( + keyspaceGroup *tsopb.KeyspaceGroup, + newPrimaryAddr string, + secondaryAddrs, addrs []string, +) (oldPrimaryAddr string, primarySwitched bool) { + k.Lock() + defer k.Unlock() + + oldPrimaryAddr = k.primaryAddr + primarySwitched = !strings.EqualFold(oldPrimaryAddr, newPrimaryAddr) + + k.group = keyspaceGroup + k.primaryAddr = newPrimaryAddr + k.secondaryAddrs = secondaryAddrs + k.addrs = addrs + + return } // tsoServiceDiscovery is the service discovery client of the independent TSO service type tsoServiceDiscovery struct { + metacli MetaStorageClient clusterID uint64 keyspaceID uint32 + // apiSvcUrls is the API service addresses apiSvcUrls []string - // tsoSrvAddrs is the TSO server addresses - tsoSrvAddrs []string + // defaultDiscoveryKey is the etcd path used for discovering the serving endpoints of // the default keyspace group defaultDiscoveryKey string - metacli MetaStorageClient + // tsoSrvAddrs is the TSO server addresses + // TODO: dynamically update the TSO server addresses in the case of TSO server failover + // and scale-out/in. + tsoSrvAddrs []string - *keyspaceGroup + // keyspaceGroupSD is for discovering the serving endpoints of the keyspace group + keyspaceGroupSD *keyspaceGroupSvcDiscovery // addr -> a gRPC connection clientConns sync.Map // Store as map[string]*grpc.ClientConn @@ -114,13 +143,12 @@ func newTSOServiceDiscovery( } // Start with the default keyspace group. The actual keyspace group, to which the keyspace belongs, // will be discovered later. - c.keyspaceGroup = &keyspaceGroup{ - id: defaultKeySpaceGroupID, + c.keyspaceGroupSD = &keyspaceGroupSvcDiscovery{ primaryAddr: "", secondaryAddrs: make([]string, 0), - allAddrs: make([]string, 0), + addrs: make([]string, 0), } - c.defaultDiscoveryKey = fmt.Sprintf(tspSvcDiscoveryFormat, clusterID, defaultKeySpaceGroupID) + c.defaultDiscoveryKey = fmt.Sprintf(tsoSvcDiscoveryFormat, clusterID, defaultKeySpaceGroupID) log.Info("created tso service discovery", zap.Uint64("cluster-id", clusterID), @@ -132,7 +160,12 @@ func newTSOServiceDiscovery( // Init initialize the concrete client underlying func (c *tsoServiceDiscovery) Init() error { - if err := c.initRetry(c.updateMember); err != nil { + maxRetryTimes := c.option.maxRetryTimes + if err := c.retry(maxRetryTimes, initRetryInterval, c.updateTSOServers); err != nil { + c.cancel() + return err + } + if err := c.retry(maxRetryTimes, initRetryInterval, c.updateMember); err != nil { c.cancel() return err } @@ -141,16 +174,18 @@ func (c *tsoServiceDiscovery) Init() error { return nil } -func (c *tsoServiceDiscovery) initRetry(f func() error) error { +func (c *tsoServiceDiscovery) retry( + maxRetryTimes int, retryInterval time.Duration, f func() error, +) error { var err error - for i := 0; i < c.option.maxRetryTimes; i++ { + for i := 0; i < maxRetryTimes; i++ { if err = f(); err == nil { return nil } select { case <-c.ctx.Done(): return err - case <-time.After(time.Second): + case <-time.After(retryInterval): } } return errors.WithStack(err) @@ -188,7 +223,7 @@ func (c *tsoServiceDiscovery) startCheckMemberLoop() { log.Info("[tso] exit check member loop") return } - if err := c.updateMember(); err != nil { + if err := c.retry(tsoQueryRetryMaxTimes, tsoQueryRetryInterval, c.updateMember); err != nil { log.Error("[tso] failed to update member", errs.ZapError(err)) } } @@ -207,20 +242,20 @@ func (c *tsoServiceDiscovery) GetKeyspaceID() uint32 { // GetKeyspaceGroupID returns the ID of the keyspace group. If the keyspace group is unknown, // it returns the default keyspace group ID. func (c *tsoServiceDiscovery) GetKeyspaceGroupID() uint32 { - c.keyspaceGroup.RLock() - defer c.keyspaceGroup.RUnlock() - return c.keyspaceGroup.id + c.keyspaceGroupSD.RLock() + defer c.keyspaceGroupSD.RUnlock() + if c.keyspaceGroupSD.group == nil { + return defaultKeySpaceGroupID + } + return c.keyspaceGroupSD.group.Id } // GetURLs returns the URLs of the tso primary/secondary addresses of this keyspace group. // For testing use. It should only be called when the client is closed. func (c *tsoServiceDiscovery) GetURLs() []string { - c.keyspaceGroup.RLock() - defer c.keyspaceGroup.RUnlock() - if c.keyspaceGroup == nil { - return nil - } - return c.keyspaceGroup.allAddrs + c.keyspaceGroupSD.RLock() + defer c.keyspaceGroupSD.RUnlock() + return c.keyspaceGroupSD.addrs } // GetServingAddr returns the grpc client connection of the serving endpoint @@ -266,7 +301,7 @@ func (c *tsoServiceDiscovery) ScheduleCheckMemberChanged() { // Immediately check if there is any membership change among the primary/secondaries in // a primary/secondary configured cluster. func (c *tsoServiceDiscovery) CheckMemberChanged() error { - return c.updateMember() + return c.retry(tsoQueryRetryMaxTimes, tsoQueryRetryInterval, c.updateMember) } // AddServingAddrSwitchedCallback adds callbacks which will be called when the primary in @@ -297,122 +332,106 @@ func (c *tsoServiceDiscovery) SetTSOGlobalServAddrUpdatedCallback(callback tsoGl // getPrimaryAddr returns the primary address. func (c *tsoServiceDiscovery) getPrimaryAddr() string { - c.keyspaceGroup.RLock() - defer c.keyspaceGroup.RUnlock() - if c.keyspaceGroup == nil { - return "" - } - return c.keyspaceGroup.primaryAddr + c.keyspaceGroupSD.RLock() + defer c.keyspaceGroupSD.RUnlock() + return c.keyspaceGroupSD.primaryAddr } // getSecondaryAddrs returns the secondary addresses. func (c *tsoServiceDiscovery) getSecondaryAddrs() []string { - c.keyspaceGroup.RLock() - defer c.keyspaceGroup.RUnlock() - if c.keyspaceGroup == nil { - return nil - } - return c.keyspaceGroup.secondaryAddrs + c.keyspaceGroupSD.RLock() + defer c.keyspaceGroupSD.RUnlock() + return c.keyspaceGroupSD.secondaryAddrs } -func (c *tsoServiceDiscovery) switchPrimary(primaryAddr string) error { - oldPrimary := c.keyspaceGroup.primaryAddr - if primaryAddr == oldPrimary { - return nil - } - - if _, err := c.GetOrCreateGRPCConn(primaryAddr); err != nil { - log.Warn("[tso] failed to connect primary", - zap.String("new-primary", primaryAddr), errs.ZapError(err)) - return err - } - // Set PD primary and Global TSO Allocator (which is also the PD primary) - c.primaryAddr = primaryAddr +func (c *tsoServiceDiscovery) afterPrimarySwitched(oldPrimary, newPrimary string) error { // Run callbacks if c.globalAllocPrimariesUpdatedCb != nil { - if err := c.globalAllocPrimariesUpdatedCb(primaryAddr); err != nil { + if err := c.globalAllocPrimariesUpdatedCb(newPrimary); err != nil { return err } } log.Info("[tso] switch primary", - zap.String("new-primary", primaryAddr), + zap.String("new-primary", newPrimary), zap.String("old-primary", oldPrimary)) return nil } -func (c *tsoServiceDiscovery) updateMember() (err error) { - if len(c.tsoSrvAddrs) == 0 { - // TODO: discover all registered TSO servers instead of just the servers serving - // the default keyspace group. - c.tsoSrvAddrs, err = c.getDefaultGroupSvcAddrs() - if err != nil { - return err - } - if len(c.tsoSrvAddrs) == 0 { - return errors.New("no tso server address found") - } - } - - // Randomly choose a TSO server to query the keyspace group to which the keyspace belongs. - randIdx := rand.Intn(len(c.tsoSrvAddrs)) - kg, err := c.getKeyspaceGroup(c.tsoSrvAddrs[randIdx], defaultKeySpaceGroupID, tsoRPCTimeout) +func (c *tsoServiceDiscovery) updateTSOServers() error { + // TODO: discover all registered TSO servers instead of just the servers serving + // the default keyspace group. + tsoSrvAddrs, err := c.getDefaultGroupSvcAddrs() if err != nil { - if !strings.Contains(err.Error(), "Unimplemented") { - return err - } + return err + } + if len(tsoSrvAddrs) == 0 { + return errors.New("no tso server address found") + } + c.tsoSrvAddrs = tsoSrvAddrs + return nil +} - log.Warn("[tso] the server doesn't support the method tsopb.FindGroupByKeyspaceID") - - // TODO: it's a hack way to solve the compatibility issue just in case the server side - // doesn't support the method tsopb.FindGroupByKeyspaceID. We should remove this after - // all maintained version supports the method. - members := make([]*tsopb.KeyspaceGroupMember, 0, len(c.tsoSrvAddrs)) - if len(c.tsoSrvAddrs) > 0 { - members = append(members, &tsopb.KeyspaceGroupMember{ - Address: c.tsoSrvAddrs[0], - IsPrimary: true, - }) - } +func (c *tsoServiceDiscovery) updateMember() error { + var tsoServerAddr string - for _, addr := range c.tsoSrvAddrs[1:] { - members = append(members, &tsopb.KeyspaceGroupMember{ - Address: addr, - }) - } + // The keyspace membership or the primary serving address of the keyspace group, to which this + // keyspace belongs, might have been changed. We need to query tso servers to get the latest info. + // If there are known tso servers serving this keyspace, we start with them; otherwise, we start + // with any tso server. - kg = &tsopb.KeyspaceGroup{ - Id: defaultKeySpaceGroupID, - Members: members, - } + // If there are known tso servers serving this keyspace, we start by randomly choosing one of them. + c.keyspaceGroupSD.RLock() + kgAddrs := c.keyspaceGroupSD.addrs + if len(kgAddrs) > 0 { + tsoServerAddr = kgAddrs[rand.Intn(len(kgAddrs))] } - if kg == nil { - return errors.New("no keyspace group found") - } - if len(kg.Members) == 0 { - return errors.New("no keyspace group member found") + c.keyspaceGroupSD.RUnlock() + + // If there is no known tso server serving this keyspace, we start with any tso server randomly. + if len(tsoServerAddr) == 0 { + // The length of c.tsoSrvAddrs must be greater than 0 at this moment, otherwise the + // initialization failed. + tsoServerAddr = c.tsoSrvAddrs[rand.Intn(len(c.tsoSrvAddrs))] } - c.keyspaceGroup.Lock() - defer c.keyspaceGroup.Unlock() + // Query the keyspace group info from the tso server by the keyspace ID. The server side will return + // the info of the keyspace group to which this keyspace belongs. + keyspaceGroup, err := c.findGroupByKeyspaceID(c.keyspaceID, tsoServerAddr, updateMemberTimeout) + if err != nil { + return err + } - c.keyspaceGroup.group = kg - c.keyspaceGroup.id = kg.Id - c.keyspaceGroup.primaryAddr = "" - c.keyspaceGroup.secondaryAddrs = make([]string, 0) - c.keyspaceGroup.allAddrs = make([]string, 0, len(kg.Members)) - for _, m := range kg.Members { - c.keyspaceGroup.allAddrs = append(c.keyspaceGroup.allAddrs, m.Address) + // Initialize the all types of serving addresses from the returned keyspace group info. + primaryAddr := "" + secondaryAddrs := make([]string, 0) + addrs := make([]string, 0, len(keyspaceGroup.Members)) + for _, m := range keyspaceGroup.Members { + addrs = append(addrs, m.Address) if m.IsPrimary { - c.keyspaceGroup.primaryAddr = m.Address + primaryAddr = m.Address } else { - c.keyspaceGroup.secondaryAddrs = append(c.keyspaceGroup.secondaryAddrs, m.Address) + secondaryAddrs = append(secondaryAddrs, m.Address) } } - if len(c.keyspaceGroup.primaryAddr) == 0 { + if len(primaryAddr) == 0 { return errors.New("no primary address found") } - c.switchPrimary(c.keyspaceGroup.primaryAddr) + + if primarySwitched := !strings.EqualFold(primaryAddr, c.getPrimaryAddr()); primarySwitched { + if _, err := c.GetOrCreateGRPCConn(primaryAddr); err != nil { + log.Warn("[tso] failed to connect the next primary", + zap.String("next-primary", primaryAddr), errs.ZapError(err)) + return err + } + } + + oldPrimary, primarySwitched := c.keyspaceGroupSD.update(keyspaceGroup, primaryAddr, secondaryAddrs, addrs) + if primarySwitched { + if err := c.afterPrimarySwitched(oldPrimary, primaryAddr); err != nil { + return err + } + } return nil } @@ -439,30 +458,32 @@ func (c *tsoServiceDiscovery) getDefaultGroupSvcAddrs() ([]string, error) { } listenUrls := primary.GetListenUrls() if len(listenUrls) == 0 { - log.Error("[tso] the keyspace serving endpoint list is empty", zap.String("primary-key", c.defaultDiscoveryKey)) + log.Error("[tso] the keyspace serving endpoint list is empty", + zap.String("primary-key", c.defaultDiscoveryKey)) return nil, errs.ErrClientGetServingEndpoint } return listenUrls, nil } -func (c *tsoServiceDiscovery) getKeyspaceGroup( - url string, keyspaceGroupID uint32, timeout time.Duration, +func (c *tsoServiceDiscovery) findGroupByKeyspaceID( + keyspaceID uint32, tsoSrvAddr string, timeout time.Duration, ) (*tsopb.KeyspaceGroup, error) { ctx, cancel := context.WithTimeout(c.ctx, timeout) defer cancel() - cc, err := c.GetOrCreateGRPCConn(url) + cc, err := c.GetOrCreateGRPCConn(tsoSrvAddr) if err != nil { return nil, err } + resp, err := tsopb.NewTSOClient(cc).FindGroupByKeyspaceID( ctx, &tsopb.FindGroupByKeyspaceIDRequest{ Header: &tsopb.RequestHeader{ ClusterId: c.clusterID, - KeyspaceId: c.keyspaceID, - KeyspaceGroupId: keyspaceGroupID, + KeyspaceId: keyspaceID, + KeyspaceGroupId: defaultKeySpaceGroupID, }, - KeyspaceId: c.keyspaceID, + KeyspaceId: keyspaceID, }) if err != nil { @@ -477,8 +498,9 @@ func (c *tsoServiceDiscovery) getKeyspaceGroup( } if resp.KeyspaceGroup == nil { attachErr := errors.Errorf("error:%s target:%s status:%s", - "empty keyspace group", cc.Target(), cc.GetState().String()) + "no keyspace group found", cc.Target(), cc.GetState().String()) return nil, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause() } + return resp.KeyspaceGroup, nil } diff --git a/pkg/mcs/tso/server/grpc_service.go b/pkg/mcs/tso/server/grpc_service.go index c10b39317613..ea8368bfbc10 100644 --- a/pkg/mcs/tso/server/grpc_service.go +++ b/pkg/mcs/tso/server/grpc_service.go @@ -18,6 +18,7 @@ import ( "context" "io" "net/http" + "strings" "time" "github.com/pingcap/kvproto/pkg/tsopb" @@ -157,33 +158,34 @@ func (s *Service) FindGroupByKeyspaceID( ctx context.Context, request *tsopb.FindGroupByKeyspaceIDRequest, ) (*tsopb.FindGroupByKeyspaceIDResponse, error) { keyspaceID := request.GetKeyspaceId() - curKeyspaceGroup, curKeyspaceGroupID, err := s.keyspaceGroupManager.FindGroupByKeyspaceID(keyspaceID) + am, keyspaceGroup, keyspaceGroupID, err := s.keyspaceGroupManager.FindGroupByKeyspaceID(keyspaceID) if err != nil { return &tsopb.FindGroupByKeyspaceIDResponse{ - Header: s.wrapErrorToHeader(tsopb.ErrorType_UNKNOWN, err.Error(), curKeyspaceGroupID), + Header: s.wrapErrorToHeader(tsopb.ErrorType_UNKNOWN, err.Error(), keyspaceGroupID), }, nil } - if curKeyspaceGroup == nil { + if keyspaceGroup == nil { return &tsopb.FindGroupByKeyspaceIDResponse{ Header: s.wrapErrorToHeader( - tsopb.ErrorType_UNKNOWN, "keyspace group not found", curKeyspaceGroupID), + tsopb.ErrorType_UNKNOWN, "keyspace group not found", keyspaceGroupID), }, nil } - var respMembers []*tsopb.KeyspaceGroupMember - for _, member := range curKeyspaceGroup.Members { - respMembers = append(respMembers, &tsopb.KeyspaceGroupMember{ - Address: member.Address, + var members []*tsopb.KeyspaceGroupMember + for _, member := range keyspaceGroup.Members { + members = append(members, &tsopb.KeyspaceGroupMember{ + Address: member.Address, + IsPrimary: strings.EqualFold(member.Address, am.GetLeaderAddr()), }) } return &tsopb.FindGroupByKeyspaceIDResponse{ - Header: s.header(curKeyspaceGroupID), + Header: s.header(keyspaceGroupID), KeyspaceGroup: &tsopb.KeyspaceGroup{ - Id: curKeyspaceGroupID, - UserKind: curKeyspaceGroup.UserKind, - InSplit: curKeyspaceGroup.InSplit, - SplitFrom: curKeyspaceGroup.SplitFrom, - Members: respMembers, + Id: keyspaceGroupID, + UserKind: keyspaceGroup.UserKind, + InSplit: keyspaceGroup.InSplit, + SplitFrom: keyspaceGroup.SplitFrom, + Members: members, }, }, nil } diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 2c98fcb5988b..0c905293c2bc 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -128,12 +128,13 @@ type ElectionMember interface { // Basically it will reset the leader lease and unset leader info. ResetLeader() // GetLeaderListenUrls returns current leader's listen urls + // The first element is the leader/primary url GetLeaderListenUrls() []string // GetLeaderID returns current leader's member ID. GetLeaderID() uint64 // GetLeaderPath returns the path of the leader. GetLeaderPath() string - // GetLeadership returns the leadership of the PD member. + // GetLeadership returns the leadership of the election member. GetLeadership() *election.Leadership // GetDCLocationPathPrefix returns the dc-location path prefix of the cluster. GetDCLocationPathPrefix() string @@ -812,7 +813,7 @@ func (am *AllocatorManager) ClusterDCLocationChecker() { } } // Only leader can write the TSO suffix to etcd in order to make it consistent in the cluster - if am.member.IsLeader() { + if am.IsLeader() { for dcLocation, info := range am.mu.clusterDCLocations { if info.Suffix > 0 { continue @@ -1186,7 +1187,7 @@ func (am *AllocatorManager) getOrCreateGRPCConn(ctx context.Context, addr string } func (am *AllocatorManager) getDCLocationInfoFromLeader(ctx context.Context, dcLocation string) (bool, *pdpb.GetDCLocationInfoResponse, error) { - if am.member.IsLeader() { + if am.IsLeader() { info, ok := am.GetDCLocationInfo(dcLocation) if !ok { return false, &pdpb.GetDCLocationInfoResponse{}, nil @@ -1199,11 +1200,11 @@ func (am *AllocatorManager) getDCLocationInfoFromLeader(ctx context.Context, dcL return ok, dcLocationInfo, nil } - leaderAddrs := am.member.GetLeaderListenUrls() - if leaderAddrs == nil || len(leaderAddrs) < 1 { + leaderAddr := am.GetLeaderAddr() + if len(leaderAddr) < 1 { return false, &pdpb.GetDCLocationInfoResponse{}, fmt.Errorf("failed to get leader client url") } - conn, err := am.getOrCreateGRPCConn(ctx, leaderAddrs[0]) + conn, err := am.getOrCreateGRPCConn(ctx, leaderAddr) if err != nil { return false, &pdpb.GetDCLocationInfoResponse{}, err } @@ -1310,3 +1311,23 @@ func (am *AllocatorManager) nextLeaderKey(dcLocation string) string { func (am *AllocatorManager) EnableLocalTSO() bool { return am.enableLocalTSO } + +// IsLeader returns whether the current member is the leader in the election group. +func (am *AllocatorManager) IsLeader() bool { + if am == nil || am.member == nil || !am.member.IsLeader() { + return false + } + return true +} + +// GetLeaderAddr returns the address of leader in the election group. +func (am *AllocatorManager) GetLeaderAddr() string { + if am == nil || am.member == nil { + return "" + } + leaderAddrs := am.member.GetLeaderListenUrls() + if len(leaderAddrs) < 1 { + return "" + } + return leaderAddrs[0] +} diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index e37884cc7782..a41de4bd107e 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -649,7 +649,7 @@ func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) { if groupID == mcsutils.DefaultKeyspaceGroupID { log.Info("removed default keyspace group meta config from the storage. " + - "now every tso node/pod will initialize it") + "now every tso node/pod will initialize it") group := &endpoint.KeyspaceGroup{ ID: mcsutils.DefaultKeyspaceGroupID, Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}}, @@ -719,13 +719,13 @@ func (kgm *KeyspaceGroupManager) GetAllocatorManager(keyspaceGroupID uint32) (*A // FindGroupByKeyspaceID returns the keyspace group that contains the keyspace with the given ID. func (kgm *KeyspaceGroupManager) FindGroupByKeyspaceID( keyspaceID uint32, -) (*endpoint.KeyspaceGroup, uint32, error) { - _, curKeyspaceGroup, curKeyspaceGroupID, err := +) (*AllocatorManager, *endpoint.KeyspaceGroup, uint32, error) { + curAM, curKeyspaceGroup, curKeyspaceGroupID, err := kgm.getKeyspaceGroupMetaWithCheck(keyspaceID, mcsutils.DefaultKeyspaceGroupID) if err != nil { - return nil, curKeyspaceGroupID, err + return nil, nil, curKeyspaceGroupID, err } - return curKeyspaceGroup, curKeyspaceGroupID, nil + return curAM, curKeyspaceGroup, curKeyspaceGroupID, nil } // GetElectionMember returns the election member of the given keyspace group