From 54ed67f8df335ceadd2e8eacfc277d5bce28567c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=A1clav=20Pavl=C3=ADn?= Date: Thu, 8 Aug 2024 08:17:48 +0200 Subject: [PATCH] chore(telemetry)_: use imported telemetry types, clean bandwith metrics --- go.mod | 3 +- go.sum | 5 +- telemetry/client.go | 257 +++++++++------- telemetry/client_test.go | 80 +++-- telemetry/common/common.go | 33 +++ vendor/github.com/rs/cors/.travis.yml | 9 - vendor/github.com/rs/cors/README.md | 32 +- vendor/github.com/rs/cors/cors.go | 279 +++++++++++------- .../github.com/rs/cors/internal/sortedset.go | 113 +++++++ vendor/github.com/rs/cors/utils.go | 69 +---- .../status-im/telemetry/LICENSE-APACHEv2 | 205 +++++++++++++ .../status-im/telemetry/LICENSE-MIT | 25 ++ .../status-im/telemetry/pkg/types/types.go | 113 +++++++ vendor/modules.txt | 8 +- wakuv2/common/publish_method.go | 19 ++ wakuv2/message_publishing.go | 36 +-- wakuv2/telemetry.go | 64 ---- wakuv2/waku.go | 42 +-- wakuv2/waku_test.go | 31 -- 19 files changed, 961 insertions(+), 462 deletions(-) create mode 100644 telemetry/common/common.go delete mode 100644 vendor/github.com/rs/cors/.travis.yml create mode 100644 vendor/github.com/rs/cors/internal/sortedset.go create mode 100644 vendor/github.com/status-im/telemetry/LICENSE-APACHEv2 create mode 100644 vendor/github.com/status-im/telemetry/LICENSE-MIT create mode 100644 vendor/github.com/status-im/telemetry/pkg/types/types.go create mode 100644 wakuv2/common/publish_method.go delete mode 100644 wakuv2/telemetry.go diff --git a/go.mod b/go.mod index 8805d71988..b6e031d468 100644 --- a/go.mod +++ b/go.mod @@ -95,6 +95,7 @@ require ( github.com/mutecomm/go-sqlcipher/v4 v4.4.2 github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 + github.com/status-im/telemetry v0.0.0-20240813130615-b4e1a202a77b github.com/urfave/cli/v2 v2.27.2 github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1 github.com/wk8/go-ordered-map/v2 v2.1.7 @@ -249,7 +250,7 @@ require ( github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/rjeczalik/notify v0.9.3 // indirect - github.com/rs/cors v1.7.0 // indirect + github.com/rs/cors v1.11.0 // indirect github.com/rs/dnscache v0.0.0-20210201191234-295bba877686 // indirect github.com/russolsen/ohyeah v0.0.0-20160324131710-f4938c005315 // indirect github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect diff --git a/go.sum b/go.sum index 668d506261..facdf62c75 100644 --- a/go.sum +++ b/go.sum @@ -1906,8 +1906,9 @@ github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTE github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= -github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= +github.com/rs/cors v1.11.0 h1:0B9GE/r9Bc2UxRMMtymBkHTenPkHDv0CW4Y98GBY+po= +github.com/rs/cors v1.11.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/rs/dnscache v0.0.0-20190621150935-06bb5526f76b/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA= github.com/rs/dnscache v0.0.0-20210201191234-295bba877686 h1:IJ6Df0uxPDtNoByV0KkzVKNseWvZFCNM/S9UoyOMCSI= github.com/rs/dnscache v0.0.0-20210201191234-295bba877686/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA= @@ -2048,6 +2049,8 @@ github.com/status-im/status-go/extkeys v1.1.2 h1:FSjARgDathJ3rIapJt851LsIXP9Oyuu github.com/status-im/status-go/extkeys v1.1.2/go.mod h1:hCmFzb2jiiVF2voZKYbzuhOQiHHCmyLJsZJXrFFg7BY= github.com/status-im/tcp-shaker v1.1.1-status h1:TnVeeWlq2SKCWotHc4Vi6qZQfY8TTe3VLmu1xpEFYhg= github.com/status-im/tcp-shaker v1.1.1-status/go.mod h1:RYo/itke1oU5k/6sj9DNM3QAwtE5rZSgg5JnkOv83hk= +github.com/status-im/telemetry v0.0.0-20240813130615-b4e1a202a77b h1:ELIoZLYXqcvcgZ3cB5i4EiI7oHIbT2Qcv36GiPwoS5o= +github.com/status-im/telemetry v0.0.0-20240813130615-b4e1a202a77b/go.mod h1:fkDGhVFJeQxBlYrhsRhIsujZEN2nNPmTXqp1M5x7HeY= github.com/status-im/zxcvbn-go v0.0.0-20220311183720-5e8676676857 h1:sPkzT7Z7uLmejOsBRlZ0kwDWpqjpHJsp834o5nbhqho= github.com/status-im/zxcvbn-go v0.0.0-20220311183720-5e8676676857/go.mod h1:lq9I5ROto5tcua65GmCE6SIW7VE0ucdEBs1fn4z7uWU= github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8= diff --git a/telemetry/client.go b/telemetry/client.go index 85b6112bb9..6c1201a9dd 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -12,41 +12,34 @@ import ( "go.uber.org/zap" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/transport" - "github.com/status-im/status-go/wakuv2" + + "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" + "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" v1protocol "github.com/status-im/status-go/protocol/v1" -) -type TelemetryType string + telemetrytypes "github.com/status-im/telemetry/pkg/types" -const ( - ProtocolStatsMetric TelemetryType = "ProtocolStats" - ReceivedEnvelopeMetric TelemetryType = "ReceivedEnvelope" - SentEnvelopeMetric TelemetryType = "SentEnvelope" - UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" - ReceivedMessagesMetric TelemetryType = "ReceivedMessages" - ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" - PeerCountMetric TelemetryType = "PeerCount" - PeerConnFailuresMetric TelemetryType = "PeerConnFailure" + telemetry "github.com/status-im/status-go/telemetry/common" +) +const ( MaxRetryCache = 5000 ) -type TelemetryRequest struct { - Id int `json:"id"` - TelemetryType TelemetryType `json:"telemetry_type"` - TelemetryData *json.RawMessage `json:"telemetry_data"` -} - func (c *Client) PushReceivedMessages(ctx context.Context, receivedMessages ReceivedMessages) { c.processAndPushTelemetry(ctx, receivedMessages) } -func (c *Client) PushSentEnvelope(ctx context.Context, sentEnvelope wakuv2.SentEnvelope) { +func (c *Client) PushSentEnvelope(ctx context.Context, sentEnvelope telemetry.SentEnvelope) { c.processAndPushTelemetry(ctx, sentEnvelope) } @@ -54,10 +47,14 @@ func (c *Client) PushReceivedEnvelope(ctx context.Context, receivedEnvelope *v2p c.processAndPushTelemetry(ctx, receivedEnvelope) } -func (c *Client) PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope wakuv2.ErrorSendingEnvelope) { +func (c *Client) PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope telemetry.ErrorSendingEnvelope) { c.processAndPushTelemetry(ctx, errorSendingEnvelope) } +func (c *Client) PushProtocolStats(ctx context.Context, stats telemetry.ProtocolStatsMap) { + c.processAndPushTelemetry(ctx, stats) +} + func (c *Client) PushPeerCount(ctx context.Context, peerCount int) { if peerCount != c.lastPeerCount { c.lastPeerCount = peerCount @@ -93,6 +90,7 @@ type PeerConnFailure struct { } type Client struct { + telemetry.ITelemetryClient serverURL string httpClient *http.Client logger *zap.Logger @@ -100,10 +98,10 @@ type Client struct { nodeName string peerId string version string - telemetryCh chan TelemetryRequest + telemetryCh chan telemetrytypes.TelemetryRequest telemetryCacheLock sync.Mutex - telemetryCache []TelemetryRequest - telemetryRetryCache []TelemetryRequest + telemetryCache []telemetrytypes.TelemetryRequest + telemetryRetryCache []telemetrytypes.TelemetryRequest nextIdLock sync.Mutex nextId int sendPeriod time.Duration @@ -134,10 +132,10 @@ func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName str keyUID: keyUID, nodeName: nodeName, version: version, - telemetryCh: make(chan TelemetryRequest), + telemetryCh: make(chan telemetrytypes.TelemetryRequest), telemetryCacheLock: sync.Mutex{}, - telemetryCache: make([]TelemetryRequest, 0), - telemetryRetryCache: make([]TelemetryRequest, 0), + telemetryCache: make([]telemetrytypes.TelemetryRequest, 0), + telemetryRetryCache: make([]telemetrytypes.TelemetryRequest, 0), nextId: 0, nextIdLock: sync.Mutex{}, sendPeriod: 10 * time.Second, // default value @@ -174,7 +172,7 @@ func (c *Client) Start(ctx context.Context) { select { case <-timer.C: c.telemetryCacheLock.Lock() - telemetryRequests := make([]TelemetryRequest, len(c.telemetryCache)) + telemetryRequests := make([]telemetrytypes.TelemetryRequest, len(c.telemetryCache)) copy(telemetryRequests, c.telemetryCache) c.telemetryCache = nil c.telemetryCacheLock.Unlock() @@ -199,44 +197,50 @@ func (c *Client) Start(ctx context.Context) { } func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) { - var telemetryRequest TelemetryRequest + var telemetryRequest telemetrytypes.TelemetryRequest switch v := data.(type) { case ReceivedMessages: - telemetryRequest = TelemetryRequest{ + telemetryRequest = telemetrytypes.TelemetryRequest{ Id: c.nextId, - TelemetryType: ReceivedMessagesMetric, + TelemetryType: telemetrytypes.ReceivedMessagesMetric, TelemetryData: c.ProcessReceivedMessages(v), } case *v2protocol.Envelope: - telemetryRequest = TelemetryRequest{ + telemetryRequest = telemetrytypes.TelemetryRequest{ Id: c.nextId, - TelemetryType: ReceivedEnvelopeMetric, + TelemetryType: telemetrytypes.ReceivedEnvelopeMetric, TelemetryData: c.ProcessReceivedEnvelope(v), } - case wakuv2.SentEnvelope: - telemetryRequest = TelemetryRequest{ + case telemetry.SentEnvelope: + telemetryRequest = telemetrytypes.TelemetryRequest{ Id: c.nextId, - TelemetryType: SentEnvelopeMetric, + TelemetryType: telemetrytypes.SentEnvelopeMetric, TelemetryData: c.ProcessSentEnvelope(v), } - case wakuv2.ErrorSendingEnvelope: - telemetryRequest = TelemetryRequest{ + case telemetry.ErrorSendingEnvelope: + telemetryRequest = telemetrytypes.TelemetryRequest{ Id: c.nextId, - TelemetryType: ErrorSendingEnvelopeMetric, + TelemetryType: telemetrytypes.ErrorSendingEnvelopeMetric, TelemetryData: c.ProcessErrorSendingEnvelope(v), } case PeerCount: - telemetryRequest = TelemetryRequest{ + telemetryRequest = telemetrytypes.TelemetryRequest{ Id: c.nextId, - TelemetryType: PeerCountMetric, + TelemetryType: telemetrytypes.PeerCountMetric, TelemetryData: c.ProcessPeerCount(v), } case PeerConnFailure: - telemetryRequest = TelemetryRequest{ + telemetryRequest = telemetrytypes.TelemetryRequest{ Id: c.nextId, - TelemetryType: PeerConnFailuresMetric, + TelemetryType: telemetrytypes.PeerConnFailureMetric, TelemetryData: c.ProcessPeerConnFailure(v), } + case telemetry.ProtocolStatsMap: + telemetryRequest = telemetrytypes.TelemetryRequest{ + Id: c.nextId, + TelemetryType: telemetrytypes.ProtocolStatsMetric, + TelemetryData: c.ProcessProtocolStats(v), + } default: c.logger.Error("Unknown telemetry data type") return @@ -254,7 +258,7 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) } // This is assuming to not run concurrently as we are not locking the `telemetryRetryCache` -func (c *Client) pushTelemetryRequest(request []TelemetryRequest) error { +func (c *Client) pushTelemetryRequest(request []telemetrytypes.TelemetryRequest) error { if len(c.telemetryRetryCache) > MaxRetryCache { //Limit the size of the cache to not grow the slice indefinitely in case the Telemetry server is gone for longer time removeNum := len(c.telemetryRetryCache) - MaxRetryCache c.telemetryRetryCache = c.telemetryRetryCache[removeNum:] @@ -288,21 +292,21 @@ func (c *Client) pushTelemetryRequest(request []TelemetryRequest) error { } func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *json.RawMessage { - var postBody []map[string]interface{} + var postBody []telemetrytypes.ReceivedMessage for _, message := range receivedMessages.Messages { - postBody = append(postBody, map[string]interface{}{ - "chatId": receivedMessages.Filter.ChatID, - "messageHash": types.EncodeHex(receivedMessages.SSHMessage.Hash), - "messageId": message.ApplicationLayer.ID, - "sentAt": receivedMessages.SSHMessage.Timestamp, - "pubsubTopic": receivedMessages.Filter.PubsubTopic, - "topic": receivedMessages.Filter.ContentTopic.String(), - "messageType": message.ApplicationLayer.Type.String(), - "receiverKeyUID": c.keyUID, - "peerId": c.peerId, - "nodeName": c.nodeName, - "messageSize": len(receivedMessages.SSHMessage.Payload), - "statusVersion": c.version, + postBody = append(postBody, telemetrytypes.ReceivedMessage{ + ChatID: receivedMessages.Filter.ChatID, + MessageHash: types.EncodeHex(receivedMessages.SSHMessage.Hash), + MessageID: message.ApplicationLayer.ID.String(), + SentAt: int64(receivedMessages.SSHMessage.Timestamp), + PubsubTopic: receivedMessages.Filter.PubsubTopic, + Topic: receivedMessages.Filter.ContentTopic.String(), + MessageType: message.ApplicationLayer.Type.String(), + ReceiverKeyUID: c.keyUID, + PeerID: c.peerId, + NodeName: c.nodeName, + MessageSize: len(receivedMessages.SSHMessage.Payload), + StatusVersion: c.version, }) } body, _ := json.Marshal(postBody) @@ -311,79 +315,83 @@ func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *jso } func (c *Client) ProcessReceivedEnvelope(envelope *v2protocol.Envelope) *json.RawMessage { - postBody := map[string]interface{}{ - "messageHash": envelope.Hash().String(), - "sentAt": uint32(envelope.Message().GetTimestamp() / int64(time.Second)), - "pubsubTopic": envelope.PubsubTopic(), - "topic": envelope.Message().ContentTopic, - "receiverKeyUID": c.keyUID, - "peerId": c.peerId, - "nodeName": c.nodeName, - "statusVersion": c.version, + postBody := telemetrytypes.ReceivedEnvelope{ + MessageHash: envelope.Hash().String(), + SentAt: envelope.Message().GetTimestamp() / int64(time.Second), + PubsubTopic: envelope.PubsubTopic(), + Topic: envelope.Message().ContentTopic, + ReceiverKeyUID: c.keyUID, + PeerID: c.peerId, + NodeName: c.nodeName, + StatusVersion: c.version, } body, _ := json.Marshal(postBody) jsonRawMessage := json.RawMessage(body) return &jsonRawMessage } -func (c *Client) ProcessSentEnvelope(sentEnvelope wakuv2.SentEnvelope) *json.RawMessage { - postBody := map[string]interface{}{ - "messageHash": sentEnvelope.Envelope.Hash().String(), - "sentAt": uint32(sentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)), - "pubsubTopic": sentEnvelope.Envelope.PubsubTopic(), - "topic": sentEnvelope.Envelope.Message().ContentTopic, - "senderKeyUID": c.keyUID, - "peerId": c.peerId, - "nodeName": c.nodeName, - "publishMethod": sentEnvelope.PublishMethod.String(), - "statusVersion": c.version, +func (c *Client) ProcessSentEnvelope(sentEnvelope telemetry.SentEnvelope) *json.RawMessage { + postBody := telemetrytypes.SentEnvelope{ + MessageHash: sentEnvelope.Envelope.Hash().String(), + SentAt: sentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second), + PubsubTopic: sentEnvelope.Envelope.PubsubTopic(), + Topic: sentEnvelope.Envelope.Message().ContentTopic, + SenderKeyUID: c.keyUID, + PeerID: c.peerId, + NodeName: c.nodeName, + PublishMethod: sentEnvelope.PublishMethod.String(), + StatusVersion: c.version, } body, _ := json.Marshal(postBody) jsonRawMessage := json.RawMessage(body) return &jsonRawMessage } -func (c *Client) ProcessErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendingEnvelope) *json.RawMessage { - postBody := map[string]interface{}{ - "messageHash": errorSendingEnvelope.SentEnvelope.Envelope.Hash().String(), - "sentAt": uint32(errorSendingEnvelope.SentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)), - "pubsubTopic": errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic(), - "topic": errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic, - "senderKeyUID": c.keyUID, - "peerId": c.peerId, - "nodeName": c.nodeName, - "publishMethod": errorSendingEnvelope.SentEnvelope.PublishMethod.String(), - "statusVersion": c.version, - "error": errorSendingEnvelope.Error.Error(), +func (c *Client) ProcessErrorSendingEnvelope(errorSendingEnvelope telemetry.ErrorSendingEnvelope) *json.RawMessage { + postBody := telemetrytypes.ErrorSendingEnvelope{ + SentEnvelope: telemetrytypes.SentEnvelope{ + MessageHash: errorSendingEnvelope.SentEnvelope.Envelope.Hash().String(), + SentAt: errorSendingEnvelope.SentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second), + PubsubTopic: errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic(), + Topic: errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic, + SenderKeyUID: c.keyUID, + PeerID: c.peerId, + NodeName: c.nodeName, + PublishMethod: errorSendingEnvelope.SentEnvelope.PublishMethod.String(), + StatusVersion: c.version, + }, + Error: errorSendingEnvelope.Error.Error(), } + body, _ := json.Marshal(postBody) jsonRawMessage := json.RawMessage(body) return &jsonRawMessage } func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage { - postBody := map[string]interface{}{ - "peerCount": peerCount.PeerCount, - "nodeName": c.nodeName, - "nodeKeyUID": c.keyUID, - "peerId": c.peerId, - "statusVersion": c.version, - "timestamp": time.Now().Unix(), + postBody := telemetrytypes.PeerCount{ + PeerCount: peerCount.PeerCount, + NodeName: c.nodeName, + NodeKeyUid: c.keyUID, + PeerID: c.peerId, + StatusVersion: c.version, + Timestamp: time.Now().Unix(), } + body, _ := json.Marshal(postBody) jsonRawMessage := json.RawMessage(body) return &jsonRawMessage } func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.RawMessage { - postBody := map[string]interface{}{ - "failedPeerId": peerConnFailure.FailedPeerId, - "failureCount": peerConnFailure.FailureCount, - "nodeName": c.nodeName, - "nodeKeyUID": c.keyUID, - "peerId": c.peerId, - "statusVersion": c.version, - "timestamp": time.Now().Unix(), + postBody := telemetrytypes.PeerConnFailure{ + FailedPeerId: peerConnFailure.FailedPeerId, + FailureCount: peerConnFailure.FailureCount, + NodeName: c.nodeName, + NodeKeyUid: c.keyUID, + PeerId: c.peerId, + StatusVersion: c.version, + Timestamp: time.Now().Unix(), } body, _ := json.Marshal(postBody) jsonRawMessage := json.RawMessage(body) @@ -397,15 +405,15 @@ func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, proces if processingError != nil { errorString = processingError.Error() } - postBody := map[string]interface{}{ - "messageHash": types.EncodeHex(shhMessage.Hash), - "sentAt": shhMessage.Timestamp, - "pubsubTopic": shhMessage.PubsubTopic, - "topic": shhMessage.Topic, - "receiverKeyUID": c.keyUID, - "peerId": c.peerId, - "nodeName": c.nodeName, - "processingError": errorString, + postBody := telemetrytypes.ReceivedEnvelope{ + MessageHash: types.EncodeHex(shhMessage.Hash), + SentAt: int64(shhMessage.Timestamp), + PubsubTopic: shhMessage.PubsubTopic, + Topic: shhMessage.Topic.String(), + ReceiverKeyUID: c.keyUID, + PeerID: c.peerId, + NodeName: c.nodeName, + ProcessingError: errorString, } body, _ := json.Marshal(postBody) _, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) @@ -413,3 +421,26 @@ func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, proces c.logger.Error("Error sending envelope update to telemetry server", zap.Error(err)) } } + +func (c *Client) ProcessProtocolStats(stats telemetry.ProtocolStatsMap) *json.RawMessage { + getStatsPerProtocol := func(protocolID protocol.ID, stats telemetry.ProtocolStatsMap) telemetrytypes.Metric { + return telemetrytypes.Metric{ + RateIn: stats[protocolID].RateIn, + RateOut: stats[protocolID].RateOut, + TotalIn: stats[protocolID].TotalIn, + TotalOut: stats[protocolID].TotalOut, + } + } + + postBody := telemetrytypes.ProtocolStats{ + PeerID: c.peerId, + Relay: getStatsPerProtocol(relay.WakuRelayID_v200, stats), + Store: getStatsPerProtocol(legacy_store.StoreID_v20beta4, stats), + FilterPush: getStatsPerProtocol(filter.FilterPushID_v20beta1, stats), + FilterSubscribe: getStatsPerProtocol(filter.FilterSubscribeID_v20beta1, stats), + Lightpush: getStatsPerProtocol(lightpush.LightPushID_v20beta1, stats), + } + body, _ := json.Marshal(postBody) + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage +} diff --git a/telemetry/client_test.go b/telemetry/client_test.go index d6482bb8d7..e8bb75e4b1 100644 --- a/telemetry/client_test.go +++ b/telemetry/client_test.go @@ -15,23 +15,33 @@ import ( "go.uber.org/zap" "google.golang.org/protobuf/proto" + "github.com/libp2p/go-libp2p/core/metrics" + v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" + "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/stretchr/testify/require" + telemetrytypes "github.com/status-im/telemetry/pkg/types" + "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/protocol/tt" v1protocol "github.com/status-im/status-go/protocol/v1" + telemetry "github.com/status-im/status-go/telemetry/common" "github.com/status-im/status-go/wakuv2" + "github.com/status-im/status-go/wakuv2/common" ) var ( testContentTopic = "/waku/1/0x12345679/rfc26" ) -func createMockServer(t *testing.T, wg *sync.WaitGroup, expectedType TelemetryType, expectedCondition func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool)) *httptest.Server { +func createMockServer(t *testing.T, wg *sync.WaitGroup, expectedType telemetrytypes.TelemetryType, expectedCondition func(received []telemetrytypes.TelemetryRequest) (shouldSucceed bool, shouldFail bool)) *httptest.Server { return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { t.Errorf("Expected 'POST' request, got '%s'", r.Method) @@ -41,7 +51,7 @@ func createMockServer(t *testing.T, wg *sync.WaitGroup, expectedType TelemetryTy } // Check the request body is as expected - var received []TelemetryRequest + var received []telemetrytypes.TelemetryRequest err := json.NewDecoder(r.Body).Decode(&received) if err != nil { t.Fatal(err) @@ -95,9 +105,9 @@ func createClient(t *testing.T, mockServerURL string) *Client { return NewClient(logger, mockServerURL, "testUID", "testNode", "1.0", WithSendPeriod(100*time.Millisecond), WithPeerID("16Uiu2HAkvWiyFsgRhuJEb9JfjYxEkoHLgnUQmr1N5mKWnYjxYRVm")) } -type expectedCondition func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) +type expectedCondition func(received []telemetrytypes.TelemetryRequest) (shouldSucceed bool, shouldFail bool) -func withMockServer(t *testing.T, expectedType TelemetryType, expectedCondition expectedCondition, testFunc func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup)) { +func withMockServer(t *testing.T, expectedType telemetrytypes.TelemetryType, expectedCondition expectedCondition, testFunc func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup)) { var wg sync.WaitGroup wg.Add(1) // Expecting one request @@ -116,7 +126,7 @@ func withMockServer(t *testing.T, expectedType TelemetryType, expectedCondition } func TestClient_ProcessReceivedMessages(t *testing.T) { - withMockServer(t, ReceivedMessagesMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { + withMockServer(t, telemetrytypes.ReceivedMessagesMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { // Create a telemetry request to send data := ReceivedMessages{ Filter: transport.Filter{ @@ -145,7 +155,7 @@ func TestClient_ProcessReceivedMessages(t *testing.T) { } func TestClient_ProcessReceivedEnvelope(t *testing.T) { - withMockServer(t, ReceivedEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { + withMockServer(t, telemetrytypes.ReceivedEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { // Create a telemetry request to send envelope := v2protocol.NewEnvelope(&pb.WakuMessage{ Payload: []byte{1, 2, 3, 4, 5}, @@ -161,16 +171,16 @@ func TestClient_ProcessReceivedEnvelope(t *testing.T) { } func TestClient_ProcessSentEnvelope(t *testing.T) { - withMockServer(t, SentEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { + withMockServer(t, telemetrytypes.SentEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { // Create a telemetry request to send - sentEnvelope := wakuv2.SentEnvelope{ + sentEnvelope := telemetry.SentEnvelope{ Envelope: v2protocol.NewEnvelope(&pb.WakuMessage{ Payload: []byte{1, 2, 3, 4, 5}, ContentTopic: testContentTopic, Version: proto.Uint32(0), Timestamp: proto.Int64(time.Now().Unix()), }, 0, ""), - PublishMethod: wakuv2.LightPush, + PublishMethod: common.LightPush, } // Send the telemetry request @@ -184,7 +194,7 @@ var ( ) func TestTelemetryUponPublishError(t *testing.T) { - withMockServer(t, ErrorSendingEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { + withMockServer(t, telemetrytypes.ErrorSendingEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { enrTreeAddress := testENRBootstrap envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS") if envEnrTreeAddress != "" { @@ -241,7 +251,7 @@ func TestRetryCache(t *testing.T) { } // Check the request body is as expected - var received []TelemetryRequest + var received []telemetrytypes.TelemetryRequest err := json.NewDecoder(r.Body).Decode(&received) if err != nil { t.Fatal(err) @@ -358,14 +368,14 @@ var testStoreENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU func TestPeerCount(t *testing.T) { t.Skip("flaky test") - expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) { - found := slices.ContainsFunc(received, func(req TelemetryRequest) bool { + expectedCondition := func(received []telemetrytypes.TelemetryRequest) (shouldSucceed bool, shouldFail bool) { + found := slices.ContainsFunc(received, func(req telemetrytypes.TelemetryRequest) bool { t.Log(req) - return req.TelemetryType == PeerCountMetric + return req.TelemetryType == telemetrytypes.PeerCountMetric }) return found, false } - withMockServer(t, PeerCountMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { + withMockServer(t, telemetrytypes.PeerCountMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { config := &wakuv2.Config{} setDefaultConfig(config, false) config.DiscV5BootstrapNodes = []string{testStoreENRBootstrap} @@ -392,7 +402,7 @@ func TestPeerCount(t *testing.T) { } func TestPeerId(t *testing.T) { - expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) { + expectedCondition := func(received []telemetrytypes.TelemetryRequest) (shouldSucceed bool, shouldFail bool) { var data map[string]interface{} err := json.Unmarshal(*received[0].TelemetryData, &data) @@ -404,7 +414,7 @@ func TestPeerId(t *testing.T) { require.True(t, ok) return ok, false } - withMockServer(t, ReceivedEnvelopeMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { + withMockServer(t, telemetrytypes.ReceivedEnvelopeMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { client.Start(ctx) client.PushReceivedEnvelope(ctx, v2protocol.NewEnvelope(&pb.WakuMessage{ @@ -417,3 +427,39 @@ func TestPeerId(t *testing.T) { }) } + +func TestProtocolStats(t *testing.T) { + expectedCondition := func(received []telemetrytypes.TelemetryRequest) (shouldSucceed bool, shouldFail bool) { + var data telemetrytypes.ProtocolStats + + err := json.Unmarshal(*received[0].TelemetryData, &data) + if err != nil { + return false, true + } + + ok := data.FilterPush.RateIn == 30 && data.Lightpush.TotalOut == 20 + require.True(t, ok) + return ok, false + } + + withMockServer(t, telemetrytypes.ReceivedEnvelopeMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { + client.Start(ctx) + + s := metrics.Stats{ + TotalIn: 10, + TotalOut: 20, + RateIn: 30, + RateOut: 40, + } + + m := make(telemetry.ProtocolStatsMap) + m[relay.WakuRelayID_v200] = s + m[filter.FilterPushID_v20beta1] = s + m[filter.FilterSubscribeID_v20beta1] = s + m[legacy_store.StoreID_v20beta4] = s + m[lightpush.LightPushID_v20beta1] = s + + client.PushProtocolStats(ctx, m) + }) + +} diff --git a/telemetry/common/common.go b/telemetry/common/common.go new file mode 100644 index 0000000000..ed29e5a589 --- /dev/null +++ b/telemetry/common/common.go @@ -0,0 +1,33 @@ +package common + +import ( + "context" + + "github.com/status-im/status-go/wakuv2/common" + wakuv2protocol "github.com/waku-org/go-waku/waku/v2/protocol" + + "github.com/libp2p/go-libp2p/core/protocol" + + "github.com/libp2p/go-libp2p/core/metrics" +) + +type ProtocolStatsMap map[protocol.ID]metrics.Stats + +type SentEnvelope struct { + Envelope *wakuv2protocol.Envelope + PublishMethod common.PublishMethod +} + +type ErrorSendingEnvelope struct { + Error error + SentEnvelope SentEnvelope +} + +type ITelemetryClient interface { + PushReceivedEnvelope(ctx context.Context, receivedEnvelope *wakuv2protocol.Envelope) + PushSentEnvelope(ctx context.Context, sentEnvelope SentEnvelope) + PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope ErrorSendingEnvelope) + PushPeerCount(ctx context.Context, peerCount int) + PushPeerConnFailures(ctx context.Context, peerConnFailures map[string]int) + PushProtocolStats(ctx context.Context, stats ProtocolStatsMap) +} diff --git a/vendor/github.com/rs/cors/.travis.yml b/vendor/github.com/rs/cors/.travis.yml deleted file mode 100644 index 9a68b56762..0000000000 --- a/vendor/github.com/rs/cors/.travis.yml +++ /dev/null @@ -1,9 +0,0 @@ -language: go -go: -- "1.10" -- "1.11" -- "1.12" -- tip -matrix: - allow_failures: - - go: tip diff --git a/vendor/github.com/rs/cors/README.md b/vendor/github.com/rs/cors/README.md index ecc83b2951..c7fbea0034 100644 --- a/vendor/github.com/rs/cors/README.md +++ b/vendor/github.com/rs/cors/README.md @@ -1,4 +1,4 @@ -# Go CORS handler [![godoc](http://img.shields.io/badge/godoc-reference-blue.svg?style=flat)](https://godoc.org/github.com/rs/cors) [![license](http://img.shields.io/badge/license-MIT-red.svg?style=flat)](https://raw.githubusercontent.com/rs/cors/master/LICENSE) [![build](https://img.shields.io/travis/rs/cors.svg?style=flat)](https://travis-ci.org/rs/cors) [![Coverage](http://gocover.io/_badge/github.com/rs/cors)](http://gocover.io/github.com/rs/cors) +# Go CORS handler [![godoc](http://img.shields.io/badge/godoc-reference-blue.svg?style=flat)](https://godoc.org/github.com/rs/cors) [![license](http://img.shields.io/badge/license-MIT-red.svg?style=flat)](https://raw.githubusercontent.com/rs/cors/master/LICENSE) [![Go Coverage](https://github.com/rs/cors/wiki/coverage.svg)](https://raw.githack.com/wiki/rs/cors/coverage.html) CORS is a `net/http` handler implementing [Cross Origin Resource Sharing W3 specification](http://www.w3.org/TR/cors/) in Golang. @@ -88,27 +88,37 @@ handler = c.Handler(handler) * **AllowedOrigins** `[]string`: A list of origins a cross-domain request can be executed from. If the special `*` value is present in the list, all origins will be allowed. An origin may contain a wildcard (`*`) to replace 0 or more characters (i.e.: `http://*.domain.com`). Usage of wildcards implies a small performance penality. Only one wildcard can be used per origin. The default value is `*`. * **AllowOriginFunc** `func (origin string) bool`: A custom function to validate the origin. It takes the origin as an argument and returns true if allowed, or false otherwise. If this option is set, the content of `AllowedOrigins` is ignored. -* **AllowOriginRequestFunc** `func (r *http.Request origin string) bool`: A custom function to validate the origin. It takes the HTTP Request object and the origin as argument and returns true if allowed or false otherwise. If this option is set, the content of `AllowedOrigins` and `AllowOriginFunc` is ignored +* **AllowOriginRequestFunc** `func (r *http.Request, origin string) bool`: A custom function to validate the origin. It takes the HTTP Request object and the origin as argument and returns true if allowed or false otherwise. If this option is set, the contents of `AllowedOrigins` and `AllowOriginFunc` are ignored. +Deprecated: use `AllowOriginVaryRequestFunc` instead. +* **AllowOriginVaryRequestFunc** `func(r *http.Request, origin string) (bool, []string)`: A custom function to validate the origin. It takes the HTTP Request object and the origin as argument and returns true if allowed or false otherwise with a list of headers used to take that decision if any so they can be added to the Vary header. If this option is set, the contents of `AllowedOrigins`, `AllowOriginFunc` and `AllowOriginRequestFunc` are ignored. * **AllowedMethods** `[]string`: A list of methods the client is allowed to use with cross-domain requests. Default value is simple methods (`GET` and `POST`). * **AllowedHeaders** `[]string`: A list of non simple headers the client is allowed to use with cross-domain requests. -* **ExposedHeaders** `[]string`: Indicates which headers are safe to expose to the API of a CORS API specification +* **ExposedHeaders** `[]string`: Indicates which headers are safe to expose to the API of a CORS API specification. * **AllowCredentials** `bool`: Indicates whether the request can include user credentials like cookies, HTTP authentication or client side SSL certificates. The default is `false`. +* **AllowPrivateNetwork** `bool`: Indicates whether to accept cross-origin requests over a private network. * **MaxAge** `int`: Indicates how long (in seconds) the results of a preflight request can be cached. The default is `0` which stands for no max age. * **OptionsPassthrough** `bool`: Instructs preflight to let other potential next handlers to process the `OPTIONS` method. Turn this on if your application handles `OPTIONS`. +* **OptionsSuccessStatus** `int`: Provides a status code to use for successful OPTIONS requests. Default value is `http.StatusNoContent` (`204`). * **Debug** `bool`: Debugging flag adds additional output to debug server side CORS issues. See [API documentation](http://godoc.org/github.com/rs/cors) for more info. ## Benchmarks - BenchmarkWithout 20000000 64.6 ns/op 8 B/op 1 allocs/op - BenchmarkDefault 3000000 469 ns/op 114 B/op 2 allocs/op - BenchmarkAllowedOrigin 3000000 608 ns/op 114 B/op 2 allocs/op - BenchmarkPreflight 20000000 73.2 ns/op 0 B/op 0 allocs/op - BenchmarkPreflightHeader 20000000 73.6 ns/op 0 B/op 0 allocs/op - BenchmarkParseHeaderList 2000000 847 ns/op 184 B/op 6 allocs/op - BenchmarkParse…Single 5000000 290 ns/op 32 B/op 3 allocs/op - BenchmarkParse…Normalized 2000000 776 ns/op 160 B/op 6 allocs/op +``` +goos: darwin +goarch: arm64 +pkg: github.com/rs/cors +BenchmarkWithout-10 135325480 8.124 ns/op 0 B/op 0 allocs/op +BenchmarkDefault-10 24082140 51.40 ns/op 0 B/op 0 allocs/op +BenchmarkAllowedOrigin-10 16424518 88.25 ns/op 0 B/op 0 allocs/op +BenchmarkPreflight-10 8010259 147.3 ns/op 0 B/op 0 allocs/op +BenchmarkPreflightHeader-10 6850962 175.0 ns/op 0 B/op 0 allocs/op +BenchmarkWildcard/match-10 253275342 4.714 ns/op 0 B/op 0 allocs/op +BenchmarkWildcard/too_short-10 1000000000 0.6235 ns/op 0 B/op 0 allocs/op +PASS +ok github.com/rs/cors 99.131s +``` ## Licenses diff --git a/vendor/github.com/rs/cors/cors.go b/vendor/github.com/rs/cors/cors.go index 2730934630..da80d343b3 100644 --- a/vendor/github.com/rs/cors/cors.go +++ b/vendor/github.com/rs/cors/cors.go @@ -4,15 +4,15 @@ as defined by http://www.w3.org/TR/cors/ You can configure it by passing an option struct to cors.New: - c := cors.New(cors.Options{ - AllowedOrigins: []string{"foo.com"}, - AllowedMethods: []string{http.MethodGet, http.MethodPost, http.MethodDelete}, - AllowCredentials: true, - }) + c := cors.New(cors.Options{ + AllowedOrigins: []string{"foo.com"}, + AllowedMethods: []string{http.MethodGet, http.MethodPost, http.MethodDelete}, + AllowCredentials: true, + }) Then insert the handler in the chain: - handler = c.Handler(handler) + handler = c.Handler(handler) See Options documentation for more options. @@ -26,8 +26,14 @@ import ( "os" "strconv" "strings" + + "github.com/rs/cors/internal" ) +var headerVaryOrigin = []string{"Origin"} +var headerOriginAll = []string{"*"} +var headerTrue = []string{"true"} + // Options is a configuration container to setup the CORS middleware. type Options struct { // AllowedOrigins is a list of origins a cross-domain request can be executed from. @@ -37,36 +43,58 @@ type Options struct { // Only one wildcard can be used per origin. // Default value is ["*"] AllowedOrigins []string - // AllowOriginFunc is a custom function to validate the origin. It take the origin - // as argument and returns true if allowed or false otherwise. If this option is - // set, the content of AllowedOrigins is ignored. + // AllowOriginFunc is a custom function to validate the origin. It take the + // origin as argument and returns true if allowed or false otherwise. If + // this option is set, the content of `AllowedOrigins` is ignored. AllowOriginFunc func(origin string) bool - // AllowOriginFunc is a custom function to validate the origin. It takes the HTTP Request object and the origin as - // argument and returns true if allowed or false otherwise. If this option is set, the content of `AllowedOrigins` - // and `AllowOriginFunc` is ignored. + // AllowOriginRequestFunc is a custom function to validate the origin. It + // takes the HTTP Request object and the origin as argument and returns true + // if allowed or false otherwise. If headers are used take the decision, + // consider using AllowOriginVaryRequestFunc instead. If this option is set, + // the contents of `AllowedOrigins`, `AllowOriginFunc` are ignored. + // + // Deprecated: use `AllowOriginVaryRequestFunc` instead. AllowOriginRequestFunc func(r *http.Request, origin string) bool + // AllowOriginVaryRequestFunc is a custom function to validate the origin. + // It takes the HTTP Request object and the origin as argument and returns + // true if allowed or false otherwise with a list of headers used to take + // that decision if any so they can be added to the Vary header. If this + // option is set, the contents of `AllowedOrigins`, `AllowOriginFunc` and + // `AllowOriginRequestFunc` are ignored. + AllowOriginVaryRequestFunc func(r *http.Request, origin string) (bool, []string) // AllowedMethods is a list of methods the client is allowed to use with // cross-domain requests. Default value is simple methods (HEAD, GET and POST). AllowedMethods []string // AllowedHeaders is list of non simple headers the client is allowed to use with // cross-domain requests. // If the special "*" value is present in the list, all headers will be allowed. - // Default value is [] but "Origin" is always appended to the list. + // Default value is []. AllowedHeaders []string // ExposedHeaders indicates which headers are safe to expose to the API of a CORS // API specification ExposedHeaders []string // MaxAge indicates how long (in seconds) the results of a preflight request - // can be cached + // can be cached. Default value is 0, which stands for no + // Access-Control-Max-Age header to be sent back, resulting in browsers + // using their default value (5s by spec). If you need to force a 0 max-age, + // set `MaxAge` to a negative value (ie: -1). MaxAge int // AllowCredentials indicates whether the request can include user credentials like // cookies, HTTP authentication or client side SSL certificates. AllowCredentials bool + // AllowPrivateNetwork indicates whether to accept cross-origin requests over a + // private network. + AllowPrivateNetwork bool // OptionsPassthrough instructs preflight to let other potential next handlers to // process the OPTIONS method. Turn this on if your application handles OPTIONS. OptionsPassthrough bool + // Provides a status code to use for successful OPTIONS requests. + // Default value is http.StatusNoContent (204). + OptionsSuccessStatus int // Debugging flag adds additional output to debug server side CORS issues Debug bool + // Adds a custom logger, implies Debug is true + Logger Logger } // Logger generic interface for logger @@ -83,53 +111,66 @@ type Cors struct { // List of allowed origins containing wildcards allowedWOrigins []wildcard // Optional origin validator function - allowOriginFunc func(origin string) bool - // Optional origin validator (with request) function - allowOriginRequestFunc func(r *http.Request, origin string) bool + allowOriginFunc func(r *http.Request, origin string) (bool, []string) // Normalized list of allowed headers - allowedHeaders []string + // Note: the Fetch standard guarantees that CORS-unsafe request-header names + // (i.e. the values listed in the Access-Control-Request-Headers header) + // are unique and sorted; + // see https://fetch.spec.whatwg.org/#cors-unsafe-request-header-names. + allowedHeaders internal.SortedSet // Normalized list of allowed methods allowedMethods []string - // Normalized list of exposed headers + // Pre-computed normalized list of exposed headers exposedHeaders []string - maxAge int + // Pre-computed maxAge header value + maxAge []string // Set to true when allowed origins contains a "*" allowedOriginsAll bool // Set to true when allowed headers contains a "*" allowedHeadersAll bool - allowCredentials bool - optionPassthrough bool + // Status code to use for successful OPTIONS requests + optionsSuccessStatus int + allowCredentials bool + allowPrivateNetwork bool + optionPassthrough bool + preflightVary []string } // New creates a new Cors handler with the provided options. func New(options Options) *Cors { c := &Cors{ - exposedHeaders: convert(options.ExposedHeaders, http.CanonicalHeaderKey), - allowOriginFunc: options.AllowOriginFunc, - allowOriginRequestFunc: options.AllowOriginRequestFunc, - allowCredentials: options.AllowCredentials, - maxAge: options.MaxAge, - optionPassthrough: options.OptionsPassthrough, + allowCredentials: options.AllowCredentials, + allowPrivateNetwork: options.AllowPrivateNetwork, + optionPassthrough: options.OptionsPassthrough, + Log: options.Logger, } if options.Debug && c.Log == nil { c.Log = log.New(os.Stdout, "[cors] ", log.LstdFlags) } - // Normalize options - // Note: for origins and methods matching, the spec requires a case-sensitive matching. - // As it may error prone, we chose to ignore the spec here. - - // Allowed Origins - if len(options.AllowedOrigins) == 0 { - if options.AllowOriginFunc == nil && options.AllowOriginRequestFunc == nil { + // Allowed origins + switch { + case options.AllowOriginVaryRequestFunc != nil: + c.allowOriginFunc = options.AllowOriginVaryRequestFunc + case options.AllowOriginRequestFunc != nil: + c.allowOriginFunc = func(r *http.Request, origin string) (bool, []string) { + return options.AllowOriginRequestFunc(r, origin), nil + } + case options.AllowOriginFunc != nil: + c.allowOriginFunc = func(r *http.Request, origin string) (bool, []string) { + return options.AllowOriginFunc(origin), nil + } + case len(options.AllowedOrigins) == 0: + if c.allowOriginFunc == nil { // Default is all origins c.allowedOriginsAll = true } - } else { + default: c.allowedOrigins = []string{} c.allowedWOrigins = []wildcard{} for _, origin := range options.AllowedOrigins { - // Normalize + // Note: for origins matching, the spec requires a case-sensitive matching. + // As it may error prone, we chose to ignore the spec here. origin = strings.ToLower(origin) if origin == "*" { // If "*" is present in the list, turn the whole list into a match all @@ -148,16 +189,19 @@ func New(options Options) *Cors { } // Allowed Headers + // Note: the Fetch standard guarantees that CORS-unsafe request-header names + // (i.e. the values listed in the Access-Control-Request-Headers header) + // are lowercase; see https://fetch.spec.whatwg.org/#cors-unsafe-request-header-names. if len(options.AllowedHeaders) == 0 { // Use sensible defaults - c.allowedHeaders = []string{"Origin", "Accept", "Content-Type", "X-Requested-With"} + c.allowedHeaders = internal.NewSortedSet("accept", "content-type", "x-requested-with") } else { - // Origin is always appended as some browsers will always request for this header at preflight - c.allowedHeaders = convert(append(options.AllowedHeaders, "Origin"), http.CanonicalHeaderKey) + normalized := convert(options.AllowedHeaders, strings.ToLower) + c.allowedHeaders = internal.NewSortedSet(normalized...) for _, h := range options.AllowedHeaders { if h == "*" { c.allowedHeadersAll = true - c.allowedHeaders = nil + c.allowedHeaders = internal.SortedSet{} break } } @@ -168,7 +212,33 @@ func New(options Options) *Cors { // Default is spec's "simple" methods c.allowedMethods = []string{http.MethodGet, http.MethodPost, http.MethodHead} } else { - c.allowedMethods = convert(options.AllowedMethods, strings.ToUpper) + c.allowedMethods = options.AllowedMethods + } + + // Options Success Status Code + if options.OptionsSuccessStatus == 0 { + c.optionsSuccessStatus = http.StatusNoContent + } else { + c.optionsSuccessStatus = options.OptionsSuccessStatus + } + + // Pre-compute exposed headers header value + if len(options.ExposedHeaders) > 0 { + c.exposedHeaders = []string{strings.Join(convert(options.ExposedHeaders, http.CanonicalHeaderKey), ", ")} + } + + // Pre-compute prefight Vary header to save allocations + if c.allowPrivateNetwork { + c.preflightVary = []string{"Origin, Access-Control-Request-Method, Access-Control-Request-Headers, Access-Control-Request-Private-Network"} + } else { + c.preflightVary = []string{"Origin, Access-Control-Request-Method, Access-Control-Request-Headers"} + } + + // Precompute max-age + if options.MaxAge > 0 { + c.maxAge = []string{strconv.Itoa(options.MaxAge)} + } else if options.MaxAge < 0 { + c.maxAge = []string{"0"} } return c @@ -211,7 +281,7 @@ func (c *Cors) Handler(h http.Handler) http.Handler { if c.optionPassthrough { h.ServeHTTP(w, r) } else { - w.WriteHeader(http.StatusOK) + w.WriteHeader(c.optionsSuccessStatus) } } else { c.logf("Handler: Actual request") @@ -226,6 +296,8 @@ func (c *Cors) HandlerFunc(w http.ResponseWriter, r *http.Request) { if r.Method == http.MethodOptions && r.Header.Get("Access-Control-Request-Method") != "" { c.logf("HandlerFunc: Preflight request") c.handlePreflight(w, r) + + w.WriteHeader(c.optionsSuccessStatus) } else { c.logf("HandlerFunc: Actual request") c.handleActualRequest(w, r) @@ -244,7 +316,7 @@ func (c *Cors) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.Handl if c.optionPassthrough { next(w, r) } else { - w.WriteHeader(http.StatusOK) + w.WriteHeader(c.optionsSuccessStatus) } } else { c.logf("ServeHTTP: Actual request") @@ -265,15 +337,21 @@ func (c *Cors) handlePreflight(w http.ResponseWriter, r *http.Request) { // Always set Vary headers // see https://github.com/rs/cors/issues/10, // https://github.com/rs/cors/commit/dbdca4d95feaa7511a46e6f1efb3b3aa505bc43f#commitcomment-12352001 - headers.Add("Vary", "Origin") - headers.Add("Vary", "Access-Control-Request-Method") - headers.Add("Vary", "Access-Control-Request-Headers") + if vary, found := headers["Vary"]; found { + headers["Vary"] = append(vary, c.preflightVary[0]) + } else { + headers["Vary"] = c.preflightVary + } + allowed, additionalVaryHeaders := c.isOriginAllowed(r, origin) + if len(additionalVaryHeaders) > 0 { + headers.Add("Vary", strings.Join(convert(additionalVaryHeaders, http.CanonicalHeaderKey), ", ")) + } if origin == "" { c.logf(" Preflight aborted: empty origin") return } - if !c.isOriginAllowed(r, origin) { + if !allowed { c.logf(" Preflight aborted: origin '%s' not allowed", origin) return } @@ -283,32 +361,39 @@ func (c *Cors) handlePreflight(w http.ResponseWriter, r *http.Request) { c.logf(" Preflight aborted: method '%s' not allowed", reqMethod) return } - reqHeaders := parseHeaderList(r.Header.Get("Access-Control-Request-Headers")) - if !c.areHeadersAllowed(reqHeaders) { - c.logf(" Preflight aborted: headers '%v' not allowed", reqHeaders) + // Note: the Fetch standard guarantees that at most one + // Access-Control-Request-Headers header is present in the preflight request; + // see step 5.2 in https://fetch.spec.whatwg.org/#cors-preflight-fetch-0. + reqHeaders, found := first(r.Header, "Access-Control-Request-Headers") + if found && !c.allowedHeadersAll && !c.allowedHeaders.Subsumes(reqHeaders[0]) { + c.logf(" Preflight aborted: headers '%v' not allowed", reqHeaders[0]) return } if c.allowedOriginsAll { - headers.Set("Access-Control-Allow-Origin", "*") + headers["Access-Control-Allow-Origin"] = headerOriginAll } else { - headers.Set("Access-Control-Allow-Origin", origin) + headers["Access-Control-Allow-Origin"] = r.Header["Origin"] } // Spec says: Since the list of methods can be unbounded, simply returning the method indicated // by Access-Control-Request-Method (if supported) can be enough - headers.Set("Access-Control-Allow-Methods", strings.ToUpper(reqMethod)) - if len(reqHeaders) > 0 { - + headers["Access-Control-Allow-Methods"] = r.Header["Access-Control-Request-Method"] + if found && len(reqHeaders[0]) > 0 { // Spec says: Since the list of headers can be unbounded, simply returning supported headers // from Access-Control-Request-Headers can be enough - headers.Set("Access-Control-Allow-Headers", strings.Join(reqHeaders, ", ")) + headers["Access-Control-Allow-Headers"] = reqHeaders } if c.allowCredentials { - headers.Set("Access-Control-Allow-Credentials", "true") + headers["Access-Control-Allow-Credentials"] = headerTrue } - if c.maxAge > 0 { - headers.Set("Access-Control-Max-Age", strconv.Itoa(c.maxAge)) + if c.allowPrivateNetwork && r.Header.Get("Access-Control-Request-Private-Network") == "true" { + headers["Access-Control-Allow-Private-Network"] = headerTrue + } + if len(c.maxAge) > 0 { + headers["Access-Control-Max-Age"] = c.maxAge + } + if c.Log != nil { + c.logf(" Preflight response headers: %v", headers) } - c.logf(" Preflight response headers: %v", headers) } // handleActualRequest handles simple cross-origin requests, actual request or redirects @@ -316,13 +401,22 @@ func (c *Cors) handleActualRequest(w http.ResponseWriter, r *http.Request) { headers := w.Header() origin := r.Header.Get("Origin") + allowed, additionalVaryHeaders := c.isOriginAllowed(r, origin) + // Always set Vary, see https://github.com/rs/cors/issues/10 - headers.Add("Vary", "Origin") + if vary := headers["Vary"]; vary == nil { + headers["Vary"] = headerVaryOrigin + } else { + headers["Vary"] = append(vary, headerVaryOrigin[0]) + } + if len(additionalVaryHeaders) > 0 { + headers.Add("Vary", strings.Join(convert(additionalVaryHeaders, http.CanonicalHeaderKey), ", ")) + } if origin == "" { c.logf(" Actual request no headers added: missing origin") return } - if !c.isOriginAllowed(r, origin) { + if !allowed { c.logf(" Actual request no headers added: origin '%s' not allowed", origin) return } @@ -333,21 +427,22 @@ func (c *Cors) handleActualRequest(w http.ResponseWriter, r *http.Request) { // We think it's a nice feature to be able to have control on those methods though. if !c.isMethodAllowed(r.Method) { c.logf(" Actual request no headers added: method '%s' not allowed", r.Method) - return } if c.allowedOriginsAll { - headers.Set("Access-Control-Allow-Origin", "*") + headers["Access-Control-Allow-Origin"] = headerOriginAll } else { - headers.Set("Access-Control-Allow-Origin", origin) + headers["Access-Control-Allow-Origin"] = r.Header["Origin"] } if len(c.exposedHeaders) > 0 { - headers.Set("Access-Control-Expose-Headers", strings.Join(c.exposedHeaders, ", ")) + headers["Access-Control-Expose-Headers"] = c.exposedHeaders } if c.allowCredentials { - headers.Set("Access-Control-Allow-Credentials", "true") + headers["Access-Control-Allow-Credentials"] = headerTrue + } + if c.Log != nil { + c.logf(" Actual response added headers: %v", headers) } - c.logf(" Actual response added headers: %v", headers) } // convenience method. checks if a logger is set. @@ -357,40 +452,43 @@ func (c *Cors) logf(format string, a ...interface{}) { } } +// check the Origin of a request. No origin at all is also allowed. +func (c *Cors) OriginAllowed(r *http.Request) bool { + origin := r.Header.Get("Origin") + allowed, _ := c.isOriginAllowed(r, origin) + return allowed +} + // isOriginAllowed checks if a given origin is allowed to perform cross-domain requests // on the endpoint -func (c *Cors) isOriginAllowed(r *http.Request, origin string) bool { - if c.allowOriginRequestFunc != nil { - return c.allowOriginRequestFunc(r, origin) - } +func (c *Cors) isOriginAllowed(r *http.Request, origin string) (allowed bool, varyHeaders []string) { if c.allowOriginFunc != nil { - return c.allowOriginFunc(origin) + return c.allowOriginFunc(r, origin) } if c.allowedOriginsAll { - return true + return true, nil } origin = strings.ToLower(origin) for _, o := range c.allowedOrigins { if o == origin { - return true + return true, nil } } for _, w := range c.allowedWOrigins { if w.match(origin) { - return true + return true, nil } } - return false + return false, nil } // isMethodAllowed checks if a given method can be used as part of a cross-domain request -// on the endpoing +// on the endpoint func (c *Cors) isMethodAllowed(method string) bool { if len(c.allowedMethods) == 0 { // If no method allowed, always return false, even for preflight request return false } - method = strings.ToUpper(method) if method == http.MethodOptions { // Always allow preflight requests return true @@ -402,24 +500,3 @@ func (c *Cors) isMethodAllowed(method string) bool { } return false } - -// areHeadersAllowed checks if a given list of headers are allowed to used within -// a cross-domain request. -func (c *Cors) areHeadersAllowed(requestedHeaders []string) bool { - if c.allowedHeadersAll || len(requestedHeaders) == 0 { - return true - } - for _, header := range requestedHeaders { - header = http.CanonicalHeaderKey(header) - found := false - for _, h := range c.allowedHeaders { - if h == header { - found = true - } - } - if !found { - return false - } - } - return true -} diff --git a/vendor/github.com/rs/cors/internal/sortedset.go b/vendor/github.com/rs/cors/internal/sortedset.go new file mode 100644 index 0000000000..513da20f7d --- /dev/null +++ b/vendor/github.com/rs/cors/internal/sortedset.go @@ -0,0 +1,113 @@ +// adapted from github.com/jub0bs/cors +package internal + +import ( + "sort" + "strings" +) + +// A SortedSet represents a mathematical set of strings sorted in +// lexicographical order. +// Each element has a unique position ranging from 0 (inclusive) +// to the set's cardinality (exclusive). +// The zero value represents an empty set. +type SortedSet struct { + m map[string]int + maxLen int +} + +// NewSortedSet returns a SortedSet that contains all of elems, +// but no other elements. +func NewSortedSet(elems ...string) SortedSet { + sort.Strings(elems) + m := make(map[string]int) + var maxLen int + i := 0 + for _, s := range elems { + if _, exists := m[s]; exists { + continue + } + m[s] = i + i++ + maxLen = max(maxLen, len(s)) + } + return SortedSet{ + m: m, + maxLen: maxLen, + } +} + +// Size returns the cardinality of set. +func (set SortedSet) Size() int { + return len(set.m) +} + +// String sorts joins the elements of set (in lexicographical order) +// with a comma and returns the resulting string. +func (set SortedSet) String() string { + elems := make([]string, len(set.m)) + for elem, i := range set.m { + elems[i] = elem // safe indexing, by construction of SortedSet + } + return strings.Join(elems, ",") +} + +// Subsumes reports whether csv is a sequence of comma-separated names that are +// - all elements of set, +// - sorted in lexicographically order, +// - unique. +func (set SortedSet) Subsumes(csv string) bool { + if csv == "" { + return true + } + posOfLastNameSeen := -1 + chunkSize := set.maxLen + 1 // (to accommodate for at least one comma) + for { + // As a defense against maliciously long names in csv, + // we only process at most chunkSize bytes per iteration. + end := min(len(csv), chunkSize) + comma := strings.IndexByte(csv[:end], ',') + var name string + if comma == -1 { + name = csv + } else { + name = csv[:comma] + } + pos, found := set.m[name] + if !found { + return false + } + // The names in csv are expected to be sorted in lexicographical order + // and appear at most once in csv. + // Therefore, the positions (in set) of the names that + // appear in csv should form a strictly increasing sequence. + // If that's not actually the case, bail out. + if pos <= posOfLastNameSeen { + return false + } + posOfLastNameSeen = pos + if comma < 0 { // We've now processed all the names in csv. + break + } + csv = csv[comma+1:] + } + return true +} + +// TODO: when updating go directive to 1.21 or later, +// use min builtin instead. +func min(a, b int) int { + if a < b { + return a + } + return b +} + +// TODO: when updating go directive to 1.21 or later, +// use max builtin instead. +func max(a, b int) int { + if a > b { + return a + } + return b +} diff --git a/vendor/github.com/rs/cors/utils.go b/vendor/github.com/rs/cors/utils.go index db83ac3ea9..7019f45cd9 100644 --- a/vendor/github.com/rs/cors/utils.go +++ b/vendor/github.com/rs/cors/utils.go @@ -1,10 +1,9 @@ package cors -import "strings" - -const toLower = 'a' - 'A' - -type converter func(string) string +import ( + "net/http" + "strings" +) type wildcard struct { prefix string @@ -12,60 +11,24 @@ type wildcard struct { } func (w wildcard) match(s string) bool { - return len(s) >= len(w.prefix)+len(w.suffix) && strings.HasPrefix(s, w.prefix) && strings.HasSuffix(s, w.suffix) + return len(s) >= len(w.prefix)+len(w.suffix) && + strings.HasPrefix(s, w.prefix) && + strings.HasSuffix(s, w.suffix) } // convert converts a list of string using the passed converter function -func convert(s []string, c converter) []string { - out := []string{} - for _, i := range s { - out = append(out, c(i)) +func convert(s []string, f func(string) string) []string { + out := make([]string, len(s)) + for i := range s { + out[i] = f(s[i]) } return out } -// parseHeaderList tokenize + normalize a string containing a list of headers -func parseHeaderList(headerList string) []string { - l := len(headerList) - h := make([]byte, 0, l) - upper := true - // Estimate the number headers in order to allocate the right splice size - t := 0 - for i := 0; i < l; i++ { - if headerList[i] == ',' { - t++ - } - } - headers := make([]string, 0, t) - for i := 0; i < l; i++ { - b := headerList[i] - switch { - case b >= 'a' && b <= 'z': - if upper { - h = append(h, b-toLower) - } else { - h = append(h, b) - } - case b >= 'A' && b <= 'Z': - if !upper { - h = append(h, b+toLower) - } else { - h = append(h, b) - } - case b == '-' || b == '_' || (b >= '0' && b <= '9'): - h = append(h, b) - } - - if b == ' ' || b == ',' || i == l-1 { - if len(h) > 0 { - // Flush the found header - headers = append(headers, string(h)) - h = h[:0] - upper = true - } - } else { - upper = b == '-' || b == '_' - } +func first(hdrs http.Header, k string) ([]string, bool) { + v, found := hdrs[k] + if !found || len(v) == 0 { + return nil, false } - return headers + return v[:1], true } diff --git a/vendor/github.com/status-im/telemetry/LICENSE-APACHEv2 b/vendor/github.com/status-im/telemetry/LICENSE-APACHEv2 new file mode 100644 index 0000000000..29616de6e6 --- /dev/null +++ b/vendor/github.com/status-im/telemetry/LICENSE-APACHEv2 @@ -0,0 +1,205 @@ +go-waku is licensed under the Apache License version 2 +Copyright (c) 2018 Status Research & Development GmbH +----------------------------------------------------- + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2018 Status Research & Development GmbH + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/status-im/telemetry/LICENSE-MIT b/vendor/github.com/status-im/telemetry/LICENSE-MIT new file mode 100644 index 0000000000..b4d78784d2 --- /dev/null +++ b/vendor/github.com/status-im/telemetry/LICENSE-MIT @@ -0,0 +1,25 @@ +go-waku is licensed under the MIT License +Copyright (c) 2018 Status Research & Development GmbH +----------------------------------------------------- + +The MIT License (MIT) + +Copyright (c) 2018 Status Research & Development GmbH + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/status-im/telemetry/pkg/types/types.go b/vendor/github.com/status-im/telemetry/pkg/types/types.go new file mode 100644 index 0000000000..15da27ff53 --- /dev/null +++ b/vendor/github.com/status-im/telemetry/pkg/types/types.go @@ -0,0 +1,113 @@ +package types + +import "encoding/json" + +type TelemetryType string + +const ( + ProtocolStatsMetric TelemetryType = "ProtocolStats" + ReceivedEnvelopeMetric TelemetryType = "ReceivedEnvelope" + SentEnvelopeMetric TelemetryType = "SentEnvelope" + UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" + ReceivedMessagesMetric TelemetryType = "ReceivedMessages" + ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" + PeerCountMetric TelemetryType = "PeerCount" + PeerConnFailureMetric TelemetryType = "PeerConnFailure" +) + +type TelemetryRequest struct { + Id int `json:"id"` + TelemetryType TelemetryType `json:"telemetry_type"` + TelemetryData *json.RawMessage `json:"telemetry_data"` +} + +type PeerCount struct { + ID int `json:"id"` + CreatedAt int64 `json:"createdAt"` + Timestamp int64 `json:"timestamp"` + NodeName string `json:"nodeName"` + NodeKeyUid string `json:"nodeKeyUid"` + PeerID string `json:"peerId"` + PeerCount int `json:"peerCount"` + StatusVersion string `json:"statusVersion"` +} + +type PeerConnFailure struct { + ID int `json:"id"` + CreatedAt int64 `json:"createdAt"` + Timestamp int64 `json:"timestamp"` + NodeName string `json:"nodeName"` + NodeKeyUid string `json:"nodeKeyUid"` + PeerId string `json:"peerId"` + StatusVersion string `json:"statusVersion"` + FailedPeerId string `json:"failedPeerId"` + FailureCount int `json:"failureCount"` +} + +type SentEnvelope struct { + ID int `json:"id"` + MessageHash string `json:"messageHash"` + SentAt int64 `json:"sentAt"` + CreatedAt int64 `json:"createdAt"` + PubsubTopic string `json:"pubsubTopic"` + Topic string `json:"topic"` + SenderKeyUID string `json:"senderKeyUID"` + PeerID string `json:"peerId"` + NodeName string `json:"nodeName"` + ProcessingError string `json:"processingError"` + PublishMethod string `json:"publishMethod"` + StatusVersion string `json:"statusVersion"` +} + +type ErrorSendingEnvelope struct { + CreatedAt int64 `json:"createdAt"` + Error string `json:"error"` + SentEnvelope SentEnvelope `json:"sentEnvelope"` +} + +type ReceivedEnvelope struct { + ID int `json:"id"` + MessageHash string `json:"messageHash"` + SentAt int64 `json:"sentAt"` + CreatedAt int64 `json:"createdAt"` + PubsubTopic string `json:"pubsubTopic"` + Topic string `json:"topic"` + ReceiverKeyUID string `json:"receiverKeyUID"` + PeerID string `json:"peerId"` + NodeName string `json:"nodeName"` + ProcessingError string `json:"processingError"` + StatusVersion string `json:"statusVersion"` +} + +type Metric struct { + TotalIn int64 `json:"totalIn"` + TotalOut int64 `json:"totalOut"` + RateIn float64 `json:"rateIn"` + RateOut float64 `json:"rateOut"` +} + +type ProtocolStats struct { + PeerID string `json:"hostID"` + Relay Metric `json:"relay"` + Store Metric `json:"store"` + FilterPush Metric `json:"filter-push"` + FilterSubscribe Metric `json:"filter-subscribe"` + Lightpush Metric `json:"lightpush"` +} + +type ReceivedMessage struct { + ID int `json:"id"` + ChatID string `json:"chatId"` + MessageHash string `json:"messageHash"` + MessageID string `json:"messageId"` + MessageType string `json:"messageType"` + MessageSize int `json:"messageSize"` + ReceiverKeyUID string `json:"receiverKeyUID"` + PeerID string `json:"peerId"` + NodeName string `json:"nodeName"` + SentAt int64 `json:"sentAt"` + Topic string `json:"topic"` + PubsubTopic string `json:"pubsubTopic"` + CreatedAt int64 `json:"createdAt"` + StatusVersion string `json:"statusVersion"` +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 556c56a1e7..62e481d7df 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -881,9 +881,10 @@ github.com/rivo/uniseg # github.com/rjeczalik/notify v0.9.3 => github.com/status-im/notify v1.0.2-status ## explicit; go 1.11 github.com/rjeczalik/notify -# github.com/rs/cors v1.7.0 -## explicit +# github.com/rs/cors v1.11.0 +## explicit; go 1.13 github.com/rs/cors +github.com/rs/cors/internal # github.com/rs/dnscache v0.0.0-20210201191234-295bba877686 ## explicit; go 1.12 github.com/rs/dnscache @@ -958,6 +959,9 @@ github.com/status-im/status-go/extkeys # github.com/status-im/tcp-shaker v1.1.1-status ## explicit; go 1.13 github.com/status-im/tcp-shaker +# github.com/status-im/telemetry v0.0.0-20240813130615-b4e1a202a77b +## explicit; go 1.20 +github.com/status-im/telemetry/pkg/types # github.com/status-im/zxcvbn-go v0.0.0-20220311183720-5e8676676857 ## explicit; go 1.14 github.com/status-im/zxcvbn-go diff --git a/wakuv2/common/publish_method.go b/wakuv2/common/publish_method.go new file mode 100644 index 0000000000..01310f4dac --- /dev/null +++ b/wakuv2/common/publish_method.go @@ -0,0 +1,19 @@ +package common + +type PublishMethod int + +const ( + LightPush PublishMethod = iota + Relay +) + +func (pm PublishMethod) String() string { + switch pm { + case LightPush: + return "LightPush" + case Relay: + return "Relay" + default: + return "Unknown" + } +} diff --git a/wakuv2/message_publishing.go b/wakuv2/message_publishing.go index 9fbf44fa8a..a5954b5f17 100644 --- a/wakuv2/message_publishing.go +++ b/wakuv2/message_publishing.go @@ -5,34 +5,16 @@ import ( "go.uber.org/zap" + gethcommon "github.com/ethereum/go-ethereum/common" + telecommon "github.com/status-im/status-go/telemetry/common" + "github.com/status-im/status-go/wakuv2/common" "github.com/waku-org/go-waku/waku/v2/api/publish" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" - - gethcommon "github.com/ethereum/go-ethereum/common" - "github.com/status-im/status-go/wakuv2/common" ) -type PublishMethod int - -const ( - LightPush PublishMethod = iota - Relay -) - -func (pm PublishMethod) String() string { - switch pm { - case LightPush: - return "LightPush" - case Relay: - return "Relay" - default: - return "Unknown" - } -} - // Send injects a message into the waku send queue, to be distributed in the // network in the coming cycles. func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage, priority *int) ([]byte, error) { @@ -91,23 +73,23 @@ func (w *Waku) broadcast() { logger := w.logger.With(zap.Stringer("envelopeHash", envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp())) var fn publish.PublishFn - var publishMethod PublishMethod + var publishMethod common.PublishMethod if w.cfg.SkipPublishToTopic { // For now only used in testing to simulate going offline - publishMethod = LightPush + publishMethod = common.LightPush fn = func(env *protocol.Envelope, logger *zap.Logger) error { return errors.New("test send failure") } } else if w.cfg.LightClient { - publishMethod = LightPush + publishMethod = common.LightPush fn = func(env *protocol.Envelope, logger *zap.Logger) error { logger.Info("publishing message via lightpush") _, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()), lightpush.WithMaxPeers(peersToPublishForLightpush)) return err } } else { - publishMethod = Relay + publishMethod = common.Relay fn = func(env *protocol.Envelope, logger *zap.Logger) error { peerCnt := len(w.node.Relay().PubSub().ListPeers(env.PubsubTopic())) logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt)) @@ -122,9 +104,9 @@ func (w *Waku) broadcast() { fn = func(env *protocol.Envelope, logger *zap.Logger) error { err := sendFn(env, logger) if err == nil { - w.statusTelemetryClient.PushSentEnvelope(w.ctx, SentEnvelope{Envelope: env, PublishMethod: publishMethod}) + w.statusTelemetryClient.PushSentEnvelope(w.ctx, telecommon.SentEnvelope{Envelope: env, PublishMethod: publishMethod}) } else { - w.statusTelemetryClient.PushErrorSendingEnvelope(w.ctx, ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: env, PublishMethod: publishMethod}}) + w.statusTelemetryClient.PushErrorSendingEnvelope(w.ctx, telecommon.ErrorSendingEnvelope{Error: err, SentEnvelope: telecommon.SentEnvelope{Envelope: env, PublishMethod: publishMethod}}) } return err } diff --git a/wakuv2/telemetry.go b/wakuv2/telemetry.go deleted file mode 100644 index 581225d526..0000000000 --- a/wakuv2/telemetry.go +++ /dev/null @@ -1,64 +0,0 @@ -package wakuv2 - -import ( - "bytes" - "encoding/json" - "fmt" - "net/http" - "time" - - "github.com/google/uuid" - "github.com/libp2p/go-libp2p/core/metrics" - "github.com/libp2p/go-libp2p/core/protocol" - "go.uber.org/zap" - - "github.com/waku-org/go-waku/waku/v2/protocol/filter" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" - "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" -) - -type BandwidthTelemetryClient struct { - serverURL string - httpClient *http.Client - hostID string - logger *zap.Logger -} - -func NewBandwidthTelemetryClient(logger *zap.Logger, serverURL string) *BandwidthTelemetryClient { - return &BandwidthTelemetryClient{ - serverURL: serverURL, - httpClient: &http.Client{Timeout: time.Minute}, - hostID: uuid.NewString(), - logger: logger.Named("bandwidth-telemetry"), - } -} - -func getStatsPerProtocol(protocolID protocol.ID, stats map[protocol.ID]metrics.Stats) map[string]interface{} { - return map[string]interface{}{ - "rateIn": stats[protocolID].RateIn, - "rateOut": stats[protocolID].RateOut, - "totalIn": stats[protocolID].TotalIn, - "totalOut": stats[protocolID].TotalOut, - } -} - -func (c *BandwidthTelemetryClient) getTelemetryRequestBody(stats map[protocol.ID]metrics.Stats) map[string]interface{} { - return map[string]interface{}{ - "hostID": c.hostID, - "relay": getStatsPerProtocol(relay.WakuRelayID_v200, stats), - "store": getStatsPerProtocol(legacy_store.StoreID_v20beta4, stats), - "filter-push": getStatsPerProtocol(filter.FilterPushID_v20beta1, stats), - "filter-subscribe": getStatsPerProtocol(filter.FilterSubscribeID_v20beta1, stats), - "lightpush": getStatsPerProtocol(lightpush.LightPushID_v20beta1, stats), - } -} - -func (c *BandwidthTelemetryClient) PushProtocolStats(stats map[protocol.ID]metrics.Stats) { - url := fmt.Sprintf("%s/protocol-stats", c.serverURL) - body, _ := json.Marshal(c.getTelemetryRequestBody(stats)) - _, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) - if err != nil { - c.logger.Error("Error sending message to telemetry server", zap.Error(err)) - } -} diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 980be1f1a6..091474ddd7 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -79,6 +79,8 @@ import ( node "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + + telemetry "github.com/status-im/status-go/telemetry/common" ) const messageQueueLimit = 1024 @@ -93,24 +95,6 @@ const peersToPublishForLightpush = 2 const publishingLimiterRate = rate.Limit(2) const publishingLimitBurst = 4 -type SentEnvelope struct { - Envelope *protocol.Envelope - PublishMethod PublishMethod -} - -type ErrorSendingEnvelope struct { - Error error - SentEnvelope SentEnvelope -} - -type ITelemetryClient interface { - PushReceivedEnvelope(ctx context.Context, receivedEnvelope *protocol.Envelope) - PushSentEnvelope(ctx context.Context, sentEnvelope SentEnvelope) - PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope ErrorSendingEnvelope) - PushPeerCount(ctx context.Context, peerCount int) - PushPeerConnFailures(ctx context.Context, peerConnFailures map[string]int) -} - // Waku represents a dark communication interface through the Ethereum // network, using its very own P2P communication layer. type Waku struct { @@ -181,12 +165,12 @@ type Waku struct { onHistoricMessagesRequestFailed func([]byte, peer.ID, error) onPeerStats func(types.ConnStatus) - statusTelemetryClient ITelemetryClient + statusTelemetryClient telemetry.ITelemetryClient defaultShardInfo protocol.RelayShards } -func (w *Waku) SetStatusTelemetryClient(client ITelemetryClient) { +func (w *Waku) SetStatusTelemetryClient(client telemetry.ITelemetryClient) { w.statusTelemetryClient = client } @@ -509,16 +493,7 @@ func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origi w.node.AddDiscoveredPeer(peerInfo.ID, peerInfo.Addrs, origin, w.cfg.DefaultShardedPubsubTopics, enr, true) } -func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { - w.wg.Add(1) - defer w.wg.Done() - - if telemetryServerURL == "" { - return - } - - telemetry := NewBandwidthTelemetryClient(w.logger, telemetryServerURL) - +func (w *Waku) telemetryBandwidthStats() { ticker := time.NewTicker(time.Second * 20) defer ticker.Stop() @@ -535,7 +510,7 @@ func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { w.bandwidthCounter.Reset() } - go telemetry.PushProtocolStats(w.bandwidthCounter.GetBandwidthByProtocol()) + w.statusTelemetryClient.PushProtocolStats(w.ctx, w.bandwidthCounter.GetBandwidthByProtocol()) } } } @@ -1107,7 +1082,10 @@ func (w *Waku) Start() error { } }() - go w.telemetryBandwidthStats(w.cfg.TelemetryServerURL) + if w.statusTelemetryClient != nil { + go w.telemetryBandwidthStats() + } + //TODO: commenting for now so that only fleet nodes are used. //Need to uncomment once filter peer scoring etc is implemented. go w.runPeerExchangeLoop() diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index e37f1a8d24..d322d2ae55 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -3,7 +3,6 @@ package wakuv2 import ( "context" "crypto/rand" - "encoding/json" "errors" "math/big" "os" @@ -14,8 +13,6 @@ import ( "go.uber.org/zap" "github.com/cenkalti/backoff/v3" - "github.com/libp2p/go-libp2p/core/metrics" - libp2pprotocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" @@ -30,10 +27,7 @@ import ( wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" - "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/status-im/status-go/appdatabase" @@ -736,28 +730,3 @@ func TestLightpushRateLimit(t *testing.T) { require.Len(t, messages, 2) } - -func TestTelemetryFormat(t *testing.T) { - logger, err := zap.NewDevelopment() - require.NoError(t, err) - - tc := NewBandwidthTelemetryClient(logger, "#") - - s := metrics.Stats{ - TotalIn: 10, - TotalOut: 20, - RateIn: 30, - RateOut: 40, - } - - m := make(map[libp2pprotocol.ID]metrics.Stats) - m[relay.WakuRelayID_v200] = s - m[filter.FilterPushID_v20beta1] = s - m[filter.FilterSubscribeID_v20beta1] = s - m[legacy_store.StoreID_v20beta4] = s - m[lightpush.LightPushID_v20beta1] = s - - requestBody := tc.getTelemetryRequestBody(m) - _, err = json.Marshal(requestBody) - require.NoError(t, err) -}