diff --git a/cluster/cluster.go b/cluster/cluster.go index 51364707..5767f4df 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -24,8 +24,10 @@ import ( "context" "fmt" "sync" + "time" "github.com/lonng/nano/cluster/clusterpb" + "github.com/lonng/nano/internal/env" "github.com/lonng/nano/internal/log" ) @@ -42,7 +44,11 @@ type cluster struct { } func newCluster(currentNode *Node) *cluster { - return &cluster{currentNode: currentNode} + c := &cluster{currentNode: currentNode} + if currentNode.IsMaster { + c.checkMemberHeartbeat() + } + return c } // Register implements the MasterServer gRPC service @@ -50,7 +56,6 @@ func (c *cluster) Register(_ context.Context, req *clusterpb.RegisterRequest) (* if req.MemberInfo == nil { return nil, ErrInvalidRegisterReq } - resp := &clusterpb.RegisterResponse{} c.mu.Lock() for k, m := range c.members { @@ -90,12 +95,12 @@ func (c *cluster) Register(_ context.Context, req *clusterpb.RegisterRequest) (* // Register services to current node c.currentNode.handler.addRemoteService(req.MemberInfo) c.mu.Lock() - c.members = append(c.members, &Member{isMaster: false, memberInfo: req.MemberInfo}) + c.members = append(c.members, &Member{isMaster: false, memberInfo: req.MemberInfo, lastHeartbeatAt: time.Now()}) c.mu.Unlock() return resp, nil } -// Register implements the MasterServer gRPC service +// Unregister implements the MasterServer gRPC service func (c *cluster) Unregister(_ context.Context, req *clusterpb.UnregisterRequest) (*clusterpb.UnregisterResponse, error) { if req.ServiceAddr == "" { return nil, ErrInvalidRegisterReq @@ -110,12 +115,17 @@ func (c *cluster) Unregister(_ context.Context, req *clusterpb.UnregisterRequest } } if index < 0 { - return nil, fmt.Errorf("address %s has notregistered", req.ServiceAddr) + return nil, fmt.Errorf("address %s has not registered", req.ServiceAddr) } // Notify registered node to update remote services delMember := &clusterpb.DelMemberRequest{ServiceAddr: req.ServiceAddr} - for _, m := range c.members { + for i, m := range c.members { + if i == index { + // this node is down. + continue + } + if m.MemberInfo().ServiceAddr == c.currentNode.ServiceAddr { continue } @@ -132,6 +142,10 @@ func (c *cluster) Unregister(_ context.Context, req *clusterpb.UnregisterRequest log.Println("Exists peer unregister to cluster", req.ServiceAddr) + if c.currentNode.UnregisterCallback != nil { + c.currentNode.UnregisterCallback(*c.members[index]) + } + // Register services to current node c.currentNode.handler.delMember(req.ServiceAddr) c.mu.Lock() @@ -141,9 +155,69 @@ func (c *cluster) Unregister(_ context.Context, req *clusterpb.UnregisterRequest c.members = append(c.members[:index], c.members[index+1:]...) } c.mu.Unlock() + return resp, nil } +func (c *cluster) Heartbeat(_ context.Context, req *clusterpb.HeartbeatRequest) (*clusterpb.HeartbeatResponse, error) { + c.mu.Lock() + defer c.mu.Unlock() + + isHit := false + for i, m := range c.members { + if m.MemberInfo().GetServiceAddr() == req.GetMemberInfo().GetServiceAddr() { + c.members[i].lastHeartbeatAt = time.Now() + isHit = true + } + } + if !isHit { + // master local not binding this node, other members do not need to be notified, because this node registered. + // maybe the master process reload + m := &Member{ + isMaster: false, + memberInfo: req.GetMemberInfo(), + lastHeartbeatAt: time.Now(), + } + c.members = append(c.members, m) + c.currentNode.handler.addRemoteService(req.MemberInfo) + log.Println("Heartbeat peer register to cluster", req.MemberInfo.ServiceAddr) + } + return &clusterpb.HeartbeatResponse{}, nil +} + +func (c *cluster) checkMemberHeartbeat() { + check := func() { + unregisterMembers := make([]*Member, 0) + // check heartbeat time + for _, m := range c.members { + if time.Now().Sub(m.lastHeartbeatAt) > 4*env.Heartbeat && !m.isMaster { + unregisterMembers = append(unregisterMembers, m) + } + } + + for _, m := range unregisterMembers { + if _, err := c.Unregister(context.Background(), &clusterpb.UnregisterRequest{ + ServiceAddr: m.MemberInfo().ServiceAddr, + }); err != nil { + log.Println("Heartbeat unregister error", err) + } + } + } + go func() { + ticker := time.NewTicker(env.Heartbeat) + for { + select { + case <-ticker.C: + if !c.currentNode.IsMaster { + ticker.Stop() + return + } + check() + } + } + }() +} + func (c *cluster) setRpcClient(client *rpcClient) { c.rpcClient = client } diff --git a/cluster/clusterpb/cluster.pb.go b/cluster/clusterpb/cluster.pb.go index 766676df..71f1023e 100644 --- a/cluster/clusterpb/cluster.pb.go +++ b/cluster/clusterpb/cluster.pb.go @@ -262,6 +262,91 @@ func (*UnregisterResponse) Descriptor() ([]byte, []int) { return file_cluster_proto_rawDescGZIP(), []int{4} } +type HeartbeatRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + MemberInfo *MemberInfo `protobuf:"bytes,1,opt,name=memberInfo,proto3" json:"memberInfo,omitempty"` +} + +func (x *HeartbeatRequest) Reset() { + *x = HeartbeatRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HeartbeatRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HeartbeatRequest) ProtoMessage() {} + +func (x *HeartbeatRequest) ProtoReflect() protoreflect.Message { + mi := &file_cluster_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HeartbeatRequest.ProtoReflect.Descriptor instead. +func (*HeartbeatRequest) Descriptor() ([]byte, []int) { + return file_cluster_proto_rawDescGZIP(), []int{5} +} + +func (x *HeartbeatRequest) GetMemberInfo() *MemberInfo { + if x != nil { + return x.MemberInfo + } + return nil +} + +type HeartbeatResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *HeartbeatResponse) Reset() { + *x = HeartbeatResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HeartbeatResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HeartbeatResponse) ProtoMessage() {} + +func (x *HeartbeatResponse) ProtoReflect() protoreflect.Message { + mi := &file_cluster_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HeartbeatResponse.ProtoReflect.Descriptor instead. +func (*HeartbeatResponse) Descriptor() ([]byte, []int) { + return file_cluster_proto_rawDescGZIP(), []int{6} +} + type RequestMessage struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -277,7 +362,7 @@ type RequestMessage struct { func (x *RequestMessage) Reset() { *x = RequestMessage{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[5] + mi := &file_cluster_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -290,7 +375,7 @@ func (x *RequestMessage) String() string { func (*RequestMessage) ProtoMessage() {} func (x *RequestMessage) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[5] + mi := &file_cluster_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -303,7 +388,7 @@ func (x *RequestMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use RequestMessage.ProtoReflect.Descriptor instead. func (*RequestMessage) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{5} + return file_cluster_proto_rawDescGZIP(), []int{7} } func (x *RequestMessage) GetGateAddr() string { @@ -355,7 +440,7 @@ type NotifyMessage struct { func (x *NotifyMessage) Reset() { *x = NotifyMessage{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[6] + mi := &file_cluster_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -368,7 +453,7 @@ func (x *NotifyMessage) String() string { func (*NotifyMessage) ProtoMessage() {} func (x *NotifyMessage) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[6] + mi := &file_cluster_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -381,7 +466,7 @@ func (x *NotifyMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use NotifyMessage.ProtoReflect.Descriptor instead. func (*NotifyMessage) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{6} + return file_cluster_proto_rawDescGZIP(), []int{8} } func (x *NotifyMessage) GetGateAddr() string { @@ -425,7 +510,7 @@ type ResponseMessage struct { func (x *ResponseMessage) Reset() { *x = ResponseMessage{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[7] + mi := &file_cluster_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -438,7 +523,7 @@ func (x *ResponseMessage) String() string { func (*ResponseMessage) ProtoMessage() {} func (x *ResponseMessage) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[7] + mi := &file_cluster_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -451,7 +536,7 @@ func (x *ResponseMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use ResponseMessage.ProtoReflect.Descriptor instead. func (*ResponseMessage) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{7} + return file_cluster_proto_rawDescGZIP(), []int{9} } func (x *ResponseMessage) GetSessionId() int64 { @@ -488,7 +573,7 @@ type PushMessage struct { func (x *PushMessage) Reset() { *x = PushMessage{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[8] + mi := &file_cluster_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -501,7 +586,7 @@ func (x *PushMessage) String() string { func (*PushMessage) ProtoMessage() {} func (x *PushMessage) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[8] + mi := &file_cluster_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -514,7 +599,7 @@ func (x *PushMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use PushMessage.ProtoReflect.Descriptor instead. func (*PushMessage) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{8} + return file_cluster_proto_rawDescGZIP(), []int{10} } func (x *PushMessage) GetSessionId() int64 { @@ -547,7 +632,7 @@ type MemberHandleResponse struct { func (x *MemberHandleResponse) Reset() { *x = MemberHandleResponse{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[9] + mi := &file_cluster_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -560,7 +645,7 @@ func (x *MemberHandleResponse) String() string { func (*MemberHandleResponse) ProtoMessage() {} func (x *MemberHandleResponse) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[9] + mi := &file_cluster_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -573,7 +658,7 @@ func (x *MemberHandleResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use MemberHandleResponse.ProtoReflect.Descriptor instead. func (*MemberHandleResponse) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{9} + return file_cluster_proto_rawDescGZIP(), []int{11} } type NewMemberRequest struct { @@ -587,7 +672,7 @@ type NewMemberRequest struct { func (x *NewMemberRequest) Reset() { *x = NewMemberRequest{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[10] + mi := &file_cluster_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -600,7 +685,7 @@ func (x *NewMemberRequest) String() string { func (*NewMemberRequest) ProtoMessage() {} func (x *NewMemberRequest) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[10] + mi := &file_cluster_proto_msgTypes[12] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -613,7 +698,7 @@ func (x *NewMemberRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use NewMemberRequest.ProtoReflect.Descriptor instead. func (*NewMemberRequest) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{10} + return file_cluster_proto_rawDescGZIP(), []int{12} } func (x *NewMemberRequest) GetMemberInfo() *MemberInfo { @@ -632,7 +717,7 @@ type NewMemberResponse struct { func (x *NewMemberResponse) Reset() { *x = NewMemberResponse{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[11] + mi := &file_cluster_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -645,7 +730,7 @@ func (x *NewMemberResponse) String() string { func (*NewMemberResponse) ProtoMessage() {} func (x *NewMemberResponse) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[11] + mi := &file_cluster_proto_msgTypes[13] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -658,7 +743,7 @@ func (x *NewMemberResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use NewMemberResponse.ProtoReflect.Descriptor instead. func (*NewMemberResponse) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{11} + return file_cluster_proto_rawDescGZIP(), []int{13} } type DelMemberRequest struct { @@ -672,7 +757,7 @@ type DelMemberRequest struct { func (x *DelMemberRequest) Reset() { *x = DelMemberRequest{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[12] + mi := &file_cluster_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -685,7 +770,7 @@ func (x *DelMemberRequest) String() string { func (*DelMemberRequest) ProtoMessage() {} func (x *DelMemberRequest) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[12] + mi := &file_cluster_proto_msgTypes[14] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -698,7 +783,7 @@ func (x *DelMemberRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use DelMemberRequest.ProtoReflect.Descriptor instead. func (*DelMemberRequest) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{12} + return file_cluster_proto_rawDescGZIP(), []int{14} } func (x *DelMemberRequest) GetServiceAddr() string { @@ -717,7 +802,7 @@ type DelMemberResponse struct { func (x *DelMemberResponse) Reset() { *x = DelMemberResponse{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[13] + mi := &file_cluster_proto_msgTypes[15] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -730,7 +815,7 @@ func (x *DelMemberResponse) String() string { func (*DelMemberResponse) ProtoMessage() {} func (x *DelMemberResponse) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[13] + mi := &file_cluster_proto_msgTypes[15] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -743,7 +828,7 @@ func (x *DelMemberResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use DelMemberResponse.ProtoReflect.Descriptor instead. func (*DelMemberResponse) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{13} + return file_cluster_proto_rawDescGZIP(), []int{15} } type SessionClosedRequest struct { @@ -757,7 +842,7 @@ type SessionClosedRequest struct { func (x *SessionClosedRequest) Reset() { *x = SessionClosedRequest{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[14] + mi := &file_cluster_proto_msgTypes[16] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -770,7 +855,7 @@ func (x *SessionClosedRequest) String() string { func (*SessionClosedRequest) ProtoMessage() {} func (x *SessionClosedRequest) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[14] + mi := &file_cluster_proto_msgTypes[16] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -783,7 +868,7 @@ func (x *SessionClosedRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use SessionClosedRequest.ProtoReflect.Descriptor instead. func (*SessionClosedRequest) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{14} + return file_cluster_proto_rawDescGZIP(), []int{16} } func (x *SessionClosedRequest) GetSessionId() int64 { @@ -802,7 +887,7 @@ type SessionClosedResponse struct { func (x *SessionClosedResponse) Reset() { *x = SessionClosedResponse{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[15] + mi := &file_cluster_proto_msgTypes[17] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -815,7 +900,7 @@ func (x *SessionClosedResponse) String() string { func (*SessionClosedResponse) ProtoMessage() {} func (x *SessionClosedResponse) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[15] + mi := &file_cluster_proto_msgTypes[17] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -828,7 +913,7 @@ func (x *SessionClosedResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use SessionClosedResponse.ProtoReflect.Descriptor instead. func (*SessionClosedResponse) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{15} + return file_cluster_proto_rawDescGZIP(), []int{17} } type CloseSessionRequest struct { @@ -842,7 +927,7 @@ type CloseSessionRequest struct { func (x *CloseSessionRequest) Reset() { *x = CloseSessionRequest{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[16] + mi := &file_cluster_proto_msgTypes[18] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -855,7 +940,7 @@ func (x *CloseSessionRequest) String() string { func (*CloseSessionRequest) ProtoMessage() {} func (x *CloseSessionRequest) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[16] + mi := &file_cluster_proto_msgTypes[18] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -868,7 +953,7 @@ func (x *CloseSessionRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use CloseSessionRequest.ProtoReflect.Descriptor instead. func (*CloseSessionRequest) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{16} + return file_cluster_proto_rawDescGZIP(), []int{18} } func (x *CloseSessionRequest) GetSessionId() int64 { @@ -887,7 +972,7 @@ type CloseSessionResponse struct { func (x *CloseSessionResponse) Reset() { *x = CloseSessionResponse{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[17] + mi := &file_cluster_proto_msgTypes[19] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -900,7 +985,7 @@ func (x *CloseSessionResponse) String() string { func (*CloseSessionResponse) ProtoMessage() {} func (x *CloseSessionResponse) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[17] + mi := &file_cluster_proto_msgTypes[19] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -913,7 +998,7 @@ func (x *CloseSessionResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use CloseSessionResponse.ProtoReflect.Descriptor instead. func (*CloseSessionResponse) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{17} + return file_cluster_proto_rawDescGZIP(), []int{19} } var File_cluster_proto protoreflect.FileDescriptor @@ -940,6 +1025,12 @@ var file_cluster_proto_rawDesc = []byte{ 0x12, 0x20, 0x0a, 0x0b, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x41, 0x64, 0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x41, 0x64, 0x64, 0x72, 0x22, 0x14, 0x0a, 0x12, 0x55, 0x6e, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x49, 0x0a, 0x10, 0x48, 0x65, 0x61, 0x72, + 0x74, 0x62, 0x65, 0x61, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x35, 0x0a, 0x0a, + 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x15, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x6d, + 0x62, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0a, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x49, + 0x6e, 0x66, 0x6f, 0x22, 0x13, 0x0a, 0x11, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x84, 0x01, 0x0a, 0x0e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x67, 0x61, 0x74, 0x65, 0x41, 0x64, 0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x67, @@ -989,7 +1080,7 @@ var file_cluster_proto_rawDesc = []byte{ 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0x16, 0x0a, 0x14, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, - 0x9c, 0x01, 0x0a, 0x06, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x12, 0x45, 0x0a, 0x08, 0x52, 0x65, + 0xe6, 0x01, 0x0a, 0x06, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x12, 0x45, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x1a, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, @@ -998,49 +1089,53 @@ var file_cluster_proto_rawDesc = []byte{ 0x1c, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x55, 0x6e, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x55, 0x6e, 0x72, 0x65, 0x67, 0x69, - 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x32, 0xfb, - 0x04, 0x0a, 0x06, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x4d, 0x0a, 0x0d, 0x48, 0x61, 0x6e, - 0x64, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x2e, 0x63, 0x6c, 0x75, - 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, - 0x62, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4b, 0x0a, 0x0c, 0x48, 0x61, 0x6e, 0x64, - 0x6c, 0x65, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x12, 0x18, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, - 0x65, 0x72, 0x70, 0x62, 0x2e, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x4d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x1a, 0x1f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x4d, - 0x65, 0x6d, 0x62, 0x65, 0x72, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x47, 0x0a, 0x0a, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x50, - 0x75, 0x73, 0x68, 0x12, 0x16, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, - 0x50, 0x75, 0x73, 0x68, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1f, 0x2e, 0x63, 0x6c, - 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x48, 0x61, - 0x6e, 0x64, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4f, - 0x0a, 0x0e, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x1a, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1f, 0x2e, 0x63, + 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x48, + 0x0a, 0x09, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x12, 0x1b, 0x2e, 0x63, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, + 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x70, 0x62, 0x2e, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x32, 0xfb, 0x04, 0x0a, 0x06, 0x4d, 0x65, 0x6d, + 0x62, 0x65, 0x72, 0x12, 0x4d, 0x0a, 0x0d, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, + 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, + 0x1f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x6d, 0x62, + 0x65, 0x72, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x00, 0x12, 0x4b, 0x0a, 0x0c, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x4e, 0x6f, 0x74, 0x69, + 0x66, 0x79, 0x12, 0x18, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x4e, + 0x6f, 0x74, 0x69, 0x66, 0x79, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, - 0x48, 0x0a, 0x09, 0x4e, 0x65, 0x77, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x1b, 0x2e, 0x63, - 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x4e, 0x65, 0x77, 0x4d, 0x65, 0x6d, 0x62, - 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x63, 0x6c, 0x75, 0x73, - 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x4e, 0x65, 0x77, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x48, 0x0a, 0x09, 0x44, 0x65, 0x6c, + 0x47, 0x0a, 0x0a, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x50, 0x75, 0x73, 0x68, 0x12, 0x16, 0x2e, + 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, + 0x62, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4f, 0x0a, 0x0e, 0x48, 0x61, 0x6e, 0x64, + 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x2e, 0x63, 0x6c, 0x75, + 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x48, 0x0a, 0x09, 0x4e, 0x65, 0x77, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x1b, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, - 0x70, 0x62, 0x2e, 0x44, 0x65, 0x6c, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, + 0x70, 0x62, 0x2e, 0x4e, 0x65, 0x77, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, - 0x44, 0x65, 0x6c, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x12, 0x54, 0x0a, 0x0d, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x43, 0x6c, - 0x6f, 0x73, 0x65, 0x64, 0x12, 0x1f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, - 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x64, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, - 0x62, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x64, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x51, 0x0a, 0x0c, 0x43, 0x6c, 0x6f, - 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1e, 0x2e, 0x63, 0x6c, 0x75, 0x73, - 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, - 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x63, 0x6c, 0x75, 0x73, - 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, - 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0c, 0x5a, 0x0a, - 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x4e, 0x65, 0x77, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x00, 0x12, 0x48, 0x0a, 0x09, 0x44, 0x65, 0x6c, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, + 0x12, 0x1b, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x44, 0x65, 0x6c, + 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, + 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x44, 0x65, 0x6c, 0x4d, 0x65, 0x6d, + 0x62, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x54, 0x0a, + 0x0d, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x64, 0x12, 0x1f, + 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, + 0x6f, 0x6e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x20, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, 0x53, 0x65, 0x73, 0x73, + 0x69, 0x6f, 0x6e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x00, 0x12, 0x51, 0x0a, 0x0c, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, + 0x69, 0x6f, 0x6e, 0x12, 0x1e, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, + 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x70, 0x62, 0x2e, + 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0c, 0x5a, 0x0a, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1055,56 +1150,61 @@ func file_cluster_proto_rawDescGZIP() []byte { return file_cluster_proto_rawDescData } -var file_cluster_proto_msgTypes = make([]protoimpl.MessageInfo, 18) +var file_cluster_proto_msgTypes = make([]protoimpl.MessageInfo, 20) var file_cluster_proto_goTypes = []interface{}{ (*MemberInfo)(nil), // 0: clusterpb.MemberInfo (*RegisterRequest)(nil), // 1: clusterpb.RegisterRequest (*RegisterResponse)(nil), // 2: clusterpb.RegisterResponse (*UnregisterRequest)(nil), // 3: clusterpb.UnregisterRequest (*UnregisterResponse)(nil), // 4: clusterpb.UnregisterResponse - (*RequestMessage)(nil), // 5: clusterpb.RequestMessage - (*NotifyMessage)(nil), // 6: clusterpb.NotifyMessage - (*ResponseMessage)(nil), // 7: clusterpb.ResponseMessage - (*PushMessage)(nil), // 8: clusterpb.PushMessage - (*MemberHandleResponse)(nil), // 9: clusterpb.MemberHandleResponse - (*NewMemberRequest)(nil), // 10: clusterpb.NewMemberRequest - (*NewMemberResponse)(nil), // 11: clusterpb.NewMemberResponse - (*DelMemberRequest)(nil), // 12: clusterpb.DelMemberRequest - (*DelMemberResponse)(nil), // 13: clusterpb.DelMemberResponse - (*SessionClosedRequest)(nil), // 14: clusterpb.SessionClosedRequest - (*SessionClosedResponse)(nil), // 15: clusterpb.SessionClosedResponse - (*CloseSessionRequest)(nil), // 16: clusterpb.CloseSessionRequest - (*CloseSessionResponse)(nil), // 17: clusterpb.CloseSessionResponse + (*HeartbeatRequest)(nil), // 5: clusterpb.HeartbeatRequest + (*HeartbeatResponse)(nil), // 6: clusterpb.HeartbeatResponse + (*RequestMessage)(nil), // 7: clusterpb.RequestMessage + (*NotifyMessage)(nil), // 8: clusterpb.NotifyMessage + (*ResponseMessage)(nil), // 9: clusterpb.ResponseMessage + (*PushMessage)(nil), // 10: clusterpb.PushMessage + (*MemberHandleResponse)(nil), // 11: clusterpb.MemberHandleResponse + (*NewMemberRequest)(nil), // 12: clusterpb.NewMemberRequest + (*NewMemberResponse)(nil), // 13: clusterpb.NewMemberResponse + (*DelMemberRequest)(nil), // 14: clusterpb.DelMemberRequest + (*DelMemberResponse)(nil), // 15: clusterpb.DelMemberResponse + (*SessionClosedRequest)(nil), // 16: clusterpb.SessionClosedRequest + (*SessionClosedResponse)(nil), // 17: clusterpb.SessionClosedResponse + (*CloseSessionRequest)(nil), // 18: clusterpb.CloseSessionRequest + (*CloseSessionResponse)(nil), // 19: clusterpb.CloseSessionResponse } var file_cluster_proto_depIdxs = []int32{ 0, // 0: clusterpb.RegisterRequest.memberInfo:type_name -> clusterpb.MemberInfo 0, // 1: clusterpb.RegisterResponse.members:type_name -> clusterpb.MemberInfo - 0, // 2: clusterpb.NewMemberRequest.memberInfo:type_name -> clusterpb.MemberInfo - 1, // 3: clusterpb.Master.Register:input_type -> clusterpb.RegisterRequest - 3, // 4: clusterpb.Master.Unregister:input_type -> clusterpb.UnregisterRequest - 5, // 5: clusterpb.Member.HandleRequest:input_type -> clusterpb.RequestMessage - 6, // 6: clusterpb.Member.HandleNotify:input_type -> clusterpb.NotifyMessage - 8, // 7: clusterpb.Member.HandlePush:input_type -> clusterpb.PushMessage - 7, // 8: clusterpb.Member.HandleResponse:input_type -> clusterpb.ResponseMessage - 10, // 9: clusterpb.Member.NewMember:input_type -> clusterpb.NewMemberRequest - 12, // 10: clusterpb.Member.DelMember:input_type -> clusterpb.DelMemberRequest - 14, // 11: clusterpb.Member.SessionClosed:input_type -> clusterpb.SessionClosedRequest - 16, // 12: clusterpb.Member.CloseSession:input_type -> clusterpb.CloseSessionRequest - 2, // 13: clusterpb.Master.Register:output_type -> clusterpb.RegisterResponse - 4, // 14: clusterpb.Master.Unregister:output_type -> clusterpb.UnregisterResponse - 9, // 15: clusterpb.Member.HandleRequest:output_type -> clusterpb.MemberHandleResponse - 9, // 16: clusterpb.Member.HandleNotify:output_type -> clusterpb.MemberHandleResponse - 9, // 17: clusterpb.Member.HandlePush:output_type -> clusterpb.MemberHandleResponse - 9, // 18: clusterpb.Member.HandleResponse:output_type -> clusterpb.MemberHandleResponse - 11, // 19: clusterpb.Member.NewMember:output_type -> clusterpb.NewMemberResponse - 13, // 20: clusterpb.Member.DelMember:output_type -> clusterpb.DelMemberResponse - 15, // 21: clusterpb.Member.SessionClosed:output_type -> clusterpb.SessionClosedResponse - 17, // 22: clusterpb.Member.CloseSession:output_type -> clusterpb.CloseSessionResponse - 13, // [13:23] is the sub-list for method output_type - 3, // [3:13] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 0, // 2: clusterpb.HeartbeatRequest.memberInfo:type_name -> clusterpb.MemberInfo + 0, // 3: clusterpb.NewMemberRequest.memberInfo:type_name -> clusterpb.MemberInfo + 1, // 4: clusterpb.Master.Register:input_type -> clusterpb.RegisterRequest + 3, // 5: clusterpb.Master.Unregister:input_type -> clusterpb.UnregisterRequest + 5, // 6: clusterpb.Master.Heartbeat:input_type -> clusterpb.HeartbeatRequest + 7, // 7: clusterpb.Member.HandleRequest:input_type -> clusterpb.RequestMessage + 8, // 8: clusterpb.Member.HandleNotify:input_type -> clusterpb.NotifyMessage + 10, // 9: clusterpb.Member.HandlePush:input_type -> clusterpb.PushMessage + 9, // 10: clusterpb.Member.HandleResponse:input_type -> clusterpb.ResponseMessage + 12, // 11: clusterpb.Member.NewMember:input_type -> clusterpb.NewMemberRequest + 14, // 12: clusterpb.Member.DelMember:input_type -> clusterpb.DelMemberRequest + 16, // 13: clusterpb.Member.SessionClosed:input_type -> clusterpb.SessionClosedRequest + 18, // 14: clusterpb.Member.CloseSession:input_type -> clusterpb.CloseSessionRequest + 2, // 15: clusterpb.Master.Register:output_type -> clusterpb.RegisterResponse + 4, // 16: clusterpb.Master.Unregister:output_type -> clusterpb.UnregisterResponse + 6, // 17: clusterpb.Master.Heartbeat:output_type -> clusterpb.HeartbeatResponse + 11, // 18: clusterpb.Member.HandleRequest:output_type -> clusterpb.MemberHandleResponse + 11, // 19: clusterpb.Member.HandleNotify:output_type -> clusterpb.MemberHandleResponse + 11, // 20: clusterpb.Member.HandlePush:output_type -> clusterpb.MemberHandleResponse + 11, // 21: clusterpb.Member.HandleResponse:output_type -> clusterpb.MemberHandleResponse + 13, // 22: clusterpb.Member.NewMember:output_type -> clusterpb.NewMemberResponse + 15, // 23: clusterpb.Member.DelMember:output_type -> clusterpb.DelMemberResponse + 17, // 24: clusterpb.Member.SessionClosed:output_type -> clusterpb.SessionClosedResponse + 19, // 25: clusterpb.Member.CloseSession:output_type -> clusterpb.CloseSessionResponse + 15, // [15:26] is the sub-list for method output_type + 4, // [4:15] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_cluster_proto_init() } @@ -1174,7 +1274,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*RequestMessage); i { + switch v := v.(*HeartbeatRequest); i { case 0: return &v.state case 1: @@ -1186,7 +1286,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*NotifyMessage); i { + switch v := v.(*HeartbeatResponse); i { case 0: return &v.state case 1: @@ -1198,7 +1298,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ResponseMessage); i { + switch v := v.(*RequestMessage); i { case 0: return &v.state case 1: @@ -1210,7 +1310,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PushMessage); i { + switch v := v.(*NotifyMessage); i { case 0: return &v.state case 1: @@ -1222,7 +1322,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*MemberHandleResponse); i { + switch v := v.(*ResponseMessage); i { case 0: return &v.state case 1: @@ -1234,7 +1334,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*NewMemberRequest); i { + switch v := v.(*PushMessage); i { case 0: return &v.state case 1: @@ -1246,7 +1346,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*NewMemberResponse); i { + switch v := v.(*MemberHandleResponse); i { case 0: return &v.state case 1: @@ -1258,7 +1358,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*DelMemberRequest); i { + switch v := v.(*NewMemberRequest); i { case 0: return &v.state case 1: @@ -1270,7 +1370,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*DelMemberResponse); i { + switch v := v.(*NewMemberResponse); i { case 0: return &v.state case 1: @@ -1282,7 +1382,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SessionClosedRequest); i { + switch v := v.(*DelMemberRequest); i { case 0: return &v.state case 1: @@ -1294,7 +1394,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[15].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SessionClosedResponse); i { + switch v := v.(*DelMemberResponse); i { case 0: return &v.state case 1: @@ -1306,7 +1406,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[16].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CloseSessionRequest); i { + switch v := v.(*SessionClosedRequest); i { case 0: return &v.state case 1: @@ -1318,6 +1418,30 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[17].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SessionClosedResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cluster_proto_msgTypes[18].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CloseSessionRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cluster_proto_msgTypes[19].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CloseSessionResponse); i { case 0: return &v.state @@ -1336,7 +1460,7 @@ func file_cluster_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_cluster_proto_rawDesc, NumEnums: 0, - NumMessages: 18, + NumMessages: 20, NumExtensions: 0, NumServices: 2, }, diff --git a/cluster/clusterpb/cluster_grpc.pb.go b/cluster/clusterpb/cluster_grpc.pb.go index 155ca2c6..58fe57eb 100644 --- a/cluster/clusterpb/cluster_grpc.pb.go +++ b/cluster/clusterpb/cluster_grpc.pb.go @@ -1,4 +1,8 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.1.0 +// - protoc v3.13.0 +// source: cluster.proto package clusterpb @@ -20,6 +24,7 @@ const _ = grpc.SupportPackageIsVersion7 type MasterClient interface { Register(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (*RegisterResponse, error) Unregister(ctx context.Context, in *UnregisterRequest, opts ...grpc.CallOption) (*UnregisterResponse, error) + Heartbeat(ctx context.Context, in *HeartbeatRequest, opts ...grpc.CallOption) (*HeartbeatResponse, error) } type masterClient struct { @@ -48,12 +53,22 @@ func (c *masterClient) Unregister(ctx context.Context, in *UnregisterRequest, op return out, nil } +func (c *masterClient) Heartbeat(ctx context.Context, in *HeartbeatRequest, opts ...grpc.CallOption) (*HeartbeatResponse, error) { + out := new(HeartbeatResponse) + err := c.cc.Invoke(ctx, "/clusterpb.Master/Heartbeat", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // MasterServer is the server API for Master service. // All implementations should embed UnimplementedMasterServer // for forward compatibility type MasterServer interface { Register(context.Context, *RegisterRequest) (*RegisterResponse, error) Unregister(context.Context, *UnregisterRequest) (*UnregisterResponse, error) + Heartbeat(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error) } // UnimplementedMasterServer should be embedded to have forward compatible implementations. @@ -66,6 +81,9 @@ func (UnimplementedMasterServer) Register(context.Context, *RegisterRequest) (*R func (UnimplementedMasterServer) Unregister(context.Context, *UnregisterRequest) (*UnregisterResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Unregister not implemented") } +func (UnimplementedMasterServer) Heartbeat(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Heartbeat not implemented") +} // UnsafeMasterServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to MasterServer will @@ -114,6 +132,24 @@ func _Master_Unregister_Handler(srv interface{}, ctx context.Context, dec func(i return interceptor(ctx, in, info, handler) } +func _Master_Heartbeat_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HeartbeatRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MasterServer).Heartbeat(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/clusterpb.Master/Heartbeat", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MasterServer).Heartbeat(ctx, req.(*HeartbeatRequest)) + } + return interceptor(ctx, in, info, handler) +} + // Master_ServiceDesc is the grpc.ServiceDesc for Master service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -129,6 +165,10 @@ var Master_ServiceDesc = grpc.ServiceDesc{ MethodName: "Unregister", Handler: _Master_Unregister_Handler, }, + { + MethodName: "Heartbeat", + Handler: _Master_Heartbeat_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "cluster.proto", diff --git a/cluster/clusterpb/proto/cluster.proto b/cluster/clusterpb/proto/cluster.proto index 6c5504d2..e6223b61 100644 --- a/cluster/clusterpb/proto/cluster.proto +++ b/cluster/clusterpb/proto/cluster.proto @@ -22,9 +22,17 @@ message UnregisterRequest { message UnregisterResponse {} +message HeartbeatRequest { + MemberInfo memberInfo = 1; +} + +message HeartbeatResponse { +} + service Master { rpc Register (RegisterRequest) returns (RegisterResponse) {} rpc Unregister (UnregisterRequest) returns (UnregisterResponse) {} + rpc Heartbeat (HeartbeatRequest) returns (HeartbeatResponse) {} } message RequestMessage { diff --git a/cluster/member.go b/cluster/member.go index 0064c55e..bf342682 100644 --- a/cluster/member.go +++ b/cluster/member.go @@ -20,13 +20,23 @@ package cluster -import "github.com/lonng/nano/cluster/clusterpb" +import ( + "fmt" + "time" + + "github.com/lonng/nano/cluster/clusterpb" +) type Member struct { - isMaster bool - memberInfo *clusterpb.MemberInfo + isMaster bool + memberInfo *clusterpb.MemberInfo + lastHeartbeatAt time.Time // cluster member report heartbeat time to the master } func (m *Member) MemberInfo() *clusterpb.MemberInfo { return m.memberInfo } + +func (m *Member) String() string { + return fmt.Sprintf("Master: %t MemberInfo: %s LastHeartbeatAt: %s", m.isMaster, m.memberInfo.String(), m.lastHeartbeatAt.String()) +} diff --git a/cluster/node.go b/cluster/node.go index 6a7b797a..d8508d93 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -44,16 +44,17 @@ import ( // Options contains some configurations for current node type Options struct { - Pipeline pipeline.Pipeline - IsMaster bool - AdvertiseAddr string - RetryInterval time.Duration - ClientAddr string - Components *component.Components - Label string - IsWebsocket bool - TSLCertificate string - TSLKey string + Pipeline pipeline.Pipeline + IsMaster bool + AdvertiseAddr string + RetryInterval time.Duration + ClientAddr string + Components *component.Components + Label string + IsWebsocket bool + TSLCertificate string + TSLKey string + UnregisterCallback func(Member) } // Node represents a node in nano cluster, which will contains a group of services. @@ -70,6 +71,9 @@ type Node struct { mu sync.RWMutex sessions map[int64]*session.Session + + once sync.Once + keepaliveExit chan struct{} } func (n *Node) Startup() error { @@ -180,9 +184,8 @@ func (n *Node) initNode() error { log.Println("Register current node to cluster failed", err, "and will retry in", n.RetryInterval.String()) time.Sleep(n.RetryInterval) } - + n.once.Do(n.keepalive) } - return nil } @@ -200,7 +203,10 @@ func (n *Node) Shutdown() { for i := length - 1; i >= 0; i-- { components[i].Comp.Shutdown() } - + // close sendHeartbeat + if n.keepaliveExit != nil { + close(n.keepaliveExit) + } if !n.IsMaster && n.AdvertiseAddr != "" { pool, err := n.rpcClient.getConnPool(n.AdvertiseAddr) if err != nil { @@ -384,6 +390,7 @@ func (n *Node) NewMember(_ context.Context, req *clusterpb.NewMemberRequest) (*c } func (n *Node) DelMember(_ context.Context, req *clusterpb.DelMemberRequest) (*clusterpb.DelMemberResponse, error) { + log.Println("DelMember member", req.String()) n.handler.delMember(req.ServiceAddr) n.cluster.delMember(req.ServiceAddr) return &clusterpb.DelMemberResponse{}, nil @@ -412,3 +419,43 @@ func (n *Node) CloseSession(_ context.Context, req *clusterpb.CloseSessionReques } return &clusterpb.CloseSessionResponse{}, nil } + +// ticker send heartbeat register info to master +func (n *Node) keepalive() { + if n.keepaliveExit == nil { + n.keepaliveExit = make(chan struct{}) + } + if n.AdvertiseAddr == "" || n.IsMaster { + return + } + heartbeat := func() { + pool, err := n.rpcClient.getConnPool(n.AdvertiseAddr) + if err != nil { + log.Println("rpcClient master conn", err) + return + } + masterCli := clusterpb.NewMasterClient(pool.Get()) + if _, err := masterCli.Heartbeat(context.Background(), &clusterpb.HeartbeatRequest{ + MemberInfo: &clusterpb.MemberInfo{ + Label: n.Label, + ServiceAddr: n.ServiceAddr, + Services: n.handler.LocalService(), + }, + }); err != nil { + log.Println("Member send heartbeat error", err) + } + } + go func() { + ticker := time.NewTicker(env.Heartbeat) + for { + select { + case <-ticker.C: + heartbeat() + case <-n.keepaliveExit: + log.Println("Exit member node heartbeat ") + ticker.Stop() + return + } + } + }() +} diff --git a/examples/cluster/main.go b/examples/cluster/main.go index c2e31c0c..e4a4250f 100644 --- a/examples/cluster/main.go +++ b/examples/cluster/main.go @@ -8,6 +8,7 @@ import ( "runtime" "github.com/lonng/nano" + "github.com/lonng/nano/cluster" "github.com/lonng/nano/examples/cluster/chat" "github.com/lonng/nano/examples/cluster/gate" "github.com/lonng/nano/examples/cluster/master" @@ -111,6 +112,9 @@ func runMaster(args *cli.Context) error { nano.WithComponents(master.Services), nano.WithSerializer(json.NewSerializer()), nano.WithDebugMode(), + nano.WithUnregisterCallback(func(m cluster.Member) { + log.Println("Todo alarm unregister:", m.String()) + }), ) return nil diff --git a/options.go b/options.go index 8f656213..533817af 100644 --- a/options.go +++ b/options.go @@ -160,3 +160,10 @@ func WithNodeId(nodeId uint64) Option { service.ResetNodeId(nodeId) } } + +// WithUnregisterCallback master unregister member event call fn +func WithUnregisterCallback(fn func(member cluster.Member)) Option { + return func(opt *cluster.Options) { + opt.UnregisterCallback = fn + } +}