Skip to content

Commit

Permalink
int tests
Browse files Browse the repository at this point in the history
  • Loading branch information
malwaredllc committed May 15, 2022
1 parent c359d9c commit 24a2217
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 13 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ jobs:

- name: Test Consistent Hashing Ring
run: go test -v ./ring

- name: Integration Tests
run: go test -v main_test.go
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

This may not be the best distributed cache, but it is a distributed cache.

## Features:

### LRU eviction policy with O(1) operations
### Thread-safe LRU cache with O(1) operations
- Least-recently-used eviction policy with a configurable cache capacity ensures low cache-miss rate
- Get/Put operations and eviction run all run in **O(1) time**
- LRU cache implementation is thread safe

### Consistent Hashing
- Client uses **consistent hashing** to uniformly distribute requests and minimize required re-mappings when servers join/leave the cluster
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func main() {
}

// get new grpc id server
grpc_server, cache_server := server.GetNewCacheServer(*capacity, *config_file, *verbose, server.DYNAMIC)
grpc_server, cache_server := server.NewCacheServer(*capacity, *config_file, *verbose, server.DYNAMIC)

// run gRPC server
log.Printf("Running gRPC server on port %d...", *grpc_port)
Expand All @@ -43,5 +43,5 @@ func main() {

// run HTTP server
log.Printf("Running REST API server on port %d...", *rest_port)
cache_server.RunHttpServer(*rest_port)
cache_server.RunAndReturnHttpServer(*rest_port)
}
30 changes: 28 additions & 2 deletions main_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package main

import (
"context"
"testing"
"strconv"
"sync"
"time"
"path/filepath"
"github.com/malwaredllc/minicache/server"
"github.com/malwaredllc/minicache/client/cache_client"
Expand All @@ -23,7 +25,7 @@ func Test10kConcurrentRestApiPuts(t *testing.T) {
abs_cert_dir, _ := filepath.Abs(RELATIVE_CLIENT_CERT_DIR)
abs_config_path, _ := filepath.Abs(RELATIVE_CONFIG_PATH)

server.CreateAndRunAllFromConfig(capacity, abs_config_path, verbose)
components := server.CreateAndRunAllFromConfig(capacity, abs_config_path, verbose)

// start client
c := cache_client.NewClientWrapper(abs_cert_dir, abs_config_path)
Expand All @@ -50,6 +52,18 @@ func Test10kConcurrentRestApiPuts(t *testing.T) {
}
wg.Wait()
t.Logf("Cache misses: %d/10,000 (%f%%)", int(miss), miss/10000)

// cleanup
for _, srv_comps := range components {
srv_comps.GrpcServer.Stop()

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

if err := srv_comps.HttpServer.Shutdown(ctx); err != nil {
t.Logf("Http server shutdown error: %s", err)
}
}
}


Expand All @@ -61,7 +75,7 @@ func Test10kConcurrentGrpcPuts(t *testing.T) {
abs_cert_dir, _ := filepath.Abs(RELATIVE_CLIENT_CERT_DIR)
abs_config_path, _ := filepath.Abs(RELATIVE_CONFIG_PATH)

server.CreateAndRunAllFromConfig(capacity, abs_config_path, verbose)
components := server.CreateAndRunAllFromConfig(capacity, abs_config_path, verbose)

// start client
c := cache_client.NewClientWrapper(abs_cert_dir, abs_config_path)
Expand Down Expand Up @@ -89,4 +103,16 @@ func Test10kConcurrentGrpcPuts(t *testing.T) {
}
wg.Wait()
t.Logf("Cache misses: %d/10,000 (%f%%)", int(miss), miss/10000)

// cleanup
for _, srv_comps := range components {
srv_comps.GrpcServer.Stop()

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

if err := srv_comps.HttpServer.Shutdown(ctx); err != nil {
t.Logf("Http server shutdown error: %s", err)
}
}
}
39 changes: 33 additions & 6 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ type Pair struct {
Value string `json:"value"`
}

type ServerComponents struct {
GrpcServer *grpc.Server
HttpServer *http.Server
}

const (
DYNAMIC = "DYNAMIC"
)
Expand All @@ -69,7 +74,7 @@ const (
// Set node_id param to DYNAMIC to dynamically discover node id.
// Otherwise, manually set it to a valid node_id from the config file.
// Returns tuple of (gRPC server instance, registered Cache CacheServer instance).
func GetNewCacheServer(capacity int, config_file string, verbose bool, node_id string) (*grpc.Server, *CacheServer) {
func NewCacheServer(capacity int, config_file string, verbose bool, node_id string) (*grpc.Server, *CacheServer) {
// set up logging
sugared_logger := GetSugaredZapLogger(verbose)

Expand Down Expand Up @@ -161,8 +166,24 @@ func (s *CacheServer) PutHandler(c *gin.Context) {
c.IndentedJSON(http.StatusCreated, <-result)
}

func (s *CacheServer) RunHttpServer(port int) {
s.router.Run(fmt.Sprintf(":%d", port))
func (s *CacheServer) RunAndReturnHttpServer(port int) *http.Server {
// setup http server
addr := fmt.Sprintf(":%d", port)
srv := &http.Server{
Addr: addr,
Handler: s.router,
}

// run in background
go func() {
// service connections
if err := srv.ListenAndServe(); err != nil {
log.Printf("listen: %s\n", err)
}
}()

// return server object so we can shutdown gracefully later
return srv
}

// gRPC handler for getting item from cache. Any replica in the group can serve read requests.
Expand Down Expand Up @@ -321,10 +342,13 @@ func (s *CacheServer) RegisterNodeInternal() {
}
}

func CreateAndRunAllFromConfig(capacity int, config_file string, verbose bool) {
// Create and run all servers defined in config file and return list of server components
func CreateAndRunAllFromConfig(capacity int, config_file string, verbose bool) []ServerComponents {
log.Printf("Creating and running all nodes from config file: %s", config_file)
config := node.LoadNodesConfig(config_file)

var components []ServerComponents

for _, nodeInfo := range config.Nodes {
// set up listener TCP connectiion
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", nodeInfo.GrpcPort))
Expand All @@ -333,7 +357,7 @@ func CreateAndRunAllFromConfig(capacity int, config_file string, verbose bool) {
}

// get new grpc id server
grpc_server, cache_server := GetNewCacheServer(capacity, config_file, verbose, nodeInfo.Id)
grpc_server, cache_server := NewCacheServer(capacity, config_file, verbose, nodeInfo.Id)

// run gRPC server
log.Printf("Running gRPC server on port %d...", nodeInfo.GrpcPort)
Expand All @@ -351,6 +375,9 @@ func CreateAndRunAllFromConfig(capacity int, config_file string, verbose bool) {

// run HTTP server
log.Printf("Running REST API server on port %d...", nodeInfo.RestPort)
go cache_server.RunHttpServer(int(nodeInfo.RestPort))
http_server := cache_server.RunAndReturnHttpServer(int(nodeInfo.RestPort))

components = append(components, ServerComponents{GrpcServer: grpc_server, HttpServer: http_server})
}
return components
}

0 comments on commit 24a2217

Please sign in to comment.