Skip to content

Commit

Permalink
[Utility] Feat: add client-side session cache (#888)
Browse files Browse the repository at this point in the history
## Description

Add a client-side cache for sessions and use it in the `servicer`
command.

## Issue

Fixes #791

## Type of change

Please mark the relevant option(s):

- [x] New feature, functionality or library
- [ ] Bug fix
- [ ] Code health or cleanup
- [ ] Major breaking change
- [ ] Documentation
- [ ] Other <!-- add details here if it a different type of change -->

## List of changes

- A session cache in the client package
- Use the new session cache in the servicer command

## Testing

- [x] `make develop_test`; if any code changes were made
- [x] `make test_e2e` on [k8s
LocalNet](https://github.com/pokt-network/pocket/blob/main/build/localnet/README.md);
if any code changes were made
- [ ] `e2e-devnet-test` passes tests on
[DevNet](https://pocketnetwork.notion.site/How-to-DevNet-ff1598f27efe44c09f34e2aa0051f0dd);
if any code was changed
- [ ] [Docker Compose
LocalNet](https://github.com/pokt-network/pocket/blob/main/docs/development/README.md);
if any major functionality was changed or introduced
- [x] [k8s
LocalNet](https://github.com/pokt-network/pocket/blob/main/build/localnet/README.md);
if any infrastructure or configuration changes were made

## Required Checklist

- [x] I have performed a self-review of my own code
- [x] I have commented my code, particularly in hard-to-understand areas
- [x] I have added, or updated, [`godoc` format
comments](https://go.dev/blog/godoc) on touched members (see:
[tip.golang.org/doc/comment](https://tip.golang.org/doc/comment))
- [x] I have tested my changes using the available tooling

### If Applicable Checklist

- [ ] I have updated the corresponding README(s); local and/or global
- [x] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have added, or updated,
[mermaid.js](https://mermaid-js.github.io) diagrams in the corresponding
README(s)
- [ ] I have added, or updated, documentation and
[mermaid.js](https://mermaid-js.github.io) diagrams in `shared/docs/*`
if I updated `shared/*`README(s)

---------

Co-authored-by: Daniel Olshansky <[email protected]>
  • Loading branch information
adshmh and Olshansk authored Jul 25, 2023
1 parent 98bac7c commit 77dc729
Show file tree
Hide file tree
Showing 4 changed files with 311 additions and 1 deletion.
87 changes: 87 additions & 0 deletions app/client/cli/cache/session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package cache

// TODO: add a TTL for cached sessions, since we know the sessions' length
import (
"encoding/json"
"errors"
"fmt"

"github.com/pokt-network/pocket/persistence/kvstore"
"github.com/pokt-network/pocket/rpc"
)

var errSessionNotFound = errors.New("session not found in cache")

// SessionCache defines the set of methods used to interact with the client-side session cache
type SessionCache interface {
Get(appAddr, chain string) (*rpc.Session, error)
Set(session *rpc.Session) error
Stop() error
}

// sessionCache stores and retrieves sessions for application+relaychain pairs
//
// It uses a key-value store as backing storage
type sessionCache struct {
// store is the local store for cached sessions
store kvstore.KVStore
}

// NewSessionCache returns a session cache backed by a kvstore using the provided database path.
func NewSessionCache(databasePath string) (SessionCache, error) {
store, err := kvstore.NewKVStore(databasePath)
if err != nil {
return nil, fmt.Errorf("Error initializing key-value store using path %s: %w", databasePath, err)
}

return &sessionCache{
store: store,
}, nil
}

// Get returns the cached session, if found, for an app+chain combination.
// The caller is responsible to verify that the returned session is valid for the current block height.
// Get is NOT safe to use concurrently
// DISCUSS: do we need concurrency here?
func (s *sessionCache) Get(appAddr, chain string) (*rpc.Session, error) {
key := sessionKey(appAddr, chain)
bz, err := s.store.Get(key)
if err != nil {
return nil, fmt.Errorf("error getting session from the store: %s %w", err.Error(), errSessionNotFound)
}

var session rpc.Session
if err := json.Unmarshal(bz, &session); err != nil {
return nil, fmt.Errorf("error unmarshalling session from store: %w", err)
}

return &session, nil
}

// Set stores the provided session in the cache with the key being the app+chain combination.
// For each app+chain combination, a single session will be stored. Subsequent calls to Set will overwrite the entry for the provided app and chain.
// Set is NOT safe to use concurrently
func (s *sessionCache) Set(session *rpc.Session) error {
bz, err := json.Marshal(*session)
if err != nil {
return fmt.Errorf("error marshalling session for app: %s, chain: %s, session height: %d: %w", session.Application.Address, session.Chain, session.SessionHeight, err)
}

key := sessionKey(session.Application.Address, session.Chain)
if err := s.store.Set(key, bz); err != nil {
return fmt.Errorf("error storing session for app: %s, chain: %s, session height: %d in the cache: %w", session.Application.Address, session.Chain, session.SessionHeight, err)
}
return nil
}

// Stop call stop on the backing store. No calls should be made to Get or Set after calling Stop.
func (s *sessionCache) Stop() error {
return s.store.Stop()
}

// sessionKey returns a key to get/set a session, based on application's address and the relay chain.
//
// The height is not used as part of the key, because for each app+chain combination only one session, i.e. the current one, is of interest.
func sessionKey(appAddr, chain string) []byte {
return []byte(fmt.Sprintf("%s-%s", appAddr, chain))
}
75 changes: 75 additions & 0 deletions app/client/cli/cache/session_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package cache

import (
"os"
"testing"

"github.com/stretchr/testify/require"

"github.com/pokt-network/pocket/rpc"
)

func TestGet(t *testing.T) {
const (
app1 = "app1Addr"
relaychainEth = "ETH-Goerli"
numSessionBlocks = 4
sessionHeight = 8
sessionNumber = 2
)

session1 := &rpc.Session{
Application: rpc.ProtocolActor{
ActorType: rpc.Application,
Address: "app1Addr",
Chains: []string{relaychainEth},
},
Chain: relaychainEth,
NumSessionBlocks: numSessionBlocks,
SessionHeight: sessionHeight,
SessionNumber: sessionNumber,
}

testCases := []struct {
name string
cacheContents []*rpc.Session
app string
chain string
expected *rpc.Session
expectedErr error
}{
{
name: "Return cached session",
cacheContents: []*rpc.Session{session1},
app: app1,
chain: relaychainEth,
expected: session1,
},
{
name: "Error returned for session not found in cache",
app: "foo",
chain: relaychainEth,
expectedErr: errSessionNotFound,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
dbPath, err := os.MkdirTemp("", "cacheStoragePath")
require.NoError(t, err)
defer os.RemoveAll(dbPath)

cache, err := NewSessionCache(dbPath)
require.NoError(t, err)

for _, s := range tc.cacheContents {
err := cache.Set(s)
require.NoError(t, err)
}

got, err := cache.Get(tc.app, tc.chain)
require.ErrorIs(t, err, tc.expectedErr)
require.EqualValues(t, tc.expected, got)
})
}
}
62 changes: 61 additions & 1 deletion app/client/cli/servicer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,39 @@ import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"net/http"

"github.com/spf13/cobra"

"github.com/pokt-network/pocket/app/client/cli/cache"
"github.com/pokt-network/pocket/app/client/cli/flags"
"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/rpc"
coreTypes "github.com/pokt-network/pocket/shared/core/types"
"github.com/pokt-network/pocket/shared/crypto"
)

// IMPROVE: make this configurable
const sessionCacheDBPath = "/tmp"

var (
errNoSessionCache = errors.New("session cache not set up")
errSessionNotFoundInCache = errors.New("session not found in cache")
errNoMatchingSessionInCache = errors.New("no session matching the requested height found in cache")

sessionCache cache.SessionCache
)

func init() {
rootCmd.AddCommand(NewServicerCommand())

var err error
sessionCache, err = cache.NewSessionCache(sessionCacheDBPath)
if err != nil {
logger.Global.Warn().Err(err).Msg("failed to initialize session cache")
}
}

func NewServicerCommand() *cobra.Command {
Expand Down Expand Up @@ -52,6 +72,12 @@ Will prompt the user for the *application* account passphrase`,
Aliases: []string{},
Args: cobra.ExactArgs(4),
RunE: func(cmd *cobra.Command, args []string) error {
defer func() {
if err := sessionCache.Stop(); err != nil {
logger.Global.Warn().Err(err).Msg("failed to stop session cache")
}
}()

applicationAddr := args[0]
servicerAddr := args[1]
chain := args[2]
Expand Down Expand Up @@ -115,13 +141,37 @@ func validateServicer(session *rpc.Session, servicerAddress string) (*rpc.Protoc
return nil, fmt.Errorf("Error getting servicer: address %s does not match any servicers in the session %d", servicerAddress, session.SessionNumber)
}

// getSessionFromCache uses the client-side session cache to fetch a session for app+chain combination at the provided height, if one has already been retrieved and cached.
func getSessionFromCache(c cache.SessionCache, appAddress, chain string, height int64) (*rpc.Session, error) {
if c == nil {
return nil, errNoSessionCache
}

session, err := c.Get(appAddress, chain)
if err != nil {
return nil, fmt.Errorf("%w: %s", errSessionNotFoundInCache, err.Error())
}

// verify the cached session matches the provided height
if height >= session.SessionHeight && height < session.SessionHeight+session.NumSessionBlocks {
return session, nil
}

return nil, errNoMatchingSessionInCache
}

func getCurrentSession(ctx context.Context, appAddress, chain string) (*rpc.Session, error) {
// CONSIDERATION: passing 0 as the height value to get the current session seems more optimal than this.
currentHeight, err := getCurrentHeight(ctx)
if err != nil {
return nil, fmt.Errorf("Error getting current session: %w", err)
}

session, err := getSessionFromCache(sessionCache, appAddress, chain, currentHeight)
if err == nil {
return session, nil
}

req := rpc.SessionRequest{
AppAddress: appAddress,
Chain: chain,
Expand All @@ -148,7 +198,17 @@ func getCurrentSession(ctx context.Context, appAddress, chain string) (*rpc.Sess
return nil, fmt.Errorf("Error getting current session: Unexpected response %v", resp)
}

return resp.JSON200, nil
session = resp.JSON200
if sessionCache == nil {
logger.Global.Warn().Msg("session cache not available: cannot cache the retrieved session")
return session, nil
}

if err := sessionCache.Set(session); err != nil {
logger.Global.Warn().Err(err).Msg("failed to store session in cache")
}

return session, nil
}

// REFACTOR: reuse this function in all the query commands
Expand Down
88 changes: 88 additions & 0 deletions app/client/cli/servicer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cli

import (
"os"
"testing"

"github.com/stretchr/testify/require"

"github.com/pokt-network/pocket/app/client/cli/cache"
"github.com/pokt-network/pocket/rpc"
)

const (
testRelaychainEth = "ETH-Goerli"
testSessionHeight = 8
testCurrentHeight = 9
)

func TestGetSessionFromCache(t *testing.T) {
const app1Addr = "app1Addr"

testCases := []struct {
name string
cachedSessions []*rpc.Session
expected *rpc.Session
expectedErr error
}{
{
name: "cached session is returned",
cachedSessions: []*rpc.Session{testSession(app1Addr, testSessionHeight)},
expected: testSession(app1Addr, testSessionHeight),
},
{
name: "nil session cache returns an error",
expectedErr: errNoSessionCache,
},
{
name: "session not found in cache",
cachedSessions: []*rpc.Session{testSession("foo", testSessionHeight)},
expectedErr: errSessionNotFoundInCache,
},
{
name: "cached session does not match the provided height",
cachedSessions: []*rpc.Session{testSession(app1Addr, 9999999)},
expectedErr: errNoMatchingSessionInCache,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var c cache.SessionCache
// prepare cache with test session for this unit test
if len(tc.cachedSessions) > 0 {
dbPath, err := os.MkdirTemp("", "cliCacheStoragePath")
require.NoError(t, err)
defer os.RemoveAll(dbPath)

c, err = cache.NewSessionCache(dbPath)
require.NoError(t, err)

for _, s := range tc.cachedSessions {
err := c.Set(s)
require.NoError(t, err)
}
}

got, err := getSessionFromCache(c, app1Addr, testRelaychainEth, testCurrentHeight)
require.ErrorIs(t, err, tc.expectedErr)
require.EqualValues(t, tc.expected, got)
})
}
}

func testSession(appAddr string, height int64) *rpc.Session {
const numSessionBlocks = 4

return &rpc.Session{
Application: rpc.ProtocolActor{
ActorType: rpc.Application,
Address: appAddr,
Chains: []string{testRelaychainEth},
},
Chain: testRelaychainEth,
NumSessionBlocks: numSessionBlocks,
SessionHeight: height,
SessionNumber: (height / numSessionBlocks), // assumes numSessionBlocks never changed
}
}

0 comments on commit 77dc729

Please sign in to comment.