Skip to content

Commit

Permalink
Use go-cache package
Browse files Browse the repository at this point in the history
  • Loading branch information
dkeysil committed Mar 12, 2024
1 parent b8d2119 commit d12aca9
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 104 deletions.
22 changes: 9 additions & 13 deletions clients/r2cbe/events_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,26 @@ import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"

"github.com/andybalholm/brotli"
"github.com/forta-network/forta-core-go/protocol"
"github.com/forta-network/forta-core-go/utils/httpclient"
"google.golang.org/protobuf/proto"
)

type combinedBlockEventsClient struct {
dispatcherClient *http.Client
r2Client *http.Client

dispatcherURL *url.URL
dispatcherPath string
}

func NewCombinedBlockEventsClient(dispatcherURL string) *combinedBlockEventsClient {
u, _ := url.Parse(dispatcherURL)

dispatcherClient := http.DefaultClient
dispatcherClient.Timeout = 10 * time.Second

return &combinedBlockEventsClient{
dispatcherClient: dispatcherClient,
r2Client: http.DefaultClient,
dispatcherURL: u,
dispatcherPath: u.Path,
dispatcherURL: u,
dispatcherPath: u.Path,
}
}

Expand All @@ -47,7 +39,7 @@ func (c *combinedBlockEventsClient) GetCombinedBlockEvents(bucket int64) (_ *pro
return nil, err
}

resp, err := c.dispatcherClient.Get(c.dispatcherURL.String())
resp, err := httpclient.Default.Get(c.dispatcherURL.String())
if err != nil {
return nil, err
}
Expand All @@ -60,7 +52,11 @@ func (c *combinedBlockEventsClient) GetCombinedBlockEvents(bucket int64) (_ *pro
return nil, err
}

resp, err = c.r2Client.Get(item.PresignedURL)
if item.ExpiresAt < time.Now().Unix() {
return nil, fmt.Errorf("presigned URL expired")
}

resp, err = httpclient.Default.Get(item.PresignedURL)
if err != nil {
return nil, err
}
Expand Down
95 changes: 18 additions & 77 deletions services/json-rpc/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,132 +4,73 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/forta-network/forta-core-go/domain"
"github.com/forta-network/forta-core-go/protocol"
"github.com/patrickmn/go-cache"
)

type cache struct {
chains map[uint64]*chainCache
cacheExpire time.Duration
type inMemory struct {
cache *cache.Cache
}

func (c *cache) collectGarbage(garbage map[uint64][]string) {
for chainID, keys := range garbage {
c.chains[chainID].collectGarbage(keys)
func NewCache(expire time.Duration) *inMemory {
return &inMemory{
cache: cache.New(expire, expire),
}
}

func (c *cache) Append(events *protocol.CombinedBlockEvents) {
garbage := make(map[uint64][]string, 0)
defer func() {
go func() {
time.Sleep(c.cacheExpire)
c.collectGarbage(garbage)
}()
}()

func (c *inMemory) Append(events *protocol.CombinedBlockEvents) {
for _, event := range events.Events {
chainID := event.ChainID
cc, ok := c.chains[chainID]
if !ok {
cc = &chainCache{
mu: &sync.RWMutex{},
data: make(map[string]interface{}),
}
c.chains[chainID] = cc
}

keys := make([]string, 0)

// eth_blockNumber
method := "eth_blockNumber"
params := "[]"

if val, ok := cc.get(method, params); ok {
if val, ok := c.cache.Get(cacheKey(chainID, method, params)); ok {
blockNumber, ok := val.(string)

// if the new block number is later than the cached one, update the cache
// if the new block number is later than the cached one, update the inMemory
if ok && isLater(blockNumber, event.Block.Number) {
cc.put(method, params, event.Block.Number)
c.cache.SetDefault(cacheKey(chainID, method, params), event.Block.Number)
}
} else {
cc.put(method, params, event.Block.Number)
c.cache.SetDefault(cacheKey(chainID, method, params), event.Block.Number)
}

keys = append(keys, cacheKey(method, params))

// eth_getBlockByNumber
method = "eth_getBlockByNumber"
params = fmt.Sprintf(`["%s", "true"]`, event.Block.Number)

block := domain.BlockFromCombinedBlockEvent(event)
cc.put(method, params, block)
keys = append(keys, cacheKey(method, params))
c.cache.SetDefault(cacheKey(chainID, method, params), block)

// eth_getLogs
method = "eth_getLogs"
params = fmt.Sprintf(`[{"fromBlock":"%s","toBlock":"%s"}]`, event.Block.Number, event.Block.Number)

logs := domain.LogsFromCombinedBlockEvent(event)
cc.put(method, params, logs)
keys = append(keys, cacheKey(method, params))
c.cache.SetDefault(cacheKey(chainID, method, params), logs)

// trace_block
method = "trace_block"
params = fmt.Sprintf(`["%s"]`, event.Block.Number)

traces, err := domain.TracesFromCombinedBlockEvent(event)
if err == nil {
cc.put(method, params, traces)
keys = append(keys, cacheKey(method, params))
c.cache.SetDefault(cacheKey(chainID, method, params), traces)
}

garbage[chainID] = keys
}
}

func (c *cache) Get(chainId uint64, method string, params string) (interface{}, bool) {
cc, ok := c.chains[chainId]
if ok {
return cc.get(method, params)
}

return nil, false
}

type chainCache struct {
mu *sync.RWMutex
// key is method + params
data map[string]interface{}
}

func (c *chainCache) put(method string, params string, result interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[cacheKey(method, params)] = result
}

func (c *chainCache) get(method string, params string) (interface{}, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
result, ok := c.data[cacheKey(method, params)]

return result, ok
}

func (c *chainCache) collectGarbage(keys []string) {
c.mu.Lock()
defer c.mu.Unlock()
for _, key := range keys {
delete(c.data, key)
}
func (c *inMemory) Get(chainId uint64, method string, params string) (interface{}, bool) {
return c.cache.Get(cacheKey(chainId, method, params))
}

func cacheKey(method, params string) string {
return method + params
func cacheKey(chainId uint64, method, params string) string {
return fmt.Sprintf("%d-%s-%s", chainId, method, params)
}

func isLater(actual, new string) bool {
Expand Down
5 changes: 1 addition & 4 deletions services/json-rpc/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ import (
)

func TestCache(t *testing.T) {
cache := cache{
chains: make(map[uint64]*chainCache),
cacheExpire: time.Millisecond * 500,
}
cache := NewCache(time.Millisecond * 500)

cache.Append(events)

Expand Down
9 changes: 3 additions & 6 deletions services/json-rpc/cache/json_rpc_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type JsonRpcCache struct {

server *http.Server

cache *cache
cache *inMemory

cbeClient clients.CombinedBlockEventsClient
}
Expand All @@ -45,10 +45,7 @@ func NewJsonRpcCache(ctx context.Context, cfg config.JsonRpcCacheConfig) (*JsonR
}

func (c *JsonRpcCache) Start() error {
c.cache = &cache{
chains: make(map[uint64]*chainCache),
cacheExpire: time.Duration(c.cfg.CacheExpirePeriodSeconds) * time.Second,
}
c.cache = NewCache(time.Duration(c.cfg.CacheExpirePeriodSeconds) * time.Second)

c.server = &http.Server{
Addr: ":8575",
Expand Down Expand Up @@ -145,6 +142,6 @@ func (c *JsonRpcCache) pollEvents() {

log.Info("Added combined block events to local cache", "bucket", bucket, "events", len(events.Events))
c.cache.Append(events)
bucket += 10
bucket += 10 // 10 seconds
}
}
5 changes: 1 addition & 4 deletions services/json-rpc/cache/json_rpc_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ func TestJsonRpcCache(t *testing.T) {
cfg: config.JsonRpcCacheConfig{
CacheExpirePeriodSeconds: 300,
},
cache: &cache{
chains: make(map[uint64]*chainCache),
cacheExpire: 300 * time.Second,
},
cache: NewCache(300 * time.Second),
}

go jrpCache.pollEvents()
Expand Down

0 comments on commit d12aca9

Please sign in to comment.