Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose remote service route for user custom #93

Merged
merged 2 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions cluster/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ var (

type rpcHandler func(session *session.Session, msg *message.Message, noCopy bool)

// CustomerRemoteServiceRoute customer remote service route
type CustomerRemoteServiceRoute func(service string, session *session.Session, members []*clusterpb.MemberInfo) *clusterpb.MemberInfo

func cache() {
hrdata := map[string]interface{}{
"code": 200,
Expand Down Expand Up @@ -336,14 +339,29 @@ func (h *LocalHandler) remoteProcess(session *session.Session, msg *message.Mess
}

// Select a remote service address
// 1. Use the service address directly if the router contains binding item
// 2. Select a remote service address randomly and bind to router
// 1. if exist customer remote service route ,use it, otherwise use default strategy
// 2. Use the service address directly if the router contains binding item
// 3. Select a remote service address randomly and bind to router
var remoteAddr string
if addr, found := session.Router().Find(service); found {
remoteAddr = addr
if h.currentNode.Options.RemoteServiceRoute != nil {
if addr, found := session.Router().Find(service); found {
remoteAddr = addr
} else {
member := h.currentNode.Options.RemoteServiceRoute(service, session, members)
if member == nil {
log.Println(fmt.Sprintf("customize remoteServiceRoute handler: %s is not found", msg.Route))
return
}
remoteAddr = member.ServiceAddr
session.Router().Bind(service, remoteAddr)
}
} else {
remoteAddr = members[rand.Intn(len(members))].ServiceAddr
session.Router().Bind(service, remoteAddr)
if addr, found := session.Router().Find(service); found {
remoteAddr = addr
} else {
remoteAddr = members[rand.Intn(len(members))].ServiceAddr
session.Router().Bind(service, remoteAddr)
}
}
pool, err := h.currentNode.rpcClient.getConnPool(remoteAddr)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions cluster/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Options struct {
TSLCertificate string
TSLKey string
UnregisterCallback func(Member)
RemoteServiceRoute CustomerRemoteServiceRoute
}

// Node represents a node in nano cluster, which will contains a group of services.
Expand Down
27 changes: 27 additions & 0 deletions examples/customerroute/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Nano cluster example

## About this example



## How to run the example?

```shell
cd examples/customerroute
go build

# run master server
./customerroute master
./customerroute chat --listen "127.0.0.1:34580"
./customerroute chat --listen "127.0.0.1:34581"
./customerroute gate --listen "127.0.0.1:34570" --gate-address "127.0.0.1:34590"
```

## open browser and visit url for 4 times
```
http://127.0.0.1:12345/web/
http://127.0.0.1:12345/web/
http://127.0.0.1:12345/web/
http://127.0.0.1:12345/web/
```
input content and send, the same ChatRoomService node will sync the message each other
186 changes: 186 additions & 0 deletions examples/customerroute/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package main

import (
"fmt"
"github.com/lonng/nano/cluster/clusterpb"
"log"
"net/http"
"os"
"path/filepath"
"runtime"

"github.com/lonng/nano"
"github.com/lonng/nano/examples/customerroute/onegate"
"github.com/lonng/nano/examples/customerroute/tworoom"
"github.com/lonng/nano/serialize/json"
"github.com/lonng/nano/session"
"github.com/pingcap/errors"
"github.com/urfave/cli"
)

func main() {
app := cli.NewApp()
app.Name = "NanoCustomerRouteDemo"
app.Author = "Lonng"
app.Email = "[email protected]"
app.Description = "Nano cluster demo"
app.Commands = []cli.Command{
{
Name: "master",
Flags: []cli.Flag{
cli.StringFlag{
Name: "listen,l",
Usage: "Master service listen address",
Value: "127.0.0.1:34567",
},
},
Action: runMaster,
},
{
Name: "gate",
Flags: []cli.Flag{
cli.StringFlag{
Name: "master",
Usage: "master server address",
Value: "127.0.0.1:34567",
},
cli.StringFlag{
Name: "listen,l",
Usage: "Gate service listen address",
Value: "",
},
cli.StringFlag{
Name: "gate-address",
Usage: "Client connect address",
Value: "",
},
},
Action: runGate,
},
{
Name: "chat",
Flags: []cli.Flag{
cli.StringFlag{
Name: "master",
Usage: "master server address",
Value: "127.0.0.1:34567",
},
cli.StringFlag{
Name: "listen,l",
Usage: "Chat service listen address",
Value: "",
},
},
Action: runChat,
},
}
log.SetFlags(log.LstdFlags | log.Lshortfile)
if err := app.Run(os.Args); err != nil {
log.Fatalf("Startup server error %+v", err)
}
}

func srcPath() string {
_, file, _, _ := runtime.Caller(0)
return filepath.Dir(file)
}

func runMaster(args *cli.Context) error {
listen := args.String("listen")
if listen == "" {
return errors.Errorf("master listen address cannot empty")
}

webDir := filepath.Join(srcPath(), "onemaster", "web")
log.Println("Nano master server web content directory", webDir)
log.Println("Nano master listen address", listen)
log.Println("Open http://127.0.0.1:12345/web/ in browser")

http.Handle("/web/", http.StripPrefix("/web/", http.FileServer(http.Dir(webDir))))
go func() {
if err := http.ListenAndServe(":12345", nil); err != nil {
panic(err)
}
}()

// Startup Nano server with the specified listen address
nano.Listen(listen,
nano.WithMaster(),
nano.WithSerializer(json.NewSerializer()),
nano.WithDebugMode(),
)

return nil
}

func runGate(args *cli.Context) error {
listen := args.String("listen")
if listen == "" {
return errors.Errorf("gate listen address cannot empty")
}

masterAddr := args.String("master")
if masterAddr == "" {
return errors.Errorf("master address cannot empty")
}

gateAddr := args.String("gate-address")
if gateAddr == "" {
return errors.Errorf("gate address cannot empty")
}

log.Println("Current server listen address", listen)
log.Println("Current gate server address", gateAddr)
log.Println("Remote master server address", masterAddr)

// Startup Nano server with the specified listen address
nano.Listen(listen,
nano.WithAdvertiseAddr(masterAddr),
nano.WithClientAddr(gateAddr),
nano.WithComponents(onegate.Services),
nano.WithSerializer(json.NewSerializer()),
nano.WithIsWebsocket(true),
nano.WithWSPath("/nano"),
nano.WithCheckOriginFunc(func(_ *http.Request) bool { return true }),
nano.WithDebugMode(),
//set remote service route for gate
nano.WithCustomerRemoteServiceRoute(customerRemoteServiceRoute),
nano.WithNodeId(2), // if you deploy multi gate, option set nodeId, default nodeId = os.Getpid()
)
return nil
}

func runChat(args *cli.Context) error {
listen := args.String("listen")
if listen == "" {
return errors.Errorf("chat listen address cannot empty")
}

masterAddr := args.String("master")
if listen == "" {
return errors.Errorf("master address cannot empty")
}

log.Println("Current chat server listen address", listen)
log.Println("Remote master server address", masterAddr)

// Register session closed callback
session.Lifetime.OnClosed(tworoom.OnSessionClosed)

// Startup Nano server with the specified listen address
nano.Listen(listen,
nano.WithAdvertiseAddr(masterAddr),
nano.WithComponents(tworoom.Services),
nano.WithSerializer(json.NewSerializer()),
nano.WithDebugMode(),
)

return nil
}

func customerRemoteServiceRoute(service string, session *session.Session, members []*clusterpb.MemberInfo) *clusterpb.MemberInfo {
count := int64(len(members))
var index = session.UID() % count
fmt.Printf("remote service:%s route to :%v \n", service, members[index])
return members[index]
}
43 changes: 43 additions & 0 deletions examples/customerroute/onegate/gate_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package onegate

import (
"fmt"
"github.com/lonng/nano/component"
"github.com/lonng/nano/examples/cluster/protocol"
"github.com/lonng/nano/session"
"github.com/pingcap/errors"
)

type RegisterService struct {
component.Base
nextGateUid int64
}

func newRegisterService() *RegisterService {
return &RegisterService{}
}

type (
RegisterRequest struct {
Nickname string `json:"nickname"`
}
RegisterResponse struct {
Code int `json:"code"`
}
)

func (bs *RegisterService) Login(s *session.Session, msg *RegisterRequest) error {
bs.nextGateUid++
uid := bs.nextGateUid
s.Bind(uid)
fmt.Println("Login uid:", uid)
chat := &protocol.JoinRoomRequest{
Nickname: msg.Nickname,
GateUid: uid,
MasterUid: uid,
}
if err := s.RPC("ChatRoomService.JoinRoom", chat); err != nil {
return errors.Trace(err)
}
return s.Response(&RegisterResponse{})
}
14 changes: 14 additions & 0 deletions examples/customerroute/onegate/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package onegate

import "github.com/lonng/nano/component"

var (
// All services in master server
Services = &component.Components{}

bindService = newRegisterService()
)

func init() {
Services.Register(bindService)
}
Loading