diff --git a/clients/r2cbe/events_client.go b/clients/r2cbe/events_client.go index e3b0c410..33fc19f1 100644 --- a/clients/r2cbe/events_client.go +++ b/clients/r2cbe/events_client.go @@ -4,19 +4,16 @@ import ( "encoding/json" "fmt" "io" - "net/http" "net/url" "time" "github.com/andybalholm/brotli" "github.com/forta-network/forta-core-go/protocol" + "github.com/forta-network/forta-core-go/utils/httpclient" "google.golang.org/protobuf/proto" ) type combinedBlockEventsClient struct { - dispatcherClient *http.Client - r2Client *http.Client - dispatcherURL *url.URL dispatcherPath string } @@ -24,14 +21,9 @@ type combinedBlockEventsClient struct { func NewCombinedBlockEventsClient(dispatcherURL string) *combinedBlockEventsClient { u, _ := url.Parse(dispatcherURL) - dispatcherClient := http.DefaultClient - dispatcherClient.Timeout = 10 * time.Second - return &combinedBlockEventsClient{ - dispatcherClient: dispatcherClient, - r2Client: http.DefaultClient, - dispatcherURL: u, - dispatcherPath: u.Path, + dispatcherURL: u, + dispatcherPath: u.Path, } } @@ -47,7 +39,7 @@ func (c *combinedBlockEventsClient) GetCombinedBlockEvents(bucket int64) (_ *pro return nil, err } - resp, err := c.dispatcherClient.Get(c.dispatcherURL.String()) + resp, err := httpclient.Default.Get(c.dispatcherURL.String()) if err != nil { return nil, err } @@ -60,7 +52,11 @@ func (c *combinedBlockEventsClient) GetCombinedBlockEvents(bucket int64) (_ *pro return nil, err } - resp, err = c.r2Client.Get(item.PresignedURL) + if item.ExpiresAt < time.Now().Unix() { + return nil, fmt.Errorf("presigned URL expired") + } + + resp, err = httpclient.Default.Get(item.PresignedURL) if err != nil { return nil, err } diff --git a/services/json-rpc/cache/cache.go b/services/json-rpc/cache/cache.go index 83fbe0e2..ea61bc67 100644 --- a/services/json-rpc/cache/cache.go +++ b/services/json-rpc/cache/cache.go @@ -4,78 +4,55 @@ import ( "fmt" "strconv" "strings" - "sync" "time" "github.com/forta-network/forta-core-go/domain" "github.com/forta-network/forta-core-go/protocol" + "github.com/patrickmn/go-cache" ) -type cache struct { - chains map[uint64]*chainCache - cacheExpire time.Duration +type inMemory struct { + cache *cache.Cache } -func (c *cache) collectGarbage(garbage map[uint64][]string) { - for chainID, keys := range garbage { - c.chains[chainID].collectGarbage(keys) +func NewCache(expire time.Duration) *inMemory { + return &inMemory{ + cache: cache.New(expire, expire), } } -func (c *cache) Append(events *protocol.CombinedBlockEvents) { - garbage := make(map[uint64][]string, 0) - defer func() { - go func() { - time.Sleep(c.cacheExpire) - c.collectGarbage(garbage) - }() - }() - +func (c *inMemory) Append(events *protocol.CombinedBlockEvents) { for _, event := range events.Events { chainID := event.ChainID - cc, ok := c.chains[chainID] - if !ok { - cc = &chainCache{ - mu: &sync.RWMutex{}, - data: make(map[string]interface{}), - } - c.chains[chainID] = cc - } - - keys := make([]string, 0) // eth_blockNumber method := "eth_blockNumber" params := "[]" - if val, ok := cc.get(method, params); ok { + if val, ok := c.cache.Get(cacheKey(chainID, method, params)); ok { blockNumber, ok := val.(string) - // if the new block number is later than the cached one, update the cache + // if the new block number is later than the cached one, update the inMemory if ok && isLater(blockNumber, event.Block.Number) { - cc.put(method, params, event.Block.Number) + c.cache.SetDefault(cacheKey(chainID, method, params), event.Block.Number) } } else { - cc.put(method, params, event.Block.Number) + c.cache.SetDefault(cacheKey(chainID, method, params), event.Block.Number) } - keys = append(keys, cacheKey(method, params)) - // eth_getBlockByNumber method = "eth_getBlockByNumber" params = fmt.Sprintf(`["%s", "true"]`, event.Block.Number) block := domain.BlockFromCombinedBlockEvent(event) - cc.put(method, params, block) - keys = append(keys, cacheKey(method, params)) + c.cache.SetDefault(cacheKey(chainID, method, params), block) // eth_getLogs method = "eth_getLogs" params = fmt.Sprintf(`[{"fromBlock":"%s","toBlock":"%s"}]`, event.Block.Number, event.Block.Number) logs := domain.LogsFromCombinedBlockEvent(event) - cc.put(method, params, logs) - keys = append(keys, cacheKey(method, params)) + c.cache.SetDefault(cacheKey(chainID, method, params), logs) // trace_block method = "trace_block" @@ -83,53 +60,17 @@ func (c *cache) Append(events *protocol.CombinedBlockEvents) { traces, err := domain.TracesFromCombinedBlockEvent(event) if err == nil { - cc.put(method, params, traces) - keys = append(keys, cacheKey(method, params)) + c.cache.SetDefault(cacheKey(chainID, method, params), traces) } - - garbage[chainID] = keys - } -} - -func (c *cache) Get(chainId uint64, method string, params string) (interface{}, bool) { - cc, ok := c.chains[chainId] - if ok { - return cc.get(method, params) } - - return nil, false -} - -type chainCache struct { - mu *sync.RWMutex - // key is method + params - data map[string]interface{} -} - -func (c *chainCache) put(method string, params string, result interface{}) { - c.mu.Lock() - defer c.mu.Unlock() - c.data[cacheKey(method, params)] = result } -func (c *chainCache) get(method string, params string) (interface{}, bool) { - c.mu.RLock() - defer c.mu.RUnlock() - result, ok := c.data[cacheKey(method, params)] - - return result, ok -} - -func (c *chainCache) collectGarbage(keys []string) { - c.mu.Lock() - defer c.mu.Unlock() - for _, key := range keys { - delete(c.data, key) - } +func (c *inMemory) Get(chainId uint64, method string, params string) (interface{}, bool) { + return c.cache.Get(cacheKey(chainId, method, params)) } -func cacheKey(method, params string) string { - return method + params +func cacheKey(chainId uint64, method, params string) string { + return fmt.Sprintf("%d-%s-%s", chainId, method, params) } func isLater(actual, new string) bool { diff --git a/services/json-rpc/cache/cache_test.go b/services/json-rpc/cache/cache_test.go index d7c8a481..0aa98d7f 100644 --- a/services/json-rpc/cache/cache_test.go +++ b/services/json-rpc/cache/cache_test.go @@ -9,10 +9,7 @@ import ( ) func TestCache(t *testing.T) { - cache := cache{ - chains: make(map[uint64]*chainCache), - cacheExpire: time.Millisecond * 500, - } + cache := NewCache(time.Millisecond * 500) cache.Append(events) diff --git a/services/json-rpc/cache/json_rpc_cache.go b/services/json-rpc/cache/json_rpc_cache.go index f9246d5c..3fc9ad17 100644 --- a/services/json-rpc/cache/json_rpc_cache.go +++ b/services/json-rpc/cache/json_rpc_cache.go @@ -26,7 +26,7 @@ type JsonRpcCache struct { server *http.Server - cache *cache + cache *inMemory cbeClient clients.CombinedBlockEventsClient } @@ -45,10 +45,7 @@ func NewJsonRpcCache(ctx context.Context, cfg config.JsonRpcCacheConfig) (*JsonR } func (c *JsonRpcCache) Start() error { - c.cache = &cache{ - chains: make(map[uint64]*chainCache), - cacheExpire: time.Duration(c.cfg.CacheExpirePeriodSeconds) * time.Second, - } + c.cache = NewCache(time.Duration(c.cfg.CacheExpirePeriodSeconds) * time.Second) c.server = &http.Server{ Addr: ":8575", @@ -145,6 +142,6 @@ func (c *JsonRpcCache) pollEvents() { log.Info("Added combined block events to local cache", "bucket", bucket, "events", len(events.Events)) c.cache.Append(events) - bucket += 10 + bucket += 10 // 10 seconds } } diff --git a/services/json-rpc/cache/json_rpc_cache_test.go b/services/json-rpc/cache/json_rpc_cache_test.go index f97e0bce..34fac22c 100644 --- a/services/json-rpc/cache/json_rpc_cache_test.go +++ b/services/json-rpc/cache/json_rpc_cache_test.go @@ -44,10 +44,7 @@ func TestJsonRpcCache(t *testing.T) { cfg: config.JsonRpcCacheConfig{ CacheExpirePeriodSeconds: 300, }, - cache: &cache{ - chains: make(map[uint64]*chainCache), - cacheExpire: 300 * time.Second, - }, + cache: NewCache(300 * time.Second), } go jrpCache.pollEvents()