Skip to content

Commit

Permalink
refactor main.go
Browse files Browse the repository at this point in the history
  • Loading branch information
JyotinderSingh committed Oct 21, 2024
1 parent 02466c4 commit 75e3ac3
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 117 deletions.
2 changes: 2 additions & 0 deletions internal/server/httpServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
_interface "github.com/dicedb/dice/internal/server/server_interface"
"hash/crc32"
"log/slog"
"net/http"
Expand Down Expand Up @@ -34,6 +35,7 @@ var unimplementedCommands = map[string]bool{
}

type HTTPServer struct {
_interface.AbstractServer
shardManager *shard.ShardManager
ioChan chan *ops.StoreResponse
httpServer *http.Server
Expand Down
2 changes: 2 additions & 0 deletions internal/server/resp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/dicedb/dice/internal/server/server_interface"
"log/slog"
"net"
"sync"
Expand Down Expand Up @@ -36,6 +37,7 @@ const (
)

type Server struct {
server_interface.AbstractServer
Host string
Port int
serverFD int
Expand Down
2 changes: 2 additions & 0 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"github.com/dicedb/dice/internal/server/server_interface"
"io"
"log/slog"
"net"
Expand All @@ -29,6 +30,7 @@ import (
)

type AsyncServer struct {
server_interface.AbstractServer
serverFD int
maxClients int32
multiplexer iomultiplexer.IOMultiplexer
Expand Down
7 changes: 7 additions & 0 deletions internal/server/server_interface/abstract_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package server_interface

Check failure on line 1 in internal/server/server_interface/abstract_server.go

View workflow job for this annotation

GitHub Actions / lint

ST1003: should not use underscores in package names (stylecheck)

import "context"

type AbstractServer interface {
Run(ctx context.Context) error
}
2 changes: 2 additions & 0 deletions internal/server/websocketServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/dicedb/dice/internal/server/server_interface"
"log/slog"
"net"
"net/http"
Expand Down Expand Up @@ -35,6 +36,7 @@ var unimplementedCommandsWebsocket = map[string]bool{
}

type WebsocketServer struct {
server_interface.AbstractServer
shardManager *shard.ShardManager
ioChan chan *ops.StoreResponse
websocketServer *http.Server
Expand Down
164 changes: 47 additions & 117 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
"github.com/dicedb/dice/internal/server/server_interface"
"log/slog"
"os"
"os/signal"
Expand Down Expand Up @@ -62,30 +63,29 @@ func main() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)

var queryWatchChan chan dstore.QueryWatchEvent = nil
var cmdWatchChan chan dstore.CmdWatchEvent = nil
var (
queryWatchChan chan dstore.QueryWatchEvent
cmdWatchChan chan dstore.CmdWatchEvent
serverErrCh = make(chan error, 2)
)

if config.EnableWatch {
queryWatchChan = make(chan dstore.QueryWatchEvent, config.DiceConfig.Performance.WatchChanBufSize)
cmdWatchChan = make(chan dstore.CmdWatchEvent, config.DiceConfig.Performance.WatchChanBufSize)
bufSize := config.DiceConfig.Performance.WatchChanBufSize
queryWatchChan = make(chan dstore.QueryWatchEvent, bufSize)
cmdWatchChan = make(chan dstore.CmdWatchEvent, bufSize)
}

var serverErrCh chan error

// Get the number of available CPU cores on the machine using runtime.NumCPU().
// This determines the total number of logical processors that can be utilized
// for parallel execution. Setting the maximum number of CPUs to the available
// core count ensures the application can make full use of all available hardware.
// If multithreading is not enabled, server will run on a single core.
var numCores int
if config.EnableMultiThreading {
serverErrCh = make(chan error, 1)
numCores = runtime.NumCPU()
logr.Debug("The DiceDB server has started in multi-threaded mode.", slog.Int("number of cores", numCores))
} else {
serverErrCh = make(chan error, 2)
logr.Debug("The DiceDB server has started in single-threaded mode.")
numCores = 1
}

// The runtime.GOMAXPROCS(numCores) call limits the number of operating system
Expand All @@ -107,132 +107,46 @@ func main() {

var serverWg sync.WaitGroup

// Initialize the AsyncServer server
// Find a port and bind it
if !config.EnableMultiThreading {
asyncServer := server.NewAsyncServer(shardManager, queryWatchChan, logr)
if err := asyncServer.FindPortAndBind(); err != nil {
cancel()
logr.Error("Error finding and binding port", slog.Any("error", err))
os.Exit(1)
}

serverWg.Add(1)
go func() {
defer serverWg.Done()
// Run the server
err := asyncServer.Run(ctx)

// Handling different server errors
if err != nil {
if errors.Is(err, context.Canceled) {
logr.Debug("Server was canceled")
} else if errors.Is(err, diceerrors.ErrAborted) {
logr.Debug("Server received abort command")
} else {
logr.Error(
"Server error",
slog.Any("error", err),
)
}
serverErrCh <- err
} else {
logr.Debug("Server stopped without error")
}
}()

// Goroutine to handle shutdown signals
wg.Add(1)
go func() {
defer wg.Done()
<-sigs
asyncServer.InitiateShutdown()
cancel()
}()

// Initialize the HTTP server
httpServer := server.NewHTTPServer(shardManager, logr)
serverWg.Add(1)
go func() {
defer serverWg.Done()
// Run the HTTP server
err := httpServer.Run(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
logr.Debug("HTTP Server was canceled")
} else if errors.Is(err, diceerrors.ErrAborted) {
logr.Debug("HTTP received abort command")
} else {
logr.Error("HTTP Server error", slog.Any("error", err))
}
serverErrCh <- err
} else {
logr.Debug("HTTP Server stopped without error")
}
}()
} else {
if config.EnableMultiThreading {
if config.EnableProfiling {
stopProfiling, err := startProfiling(logr)
if err != nil {
logr.Error("Profiling could not be started", slog.Any("error", err))
close(sigs)
os.Exit(1)
}

defer stopProfiling()
}

workerManager := worker.NewWorkerManager(config.DiceConfig.Performance.MaxClients, shardManager)
// Initialize the RESP Server
respServer := resp.NewServer(shardManager, workerManager, cmdWatchChan, serverErrCh, logr)
serverWg.Add(1)
go func() {
defer serverWg.Done()
// Run the server
err := respServer.Run(ctx)
go runServer(ctx, &serverWg, respServer, logr, serverErrCh)
} else {
asyncServer := server.NewAsyncServer(shardManager, queryWatchChan, logr)
if err := asyncServer.FindPortAndBind(); err != nil {
logr.Error("Error finding and binding port", slog.Any("error", err))
close(sigs)
os.Exit(1)

Check failure on line 130 in main.go

View workflow job for this annotation

GitHub Actions / lint

exitAfterDefer: os.Exit will exit, and `defer stopProfiling()` will not run (gocritic)
}

// Handling different server errors
if err != nil {
if errors.Is(err, context.Canceled) {
logr.Debug("Server was canceled")
} else if errors.Is(err, diceerrors.ErrAborted) {
logr.Debug("Server received abort command")
} else {
logr.Error("Server error", "error", err)
}
serverErrCh <- err
} else {
logr.Debug("Server stopped without error")
}
}()

// Goroutine to handle shutdown signals
wg.Add(1)
go func() {
defer wg.Done()
<-sigs
respServer.Shutdown()
cancel()
}()
serverWg.Add(1)
go runServer(ctx, &serverWg, asyncServer, logr, serverErrCh)

httpServer := server.NewHTTPServer(shardManager, logr)
serverWg.Add(1)
go runServer(ctx, &serverWg, httpServer, logr, serverErrCh)
}

websocketServer := server.NewWebSocketServer(shardManager, config.WebsocketPort, logr)
serverWg.Add(1)
go runServer(ctx, &serverWg, websocketServer, logr, serverErrCh)

wg.Add(1)
go func() {
defer serverWg.Done()
// Run the Websocket server
err := websocketServer.Run(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
logr.Debug("Websocket Server was canceled")
} else if errors.Is(err, diceerrors.ErrAborted) {
logr.Debug("Websocket received abort command")
} else {
logr.Error("Websocket Server error", "error", err)
}
serverErrCh <- err
} else {
logr.Debug("Websocket Server stopped without error")
}
defer wg.Done()
<-sigs
cancel()
}()

go func() {
Expand All @@ -255,6 +169,22 @@ func main() {
logr.Debug("Server has shut down gracefully")
}

func runServer(ctx context.Context, wg *sync.WaitGroup, srv server_interface.AbstractServer, logr *slog.Logger, errCh chan<- error) {
defer wg.Done()
if err := srv.Run(ctx); err != nil {
switch {
case errors.Is(err, context.Canceled):
logr.Debug(fmt.Sprintf("%T was canceled", srv))
case errors.Is(err, diceerrors.ErrAborted):
logr.Debug(fmt.Sprintf("%T received abort command", srv))
default:
logr.Error(fmt.Sprintf("%T error", srv), slog.Any("error", err))
}
errCh <- err
} else {
logr.Debug(fmt.Sprintf("%T stopped without error", srv))
}
}
func startProfiling(logr *slog.Logger) (func(), error) {
// Start CPU profiling
cpuFile, err := os.Create("cpu.prof")
Expand Down

0 comments on commit 75e3ac3

Please sign in to comment.