Skip to content

Commit

Permalink
Merge pull request #4 from malwaredllc/integration-tests
Browse files Browse the repository at this point in the history
Integration tests
  • Loading branch information
danielvegamyhre authored May 15, 2022
2 parents 6700ab1 + 8de4e81 commit f9c55af
Show file tree
Hide file tree
Showing 11 changed files with 274 additions and 130 deletions.
7 changes: 3 additions & 4 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ jobs:
run: go build -v ./...

- name: Test LRU Cache
run: go test -v ./lru_cache

- name: Test Consistent Hashing Ring
run: go test -v ./ring
run: go test -v ./...
env:
GODEBUG: x509sha1=1
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

This may not be the best distributed cache, but it is a distributed cache.

Features:

### LRU eviction policy with O(1) operations
### Thread-safe LRU cache with O(1) operations
- Least-recently-used eviction policy with a configurable cache capacity ensures low cache-miss rate
- Get/Put operations and eviction run all run in **O(1) time**
- LRU cache implementation is thread safe

### Consistent Hashing
- Client uses **consistent hashing** to uniformly distribute requests and minimize required re-mappings when servers join/leave the cluster
Expand All @@ -21,3 +20,7 @@ Features:

### No single point of failure
- The distributed election algorithm allows any nodes to arbitrarily join/leave cluster at any time, and there is always guaranteed to be a leader tracking the state of nodes in the cluster to provide to clients for consistent hashing.

------------

## Example Usage
34 changes: 20 additions & 14 deletions client/cache_client/cache_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Payload struct {
type ClientWrapper struct {
Config nodelib.NodesConfig
Ring *ring.Ring
CertDir string
}

// Checks cluster config every 5 seconds and updates ring with any changes. Runs in infinite loop.
Expand All @@ -53,7 +54,7 @@ func (c *ClientWrapper) StartClusterConfigWatcher() {
attempted[randnode.Id] = true

// skip node we can't connect to
client, err := NewCacheClient(randnode.Host, int(randnode.GrpcPort))
client, err := NewCacheClient(c.CertDir, randnode.Host, int(randnode.GrpcPort))
if err != nil {
continue
}
Expand Down Expand Up @@ -81,7 +82,7 @@ func (c *ClientWrapper) StartClusterConfigWatcher() {
defer cancel()

// restart process if we can't connect to leader
client, err := NewCacheClient(leader.Host, int(leader.GrpcPort))
client, err := NewCacheClient(c.CertDir, leader.Host, int(leader.GrpcPort))
if err != nil {
continue
}
Expand Down Expand Up @@ -125,14 +126,14 @@ func (c *ClientWrapper) StartClusterConfigWatcher() {
}

// Create new Client struct instance and sets up node ring with consistent hashing
func NewClientWrapper(config_file string) *ClientWrapper {
func NewClientWrapper(cert_dir string, config_file string) *ClientWrapper {
// get initial nodes from config file and add them to the ring
init_nodes_config := nodelib.LoadNodesConfig(config_file)
ring := ring.NewRing()
var cluster_config []*pb.Node

for _, node := range init_nodes_config.Nodes {
c, err := NewCacheClient(node.Host, int(node.GrpcPort))
c, err := NewCacheClient(cert_dir, node.Host, int(node.GrpcPort))
if err != nil {
log.Printf("error: %v", err)
continue
Expand Down Expand Up @@ -164,21 +165,21 @@ func NewClientWrapper(config_file string) *ClientWrapper {
ring.AddNode(node.Id, node.Host, node.RestPort, node.GrpcPort)

// attempt to create client
c, err := NewCacheClient(node.Host, int(node.GrpcPort))
c, err := NewCacheClient(cert_dir, node.Host, int(node.GrpcPort))
if err != nil {
log.Printf("error: %v", err)
continue
}
config_map[node.Id].SetGrpcClient(c)
}
config := nodelib.NodesConfig{Nodes: config_map}
return &ClientWrapper{Config: config, Ring: ring}
return &ClientWrapper{Config: config, Ring: ring, CertDir: cert_dir}
}

// Utility funciton to get a new Cache Client which uses gRPC secured with mTLS
func NewCacheClient(server_host string, server_port int) (pb.CacheServiceClient, error) {
func NewCacheClient(cert_dir string, server_host string, server_port int) (pb.CacheServiceClient, error) {
// set up TLS
creds, err := LoadTLSCredentials()
creds, err := LoadTLSCredentials(cert_dir)
if err != nil {
log.Fatalf("failed to create credentials: %v", err)
}
Expand Down Expand Up @@ -248,7 +249,9 @@ func (c *ClientWrapper) Put(key string, value string) error {
}

// check response
_, err = new(http.Client).Do(req)
res, err := new(http.Client).Do(req)
defer res.Body.Close()

if err != nil {
return errors.New(fmt.Sprintf("error sending POST request: %s", err))
}
Expand All @@ -262,7 +265,7 @@ func (c *ClientWrapper) GetGrpc(key string) (string, error) {

// make new client if necessary
if nodeInfo.GrpcClient == nil {
c, err := NewCacheClient(nodeInfo.Host, int(nodeInfo.GrpcPort))
c, err := NewCacheClient(c.CertDir, nodeInfo.Host, int(nodeInfo.GrpcPort))
if err != nil {
return "", errors.New(fmt.Sprintf("error making gRPC client: %s", err))
}
Expand All @@ -288,7 +291,7 @@ func (c *ClientWrapper) PutGrpc(key string, value string) error {

// make new client if necessary
if nodeInfo.GrpcClient == nil {
c, err := NewCacheClient(nodeInfo.Host, int(nodeInfo.GrpcPort))
c, err := NewCacheClient(c.CertDir, nodeInfo.Host, int(nodeInfo.GrpcPort))
if err != nil {
return errors.New(fmt.Sprintf("error making gRPC client: %s", err))
}
Expand All @@ -308,9 +311,9 @@ func (c *ClientWrapper) PutGrpc(key string, value string) error {
}

// Utility function to set up mTLS config and credentials
func LoadTLSCredentials() (credentials.TransportCredentials, error) {
func LoadTLSCredentials(cert_dir string) (credentials.TransportCredentials, error) {
// Load certificate of the CA who signed server's certificate
pemServerCA, err := ioutil.ReadFile("../../certs/ca-cert.pem")
pemServerCA, err := ioutil.ReadFile(fmt.Sprintf("%s/ca-cert.pem", cert_dir))
if err != nil {
return nil, err
}
Expand All @@ -321,7 +324,10 @@ func LoadTLSCredentials() (credentials.TransportCredentials, error) {
}

// Load client's certificate and private key
clientCert, err := tls.LoadX509KeyPair("../../certs/client-cert.pem", "../../certs/client-key.pem")
clientCert, err := tls.LoadX509KeyPair(
fmt.Sprintf("%s/client-cert.pem", cert_dir),
fmt.Sprintf("%s/client-key.pem", cert_dir),
)
if err != nil {
return nil, err
}
Expand Down
64 changes: 0 additions & 64 deletions client/cache_client/cache_client_test.go

This file was deleted.

26 changes: 0 additions & 26 deletions client/main.go

This file was deleted.

25 changes: 25 additions & 0 deletions configs/nodes-local.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"nodes": {
"node0": {
"id": "node0",
"host": "localhost",
"rest_port": 8080,
"grpc_port": 5005,
"HashId": ""
},
"node1": {
"id": "node1",
"host": "localhost",
"rest_port": 8081,
"grpc_port": 5006,
"HashId": ""
},
"node2": {
"id": "node2",
"host": "localhost",
"rest_port": 8082,
"grpc_port": 5007,
"HashId": ""
}
}
}
7 changes: 2 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,12 @@ func main() {
}

// get new grpc id server
grpc_server, cache_server := server.GetNewCacheServer(*capacity, *config_file, *verbose)
grpc_server, cache_server := server.NewCacheServer(*capacity, *config_file, *verbose, server.DYNAMIC)

// run gRPC server
log.Printf("Running gRPC server on port %d...", *grpc_port)
go grpc_server.Serve(listener)

// set up grpc clients with each node in cluster
//cache_server.CreateAllGrpcClients()

// register node with cluster
cache_server.RegisterNodeInternal()

Expand All @@ -46,5 +43,5 @@ func main() {

// run HTTP server
log.Printf("Running REST API server on port %d...", *rest_port)
cache_server.RunHttpServer(*rest_port)
cache_server.RunAndReturnHttpServer(*rest_port)
}
Loading

0 comments on commit f9c55af

Please sign in to comment.