Skip to content

Commit

Permalink
feat(xconsul): add xconsul with client, service, name
Browse files Browse the repository at this point in the history
  • Loading branch information
Aoi-hosizora committed Aug 13, 2020
1 parent 7fbc645 commit 35245b7
Show file tree
Hide file tree
Showing 8 changed files with 423 additions and 8 deletions.
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,27 @@
+ xtime
+ **xgin**
+ **xgorm**
+ xredis (`redigo/redis`)
+ **xredis**
+ xneo4j
+ **xfiber**
+ xdto
+ **xtelebot**
+ xserverchan
+ **xstatus**
+ xconsul

### Dependencies

+ See [go.mod](./go.mod) and [go.sum](./go.sum)
+ `github.com/Aoi-hosizora/ahlib v1.2.8`
+ `github.com/gin-gonic/gin v1.6.3`
+ `github.com/go-sql-driver/mysql v1.4.1`
+ `github.com/gofiber/fiber v1.12.6`
+ `github.com/gomodule/redigo/redis v0.0.0-20200429221454-e14091dffc1b`
+ `github.com/jinzhu/gorm v1.9.12`
+ `gopkg.in/go-playground/validator.v9 v9.29.1`
+ `github.com/gomodule/redigo/redis v0.0.0-20200429221454-e14091dffc1b`
+ `github.com/neo4j/neo4j-go-driver v1.7.4`
+ `github.com/sirupsen/logrus v1.6.0`
+ `gopkg.in/go-playground/validator.v9 v9.29.1`
+ `Aoi-hosizora/go-serverchan v1.0.0`
+ `gopkg.in/tucnak/telebot.v2 v2.3.3`
+ `Aoi-hosizora/go-serverchan v1.0.0`
+ `github.com/hashicorp/consul/api v1.6.0`
+ `google.golang.org/grpc v1.31.0`
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ require (
github.com/gofiber/fiber v1.12.6
github.com/golang/mock v1.4.3 // indirect
github.com/gomodule/redigo/redis v0.0.0-20200429221454-e14091dffc1b
github.com/hashicorp/consul/api v1.6.0
github.com/jinzhu/gorm v1.9.15
github.com/neo4j-drivers/gobolt v1.7.4 // indirect
github.com/neo4j/neo4j-go-driver v1.7.4
github.com/onsi/ginkgo v1.14.0 // indirect
github.com/sirupsen/logrus v1.6.0
google.golang.org/grpc v1.31.0
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/go-playground/validator.v9 v9.29.1
gopkg.in/tucnak/telebot.v2 v2.3.3
Expand Down
138 changes: 135 additions & 3 deletions go.sum

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions xconsul/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# xconsul

### Service

+ `type ConsulService struct {}`
+ `RegisterConsulService(ca string, srv *ConsulService) error`

### Client

+ `RegisterConsulResolver()`
+ `NewConsulBuilder() resolver.Builder`

### Name

+ `type NameHandler struct {}`
+ `SetDefaultNameHandler(hdr *NameHandler)`
86 changes: 86 additions & 0 deletions xconsul/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package xconsul

import (
"fmt"
"github.com/hashicorp/consul/api"
"google.golang.org/grpc/resolver"
"log"
"sync"
)

// RegisterConsulResolver will register consul build to grpc resolver, used in client.
func RegisterConsulResolver() {
resolver.Register(NewConsulBuilder())
}

type ConsulBuilder struct{}

func NewConsulBuilder() resolver.Builder {
return &ConsulBuilder{}
}

func (cb *ConsulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
ta := fmt.Sprintf("%s/%s", target.Authority, target.Endpoint)
host, port, name, err := defaultNameHandler.ParseGrpcTarget(ta)
if err != nil {
return nil, err
}

cr := &consulResolver{
host: host,
port: port,
name: name,
cc: cc,
disableServiceConfig: opts.DisableServiceConfig,
lastIndex: 0,
}

cr.wg.Add(1)
go cr.watcher()
return cr, nil
}

func (cb *ConsulBuilder) Scheme() string {
return "consul"
}

type consulResolver struct {
host string
port int
wg sync.WaitGroup
cc resolver.ClientConn
name string
disableServiceConfig bool
lastIndex uint64
}

func (cr *consulResolver) ResolveNow(resolver.ResolveNowOptions) {}

func (cr *consulResolver) Close() {}

func (cr *consulResolver) watcher() {
config := api.DefaultConfig()
config.Address = fmt.Sprintf("%s:%d", cr.host, cr.port)
client, err := api.NewClient(config)
if err != nil {
log.Println("Failed to create consul client:", err)
return
}

for {
services, metaInfo, err := client.Health().Service(cr.name, cr.name, true, &api.QueryOptions{WaitIndex: cr.lastIndex})
if err != nil {
log.Println("Failed to retrieve instances from consul:", err)
}

cr.lastIndex = metaInfo.LastIndex
addresses := make([]resolver.Address, len(services))
for idx, service := range services {
addr := fmt.Sprintf("%v:%v", service.Service.Address, service.Service.Port)
addresses[idx] = resolver.Address{Addr: addr}
}

log.Println("Update service addresses:", len(addresses))
cr.cc.UpdateState(resolver.State{Addresses: addresses})
}
}
69 changes: 69 additions & 0 deletions xconsul/name.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package xconsul

import (
"fmt"
"github.com/Aoi-hosizora/ahlib/xnumber"
"regexp"
)

type NameHandler struct {
// Used in api.AgentServiceRegistration `ID` field.
GetConsulID func(ip string, port int, name string) string

// Used in api.AgentServiceCheck `GRPC` field.
GetGrpcTarget func(ip string, port int, name string) string

// Used to parse api.AgentServiceCheck `GRPC` field in ConsulBuilder.Build.
ParseGrpcTarget func(schema string) (ip string, port int, name string, err error)
}

var defaultNameHandler = &NameHandler{
GetConsulID: getConsulIDDefault,
GetGrpcTarget: getGrpcTargetDefault,
ParseGrpcTarget: parseGrpcTargetDefault,
}

// Change default NameHandler.
func SetDefaultNameHandler(hdr *NameHandler) {
if hdr == nil || hdr.GetConsulID == nil || hdr.GetGrpcTarget == nil || hdr.ParseGrpcTarget == nil {
panic("invalid name handler, could not be nil")
}

defaultNameHandler = hdr
}

func getConsulIDDefault(ip string, port int, name string) string {
return fmt.Sprintf("%s-%d-%s", ip, port, name)
}

func getGrpcTargetDefault(ip string, port int, name string) string {
return fmt.Sprintf("%s:%d/%s", ip, port, name)
}

func parseGrpcTargetDefault(target string) (host string, port int, name string, err error) {
if target == "" {
return "", 0, "", fmt.Errorf("consul resolver: missing address")
}

// localhost:8500/xxx
regexConsul, err := regexp.Compile(`^([A-z0-9.]+)(?::([0-9]{1,5}))?/([A-z_-]+)$`)
if err != nil {
return "", 0, "", err
}
if !regexConsul.MatchString(target) {
return "", 0, "", fmt.Errorf("consul resolver: invalid uri")
}

groups := regexConsul.FindStringSubmatch(target)
host = groups[1] // localhost
name = groups[3] // xxx
port, err = xnumber.Atoi(groups[2]) // 8500
if err != nil {
return "", 0, "", err
}
if port == 0 {
port = 8500
}

return host, port, name, nil
}
74 changes: 74 additions & 0 deletions xconsul/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package xconsul

import (
"fmt"
"github.com/hashicorp/consul/api"
"time"
)

type ConsulService struct {
IP string
Port int
Tag []string
Name string

Interval time.Duration
Deregister time.Duration

Namespace string
Weights *api.AgentWeights
Proxy *api.AgentServiceConnectProxyConfig
Connect *api.AgentServiceConnect
}

// RegisterConsulService will connect consul agent, and register a service into it.
func RegisterConsulService(consulAddress string, srv *ConsulService) error {
// connect consul
cfg := api.DefaultConfig()
cfg.Address = consulAddress
client, err := api.NewClient(cfg)
if err != nil {
return err
}
agent := client.Agent()

// check some parameter
if srv.IP == "" || srv.Name == "" {
return fmt.Errorf("expected non-empty IP and Name")
}
if srv.Port <= 0 || srv.Port >= 65536 {
return fmt.Errorf("invalid port value")
}
if srv.Interval == 0 {
srv.Interval = time.Duration(10) * time.Second
}
if srv.Deregister == 0 {
srv.Deregister = time.Duration(1) * time.Minute
}

// set registration
registration := &api.AgentServiceRegistration{
ID: defaultNameHandler.GetConsulID(srv.IP, srv.Port, srv.Name),
Name: srv.Name,
Tags: srv.Tag,
Port: srv.Port,
Address: srv.IP,
Check: &api.AgentServiceCheck{
Interval: srv.Interval.String(),
DeregisterCriticalServiceAfter: srv.Deregister.String(),
GRPC: defaultNameHandler.GetGrpcTarget(srv.IP, srv.Port, srv.Name),
},

Namespace: srv.Namespace,
Weights: srv.Weights,
Proxy: srv.Proxy,
Connect: srv.Connect,
}

// register to agent
err = agent.ServiceRegister(registration)
if err != nil {
return err
}
return nil
}
34 changes: 34 additions & 0 deletions xconsul/xcomsul_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package xconsul

import (
"log"
"testing"
)

func TestConsul(t *testing.T) {
RegisterConsulResolver()

for i := 0; i < 5; i++ {
err := RegisterConsulService("127.0.0.1:8500", &ConsulService{
IP: "127.0.0.1",
Port: 5555 + i,
Name: "test",
})
log.Println(err)
}
}

func TestDefaultNameHandler(t *testing.T) {
log.Println(defaultNameHandler.GetConsulID("127.0.0.1", 8500, "aaa"))
log.Println(defaultNameHandler.GetGrpcTarget("127.0.0.1", 8500, "aaa"))
log.Println(defaultNameHandler.ParseGrpcTarget("127.0.0.1:8500/aaa"))

func() {
defer func() {
if err := recover(); err != nil {
log.Println(err)
}
}()
SetDefaultNameHandler(nil)
}()
}

0 comments on commit 35245b7

Please sign in to comment.