Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Utility] Feat: add client-side session cache #888

Merged
merged 12 commits into from
Jul 25, 2023
87 changes: 87 additions & 0 deletions app/client/cli/cache/session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package cache
adshmh marked this conversation as resolved.
Show resolved Hide resolved

// TODO: add a TTL for cached sessions, since we know the sessions' length
import (
"encoding/json"
"errors"
"fmt"

"github.com/pokt-network/pocket/persistence/kvstore"
"github.com/pokt-network/pocket/rpc"
)

var errSessionNotFound = errors.New("session not found in cache")

// SessionCache defines the set of methods used to interact with the client-side session cache
type SessionCache interface {
Get(appAddr, chain string) (*rpc.Session, error)
Set(session *rpc.Session) error
Stop() error
}

// sessionCache stores and retrieves sessions for application+relaychain pairs
//
// It uses a key-value store as backing storage
type sessionCache struct {
// store is the local store for cached sessions
store kvstore.KVStore
}

// NewSessionCache returns a session cache backed by a kvstore using the provided database path.
func NewSessionCache(databasePath string) (SessionCache, error) {
store, err := kvstore.NewKVStore(databasePath)
if err != nil {
return nil, fmt.Errorf("Error initializing key-value store using path %s: %w", databasePath, err)
}

return &sessionCache{
store: store,
}, nil
}

// Get returns the cached session, if found, for an app+chain combination.
// The caller is responsible to verify that the returned session is valid for the current block height.
// Get is NOT safe to use concurrently
adshmh marked this conversation as resolved.
Show resolved Hide resolved
// DISCUSS: do we need concurrency here?
func (s *sessionCache) Get(appAddr, chain string) (*rpc.Session, error) {
key := sessionKey(appAddr, chain)
bz, err := s.store.Get(key)
if err != nil {
return nil, fmt.Errorf("error getting session from the store: %s %w", err.Error(), errSessionNotFound)
}

var session rpc.Session
if err := json.Unmarshal(bz, &session); err != nil {
return nil, fmt.Errorf("error unmarshalling session from store: %w", err)
}

return &session, nil
}

// Set stores the provided session in the cache with the key being the app+chain combination.
// For each app+chain combination, a single session will be stored. Subsequent calls to Set will overwrite the entry for the provided app and chain.
// Set is NOT safe to use concurrently
func (s *sessionCache) Set(session *rpc.Session) error {
bz, err := json.Marshal(*session)
if err != nil {
return fmt.Errorf("error marshalling session for app: %s, chain: %s, session height: %d: %w", session.Application.Address, session.Chain, session.SessionHeight, err)
}

key := sessionKey(session.Application.Address, session.Chain)
if err := s.store.Set(key, bz); err != nil {
return fmt.Errorf("error storing session for app: %s, chain: %s, session height: %d in the cache: %w", session.Application.Address, session.Chain, session.SessionHeight, err)
}
return nil
}

// Stop call stop on the backing store. No calls should be made to Get or Set after calling Stop.
func (s *sessionCache) Stop() error {
return s.store.Stop()
}

// sessionKey returns a key to get/set a session, based on application's address and the relay chain.
//
// The height is not used as part of the key, because for each app+chain combination only one session, i.e. the current one, is of interest.
func sessionKey(appAddr, chain string) []byte {
return []byte(fmt.Sprintf("%s-%s", appAddr, chain))
}
75 changes: 75 additions & 0 deletions app/client/cli/cache/session_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package cache

import (
"os"
"testing"

"github.com/stretchr/testify/require"

"github.com/pokt-network/pocket/rpc"
)

func TestGet(t *testing.T) {
const (
app1 = "app1Addr"
relaychainEth = "ETH-Goerli"
numSessionBlocks = 4
sessionHeight = 8
sessionNumber = 2
)

session1 := &rpc.Session{
Application: rpc.ProtocolActor{
ActorType: rpc.Application,
Address: "app1Addr",
Chains: []string{relaychainEth},
},
Chain: relaychainEth,
NumSessionBlocks: numSessionBlocks,
SessionHeight: sessionHeight,
SessionNumber: sessionNumber,
}

testCases := []struct {
name string
cacheContents []*rpc.Session
app string
chain string
expected *rpc.Session
expectedErr error
}{
{
name: "Return cached session",
cacheContents: []*rpc.Session{session1},
app: app1,
chain: relaychainEth,
expected: session1,
},
{
name: "Error returned for session not found in cache",
app: "foo",
chain: relaychainEth,
expectedErr: errSessionNotFound,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
dbPath, err := os.MkdirTemp("", "cacheStoragePath")
adshmh marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
defer os.RemoveAll(dbPath)

cache, err := NewSessionCache(dbPath)
require.NoError(t, err)

for _, s := range tc.cacheContents {
err := cache.Set(s)
require.NoError(t, err)
}

got, err := cache.Get(tc.app, tc.chain)
require.ErrorIs(t, err, tc.expectedErr)
require.EqualValues(t, tc.expected, got)
})
}
}
62 changes: 61 additions & 1 deletion app/client/cli/servicer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,39 @@ import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"net/http"

"github.com/spf13/cobra"

"github.com/pokt-network/pocket/app/client/cli/cache"
"github.com/pokt-network/pocket/app/client/cli/flags"
"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/rpc"
coreTypes "github.com/pokt-network/pocket/shared/core/types"
"github.com/pokt-network/pocket/shared/crypto"
)

// IMPROVE: make this configurable
const sessionCacheDBPath = "/tmp"
adshmh marked this conversation as resolved.
Show resolved Hide resolved

var (
errNoSessionCache = errors.New("session cache not set up")
errSessionNotFoundInCache = errors.New("session not found in cache")
errNoMatchingSessionInCache = errors.New("no session matching the requested height found in cache")

sessionCache cache.SessionCache
)

func init() {
rootCmd.AddCommand(NewServicerCommand())

var err error
sessionCache, err = cache.NewSessionCache(sessionCacheDBPath)
if err != nil {
logger.Global.Warn().Err(err).Msg("failed to initialize session cache")
}
}

func NewServicerCommand() *cobra.Command {
Expand Down Expand Up @@ -52,6 +72,12 @@ Will prompt the user for the *application* account passphrase`,
Aliases: []string{},
Args: cobra.ExactArgs(4),
RunE: func(cmd *cobra.Command, args []string) error {
defer func() {
if err := sessionCache.Stop(); err != nil {
logger.Global.Warn().Err(err).Msg("failed to stop session cache")
}
}()

applicationAddr := args[0]
servicerAddr := args[1]
chain := args[2]
Expand Down Expand Up @@ -115,13 +141,37 @@ func validateServicer(session *rpc.Session, servicerAddress string) (*rpc.Protoc
return nil, fmt.Errorf("Error getting servicer: address %s does not match any servicers in the session %d", servicerAddress, session.SessionNumber)
}

// getSessionFromCache uses the client-side session cache to fetch a session for app+chain combination at the provided height, if one has already been retrieved and cached.
func getSessionFromCache(c cache.SessionCache, appAddress, chain string, height int64) (*rpc.Session, error) {
if c == nil {
return nil, errNoSessionCache
}

session, err := c.Get(appAddress, chain)
if err != nil {
return nil, fmt.Errorf("%w: %s", errSessionNotFoundInCache, err.Error())
}

// verify the cached session matches the provided height
if height >= session.SessionHeight && height < session.SessionHeight+session.NumSessionBlocks {
return session, nil
}

return nil, errNoMatchingSessionInCache
}

func getCurrentSession(ctx context.Context, appAddress, chain string) (*rpc.Session, error) {
// CONSIDERATION: passing 0 as the height value to get the current session seems more optimal than this.
currentHeight, err := getCurrentHeight(ctx)
if err != nil {
return nil, fmt.Errorf("Error getting current session: %w", err)
}

session, err := getSessionFromCache(sessionCache, appAddress, chain, currentHeight)
if err == nil {
return session, nil
}

req := rpc.SessionRequest{
AppAddress: appAddress,
Chain: chain,
Expand All @@ -148,7 +198,17 @@ func getCurrentSession(ctx context.Context, appAddress, chain string) (*rpc.Sess
return nil, fmt.Errorf("Error getting current session: Unexpected response %v", resp)
}

return resp.JSON200, nil
session = resp.JSON200
if sessionCache == nil {
logger.Global.Warn().Msg("session cache not available: cannot cache the retrieved session")
return session, nil
adshmh marked this conversation as resolved.
Show resolved Hide resolved
}

if err := sessionCache.Set(session); err != nil {
logger.Global.Warn().Err(err).Msg("failed to store session in cache")
}

return session, nil
}

// REFACTOR: reuse this function in all the query commands
Expand Down
88 changes: 88 additions & 0 deletions app/client/cli/servicer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cli

import (
"os"
"testing"

"github.com/stretchr/testify/require"

"github.com/pokt-network/pocket/app/client/cli/cache"
"github.com/pokt-network/pocket/rpc"
)

const (
testRelaychainEth = "ETH-Goerli"
testSessionHeight = 8
testCurrentHeight = 9
)

func TestGetSessionFromCache(t *testing.T) {
const app1Addr = "app1Addr"

testCases := []struct {
name string
cachedSessions []*rpc.Session
expected *rpc.Session
expectedErr error
}{
{
name: "cached session is returned",
cachedSessions: []*rpc.Session{testSession(app1Addr, testSessionHeight)},
expected: testSession(app1Addr, testSessionHeight),
},
{
name: "nil session cache returns an error",
expectedErr: errNoSessionCache,
},
{
name: "session not found in cache",
cachedSessions: []*rpc.Session{testSession("foo", testSessionHeight)},
expectedErr: errSessionNotFoundInCache,
},
{
name: "cached session does not match the provided height",
cachedSessions: []*rpc.Session{testSession(app1Addr, 9999999)},
expectedErr: errNoMatchingSessionInCache,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var c cache.SessionCache
// prepare cache with test session for this unit test
if len(tc.cachedSessions) > 0 {
adshmh marked this conversation as resolved.
Show resolved Hide resolved
dbPath, err := os.MkdirTemp("", "cliCacheStoragePath")
require.NoError(t, err)
defer os.RemoveAll(dbPath)

c, err = cache.NewSessionCache(dbPath)
require.NoError(t, err)

for _, s := range tc.cachedSessions {
err := c.Set(s)
require.NoError(t, err)
}
}

got, err := getSessionFromCache(c, app1Addr, testRelaychainEth, testCurrentHeight)
require.ErrorIs(t, err, tc.expectedErr)
require.EqualValues(t, tc.expected, got)
})
}
}

func testSession(appAddr string, height int64) *rpc.Session {
const numSessionBlocks = 4

return &rpc.Session{
Application: rpc.ProtocolActor{
ActorType: rpc.Application,
Address: appAddr,
Chains: []string{testRelaychainEth},
},
Chain: testRelaychainEth,
NumSessionBlocks: numSessionBlocks,
SessionHeight: height,
SessionNumber: (height / numSessionBlocks), // assumes numSessionBlocks never changed
}
}
Loading