Skip to content

Commit

Permalink
feat: updated node services
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Jun 11, 2024
1 parent 0a95079 commit d875383
Show file tree
Hide file tree
Showing 16 changed files with 2,051 additions and 41 deletions.
3 changes: 2 additions & 1 deletion apps/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package apps

import (
"context"
"errors"
"github.com/apex/log"
"github.com/crawlab-team/crawlab-core/controllers"
"github.com/crawlab-team/crawlab-core/interfaces"
Expand Down Expand Up @@ -67,7 +68,7 @@ func (app *Api) Start() {

// serve
if err := http.Serve(app.ln, app.app); err != nil {
if err != http.ErrServerClosed {
if !errors.Is(err, http.ErrServerClosed) {
log.Error("run server error:" + err.Error())
} else {
log.Info("server graceful down")
Expand Down
122 changes: 122 additions & 0 deletions apps/api_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package apps

import (
"context"
"errors"
"github.com/apex/log"
"github.com/crawlab-team/crawlab-core/controllers"
"github.com/crawlab-team/crawlab-core/interfaces"
"github.com/crawlab-team/crawlab-core/middlewares"
"github.com/gin-gonic/gin"
"github.com/spf13/viper"
"net"
"net/http"
"time"
)

func init() {
// set gin mode
if viper.GetString("gin.mode") == "" {
gin.SetMode(gin.ReleaseMode)
} else {
gin.SetMode(viper.GetString("gin.mode"))
}
}

type ApiV2 struct {
// dependencies
interfaces.WithConfigPath

// internals
app *gin.Engine
ln net.Listener
srv *http.Server
ready bool
}

func (app *ApiV2) Init() {
// initialize middlewares
_ = app.initModuleWithApp("middlewares", middlewares.InitMiddlewares)

// initialize routes
_ = app.initModuleWithApp("routes", controllers.InitRoutes)
}

func (app *ApiV2) Start() {
// address
host := viper.GetString("server.host")
port := viper.GetString("server.port")
address := net.JoinHostPort(host, port)

// http server
app.srv = &http.Server{
Handler: app.app,
Addr: address,
}

// listen
var err error
app.ln, err = net.Listen("tcp", address)
if err != nil {
panic(err)
}
app.ready = true

// serve
if err := http.Serve(app.ln, app.app); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
log.Error("run server error:" + err.Error())
} else {
log.Info("server graceful down")
}
}
}

func (app *ApiV2) Wait() {
DefaultWait()
}

func (app *ApiV2) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err := app.srv.Shutdown(ctx); err != nil {
log.Error("run server error:" + err.Error())
}
}

func (app *ApiV2) GetGinEngine() *gin.Engine {
return app.app
}

func (app *ApiV2) GetHttpServer() *http.Server {
return app.srv
}

func (app *ApiV2) Ready() (ok bool) {
return app.ready
}

func (app *ApiV2) initModuleWithApp(name string, fn func(app *gin.Engine) error) (err error) {
return initModule(name, func() error {
return fn(app.app)
})
}

func NewApiV2() *ApiV2 {
api := &ApiV2{
app: gin.New(),
}
api.Init()
return api
}

var apiV2 *ApiV2

func GetApiV2() *ApiV2 {
if apiV2 != nil {
return apiV2
}
apiV2 = NewApiV2()
return apiV2
}
5 changes: 2 additions & 3 deletions apps/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ type ApiApp interface {
type NodeApp interface {
App
interfaces.WithConfigPath
SetGrpcAddress(address interfaces.Address)
}

type ServerApp interface {
Expand All @@ -34,7 +33,7 @@ type ServerApp interface {

type DockerApp interface {
App
GetParent() (parent ServerApp)
SetParent(parent ServerApp)
GetParent() (parent NodeApp)
SetParent(parent NodeApp)
Ready() (ok bool)
}
16 changes: 1 addition & 15 deletions apps/options.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,10 @@
package apps

import "github.com/crawlab-team/crawlab-core/interfaces"

type ServerOption func(app ServerApp)

func WithServerConfigPath(path string) ServerOption {
return func(app ServerApp) {
app.SetConfigPath(path)
}
}

func WithServerGrpcAddress(address interfaces.Address) ServerOption {
return func(app ServerApp) {
app.SetGrpcAddress(address)
}
}

type DockerOption func(dck DockerApp)

func WithDockerParent(parent ServerApp) DockerOption {
func WithDockerParent(parent NodeApp) DockerOption {
return func(dck DockerApp) {
dck.SetParent(parent)
}
Expand Down
6 changes: 3 additions & 3 deletions apps/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (app *Server) initPprof() {
}
}

func NewServer(opts ...ServerOption) (app ServerApp) {
func NewServer(opts ...ServerOption) (app NodeApp) {
// server
svr := &Server{
WithConfigPath: config.NewConfigPathService(),
Expand Down Expand Up @@ -143,9 +143,9 @@ func NewServer(opts ...ServerOption) (app ServerApp) {
return svr
}

var server ServerApp
var server NodeApp

func GetServer(opts ...ServerOption) ServerApp {
func GetServer(opts ...ServerOption) NodeApp {
if server != nil {
return server
}
Expand Down
122 changes: 122 additions & 0 deletions apps/server_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package apps

import (
"fmt"
"github.com/apex/log"
"github.com/crawlab-team/crawlab-core/config"
"github.com/crawlab-team/crawlab-core/interfaces"
"github.com/crawlab-team/crawlab-core/node/service"
"github.com/crawlab-team/crawlab-core/utils"
"github.com/spf13/viper"
"net/http"
_ "net/http/pprof"
)

func init() {
injectModules()
}

type ServerV2 struct {
// settings
grpcAddress interfaces.Address

// dependencies
interfaces.WithConfigPath

// modules
nodeSvc interfaces.NodeService
api *ApiV2
dck *Docker

// internals
quit chan int
}

func (app *ServerV2) Init() {
// log node info
app.logNodeInfo()

// pprof
app.initPprof()
}

func (app *ServerV2) Start() {
if utils.IsMaster() {
// start docker app
if utils.IsDocker() {
go app.dck.Start()
}

// start api
go app.api.Start()
}

// start node service
go app.nodeSvc.Start()
}

func (app *ServerV2) Wait() {
<-app.quit
}

func (app *ServerV2) Stop() {
app.api.Stop()
app.quit <- 1
}

func (app *ServerV2) logNodeInfo() {
log.Infof("current node type: %s", utils.GetNodeType())
if utils.IsDocker() {
log.Infof("running in docker container")
}
}

func (app *ServerV2) initPprof() {
if viper.GetBool("pprof") {
go func() {
fmt.Println(http.ListenAndServe("0.0.0.0:6060", nil))
}()
}
}

func NewServerV2() (app NodeApp) {
// server
svr := &ServerV2{
WithConfigPath: config.NewConfigPathService(),
quit: make(chan int, 1),
}

// master modules
if utils.IsMaster() {
// api
svr.api = GetApiV2()

// docker
if utils.IsDocker() {
svr.dck = GetDocker(WithDockerParent(svr))
}
}

// node service
var err error
if utils.IsMaster() {
svr.nodeSvc, err = service.NewMasterServiceV2()
} else {
svr.nodeSvc, err = service.NewWorkerServiceV2()
}
if err != nil {
panic(err)
}

return svr
}

var serverV2 NodeApp

func GetServerV2() NodeApp {
if serverV2 != nil {
return serverV2
}
serverV2 = NewServerV2()
return serverV2
}
6 changes: 2 additions & 4 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ var serverCmd = &cobra.Command{
Short: "Start Crawlab server",
Long: `Start Crawlab node server that can serve as API, task scheduler, task runner, etc.`,
Run: func(cmd *cobra.Command, args []string) {
// options
var opts []apps.ServerOption

// app
svr := apps.GetServer(opts...)
//svr := apps.GetServer(opts...)
svr := apps.GetServerV2()

// start
apps.Start(svr)
Expand Down
4 changes: 3 additions & 1 deletion controllers/router_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func registerBuiltinHandler(group *gin.RouterGroup, method, path string, handler
group.Handle(method, path, handlerFunc)
}

func InitRoutes(app *gin.Engine) {
func InitRoutes(app *gin.Engine) (err error) {
// routes groups
groups := NewRouterGroups(app)

Expand Down Expand Up @@ -352,4 +352,6 @@ func InitRoutes(app *gin.Engine) {
HandlerFunc: PostLogout,
},
})

return nil
}
7 changes: 7 additions & 0 deletions grpc/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type GrpcClientV2 struct {

// clients
NodeClient grpc2.NodeServiceClient
TaskClient grpc2.TaskServiceClient
ModelBaseServiceV2Client grpc2.ModelBaseServiceV2Client
}

Expand Down Expand Up @@ -94,6 +95,8 @@ func (c *GrpcClientV2) Register() {
c.NodeClient = grpc2.NewNodeServiceClient(c.conn)
// model base service
c.ModelBaseServiceV2Client = grpc2.NewModelBaseServiceV2Client(c.conn)
// task
c.TaskClient = grpc2.NewTaskServiceClient(c.conn)

// log
log.Infof("[GrpcClient] grpc client registered client services")
Expand Down Expand Up @@ -123,6 +126,10 @@ func (c *GrpcClientV2) IsClosed() (res bool) {
return false
}

func (c *GrpcClientV2) GetMessageChannel() (msgCh chan *grpc2.StreamMessage) {
return c.msgCh
}

func (c *GrpcClientV2) getRequestData(d interface{}) (data []byte) {
if d == nil {
return data
Expand Down
Loading

0 comments on commit d875383

Please sign in to comment.