Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow reactivity features to be toggled with flag #1176

Merged
merged 3 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ var (
KeysLimit = DefaultKeysLimit

EnableProfiling = false

EnableWatch = true
)

type Config struct {
Expand Down
7 changes: 7 additions & 0 deletions internal/server/abstractserver/abstract_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package abstractserver

import "context"

type AbstractServer interface {
Run(ctx context.Context) error
}
2 changes: 2 additions & 0 deletions internal/server/httpServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/dicedb/dice/internal/server/abstractserver"
"hash/crc32"
"log/slog"
"net/http"
Expand Down Expand Up @@ -34,6 +35,7 @@ var unimplementedCommands = map[string]bool{
}

type HTTPServer struct {
abstractserver.AbstractServer
shardManager *shard.ShardManager
ioChan chan *ops.StoreResponse
httpServer *http.Server
Expand Down
15 changes: 10 additions & 5 deletions internal/server/resp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/dicedb/dice/internal/server/abstractserver"
"log/slog"
"net"
"sync"
Expand Down Expand Up @@ -36,6 +37,7 @@ const (
)

type Server struct {
abstractserver.AbstractServer
Host string
Port int
serverFD int
Expand Down Expand Up @@ -75,12 +77,15 @@ func (s *Server) Run(ctx context.Context) (err error) {
errChan := make(chan error, 1)
wg := &sync.WaitGroup{}

wg.Add(2)
go func() {
defer wg.Done()
s.watchManager.Run(ctx, s.cmdWatchChan)
}()
if s.cmdWatchChan != nil {
wg.Add(1)
go func() {
defer wg.Done()
s.watchManager.Run(ctx, s.cmdWatchChan)
}()
}

wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
if err := s.AcceptConnectionRequests(ctx, wg); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"github.com/dicedb/dice/internal/server/abstractserver"
"io"
"log/slog"
"net"
Expand All @@ -29,6 +30,7 @@ import (
)

type AsyncServer struct {
abstractserver.AbstractServer
serverFD int
maxClients int32
multiplexer iomultiplexer.IOMultiplexer
Expand Down
2 changes: 2 additions & 0 deletions internal/server/websocketServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/dicedb/dice/internal/server/abstractserver"
"log/slog"
"net"
"net/http"
Expand Down Expand Up @@ -35,6 +36,7 @@ var unimplementedCommandsWebsocket = map[string]bool{
}

type WebsocketServer struct {
abstractserver.AbstractServer
shardManager *shard.ShardManager
ioChan chan *ops.StoreResponse
websocketServer *http.Server
Expand Down
166 changes: 51 additions & 115 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
"github.com/dicedb/dice/internal/server/abstractserver"
"log/slog"
"os"
"os/signal"
Expand Down Expand Up @@ -40,6 +41,8 @@ func init() {
"This flag controls the number of keys each shard holds at startup. You can multiply this number with the "+
"total number of shard threads to estimate how much memory will be required at system start up.")
flag.BoolVar(&config.EnableProfiling, "enable-profiling", false, "enable profiling for the dicedb server")
flag.BoolVar(&config.EnableWatch, "enable-watch", false, "enable reactivity features which power the .WATCH commands")

flag.Parse()

config.SetupConfig()
Expand All @@ -60,9 +63,17 @@ func main() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)

queryWatchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Performance.WatchChanBufSize)
cmdWatchChan := make(chan dstore.CmdWatchEvent, config.DiceConfig.Memory.KeysLimit)
var serverErrCh chan error
var (
queryWatchChan chan dstore.QueryWatchEvent
cmdWatchChan chan dstore.CmdWatchEvent
serverErrCh = make(chan error, 2)
)

if config.EnableWatch {
bufSize := config.DiceConfig.Performance.WatchChanBufSize
queryWatchChan = make(chan dstore.QueryWatchEvent, bufSize)
cmdWatchChan = make(chan dstore.CmdWatchEvent, bufSize)
}

// Get the number of available CPU cores on the machine using runtime.NumCPU().
// This determines the total number of logical processors that can be utilized
Expand All @@ -71,13 +82,10 @@ func main() {
// If multithreading is not enabled, server will run on a single core.
var numCores int
if config.EnableMultiThreading {
serverErrCh = make(chan error, 1)
numCores = runtime.NumCPU()
logr.Debug("The DiceDB server has started in multi-threaded mode.", slog.Int("number of cores", numCores))
} else {
serverErrCh = make(chan error, 2)
logr.Debug("The DiceDB server has started in single-threaded mode.")
numCores = 1
}

// The runtime.GOMAXPROCS(numCores) call limits the number of operating system
Expand All @@ -99,132 +107,44 @@ func main() {

var serverWg sync.WaitGroup

// Initialize the AsyncServer server
// Find a port and bind it
if !config.EnableMultiThreading {
asyncServer := server.NewAsyncServer(shardManager, queryWatchChan, logr)
if err := asyncServer.FindPortAndBind(); err != nil {
cancel()
logr.Error("Error finding and binding port", slog.Any("error", err))
os.Exit(1)
}

serverWg.Add(1)
go func() {
defer serverWg.Done()
// Run the server
err := asyncServer.Run(ctx)

// Handling different server errors
if err != nil {
if errors.Is(err, context.Canceled) {
logr.Debug("Server was canceled")
} else if errors.Is(err, diceerrors.ErrAborted) {
logr.Debug("Server received abort command")
} else {
logr.Error(
"Server error",
slog.Any("error", err),
)
}
serverErrCh <- err
} else {
logr.Debug("Server stopped without error")
}
}()

// Goroutine to handle shutdown signals
wg.Add(1)
go func() {
defer wg.Done()
<-sigs
asyncServer.InitiateShutdown()
cancel()
}()

// Initialize the HTTP server
httpServer := server.NewHTTPServer(shardManager, logr)
serverWg.Add(1)
go func() {
defer serverWg.Done()
// Run the HTTP server
err := httpServer.Run(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
logr.Debug("HTTP Server was canceled")
} else if errors.Is(err, diceerrors.ErrAborted) {
logr.Debug("HTTP received abort command")
} else {
logr.Error("HTTP Server error", slog.Any("error", err))
}
serverErrCh <- err
} else {
logr.Debug("HTTP Server stopped without error")
}
}()
} else {
if config.EnableMultiThreading {
if config.EnableProfiling {
stopProfiling, err := startProfiling(logr)
if err != nil {
logr.Error("Profiling could not be started", slog.Any("error", err))
os.Exit(1)
sigs <- syscall.SIGKILL
}

defer stopProfiling()
}

workerManager := worker.NewWorkerManager(config.DiceConfig.Performance.MaxClients, shardManager)
// Initialize the RESP Server
respServer := resp.NewServer(shardManager, workerManager, cmdWatchChan, serverErrCh, logr)
serverWg.Add(1)
go func() {
defer serverWg.Done()
// Run the server
err := respServer.Run(ctx)
go runServer(ctx, &serverWg, respServer, logr, serverErrCh)
} else {
asyncServer := server.NewAsyncServer(shardManager, queryWatchChan, logr)
if err := asyncServer.FindPortAndBind(); err != nil {
logr.Error("Error finding and binding port", slog.Any("error", err))
sigs <- syscall.SIGKILL
}

// Handling different server errors
if err != nil {
if errors.Is(err, context.Canceled) {
logr.Debug("Server was canceled")
} else if errors.Is(err, diceerrors.ErrAborted) {
logr.Debug("Server received abort command")
} else {
logr.Error("Server error", "error", err)
}
serverErrCh <- err
} else {
logr.Debug("Server stopped without error")
}
}()

// Goroutine to handle shutdown signals
wg.Add(1)
go func() {
defer wg.Done()
<-sigs
respServer.Shutdown()
cancel()
}()
serverWg.Add(1)
go runServer(ctx, &serverWg, asyncServer, logr, serverErrCh)

httpServer := server.NewHTTPServer(shardManager, logr)
serverWg.Add(1)
go runServer(ctx, &serverWg, httpServer, logr, serverErrCh)
}

websocketServer := server.NewWebSocketServer(shardManager, config.WebsocketPort, logr)
serverWg.Add(1)
go runServer(ctx, &serverWg, websocketServer, logr, serverErrCh)

wg.Add(1)
go func() {
defer serverWg.Done()
// Run the Websocket server
err := websocketServer.Run(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
logr.Debug("Websocket Server was canceled")
} else if errors.Is(err, diceerrors.ErrAborted) {
logr.Debug("Websocket received abort command")
} else {
logr.Error("Websocket Server error", "error", err)
}
serverErrCh <- err
} else {
logr.Debug("Websocket Server stopped without error")
}
defer wg.Done()
<-sigs
cancel()
}()

go func() {
Expand All @@ -247,6 +167,22 @@ func main() {
logr.Debug("Server has shut down gracefully")
}

func runServer(ctx context.Context, wg *sync.WaitGroup, srv abstractserver.AbstractServer, logr *slog.Logger, errCh chan<- error) {
defer wg.Done()
if err := srv.Run(ctx); err != nil {
switch {
case errors.Is(err, context.Canceled):
logr.Debug(fmt.Sprintf("%T was canceled", srv))
case errors.Is(err, diceerrors.ErrAborted):
logr.Debug(fmt.Sprintf("%T received abort command", srv))
default:
logr.Error(fmt.Sprintf("%T error", srv), slog.Any("error", err))
}
errCh <- err
} else {
logr.Debug(fmt.Sprintf("%T stopped without error", srv))
}
}
func startProfiling(logr *slog.Logger) (func(), error) {
// Start CPU profiling
cpuFile, err := os.Create("cpu.prof")
Expand Down