From c359d9c07c69a001434ba893cf0afd997def16ba Mon Sep 17 00:00:00 2001 From: malwaredllc <30509968+malwaredllc@users.noreply.github.com> Date: Sat, 14 May 2022 17:40:24 -0600 Subject: [PATCH 1/3] int tests --- README.md | 6 +- client/cache_client/cache_client.go | 34 +++++---- client/cache_client/cache_client_test.go | 64 ----------------- client/main.go | 26 ------- configs/nodes-local.json | 25 +++++++ main.go | 5 +- main_test.go | 92 ++++++++++++++++++++++++ ring/ring.go | 4 ++ server/election.go | 13 ++-- server/server.go | 66 +++++++++++++++-- 10 files changed, 214 insertions(+), 121 deletions(-) delete mode 100644 client/cache_client/cache_client_test.go delete mode 100644 client/main.go create mode 100644 configs/nodes-local.json create mode 100644 main_test.go diff --git a/README.md b/README.md index d63a033..9df93fc 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ This may not be the best distributed cache, but it is a distributed cache. -Features: +## Features: ### LRU eviction policy with O(1) operations - Least-recently-used eviction policy with a configurable cache capacity ensures low cache-miss rate @@ -21,3 +21,7 @@ Features: ### No single point of failure - The distributed election algorithm allows any nodes to arbitrarily join/leave cluster at any time, and there is always guaranteed to be a leader tracking the state of nodes in the cluster to provide to clients for consistent hashing. + +------------ + +## Example Usage diff --git a/client/cache_client/cache_client.go b/client/cache_client/cache_client.go index 21e45e7..b0912a8 100644 --- a/client/cache_client/cache_client.go +++ b/client/cache_client/cache_client.go @@ -30,6 +30,7 @@ type Payload struct { type ClientWrapper struct { Config nodelib.NodesConfig Ring *ring.Ring + CertDir string } // Checks cluster config every 5 seconds and updates ring with any changes. Runs in infinite loop. @@ -53,7 +54,7 @@ func (c *ClientWrapper) StartClusterConfigWatcher() { attempted[randnode.Id] = true // skip node we can't connect to - client, err := NewCacheClient(randnode.Host, int(randnode.GrpcPort)) + client, err := NewCacheClient(c.CertDir, randnode.Host, int(randnode.GrpcPort)) if err != nil { continue } @@ -81,7 +82,7 @@ func (c *ClientWrapper) StartClusterConfigWatcher() { defer cancel() // restart process if we can't connect to leader - client, err := NewCacheClient(leader.Host, int(leader.GrpcPort)) + client, err := NewCacheClient(c.CertDir, leader.Host, int(leader.GrpcPort)) if err != nil { continue } @@ -125,14 +126,14 @@ func (c *ClientWrapper) StartClusterConfigWatcher() { } // Create new Client struct instance and sets up node ring with consistent hashing -func NewClientWrapper(config_file string) *ClientWrapper { +func NewClientWrapper(cert_dir string, config_file string) *ClientWrapper { // get initial nodes from config file and add them to the ring init_nodes_config := nodelib.LoadNodesConfig(config_file) ring := ring.NewRing() var cluster_config []*pb.Node for _, node := range init_nodes_config.Nodes { - c, err := NewCacheClient(node.Host, int(node.GrpcPort)) + c, err := NewCacheClient(cert_dir, node.Host, int(node.GrpcPort)) if err != nil { log.Printf("error: %v", err) continue @@ -164,7 +165,7 @@ func NewClientWrapper(config_file string) *ClientWrapper { ring.AddNode(node.Id, node.Host, node.RestPort, node.GrpcPort) // attempt to create client - c, err := NewCacheClient(node.Host, int(node.GrpcPort)) + c, err := NewCacheClient(cert_dir, node.Host, int(node.GrpcPort)) if err != nil { log.Printf("error: %v", err) continue @@ -172,13 +173,13 @@ func NewClientWrapper(config_file string) *ClientWrapper { config_map[node.Id].SetGrpcClient(c) } config := nodelib.NodesConfig{Nodes: config_map} - return &ClientWrapper{Config: config, Ring: ring} + return &ClientWrapper{Config: config, Ring: ring, CertDir: cert_dir} } // Utility funciton to get a new Cache Client which uses gRPC secured with mTLS -func NewCacheClient(server_host string, server_port int) (pb.CacheServiceClient, error) { +func NewCacheClient(cert_dir string, server_host string, server_port int) (pb.CacheServiceClient, error) { // set up TLS - creds, err := LoadTLSCredentials() + creds, err := LoadTLSCredentials(cert_dir) if err != nil { log.Fatalf("failed to create credentials: %v", err) } @@ -248,7 +249,9 @@ func (c *ClientWrapper) Put(key string, value string) error { } // check response - _, err = new(http.Client).Do(req) + res, err := new(http.Client).Do(req) + defer res.Body.Close() + if err != nil { return errors.New(fmt.Sprintf("error sending POST request: %s", err)) } @@ -262,7 +265,7 @@ func (c *ClientWrapper) GetGrpc(key string) (string, error) { // make new client if necessary if nodeInfo.GrpcClient == nil { - c, err := NewCacheClient(nodeInfo.Host, int(nodeInfo.GrpcPort)) + c, err := NewCacheClient(c.CertDir, nodeInfo.Host, int(nodeInfo.GrpcPort)) if err != nil { return "", errors.New(fmt.Sprintf("error making gRPC client: %s", err)) } @@ -288,7 +291,7 @@ func (c *ClientWrapper) PutGrpc(key string, value string) error { // make new client if necessary if nodeInfo.GrpcClient == nil { - c, err := NewCacheClient(nodeInfo.Host, int(nodeInfo.GrpcPort)) + c, err := NewCacheClient(c.CertDir, nodeInfo.Host, int(nodeInfo.GrpcPort)) if err != nil { return errors.New(fmt.Sprintf("error making gRPC client: %s", err)) } @@ -308,9 +311,9 @@ func (c *ClientWrapper) PutGrpc(key string, value string) error { } // Utility function to set up mTLS config and credentials -func LoadTLSCredentials() (credentials.TransportCredentials, error) { +func LoadTLSCredentials(cert_dir string) (credentials.TransportCredentials, error) { // Load certificate of the CA who signed server's certificate - pemServerCA, err := ioutil.ReadFile("../../certs/ca-cert.pem") + pemServerCA, err := ioutil.ReadFile(fmt.Sprintf("%s/ca-cert.pem", cert_dir)) if err != nil { return nil, err } @@ -321,7 +324,10 @@ func LoadTLSCredentials() (credentials.TransportCredentials, error) { } // Load client's certificate and private key - clientCert, err := tls.LoadX509KeyPair("../../certs/client-cert.pem", "../../certs/client-key.pem") + clientCert, err := tls.LoadX509KeyPair( + fmt.Sprintf("%s/client-cert.pem", cert_dir), + fmt.Sprintf("%s/client-key.pem", cert_dir), + ) if err != nil { return nil, err } diff --git a/client/cache_client/cache_client_test.go b/client/cache_client/cache_client_test.go deleted file mode 100644 index 96dc8e8..0000000 --- a/client/cache_client/cache_client_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package cache_client - -import ( - "testing" - "strconv" - "sync" -) - -// 10 goroutines make 10k requests each via REST API. Count cache misses. -func Test10kConcurrentRestApiPutsDocker(t *testing.T) { - c := NewClientWrapper("../../configs/nodes-docker.json") - c.StartClusterConfigWatcher() - - var wg sync.WaitGroup - var mutex sync.Mutex - miss := 0.0 - - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for i := 1; i <= 1000; i++ { - v := strconv.Itoa(i) - err := c.Put(v, v) - if err != nil { - mutex.Lock() - miss += 1 - mutex.Unlock() - } - } - }() - } - wg.Wait() - t.Logf("Cache misses: %d/10,000 (%f%%)", int(miss), miss/10000) -} - - -// 10 goroutines make 10k requests each vi gRPC. Count cache misses. -func Test10kConcurrentGrpcPutsDocker(t *testing.T) { - c := NewClientWrapper("../../configs/nodes-docker.json") - c.StartClusterConfigWatcher() - - var wg sync.WaitGroup - var mutex sync.Mutex - miss := 0.0 - - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for i := 1; i <= 1000; i++ { - v := strconv.Itoa(i) - err := c.PutGrpc(v, v) - if err != nil { - mutex.Lock() - miss += 1 - mutex.Unlock() - } - } - }() - } - wg.Wait() - t.Logf("Cache misses: %df10,000 (%f%%)", int(miss), miss/10000) -} \ No newline at end of file diff --git a/client/main.go b/client/main.go deleted file mode 100644 index 2a6024a..0000000 --- a/client/main.go +++ /dev/null @@ -1,26 +0,0 @@ -package main - -import ( - "strconv" - "sync" - "github.com/malwaredllc/minicache/client/cache_client" -) - -// 10 goroutines make 1k requests each -func main() { - c := cache_client.NewClientWrapper("../configs/nodes-docker.json") - c.StartClusterConfigWatcher() - - var wg sync.WaitGroup - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for i := 1; i <= 1000; i++ { - v := strconv.Itoa(i) - c.Put(v, v) - } - }() - } - wg.Wait() -} \ No newline at end of file diff --git a/configs/nodes-local.json b/configs/nodes-local.json new file mode 100644 index 0000000..8ec929e --- /dev/null +++ b/configs/nodes-local.json @@ -0,0 +1,25 @@ +{ + "nodes": { + "node0": { + "id": "node0", + "host": "localhost", + "rest_port": 8080, + "grpc_port": 5005, + "HashId": "" + }, + "node1": { + "id": "node1", + "host": "localhost", + "rest_port": 8081, + "grpc_port": 5006, + "HashId": "" + }, + "node2": { + "id": "node2", + "host": "localhost", + "rest_port": 8082, + "grpc_port": 5007, + "HashId": "" + } + } +} diff --git a/main.go b/main.go index 65586d1..fabe27d 100644 --- a/main.go +++ b/main.go @@ -25,15 +25,12 @@ func main() { } // get new grpc id server - grpc_server, cache_server := server.GetNewCacheServer(*capacity, *config_file, *verbose) + grpc_server, cache_server := server.GetNewCacheServer(*capacity, *config_file, *verbose, server.DYNAMIC) // run gRPC server log.Printf("Running gRPC server on port %d...", *grpc_port) go grpc_server.Serve(listener) - // set up grpc clients with each node in cluster - //cache_server.CreateAllGrpcClients() - // register node with cluster cache_server.RegisterNodeInternal() diff --git a/main_test.go b/main_test.go new file mode 100644 index 0000000..363cc9d --- /dev/null +++ b/main_test.go @@ -0,0 +1,92 @@ +package main + +import ( + "testing" + "strconv" + "sync" + "path/filepath" + "github.com/malwaredllc/minicache/server" + "github.com/malwaredllc/minicache/client/cache_client" +) + +const ( + RELATIVE_CONFIG_PATH = "configs/nodes-local.json" + RELATIVE_CLIENT_CERT_DIR = "certs" +) + + +// 10 goroutines make 10k requests each via REST API. Count cache misses. +func Test10kConcurrentRestApiPuts(t *testing.T) { + // start servers + capacity := 100 + verbose := true + abs_cert_dir, _ := filepath.Abs(RELATIVE_CLIENT_CERT_DIR) + abs_config_path, _ := filepath.Abs(RELATIVE_CONFIG_PATH) + + server.CreateAndRunAllFromConfig(capacity, abs_config_path, verbose) + + // start client + c := cache_client.NewClientWrapper(abs_cert_dir, abs_config_path) + c.StartClusterConfigWatcher() + + var wg sync.WaitGroup + var mutex sync.Mutex + miss := 0.0 + + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 1; i <= 1000; i++ { + v := strconv.Itoa(i) + err := c.Put(v, v) + if err != nil { + mutex.Lock() + miss += 1 + mutex.Unlock() + } + } + }() + } + wg.Wait() + t.Logf("Cache misses: %d/10,000 (%f%%)", int(miss), miss/10000) +} + + +// 10 goroutines make 10k requests each vi gRPC. Count cache misses. +func Test10kConcurrentGrpcPuts(t *testing.T) { + // start servers + capacity := 100 + verbose := true + abs_cert_dir, _ := filepath.Abs(RELATIVE_CLIENT_CERT_DIR) + abs_config_path, _ := filepath.Abs(RELATIVE_CONFIG_PATH) + + server.CreateAndRunAllFromConfig(capacity, abs_config_path, verbose) + + // start client + c := cache_client.NewClientWrapper(abs_cert_dir, abs_config_path) + c.StartClusterConfigWatcher() + + + var wg sync.WaitGroup + var mutex sync.Mutex + miss := 0.0 + + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 1; i <= 1000; i++ { + v := strconv.Itoa(i) + err := c.PutGrpc(v, v) + if err != nil { + mutex.Lock() + miss += 1 + mutex.Unlock() + } + } + }() + } + wg.Wait() + t.Logf("Cache misses: %d/10,000 (%f%%)", int(miss), miss/10000) +} \ No newline at end of file diff --git a/ring/ring.go b/ring/ring.go index e432160..9b781c8 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -43,6 +43,10 @@ func (r *Ring) RemoveNode(id string) error { } func (r *Ring) Get(id string) string { + // handle empty ring + if len(r.Nodes) == 0 { + panic("CONSISTENT HASHING RING IS EMPTY") + } i := r.search(id) if i >= r.Nodes.Len() { i = 0 diff --git a/server/election.go b/server/election.go index c9fe9ad..422ec5d 100644 --- a/server/election.go +++ b/server/election.go @@ -73,10 +73,13 @@ func (s *CacheServer) RunElection() { s.logger.Info("Waiting for decision...") // if after 5 seconds we receive no winner announcement, start the election process over select { - case s.leader_id = <-s.decision_chan: - s.logger.Infof("Received decision: Leader is node %s", s.leader_id) - s.election_status = NO_ELECTION_RUNNING - return + case winner := <-s.decision_chan: + if winner != "" { + s.leader_id = winner + s.logger.Infof("Received decision: Leader is node %s", s.leader_id) + s.election_status = NO_ELECTION_RUNNING + return + } case <-time.After(5*time.Second): s.logger.Info("Timed out waiting for decision. Starting new election.") s.RunElection() @@ -219,7 +222,7 @@ func (s *CacheServer) IsLeaderAlive() bool { s.logger.Infof("leader is %s", s.leader_id) leader, ok := s.nodes_config.Nodes[s.leader_id] if !ok { - s.logger.Infof("leader %s does not exist", leader) + s.logger.Infof("leader %s does not exist", s.leader_id) return true } diff --git a/server/server.go b/server/server.go index ca5423d..f2bfd4b 100644 --- a/server/server.go +++ b/server/server.go @@ -15,7 +15,9 @@ import ( "io/ioutil" "net/http" "os" + "log" "time" + "net" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -59,9 +61,15 @@ type Pair struct { Value string `json:"value"` } +const ( + DYNAMIC = "DYNAMIC" +) + // Utility function for creating a new gRPC server secured with mTLS, and registering a cache server service with it. +// Set node_id param to DYNAMIC to dynamically discover node id. +// Otherwise, manually set it to a valid node_id from the config file. // Returns tuple of (gRPC server instance, registered Cache CacheServer instance). -func GetNewCacheServer(capacity int, config_file string, verbose bool) (*grpc.Server, *CacheServer) { +func GetNewCacheServer(capacity int, config_file string, verbose bool, node_id string) (*grpc.Server, *CacheServer) { // set up logging sugared_logger := GetSugaredZapLogger(verbose) @@ -69,14 +77,25 @@ func GetNewCacheServer(capacity int, config_file string, verbose bool) (*grpc.Se nodes_config := node.LoadNodesConfig(config_file) // determine which node id we are and which group we are in - node_id := node.GetCurrentNodeId(nodes_config) + var final_node_id string + if node_id == DYNAMIC { + final_node_id = node.GetCurrentNodeId(nodes_config) + + // if this is not one of the initial nodes in the config file, add it dynamically + if _, ok := nodes_config.Nodes[final_node_id]; !ok { + host, _ := os.Hostname() + nodes_config.Nodes[final_node_id] = node.NewNode(final_node_id, host, 8080, 5005) + } + } else { + final_node_id = node_id - // if this is not one of the initial nodes in the config file, add it dynamically - if _, ok := nodes_config.Nodes[node_id]; !ok { - host, _ := os.Hostname() - nodes_config.Nodes[node_id] = node.NewNode(node_id, host, 8080, 5005) + // if this is not one of the initial nodes in the config file, panic + if _, ok := nodes_config.Nodes[final_node_id]; !ok { + panic("given node ID not found in config file") + } } + // set up gin router router := gin.New() router.Use(gin.Recovery()) @@ -91,6 +110,7 @@ func GetNewCacheServer(capacity int, config_file string, verbose bool) (*grpc.Se logger: sugared_logger, nodes_config: nodes_config, node_id: node_id, + leader_id: NO_LEADER, decision_chan: make(chan string, 1), } @@ -301,4 +321,36 @@ func (s *CacheServer) RegisterNodeInternal() { } } - +func CreateAndRunAllFromConfig(capacity int, config_file string, verbose bool) { + log.Printf("Creating and running all nodes from config file: %s", config_file) + config := node.LoadNodesConfig(config_file) + + for _, nodeInfo := range config.Nodes { + // set up listener TCP connectiion + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", nodeInfo.GrpcPort)) + if err != nil { + panic(err) + } + + // get new grpc id server + grpc_server, cache_server := GetNewCacheServer(capacity, config_file, verbose, nodeInfo.Id) + + // run gRPC server + log.Printf("Running gRPC server on port %d...", nodeInfo.GrpcPort) + go grpc_server.Serve(listener) + + // register node with cluster + cache_server.RegisterNodeInternal() + + // run initial election + cache_server.RunElection() + + // start leader heartbeat monitor + go cache_server.StartLeaderHeartbeatMonitor() + + + // run HTTP server + log.Printf("Running REST API server on port %d...", nodeInfo.RestPort) + go cache_server.RunHttpServer(int(nodeInfo.RestPort)) + } +} From 24a2217dfd7243ec7251ee7718ad4c37aa92ffba Mon Sep 17 00:00:00 2001 From: malwaredllc <30509968+malwaredllc@users.noreply.github.com> Date: Sat, 14 May 2022 18:04:30 -0600 Subject: [PATCH 2/3] int tests --- .github/workflows/go.yml | 3 +++ README.md | 5 ++--- main.go | 4 ++-- main_test.go | 30 ++++++++++++++++++++++++++++-- server/server.go | 39 +++++++++++++++++++++++++++++++++------ 5 files changed, 68 insertions(+), 13 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 380bb17..929c5b2 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -26,3 +26,6 @@ jobs: - name: Test Consistent Hashing Ring run: go test -v ./ring + + - name: Integration Tests + run: go test -v main_test.go \ No newline at end of file diff --git a/README.md b/README.md index 9df93fc..6f7ddb7 100644 --- a/README.md +++ b/README.md @@ -4,11 +4,10 @@ This may not be the best distributed cache, but it is a distributed cache. -## Features: - -### LRU eviction policy with O(1) operations +### Thread-safe LRU cache with O(1) operations - Least-recently-used eviction policy with a configurable cache capacity ensures low cache-miss rate - Get/Put operations and eviction run all run in **O(1) time** +- LRU cache implementation is thread safe ### Consistent Hashing - Client uses **consistent hashing** to uniformly distribute requests and minimize required re-mappings when servers join/leave the cluster diff --git a/main.go b/main.go index fabe27d..c9d392d 100644 --- a/main.go +++ b/main.go @@ -25,7 +25,7 @@ func main() { } // get new grpc id server - grpc_server, cache_server := server.GetNewCacheServer(*capacity, *config_file, *verbose, server.DYNAMIC) + grpc_server, cache_server := server.NewCacheServer(*capacity, *config_file, *verbose, server.DYNAMIC) // run gRPC server log.Printf("Running gRPC server on port %d...", *grpc_port) @@ -43,5 +43,5 @@ func main() { // run HTTP server log.Printf("Running REST API server on port %d...", *rest_port) - cache_server.RunHttpServer(*rest_port) + cache_server.RunAndReturnHttpServer(*rest_port) } diff --git a/main_test.go b/main_test.go index 363cc9d..e5d903e 100644 --- a/main_test.go +++ b/main_test.go @@ -1,9 +1,11 @@ package main import ( + "context" "testing" "strconv" "sync" + "time" "path/filepath" "github.com/malwaredllc/minicache/server" "github.com/malwaredllc/minicache/client/cache_client" @@ -23,7 +25,7 @@ func Test10kConcurrentRestApiPuts(t *testing.T) { abs_cert_dir, _ := filepath.Abs(RELATIVE_CLIENT_CERT_DIR) abs_config_path, _ := filepath.Abs(RELATIVE_CONFIG_PATH) - server.CreateAndRunAllFromConfig(capacity, abs_config_path, verbose) + components := server.CreateAndRunAllFromConfig(capacity, abs_config_path, verbose) // start client c := cache_client.NewClientWrapper(abs_cert_dir, abs_config_path) @@ -50,6 +52,18 @@ func Test10kConcurrentRestApiPuts(t *testing.T) { } wg.Wait() t.Logf("Cache misses: %d/10,000 (%f%%)", int(miss), miss/10000) + + // cleanup + for _, srv_comps := range components { + srv_comps.GrpcServer.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + if err := srv_comps.HttpServer.Shutdown(ctx); err != nil { + t.Logf("Http server shutdown error: %s", err) + } + } } @@ -61,7 +75,7 @@ func Test10kConcurrentGrpcPuts(t *testing.T) { abs_cert_dir, _ := filepath.Abs(RELATIVE_CLIENT_CERT_DIR) abs_config_path, _ := filepath.Abs(RELATIVE_CONFIG_PATH) - server.CreateAndRunAllFromConfig(capacity, abs_config_path, verbose) + components := server.CreateAndRunAllFromConfig(capacity, abs_config_path, verbose) // start client c := cache_client.NewClientWrapper(abs_cert_dir, abs_config_path) @@ -89,4 +103,16 @@ func Test10kConcurrentGrpcPuts(t *testing.T) { } wg.Wait() t.Logf("Cache misses: %d/10,000 (%f%%)", int(miss), miss/10000) + + // cleanup + for _, srv_comps := range components { + srv_comps.GrpcServer.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + if err := srv_comps.HttpServer.Shutdown(ctx); err != nil { + t.Logf("Http server shutdown error: %s", err) + } + } } \ No newline at end of file diff --git a/server/server.go b/server/server.go index f2bfd4b..215de43 100644 --- a/server/server.go +++ b/server/server.go @@ -61,6 +61,11 @@ type Pair struct { Value string `json:"value"` } +type ServerComponents struct { + GrpcServer *grpc.Server + HttpServer *http.Server +} + const ( DYNAMIC = "DYNAMIC" ) @@ -69,7 +74,7 @@ const ( // Set node_id param to DYNAMIC to dynamically discover node id. // Otherwise, manually set it to a valid node_id from the config file. // Returns tuple of (gRPC server instance, registered Cache CacheServer instance). -func GetNewCacheServer(capacity int, config_file string, verbose bool, node_id string) (*grpc.Server, *CacheServer) { +func NewCacheServer(capacity int, config_file string, verbose bool, node_id string) (*grpc.Server, *CacheServer) { // set up logging sugared_logger := GetSugaredZapLogger(verbose) @@ -161,8 +166,24 @@ func (s *CacheServer) PutHandler(c *gin.Context) { c.IndentedJSON(http.StatusCreated, <-result) } -func (s *CacheServer) RunHttpServer(port int) { - s.router.Run(fmt.Sprintf(":%d", port)) +func (s *CacheServer) RunAndReturnHttpServer(port int) *http.Server { + // setup http server + addr := fmt.Sprintf(":%d", port) + srv := &http.Server{ + Addr: addr, + Handler: s.router, + } + + // run in background + go func() { + // service connections + if err := srv.ListenAndServe(); err != nil { + log.Printf("listen: %s\n", err) + } + }() + + // return server object so we can shutdown gracefully later + return srv } // gRPC handler for getting item from cache. Any replica in the group can serve read requests. @@ -321,10 +342,13 @@ func (s *CacheServer) RegisterNodeInternal() { } } -func CreateAndRunAllFromConfig(capacity int, config_file string, verbose bool) { +// Create and run all servers defined in config file and return list of server components +func CreateAndRunAllFromConfig(capacity int, config_file string, verbose bool) []ServerComponents { log.Printf("Creating and running all nodes from config file: %s", config_file) config := node.LoadNodesConfig(config_file) + var components []ServerComponents + for _, nodeInfo := range config.Nodes { // set up listener TCP connectiion listener, err := net.Listen("tcp", fmt.Sprintf(":%d", nodeInfo.GrpcPort)) @@ -333,7 +357,7 @@ func CreateAndRunAllFromConfig(capacity int, config_file string, verbose bool) { } // get new grpc id server - grpc_server, cache_server := GetNewCacheServer(capacity, config_file, verbose, nodeInfo.Id) + grpc_server, cache_server := NewCacheServer(capacity, config_file, verbose, nodeInfo.Id) // run gRPC server log.Printf("Running gRPC server on port %d...", nodeInfo.GrpcPort) @@ -351,6 +375,9 @@ func CreateAndRunAllFromConfig(capacity int, config_file string, verbose bool) { // run HTTP server log.Printf("Running REST API server on port %d...", nodeInfo.RestPort) - go cache_server.RunHttpServer(int(nodeInfo.RestPort)) + http_server := cache_server.RunAndReturnHttpServer(int(nodeInfo.RestPort)) + + components = append(components, ServerComponents{GrpcServer: grpc_server, HttpServer: http_server}) } + return components } From 71d764dd69c8e2041ae53e221b6c4397fd321790 Mon Sep 17 00:00:00 2001 From: malwaredllc <30509968+malwaredllc@users.noreply.github.com> Date: Sat, 14 May 2022 18:07:38 -0600 Subject: [PATCH 3/3] Env var --- .github/workflows/go.yml | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 929c5b2..27d44fc 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -22,10 +22,6 @@ jobs: run: go build -v ./... - name: Test LRU Cache - run: go test -v ./lru_cache - - - name: Test Consistent Hashing Ring - run: go test -v ./ring - - - name: Integration Tests - run: go test -v main_test.go \ No newline at end of file + run: go test -v ./... + env: + GODEBUG: x509sha1=1 \ No newline at end of file