From 334bcf851c4d309c631d232b5575c5c281f3527a Mon Sep 17 00:00:00 2001 From: dkeysil Date: Fri, 19 Apr 2024 13:45:32 +0200 Subject: [PATCH 1/3] Remove receipts from the publisher --- clients/alertapi/api.go | 12 ++---- clients/interfaces.go | 2 +- services/publisher/publisher.go | 75 ++------------------------------- 3 files changed, 9 insertions(+), 80 deletions(-) diff --git a/clients/alertapi/api.go b/clients/alertapi/api.go index 53ea4fea..4cb59632 100644 --- a/clients/alertapi/api.go +++ b/clients/alertapi/api.go @@ -16,7 +16,7 @@ type client struct { apiUrl string } -func (c *client) post(path string, body interface{}, headers map[string]string, target interface{}) error { +func (c *client) post(path string, body interface{}, headers map[string]string) error { jsonVal, err := json.Marshal(body) if err != nil { return err @@ -47,20 +47,16 @@ func (c *client) post(path string, body interface{}, headers map[string]string, }).Error("alert api error") return fmt.Errorf("%d error: %s", resp.StatusCode, string(b)) } - return json.Unmarshal(b, target) + return nil } -func (c *client) PostBatch(batch *domain.AlertBatchRequest, token string) (*domain.AlertBatchResponse, error) { +func (c *client) PostBatch(batch *domain.AlertBatchRequest, token string) error { path := fmt.Sprintf("/batch/%s", batch.Ref) headers := map[string]string{ "content-type": "application/json", "Authorization": fmt.Sprintf("Bearer %s", token), } - var resp domain.AlertBatchResponse - if err := c.post(path, batch, headers, &resp); err != nil { - return nil, err - } - return &resp, nil + return c.post(path, batch, headers) } func NewClient(apiUrl string) *client { diff --git a/clients/interfaces.go b/clients/interfaces.go index 0c550456..99f9f96b 100644 --- a/clients/interfaces.go +++ b/clients/interfaces.go @@ -60,7 +60,7 @@ type MessageClient interface { // AlertAPIClient calls an http api on the analyzer to store alerts type AlertAPIClient interface { - PostBatch(batch *domain.AlertBatchRequest, token string) (*domain.AlertBatchResponse, error) + PostBatch(batch *domain.AlertBatchRequest, token string) error } type IPAuthenticator interface { diff --git a/services/publisher/publisher.go b/services/publisher/publisher.go index a3fe463c..f016b201 100644 --- a/services/publisher/publisher.go +++ b/services/publisher/publisher.go @@ -29,12 +29,10 @@ import ( "github.com/forta-network/forta-node/clients" "github.com/forta-network/forta-node/clients/alertapi" "github.com/forta-network/forta-node/clients/messaging" - "github.com/forta-network/forta-node/clients/storagegrpc" "github.com/forta-network/forta-node/config" "github.com/forta-network/forta-node/services/components/estimation" "github.com/forta-network/forta-node/services/components/metrics" "github.com/forta-network/forta-node/services/publisher/webhooklog" - "github.com/forta-network/forta-node/services/storage" "github.com/forta-network/forta-node/store" ipfsapi "github.com/ipfs/go-ipfs-api" log "github.com/sirupsen/logrus" @@ -59,7 +57,6 @@ type Publisher struct { cfg PublisherConfig contract AlertsContract ipfs ipfs.Client - storage protocol.StorageClient metricsAggregator *AgentMetricsAggregator messageClient clients.MessageClient alertClient clients.AlertAPIClient @@ -67,8 +64,7 @@ type Publisher struct { lifecycleMetrics metrics.Lifecycle - batchRefStore store.StringStore - lastReceiptStore store.StringStore + batchRefStore store.StringStore blockTimeline estimation.BlockTimeline @@ -109,9 +105,6 @@ type Publisher struct { // LocalAlertClient sends the local alerts. type LocalAlertClient webhook.AlertWebhookClient -// StorageClient stores content. -type StorageClient protocol.StorageClient - // EthClient interacts with the Ethereum API. type EthClient interface { BlockNumber(ctx context.Context) (uint64, error) @@ -276,12 +269,6 @@ func (pub *Publisher) publishNextBatch(batch *protocol.AlertBatch) (published bo }, ) - var lastReceipt string - lr, err := pub.lastReceiptStore.Get() - if err == nil { - lastReceipt = lr - } - signedBatchSummary, err := security.SignBatchSummary( pub.cfg.Key, &protocol.BatchSummary{ Batch: cid, @@ -290,7 +277,6 @@ func (pub *Publisher) publishNextBatch(batch *protocol.AlertBatch) (published bo BlockEnd: batch.BlockEnd, AlertCount: batch.AlertCount, ScannerVersion: batch.ScannerVersion, - PreviousReceipt: lastReceipt, LatestBlockInput: batch.LatestBlockInput, Timestamp: time.Now().UTC().Format(time.RFC3339), }, @@ -302,7 +288,6 @@ func (pub *Publisher) publishNextBatch(batch *protocol.AlertBatch) (published bo scannerAddr := pub.cfg.Key.Address.Hex() - var resp *domain.AlertBatchResponse for i := 0; i < defaultBatchSendRetryTimes; i++ { var scannerJwt string scannerJwt, err = security.CreateScannerJWT( @@ -314,7 +299,7 @@ func (pub *Publisher) publishNextBatch(batch *protocol.AlertBatch) (published bo logger.WithError(err).Error("failed to create batch jwt") return false, err } - resp, err = pub.alertClient.PostBatch(&domain.AlertBatchRequest{ + err = pub.alertClient.PostBatch(&domain.AlertBatchRequest{ Scanner: scannerAddr, ChainID: int64(batch.ChainId), BlockStart: int64(batch.BlockStart), @@ -340,47 +325,6 @@ func (pub *Publisher) publishNextBatch(batch *protocol.AlertBatch) (published bo return false, fmt.Errorf("failed to send the alert tx: %v", err) } - if resp.SignedReceipt != nil { - // store off receipt id - if err := pub.lastReceiptStore.Put(resp.ReceiptID); err != nil { - logger.WithError(err).Error("failed to marshal receipt") - return true, err - } - logger = logger.WithFields( - log.Fields{ - "receiptId": resp.ReceiptID, - }, - ) - - // if for some reason receipt can't marshal, log and move on - b, err := json.Marshal(resp.SignedReceipt) - if err != nil { - logger.WithError(err).Error("failed to marshal receipt (not saving receipt)") - return true, nil - } - logger = logger.WithFields(log.Fields{ - "receipt": string(b), - }) - - if pub.cfg.Config.AdvancedConfig.IPFSExperiment { - ctx, cancel := context.WithTimeout(pub.ctx, time.Second*10) - defer cancel() - putResp, err := pub.storage.Put(ctx, &protocol.PutRequest{ - User: scannerAddr, - Kind: storage.KindBatchReceipt, - Bytes: b, - }) - if err != nil { - logger.WithError(err).Warn("failed to store batch receipt") - } else { - logger = logger.WithFields(log.Fields{ - "storedReceiptRef": putResp.ContentId, - "storedReceiptPath": putResp.ContentPath, - }) - } - } - } - logger.Info("alert batch") return true, nil @@ -864,15 +808,7 @@ func NewPublisher(ctx context.Context, blockTimeline estimation.BlockTimeline, c apiClient := alertapi.NewClient(cfg.Publish.APIURL) - var storageClient protocol.StorageClient - if !cfg.LocalModeConfig.Enable && cfg.AdvancedConfig.IPFSExperiment { - storageClient, err = storagegrpc.DialContext(ctx, fmt.Sprintf("%s:%s", config.DockerStorageContainerName, config.DefaultStoragePort)) - if err != nil { - return nil, fmt.Errorf("failed to dial the storage client: %v", err) - } - } - - return initPublisher(ctx, msgClient, lifecycleMetrics, apiClient, storageClient, blockTimeline, + return initPublisher(ctx, msgClient, lifecycleMetrics, apiClient, blockTimeline, PublisherConfig{ ChainID: cfg.ChainID, Key: key, @@ -886,8 +822,7 @@ func NewPublisher(ctx context.Context, blockTimeline estimation.BlockTimeline, c func initPublisher( ctx context.Context, mc clients.MessageClient, lifecycleMetrics metrics.Lifecycle, alertClient clients.AlertAPIClient, - storageClient StorageClient, blockTimeline estimation.BlockTimeline, - cfg PublisherConfig, + blockTimeline estimation.BlockTimeline, cfg PublisherConfig, ) (*Publisher, error) { ipfsClient, err := ipfs.NewClient(fmt.Sprintf("http://%s:5001", config.DockerIpfsContainerName)) if err != nil { @@ -929,14 +864,12 @@ func initPublisher( ctx: ctx, cfg: cfg, ipfs: ipfsClient, - storage: storageClient, metricsAggregator: NewMetricsAggregator(time.Duration(*cfg.PublisherConfig.Batch.MetricsBucketIntervalSeconds)*time.Second, int64(cfg.ChainID)), messageClient: mc, alertClient: alertClient, localAlertClient: localAlertClient, lifecycleMetrics: lifecycleMetrics, batchRefStore: store.NewFileStringStore(path.Join(cfg.Config.FortaDir, ".last-batch")), - lastReceiptStore: store.NewFileStringStore(path.Join(cfg.Config.FortaDir, ".last-receipt")), blockTimeline: blockTimeline, skipEmpty: cfg.PublisherConfig.Batch.SkipEmpty, From 3c16858e0d9683742e20de001cc852ca72403b0a Mon Sep 17 00:00:00 2001 From: dkeysil Date: Tue, 23 Apr 2024 09:19:28 +0200 Subject: [PATCH 2/3] Update forta-core-go --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index b9c2da3b..7fba9056 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/cenkalti/backoff/v4 v4.1.3 github.com/docker/docker v1.6.2 github.com/docker/go-connections v0.4.0 - github.com/forta-network/forta-core-go v0.0.0-20240410112425-80ab9a87259c + github.com/forta-network/forta-core-go v0.0.0-20240423071831-edccde967e5b github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_model v0.3.0 github.com/prometheus/common v0.39.0 diff --git a/go.sum b/go.sum index 368cb8ad..a2a35f1e 100644 --- a/go.sum +++ b/go.sum @@ -329,8 +329,8 @@ github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= -github.com/forta-network/forta-core-go v0.0.0-20240410112425-80ab9a87259c h1:f+lF42ZkBK6PnAY6yQXXn46xzRxk+e/alsito9AtByA= -github.com/forta-network/forta-core-go v0.0.0-20240410112425-80ab9a87259c/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo= +github.com/forta-network/forta-core-go v0.0.0-20240423071831-edccde967e5b h1:YoX67i29YPnXGXXvQYGm9oyFdC2MY10zMJqiZHxUd84= +github.com/forta-network/forta-core-go v0.0.0-20240423071831-edccde967e5b/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo= github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707 h1:f6I7K43i2m6AwHSsDxh0Mf3qFzYt8BKnabSl/zGFmh0= github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707/go.mod h1:nqTUF1REklpWLZ/M5HfzqhSHNz4dPVKzJvbLziqTZpw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= From 52ec5376ae0aee502d6fe054af807ef4e8dbea3d Mon Sep 17 00:00:00 2001 From: dkeysil Date: Tue, 23 Apr 2024 09:20:50 +0200 Subject: [PATCH 3/3] Re-generate mocks --- clients/mocks/mock_clients.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/clients/mocks/mock_clients.go b/clients/mocks/mock_clients.go index 85ef8ce4..688f4b2a 100644 --- a/clients/mocks/mock_clients.go +++ b/clients/mocks/mock_clients.go @@ -614,12 +614,11 @@ func (m *MockAlertAPIClient) EXPECT() *MockAlertAPIClientMockRecorder { } // PostBatch mocks base method. -func (m *MockAlertAPIClient) PostBatch(batch *domain.AlertBatchRequest, token string) (*domain.AlertBatchResponse, error) { +func (m *MockAlertAPIClient) PostBatch(batch *domain.AlertBatchRequest, token string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PostBatch", batch, token) - ret0, _ := ret[0].(*domain.AlertBatchResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret0, _ := ret[0].(error) + return ret0 } // PostBatch indicates an expected call of PostBatch.