From 56e5e96b8841c92354fd1bd33beb51f21e610c38 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Thu, 12 Sep 2024 15:19:22 +0200 Subject: [PATCH 01/15] add subscription metrics --- .../chainlib/consumer_websocket_manager.go | 2 +- .../consumer_ws_subscription_manager.go | 19 ++ .../consumer_ws_subscription_manager_test.go | 6 +- protocol/metrics/metrics_consumer_manager.go | 166 ++++++++++++------ protocol/metrics/rpcconsumerlogs.go | 15 ++ protocol/rpcconsumer/rpcconsumer.go | 2 +- .../pre_setups/init_lava_only_with_node.sh | 8 +- 7 files changed, 156 insertions(+), 62 deletions(-) diff --git a/protocol/chainlib/consumer_websocket_manager.go b/protocol/chainlib/consumer_websocket_manager.go index 5b36e81e5b..81a3a8ea8e 100644 --- a/protocol/chainlib/consumer_websocket_manager.go +++ b/protocol/chainlib/consumer_websocket_manager.go @@ -159,7 +159,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() { continue } - // check whether its a normal relay / unsubscribe / unsubscribe_all otherwise its a subscription flow. + // check whether it's a normal relay / unsubscribe / unsubscribe_all otherwise its a subscription flow. if !IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) { if IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_UNSUBSCRIBE) { err := cwm.consumerWsSubscriptionManager.Unsubscribe(webSocketCtx, protocolMessage, dappID, userIp, cwm.WebsocketConnectionUID, metricsData) diff --git a/protocol/chainlib/consumer_ws_subscription_manager.go b/protocol/chainlib/consumer_ws_subscription_manager.go index e19420812f..c422e411fe 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager.go +++ b/protocol/chainlib/consumer_ws_subscription_manager.go @@ -18,6 +18,15 @@ import ( spectypes "github.com/lavanet/lava/v3/x/spec/types" ) +// is there a place to declare enums etc? +type RelayDisconnectReasonEnum string + +const ( + ConsumerDisconnect RelayDisconnectReasonEnum = "ConsumerDisconnect" + ProviderDisconnect RelayDisconnectReasonEnum = "ProviderDisconnect" + UserDisconnect RelayDisconnectReasonEnum = "UserDisconnect" +) + type unsubscribeRelayData struct { protocolMessage ProtocolMessage } @@ -56,6 +65,7 @@ type ConsumerWSSubscriptionManager struct { activeSubscriptionProvidersStorage *lavasession.ActiveSubscriptionProvidersStorage currentlyPendingSubscriptions map[string]*pendingSubscriptionsBroadcastManager lock sync.RWMutex + rpcConsumerLogs *metrics.RPCConsumerLogs } func NewConsumerWSSubscriptionManager( @@ -65,6 +75,7 @@ func NewConsumerWSSubscriptionManager( connectionType string, chainParser ChainParser, activeSubscriptionProvidersStorage *lavasession.ActiveSubscriptionProvidersStorage, + rpcConsumerLogs *metrics.RPCConsumerLogs, ) *ConsumerWSSubscriptionManager { return &ConsumerWSSubscriptionManager{ connectedDapps: make(map[string]map[string]*common.SafeChannelSender[*pairingtypes.RelayReply]), @@ -76,6 +87,7 @@ func NewConsumerWSSubscriptionManager( relaySender: relaySender, connectionType: connectionType, activeSubscriptionProvidersStorage: activeSubscriptionProvidersStorage, + rpcConsumerLogs: rpcConsumerLogs, } } @@ -190,6 +202,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription( webSocketConnectionUniqueId string, metricsData *metrics.RelayMetrics, ) (firstReply *pairingtypes.RelayReply, repliesChan <-chan *pairingtypes.RelayReply, err error) { + go cwsm.rpcConsumerLogs.SetRelaySubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType) hashedParams, _, err := cwsm.getHashedParams(protocolMessage) if err != nil { return nil, nil, utils.LavaFormatError("could not marshal params", err) @@ -217,6 +230,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription( // called after send relay failure or parsing failure afterwards onSubscriptionFailure := func() { + go cwsm.rpcConsumerLogs.SetFailedRelaySubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType) cwsm.failedPendingSubscription(hashedParams) closeWebsocketRepliesChannel() } @@ -256,6 +270,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription( // Validated there are no active subscriptions that we can use. firstSubscriptionReply, returnWebsocketRepliesChan := cwsm.checkForActiveSubscriptionWithLock(webSocketCtx, hashedParams, protocolMessage, dappKey, websocketRepliesSafeChannelSender, closeWebsocketRepliesChannel) if firstSubscriptionReply != nil { + go cwsm.rpcConsumerLogs.SetDuplicatedRelaySubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType) if returnWebsocketRepliesChan { return firstSubscriptionReply, websocketRepliesChan, nil } @@ -525,12 +540,14 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages( utils.LogAttr("GUID", webSocketCtx), utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)), ) + go cwsm.rpcConsumerLogs.SetRelaySubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, string(UserDisconnect)) return case <-replyServer.Context().Done(): utils.LavaFormatTrace("reply server context canceled", utils.LogAttr("GUID", webSocketCtx), utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)), ) + go cwsm.rpcConsumerLogs.SetRelaySubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, string(ConsumerDisconnect)) return default: var reply pairingtypes.RelayReply @@ -538,6 +555,7 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages( if err != nil { // The connection was closed by the provider utils.LavaFormatTrace("error reading from subscription stream", utils.LogAttr("original error", err.Error())) + go cwsm.rpcConsumerLogs.SetRelaySubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, string(ProviderDisconnect)) return } err = cwsm.handleIncomingSubscriptionNodeMessage(hashedParams, &reply, providerAddr) @@ -546,6 +564,7 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages( utils.LogAttr("hashedParams", hashedParams), utils.LogAttr("reply", reply), ) + go cwsm.rpcConsumerLogs.SetFailedRelaySubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType) return } } diff --git a/protocol/chainlib/consumer_ws_subscription_manager_test.go b/protocol/chainlib/consumer_ws_subscription_manager_test.go index c549cb6772..b32677acaf 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager_test.go +++ b/protocol/chainlib/consumer_ws_subscription_manager_test.go @@ -136,7 +136,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String()) // Create a new ConsumerWSSubscriptionManager - manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage()) + manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil) uniqueIdentifiers := make([]string, numberOfParallelSubscriptions) wg := sync.WaitGroup{} wg.Add(numberOfParallelSubscriptions) @@ -293,7 +293,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) { consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String()) // Create a new ConsumerWSSubscriptionManager - manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage()) + manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil) wg := sync.WaitGroup{} wg.Add(10) @@ -538,7 +538,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String()) // Create a new ConsumerWSSubscriptionManager - manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage()) + manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil) // Start a new subscription for the first time, called SendParsedRelay once ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) diff --git a/protocol/metrics/metrics_consumer_manager.go b/protocol/metrics/metrics_consumer_manager.go index 2f3337e432..325ad80328 100644 --- a/protocol/metrics/metrics_consumer_manager.go +++ b/protocol/metrics/metrics_consumer_manager.go @@ -26,33 +26,37 @@ func (lt *LatencyTracker) AddLatency(latency time.Duration) { } type ConsumerMetricsManager struct { - totalCURequestedMetric *prometheus.CounterVec - totalRelaysRequestedMetric *prometheus.CounterVec - totalErroredMetric *prometheus.CounterVec - totalNodeErroredMetric *prometheus.CounterVec - totalNodeErroredRecoveredSuccessfullyMetric *prometheus.CounterVec - totalNodeErroredRecoveryAttemptsMetric *prometheus.CounterVec - totalRelaysSentToProvidersMetric *prometheus.CounterVec - totalRelaysSentByNewBatchTickerMetric *prometheus.CounterVec - blockMetric *prometheus.GaugeVec - latencyMetric *prometheus.GaugeVec - qosMetric *prometheus.GaugeVec - qosExcellenceMetric *prometheus.GaugeVec - LatestBlockMetric *prometheus.GaugeVec - LatestProviderRelay *prometheus.GaugeVec - virtualEpochMetric *prometheus.GaugeVec - apiMethodCalls *prometheus.GaugeVec - endpointsHealthChecksOkMetric prometheus.Gauge - endpointsHealthChecksOk uint64 - lock sync.Mutex - protocolVersionMetric *prometheus.GaugeVec - providerRelays map[string]uint64 - addMethodsApiGauge bool - averageLatencyPerChain map[string]*LatencyTracker // key == chain Id + api interface - averageLatencyMetric *prometheus.GaugeVec - relayProcessingLatencyBeforeProvider *prometheus.GaugeVec - relayProcessingLatencyAfterProvider *prometheus.GaugeVec - averageProcessingLatency map[string]*LatencyTracker + totalCURequestedMetric *prometheus.CounterVec + totalRelaysRequestedMetric *prometheus.CounterVec + totalErroredMetric *prometheus.CounterVec + totalNodeErroredMetric *prometheus.CounterVec + totalNodeErroredRecoveredSuccessfullyMetric *prometheus.CounterVec + totalNodeErroredRecoveryAttemptsMetric *prometheus.CounterVec + totalRelaysSentToProvidersMetric *prometheus.CounterVec + totalRelaysSentByNewBatchTickerMetric *prometheus.CounterVec + totalRelaySubscriptionRequestsMetric *prometheus.CounterVec + totalFailedRelaySubscriptionRequestsMetric *prometheus.CounterVec + totalRelaySubscriptionDissconnectMetric *prometheus.CounterVec + totalDuplicatedRelaySubscriptionRequestsMetric *prometheus.CounterVec + blockMetric *prometheus.GaugeVec + latencyMetric *prometheus.GaugeVec + qosMetric *prometheus.GaugeVec + qosExcellenceMetric *prometheus.GaugeVec + LatestBlockMetric *prometheus.GaugeVec + LatestProviderRelay *prometheus.GaugeVec + virtualEpochMetric *prometheus.GaugeVec + apiMethodCalls *prometheus.GaugeVec + endpointsHealthChecksOkMetric prometheus.Gauge + endpointsHealthChecksOk uint64 + lock sync.Mutex + protocolVersionMetric *prometheus.GaugeVec + providerRelays map[string]uint64 + addMethodsApiGauge bool + averageLatencyPerChain map[string]*LatencyTracker // key == chain Id + api interface + averageLatencyMetric *prometheus.GaugeVec + relayProcessingLatencyBeforeProvider *prometheus.GaugeVec + relayProcessingLatencyAfterProvider *prometheus.GaugeVec + averageProcessingLatency map[string]*LatencyTracker } type ConsumerMetricsManagerOptions struct { @@ -88,6 +92,26 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM Help: "The total number of errors encountered by the consumer over time.", }, []string{"spec", "apiInterface"}) + totalRelaySubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "lava_consumer_total_relay_subscription_requests", + Help: "The total number of relay subscription requests by the consumer over time per chain id per api interface.", + }, []string{"spec", "apiInterface"}) + + totalFailedRelaySubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "lava_consumer_total_failed_subscription_requests", + Help: "The total number of failed relay subscription requests by the consumer over time per chain id per api interface.", + }, []string{"spec", "apiInterface"}) + + totalDuplicatedRelaySubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "lava_consumer_total_duplicated_subscription_requests", + Help: "The total number of duplicated relay subscription requests by the consumer over time per chain id per api interface.", + }, []string{"spec", "apiInterface"}) + + totalRelaySubscriptionDissconnectMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "lava_consumer_total_subscription_dissconnect", + Help: "The total number of relay subscription dissconnets over time per chain id per api interface per dissconnect reason.", + }, []string{"spec", "apiInterface", "dissconectReason"}) + blockMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "lava_latest_block", Help: "The latest block measured", @@ -196,34 +220,42 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM prometheus.MustRegister(totalNodeErroredRecoveryAttemptsMetric) prometheus.MustRegister(relayProcessingLatencyBeforeProvider) prometheus.MustRegister(relayProcessingLatencyAfterProvider) + prometheus.MustRegister(totalRelaySubscriptionRequestsMetric) + prometheus.MustRegister(totalFailedRelaySubscriptionRequestsMetric) + prometheus.MustRegister(totalDuplicatedRelaySubscriptionRequestsMetric) + prometheus.MustRegister(totalRelaySubscriptionDissconnectMetric) consumerMetricsManager := &ConsumerMetricsManager{ - totalCURequestedMetric: totalCURequestedMetric, - totalRelaysRequestedMetric: totalRelaysRequestedMetric, - totalErroredMetric: totalErroredMetric, - blockMetric: blockMetric, - latencyMetric: latencyMetric, - qosMetric: qosMetric, - qosExcellenceMetric: qosExcellenceMetric, - LatestBlockMetric: latestBlockMetric, - LatestProviderRelay: latestProviderRelay, - providerRelays: map[string]uint64{}, - averageLatencyPerChain: map[string]*LatencyTracker{}, - virtualEpochMetric: virtualEpochMetric, - endpointsHealthChecksOkMetric: endpointsHealthChecksOkMetric, - endpointsHealthChecksOk: 1, - protocolVersionMetric: protocolVersionMetric, - averageLatencyMetric: averageLatencyMetric, - totalRelaysSentByNewBatchTickerMetric: totalRelaysSentByNewBatchTickerMetric, - apiMethodCalls: apiSpecificsMetric, - addMethodsApiGauge: options.AddMethodsApiGauge, - totalNodeErroredMetric: totalNodeErroredMetric, - totalNodeErroredRecoveredSuccessfullyMetric: totalNodeErroredRecoveredSuccessfullyMetric, - totalNodeErroredRecoveryAttemptsMetric: totalNodeErroredRecoveryAttemptsMetric, - totalRelaysSentToProvidersMetric: totalRelaysSentToProvidersMetric, - relayProcessingLatencyBeforeProvider: relayProcessingLatencyBeforeProvider, - relayProcessingLatencyAfterProvider: relayProcessingLatencyAfterProvider, - averageProcessingLatency: map[string]*LatencyTracker{}, + totalCURequestedMetric: totalCURequestedMetric, + totalRelaysRequestedMetric: totalRelaysRequestedMetric, + totalRelaySubscriptionRequestsMetric: totalRelaySubscriptionRequestsMetric, + totalFailedRelaySubscriptionRequestsMetric: totalFailedRelaySubscriptionRequestsMetric, + totalDuplicatedRelaySubscriptionRequestsMetric: totalDuplicatedRelaySubscriptionRequestsMetric, + totalRelaySubscriptionDissconnectMetric: totalRelaySubscriptionDissconnectMetric, + totalErroredMetric: totalErroredMetric, + blockMetric: blockMetric, + latencyMetric: latencyMetric, + qosMetric: qosMetric, + qosExcellenceMetric: qosExcellenceMetric, + LatestBlockMetric: latestBlockMetric, + LatestProviderRelay: latestProviderRelay, + providerRelays: map[string]uint64{}, + averageLatencyPerChain: map[string]*LatencyTracker{}, + virtualEpochMetric: virtualEpochMetric, + endpointsHealthChecksOkMetric: endpointsHealthChecksOkMetric, + endpointsHealthChecksOk: 1, + protocolVersionMetric: protocolVersionMetric, + averageLatencyMetric: averageLatencyMetric, + totalRelaysSentByNewBatchTickerMetric: totalRelaysSentByNewBatchTickerMetric, + apiMethodCalls: apiSpecificsMetric, + addMethodsApiGauge: options.AddMethodsApiGauge, + totalNodeErroredMetric: totalNodeErroredMetric, + totalNodeErroredRecoveredSuccessfullyMetric: totalNodeErroredRecoveredSuccessfullyMetric, + totalNodeErroredRecoveryAttemptsMetric: totalNodeErroredRecoveryAttemptsMetric, + totalRelaysSentToProvidersMetric: totalRelaysSentToProvidersMetric, + relayProcessingLatencyBeforeProvider: relayProcessingLatencyBeforeProvider, + relayProcessingLatencyAfterProvider: relayProcessingLatencyAfterProvider, + averageProcessingLatency: map[string]*LatencyTracker{}, } http.Handle("/metrics", promhttp.Handler()) @@ -460,3 +492,31 @@ func SetVersionInner(protocolVersionMetric *prometheus.GaugeVec, version string) combined := major*1000000 + minor*1000 + patch protocolVersionMetric.WithLabelValues("version").Set(float64(combined)) } + +func (pme *ConsumerMetricsManager) SetRelaySubscriptionRequestMetric(chainId string, apiInterface string) { + if pme == nil { + return + } + pme.totalRelaySubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc() +} + +func (pme *ConsumerMetricsManager) SetFailedRelaySubscriptionRequestMetric(chainId string, apiInterface string) { + if pme == nil { + return + } + pme.totalFailedRelaySubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc() +} + +func (pme *ConsumerMetricsManager) SetDuplicatedRelaySubscriptionRequestMetric(chainId string, apiInterface string) { + if pme == nil { + return + } + pme.totalDuplicatedRelaySubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc() +} + +func (pme *ConsumerMetricsManager) SetRelaySubscriptioDisconnectRequestMetric(chainId string, apiInterface string, dissconnectReason string) { + if pme == nil { + return + } + pme.totalRelaySubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, dissconnectReason).Inc() +} diff --git a/protocol/metrics/rpcconsumerlogs.go b/protocol/metrics/rpcconsumerlogs.go index 606ce8f114..511c1cbc0f 100644 --- a/protocol/metrics/rpcconsumerlogs.go +++ b/protocol/metrics/rpcconsumerlogs.go @@ -263,3 +263,18 @@ func (rpccl *RPCConsumerLogs) LogTestMode(fiberCtx *fiber.Ctx) { } utils.LavaFormatInfo(st) } + +func (rpccl *RPCConsumerLogs) SetRelaySubscriptionRequestMetric(chainId string, apiInterface string) { + rpccl.consumerMetricsManager.SetRelaySubscriptionRequestMetric(chainId, apiInterface) +} + +func (rpccl *RPCConsumerLogs) SetFailedRelaySubscriptionRequestMetric(chainId string, apiInterface string) { + rpccl.consumerMetricsManager.SetFailedRelaySubscriptionRequestMetric(chainId, apiInterface) +} + +func (rpccl *RPCConsumerLogs) SetDuplicatedRelaySubscriptionRequestMetric(chainId string, apiInterface string) { + rpccl.consumerMetricsManager.SetDuplicatedRelaySubscriptionRequestMetric(chainId, apiInterface) +} +func (rpccl *RPCConsumerLogs) SetRelaySubscriptioDisconnectRequestMetric(chainId string, apiInterface string, disconnectReason string) { + rpccl.consumerMetricsManager.SetRelaySubscriptioDisconnectRequestMetric(chainId, apiInterface, disconnectReason) +} diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index cd5e9eb4e7..89f1c50c66 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -304,7 +304,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt if rpcEndpoint.ApiInterface == spectypes.APIInterfaceJsonRPC { specMethodType = http.MethodPost } - consumerWsSubscriptionManager = chainlib.NewConsumerWSSubscriptionManager(consumerSessionManager, rpcConsumerServer, options.refererData, specMethodType, chainParser, activeSubscriptionProvidersStorage) + consumerWsSubscriptionManager = chainlib.NewConsumerWSSubscriptionManager(consumerSessionManager, rpcConsumerServer, options.refererData, specMethodType, chainParser, activeSubscriptionProvidersStorage, rpcConsumerMetrics) utils.LavaFormatInfo("RPCConsumer Listening", utils.Attribute{Key: "endpoints", Value: rpcEndpoint.String()}) err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, rpcc.consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, options.requiredResponses, privKey, lavaChainID, options.cache, rpcConsumerMetrics, consumerAddr, consumerConsistency, relaysMonitor, options.cmdFlags, options.stateShare, options.refererData, consumerReportsManager, consumerWsSubscriptionManager) diff --git a/scripts/pre_setups/init_lava_only_with_node.sh b/scripts/pre_setups/init_lava_only_with_node.sh index d99ddc4094..a08a3e0965 100755 --- a/scripts/pre_setups/init_lava_only_with_node.sh +++ b/scripts/pre_setups/init_lava_only_with_node.sh @@ -48,16 +48,16 @@ sleep_until_next_epoch wait_next_block screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \ -$PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \ -$PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' \ -$PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \ +$PROVIDER1_LISTENER LAV1 rest http://0.0.0.0:1317 \ +$PROVIDER1_LISTENER LAV1 tendermintrpc http://0.0.0.0:26657,ws://0.0.0.0:26657/websocket \ +$PROVIDER1_LISTENER LAV1 grpc 0.0.0.0:9090 \ $EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level trace --from servicer1 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 wait_next_block screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \ 127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \ -$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level trace --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 +$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level trace --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7780" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 echo "--- setting up screens done ---" screen -ls \ No newline at end of file From 6a4f2c0ac1605adc740e059785f10480b1c010ea Mon Sep 17 00:00:00 2001 From: leon mandel Date: Fri, 13 Sep 2024 14:56:47 +0200 Subject: [PATCH 02/15] fix metric typo --- protocol/metrics/metrics_consumer_manager.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/protocol/metrics/metrics_consumer_manager.go b/protocol/metrics/metrics_consumer_manager.go index 325ad80328..253dd0d4bc 100644 --- a/protocol/metrics/metrics_consumer_manager.go +++ b/protocol/metrics/metrics_consumer_manager.go @@ -98,18 +98,18 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM }, []string{"spec", "apiInterface"}) totalFailedRelaySubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "lava_consumer_total_failed_subscription_requests", + Name: "lava_consumer_total_failed_relay_subscription_requests", Help: "The total number of failed relay subscription requests by the consumer over time per chain id per api interface.", }, []string{"spec", "apiInterface"}) totalDuplicatedRelaySubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "lava_consumer_total_duplicated_subscription_requests", + Name: "lava_consumer_total_duplicated_relay_subscription_requests", Help: "The total number of duplicated relay subscription requests by the consumer over time per chain id per api interface.", }, []string{"spec", "apiInterface"}) totalRelaySubscriptionDissconnectMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "lava_consumer_total_subscription_dissconnect", - Help: "The total number of relay subscription dissconnets over time per chain id per api interface per dissconnect reason.", + Name: "lava_consumer_total_relay_subscription_disconnect", + Help: "The total number of relay subscription disconnets over time per chain id per api interface per dissconnect reason.", }, []string{"spec", "apiInterface", "dissconectReason"}) blockMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ @@ -514,9 +514,9 @@ func (pme *ConsumerMetricsManager) SetDuplicatedRelaySubscriptionRequestMetric(c pme.totalDuplicatedRelaySubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc() } -func (pme *ConsumerMetricsManager) SetRelaySubscriptioDisconnectRequestMetric(chainId string, apiInterface string, dissconnectReason string) { +func (pme *ConsumerMetricsManager) SetRelaySubscriptioDisconnectRequestMetric(chainId string, apiInterface string, disconnectReason string) { if pme == nil { return } - pme.totalRelaySubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, dissconnectReason).Inc() + pme.totalRelaySubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, disconnectReason).Inc() } From 62a7fdf2539bb7a8f8729d12abac776205d4567e Mon Sep 17 00:00:00 2001 From: leon mandel Date: Sun, 15 Sep 2024 10:30:07 +0200 Subject: [PATCH 03/15] fix pr --- scripts/pre_setups/init_lava_only_with_node.sh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scripts/pre_setups/init_lava_only_with_node.sh b/scripts/pre_setups/init_lava_only_with_node.sh index a08a3e0965..d99ddc4094 100755 --- a/scripts/pre_setups/init_lava_only_with_node.sh +++ b/scripts/pre_setups/init_lava_only_with_node.sh @@ -48,16 +48,16 @@ sleep_until_next_epoch wait_next_block screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \ -$PROVIDER1_LISTENER LAV1 rest http://0.0.0.0:1317 \ -$PROVIDER1_LISTENER LAV1 tendermintrpc http://0.0.0.0:26657,ws://0.0.0.0:26657/websocket \ -$PROVIDER1_LISTENER LAV1 grpc 0.0.0.0:9090 \ +$PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \ +$PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' \ +$PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \ $EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level trace --from servicer1 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 wait_next_block screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \ 127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \ -$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level trace --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7780" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 +$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level trace --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 echo "--- setting up screens done ---" screen -ls \ No newline at end of file From dfc3cb8d9f61ff3c8c4213112415ad8444d4b746 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Sun, 15 Sep 2024 10:34:27 +0200 Subject: [PATCH 04/15] fix typo --- protocol/metrics/metrics_consumer_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/metrics/metrics_consumer_manager.go b/protocol/metrics/metrics_consumer_manager.go index 253dd0d4bc..911ea3f04a 100644 --- a/protocol/metrics/metrics_consumer_manager.go +++ b/protocol/metrics/metrics_consumer_manager.go @@ -109,7 +109,7 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM totalRelaySubscriptionDissconnectMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "lava_consumer_total_relay_subscription_disconnect", - Help: "The total number of relay subscription disconnets over time per chain id per api interface per dissconnect reason.", + Help: "The total number of relay subscription disconnects over time per chain id per api interface per dissconnect reason.", }, []string{"spec", "apiInterface", "dissconectReason"}) blockMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ From 849603d63a7a9135a9bfd96f631c062b0916903f Mon Sep 17 00:00:00 2001 From: leon mandel Date: Sun, 15 Sep 2024 10:54:34 +0200 Subject: [PATCH 05/15] fix lint --- protocol/metrics/rpcconsumerlogs.go | 1 + 1 file changed, 1 insertion(+) diff --git a/protocol/metrics/rpcconsumerlogs.go b/protocol/metrics/rpcconsumerlogs.go index 511c1cbc0f..51cf2c567b 100644 --- a/protocol/metrics/rpcconsumerlogs.go +++ b/protocol/metrics/rpcconsumerlogs.go @@ -275,6 +275,7 @@ func (rpccl *RPCConsumerLogs) SetFailedRelaySubscriptionRequestMetric(chainId st func (rpccl *RPCConsumerLogs) SetDuplicatedRelaySubscriptionRequestMetric(chainId string, apiInterface string) { rpccl.consumerMetricsManager.SetDuplicatedRelaySubscriptionRequestMetric(chainId, apiInterface) } + func (rpccl *RPCConsumerLogs) SetRelaySubscriptioDisconnectRequestMetric(chainId string, apiInterface string, disconnectReason string) { rpccl.consumerMetricsManager.SetRelaySubscriptioDisconnectRequestMetric(chainId, apiInterface, disconnectReason) } From c8be64451a6dcec2ab3cd288f03eb06456b8480c Mon Sep 17 00:00:00 2001 From: leon mandel Date: Sun, 15 Sep 2024 12:25:41 +0200 Subject: [PATCH 06/15] fix tests --- .../consumer_ws_subscription_manager_test.go | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/protocol/chainlib/consumer_ws_subscription_manager_test.go b/protocol/chainlib/consumer_ws_subscription_manager_test.go index b32677acaf..943f10e3cd 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager_test.go +++ b/protocol/chainlib/consumer_ws_subscription_manager_test.go @@ -27,6 +27,9 @@ import ( const ( numberOfParallelSubscriptions = 10 uniqueId = "1234" + projectHashTest = "test_projecthash" + chainIdTest = "test_chainId" + apiTypeTest = "test_apiType" ) func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *testing.T) { @@ -51,7 +54,8 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes subscriptionFirstReply2: []byte(`{"jsonrpc":"2.0","id":4,"result":{}}`), }, } - + rpcconsumerLogs, _ := metrics.NewRPCConsumerLogs(nil, nil) + metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest) for _, play := range playbook { t.Run(play.name, func(t *testing.T) { ts := SetupForTests(t, 1, play.specId, "../../") @@ -136,7 +140,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String()) // Create a new ConsumerWSSubscriptionManager - manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil) + manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs) uniqueIdentifiers := make([]string, numberOfParallelSubscriptions) wg := sync.WaitGroup{} wg.Add(numberOfParallelSubscriptions) @@ -151,7 +155,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes var repliesChan <-chan *pairingtypes.RelayReply var firstReply *pairingtypes.RelayReply - firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[index], nil) + firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[index], metricsData) go func() { for subMsg := range repliesChan { // utils.LavaFormatInfo("got reply for index", utils.LogAttr("index", index)) @@ -169,7 +173,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes // now we have numberOfParallelSubscriptions subscriptions currently running require.Len(t, manager.connectedDapps, numberOfParallelSubscriptions) // remove one - err = manager.Unsubscribe(ts.Ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[0], nil) + err = manager.Unsubscribe(ts.Ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[0], metricsData) require.NoError(t, err) // now we have numberOfParallelSubscriptions - 1 require.Len(t, manager.connectedDapps, numberOfParallelSubscriptions-1) @@ -177,7 +181,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes require.Len(t, manager.activeSubscriptions, 1) // same flow for unsubscribe all - err = manager.UnsubscribeAll(ts.Ctx, dapp, ip, uniqueIdentifiers[1], nil) + err = manager.UnsubscribeAll(ts.Ctx, dapp, ip, uniqueIdentifiers[1], metricsData) require.NoError(t, err) // now we have numberOfParallelSubscriptions - 2 require.Len(t, manager.connectedDapps, numberOfParallelSubscriptions-2) @@ -209,7 +213,6 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) { subscriptionFirstReply2: []byte(`{"jsonrpc":"2.0","id":4,"result":{}}`), }, } - for _, play := range playbook { t.Run(play.name, func(t *testing.T) { ts := SetupForTests(t, 1, play.specId, "../../") @@ -291,9 +294,10 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) { Times(1) // Should call SendParsedRelay, because it is the first time we subscribe consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String()) - + rpcconsumerLogs, _ := metrics.NewRPCConsumerLogs(nil, nil) + metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest) // Create a new ConsumerWSSubscriptionManager - manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil) + manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs) wg := sync.WaitGroup{} wg.Add(10) @@ -305,7 +309,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) { ctx := utils.WithUniqueIdentifier(ts.Ctx, utils.GenerateUniqueIdentifier()) var repliesChan <-chan *pairingtypes.RelayReply var firstReply *pairingtypes.RelayReply - firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp+strconv.Itoa(index), ts.Consumer.Addr.String(), uniqueId, nil) + firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp+strconv.Itoa(index), ts.Consumer.Addr.String(), uniqueId, metricsData) go func() { for subMsg := range repliesChan { require.Equal(t, string(play.subscriptionFirstReply1), string(subMsg.Data)) @@ -379,6 +383,8 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { unsubscribeMessage2: []byte(`{"jsonrpc":"2.0","method":"eth_unsubscribe","params":["0x2134567890"],"id":1}`), }, } + rpcconsumerLogs, _ := metrics.NewRPCConsumerLogs(nil, nil) + metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest) for _, play := range playbook { t.Run(play.name, func(t *testing.T) { @@ -538,12 +544,12 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String()) // Create a new ConsumerWSSubscriptionManager - manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil) + manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs) // Start a new subscription for the first time, called SendParsedRelay once ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) - firstReply, repliesChan1, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, nil) + firstReply, repliesChan1, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData) assert.NoError(t, err) unsubscribeMessageWg.Add(1) assert.Equal(t, string(play.subscriptionFirstReply1), string(firstReply.Data)) @@ -559,7 +565,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { // Start a subscription again, same params, same dappKey, should not call SendParsedRelay ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) - firstReply, repliesChan2, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, nil) + firstReply, repliesChan2, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData) assert.NoError(t, err) assert.Equal(t, string(play.subscriptionFirstReply1), string(firstReply.Data)) assert.Nil(t, repliesChan2) // Same subscription, same dappKey, no need for a new channel @@ -568,7 +574,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { // Start a subscription again, same params, different dappKey, should not call SendParsedRelay ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) - firstReply, repliesChan3, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp2, ts.Consumer.Addr.String(), uniqueId, nil) + firstReply, repliesChan3, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp2, ts.Consumer.Addr.String(), uniqueId, metricsData) assert.NoError(t, err) assert.Equal(t, string(play.subscriptionFirstReply1), string(firstReply.Data)) assert.NotNil(t, repliesChan3) // Same subscription, but different dappKey, so will create new channel @@ -652,7 +658,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { // Start a subscription again, different params, same dappKey, should call SendParsedRelay ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) - firstReply, repliesChan4, err := manager.StartSubscription(ctx, subscribeProtocolMessage2, dapp1, ts.Consumer.Addr.String(), uniqueId, nil) + firstReply, repliesChan4, err := manager.StartSubscription(ctx, subscribeProtocolMessage2, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData) assert.NoError(t, err) unsubscribeMessageWg.Add(1) assert.Equal(t, string(play.subscriptionFirstReply2), string(firstReply.Data)) @@ -671,7 +677,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) unsubProtocolMessage := NewProtocolMessage(unsubscribeChainMessage1, nil, relayResult1.Request.RelayData, dapp2, ts.Consumer.Addr.String()) - err = manager.Unsubscribe(ctx, unsubProtocolMessage, dapp2, ts.Consumer.Addr.String(), uniqueId, nil) + err = manager.Unsubscribe(ctx, unsubProtocolMessage, dapp2, ts.Consumer.Addr.String(), uniqueId, metricsData) require.NoError(t, err) listenForExpectedMessages(ctx, repliesChan1, string(play.subscriptionFirstReply1)) @@ -697,7 +703,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { Times(2) // Should call SendParsedRelay, because it unsubscribed ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) - err = manager.UnsubscribeAll(ctx, dapp1, ts.Consumer.Addr.String(), uniqueId, nil) + err = manager.UnsubscribeAll(ctx, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData) require.NoError(t, err) expectNoMoreMessages(ctx, repliesChan1) From a0ebc455d98f66f20178f431fb0b9b046602489e Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Sun, 15 Sep 2024 17:11:37 +0300 Subject: [PATCH 07/15] Add "Connection refused" to allowedErrorsDuringEmergencyMode --- testutil/e2e/allowedErrorList.go | 1 + 1 file changed, 1 insertion(+) diff --git a/testutil/e2e/allowedErrorList.go b/testutil/e2e/allowedErrorList.go index bb95b79e5a..5b083bae70 100644 --- a/testutil/e2e/allowedErrorList.go +++ b/testutil/e2e/allowedErrorList.go @@ -13,6 +13,7 @@ var allowedErrors = map[string]string{ var allowedErrorsDuringEmergencyMode = map[string]string{ "connection refused": "Connection to tendermint port sometimes can happen as we shut down the node and we try to fetch info during emergency mode", + "Connection refused": "Connection to tendermint port sometimes can happen as we shut down the node and we try to fetch info during emergency mode", "connection reset by peer": "Connection to tendermint port sometimes can happen as we shut down the node and we try to fetch info during emergency mode", "Failed Querying EpochDetails": "Connection to tendermint port sometimes can happen as we shut down the node and we try to fetch info during emergency mode", } From f265d898871289e78fbf2e484696d836220f1154 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 17 Sep 2024 14:21:09 +0200 Subject: [PATCH 08/15] remove consumerSessionManager from consumer ws sub --- protocol/chainlib/consumer_ws_subscription_manager.go | 3 --- .../chainlib/consumer_ws_subscription_manager_test.go | 11 +++-------- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/protocol/chainlib/consumer_ws_subscription_manager.go b/protocol/chainlib/consumer_ws_subscription_manager.go index c422e411fe..e0bb1abd07 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager.go +++ b/protocol/chainlib/consumer_ws_subscription_manager.go @@ -58,7 +58,6 @@ type ConsumerWSSubscriptionManager struct { connectedDapps map[string]map[string]*common.SafeChannelSender[*pairingtypes.RelayReply] // first key is dapp key, second key is hashed params activeSubscriptions map[string]*activeSubscriptionHolder // key is params hash relaySender RelaySender - consumerSessionManager *lavasession.ConsumerSessionManager chainParser ChainParser refererData *RefererData connectionType string @@ -69,7 +68,6 @@ type ConsumerWSSubscriptionManager struct { } func NewConsumerWSSubscriptionManager( - consumerSessionManager *lavasession.ConsumerSessionManager, relaySender RelaySender, refererData *RefererData, connectionType string, @@ -81,7 +79,6 @@ func NewConsumerWSSubscriptionManager( connectedDapps: make(map[string]map[string]*common.SafeChannelSender[*pairingtypes.RelayReply]), activeSubscriptions: make(map[string]*activeSubscriptionHolder), currentlyPendingSubscriptions: make(map[string]*pendingSubscriptionsBroadcastManager), - consumerSessionManager: consumerSessionManager, chainParser: chainParser, refererData: refererData, relaySender: relaySender, diff --git a/protocol/chainlib/consumer_ws_subscription_manager_test.go b/protocol/chainlib/consumer_ws_subscription_manager_test.go index 943f10e3cd..b34a5dd281 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager_test.go +++ b/protocol/chainlib/consumer_ws_subscription_manager_test.go @@ -137,10 +137,8 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes Return(relayResult1, nil). Times(1) // Should call SendParsedRelay, because it is the first time we subscribe - consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String()) - // Create a new ConsumerWSSubscriptionManager - manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs) + manager := NewConsumerWSSubscriptionManager(relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs) uniqueIdentifiers := make([]string, numberOfParallelSubscriptions) wg := sync.WaitGroup{} wg.Add(numberOfParallelSubscriptions) @@ -293,11 +291,10 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) { Return(relayResult1, nil). Times(1) // Should call SendParsedRelay, because it is the first time we subscribe - consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String()) rpcconsumerLogs, _ := metrics.NewRPCConsumerLogs(nil, nil) metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest) // Create a new ConsumerWSSubscriptionManager - manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs) + manager := NewConsumerWSSubscriptionManager(relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs) wg := sync.WaitGroup{} wg.Add(10) @@ -541,10 +538,8 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { Return(relayResult1, nil). Times(1) // Should call SendParsedRelay, because it is the first time we subscribe - consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String()) - // Create a new ConsumerWSSubscriptionManager - manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs) + manager := NewConsumerWSSubscriptionManager(relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs) // Start a new subscription for the first time, called SendParsedRelay once ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) From de308d8edf1f8945a05d9d4d1d10406d9718c157 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 17 Sep 2024 14:26:10 +0200 Subject: [PATCH 09/15] add back session t ws sub --- protocol/chainlib/consumer_ws_subscription_manager.go | 4 ++++ .../chainlib/consumer_ws_subscription_manager_test.go | 11 ++++++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/protocol/chainlib/consumer_ws_subscription_manager.go b/protocol/chainlib/consumer_ws_subscription_manager.go index e6273f40d5..c422e411fe 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager.go +++ b/protocol/chainlib/consumer_ws_subscription_manager.go @@ -58,6 +58,7 @@ type ConsumerWSSubscriptionManager struct { connectedDapps map[string]map[string]*common.SafeChannelSender[*pairingtypes.RelayReply] // first key is dapp key, second key is hashed params activeSubscriptions map[string]*activeSubscriptionHolder // key is params hash relaySender RelaySender + consumerSessionManager *lavasession.ConsumerSessionManager chainParser ChainParser refererData *RefererData connectionType string @@ -68,6 +69,7 @@ type ConsumerWSSubscriptionManager struct { } func NewConsumerWSSubscriptionManager( + consumerSessionManager *lavasession.ConsumerSessionManager, relaySender RelaySender, refererData *RefererData, connectionType string, @@ -79,6 +81,7 @@ func NewConsumerWSSubscriptionManager( connectedDapps: make(map[string]map[string]*common.SafeChannelSender[*pairingtypes.RelayReply]), activeSubscriptions: make(map[string]*activeSubscriptionHolder), currentlyPendingSubscriptions: make(map[string]*pendingSubscriptionsBroadcastManager), + consumerSessionManager: consumerSessionManager, chainParser: chainParser, refererData: refererData, relaySender: relaySender, @@ -211,6 +214,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription( utils.LogAttr("GUID", webSocketCtx), utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)), utils.LogAttr("dappKey", dappKey), + utils.LogAttr("connectedDapps", cwsm.connectedDapps), ) websocketRepliesChan := make(chan *pairingtypes.RelayReply) diff --git a/protocol/chainlib/consumer_ws_subscription_manager_test.go b/protocol/chainlib/consumer_ws_subscription_manager_test.go index b34a5dd281..943f10e3cd 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager_test.go +++ b/protocol/chainlib/consumer_ws_subscription_manager_test.go @@ -137,8 +137,10 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes Return(relayResult1, nil). Times(1) // Should call SendParsedRelay, because it is the first time we subscribe + consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String()) + // Create a new ConsumerWSSubscriptionManager - manager := NewConsumerWSSubscriptionManager(relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs) + manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs) uniqueIdentifiers := make([]string, numberOfParallelSubscriptions) wg := sync.WaitGroup{} wg.Add(numberOfParallelSubscriptions) @@ -291,10 +293,11 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) { Return(relayResult1, nil). Times(1) // Should call SendParsedRelay, because it is the first time we subscribe + consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String()) rpcconsumerLogs, _ := metrics.NewRPCConsumerLogs(nil, nil) metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest) // Create a new ConsumerWSSubscriptionManager - manager := NewConsumerWSSubscriptionManager(relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs) + manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs) wg := sync.WaitGroup{} wg.Add(10) @@ -538,8 +541,10 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { Return(relayResult1, nil). Times(1) // Should call SendParsedRelay, because it is the first time we subscribe + consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String()) + // Create a new ConsumerWSSubscriptionManager - manager := NewConsumerWSSubscriptionManager(relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs) + manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs) // Start a new subscription for the first time, called SendParsedRelay once ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) From a11dbdb92ce1de26202573595558be015feebb9a Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 17 Sep 2024 15:30:56 +0200 Subject: [PATCH 10/15] fix pr --- .../consumer_ws_subscription_manager.go | 30 +-- .../consumer_ws_subscription_manager_test.go | 9 +- protocol/metrics/metrics_consumer_manager.go | 183 ++++++++++-------- protocol/metrics/rpcconsumerlogs.go | 16 -- 4 files changed, 111 insertions(+), 127 deletions(-) diff --git a/protocol/chainlib/consumer_ws_subscription_manager.go b/protocol/chainlib/consumer_ws_subscription_manager.go index c422e411fe..88fd5c5c34 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager.go +++ b/protocol/chainlib/consumer_ws_subscription_manager.go @@ -18,15 +18,6 @@ import ( spectypes "github.com/lavanet/lava/v3/x/spec/types" ) -// is there a place to declare enums etc? -type RelayDisconnectReasonEnum string - -const ( - ConsumerDisconnect RelayDisconnectReasonEnum = "ConsumerDisconnect" - ProviderDisconnect RelayDisconnectReasonEnum = "ProviderDisconnect" - UserDisconnect RelayDisconnectReasonEnum = "UserDisconnect" -) - type unsubscribeRelayData struct { protocolMessage ProtocolMessage } @@ -65,7 +56,7 @@ type ConsumerWSSubscriptionManager struct { activeSubscriptionProvidersStorage *lavasession.ActiveSubscriptionProvidersStorage currentlyPendingSubscriptions map[string]*pendingSubscriptionsBroadcastManager lock sync.RWMutex - rpcConsumerLogs *metrics.RPCConsumerLogs + consumerMetricsManager *metrics.ConsumerMetricsManager } func NewConsumerWSSubscriptionManager( @@ -75,7 +66,7 @@ func NewConsumerWSSubscriptionManager( connectionType string, chainParser ChainParser, activeSubscriptionProvidersStorage *lavasession.ActiveSubscriptionProvidersStorage, - rpcConsumerLogs *metrics.RPCConsumerLogs, + consumerMetricsManager *metrics.ConsumerMetricsManager, ) *ConsumerWSSubscriptionManager { return &ConsumerWSSubscriptionManager{ connectedDapps: make(map[string]map[string]*common.SafeChannelSender[*pairingtypes.RelayReply]), @@ -87,7 +78,7 @@ func NewConsumerWSSubscriptionManager( relaySender: relaySender, connectionType: connectionType, activeSubscriptionProvidersStorage: activeSubscriptionProvidersStorage, - rpcConsumerLogs: rpcConsumerLogs, + consumerMetricsManager: consumerMetricsManager, } } @@ -202,7 +193,6 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription( webSocketConnectionUniqueId string, metricsData *metrics.RelayMetrics, ) (firstReply *pairingtypes.RelayReply, repliesChan <-chan *pairingtypes.RelayReply, err error) { - go cwsm.rpcConsumerLogs.SetRelaySubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType) hashedParams, _, err := cwsm.getHashedParams(protocolMessage) if err != nil { return nil, nil, utils.LavaFormatError("could not marshal params", err) @@ -230,7 +220,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription( // called after send relay failure or parsing failure afterwards onSubscriptionFailure := func() { - go cwsm.rpcConsumerLogs.SetFailedRelaySubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType) + go cwsm.consumerMetricsManager.SetFailedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType) cwsm.failedPendingSubscription(hashedParams) closeWebsocketRepliesChannel() } @@ -270,7 +260,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription( // Validated there are no active subscriptions that we can use. firstSubscriptionReply, returnWebsocketRepliesChan := cwsm.checkForActiveSubscriptionWithLock(webSocketCtx, hashedParams, protocolMessage, dappKey, websocketRepliesSafeChannelSender, closeWebsocketRepliesChannel) if firstSubscriptionReply != nil { - go cwsm.rpcConsumerLogs.SetDuplicatedRelaySubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType) + go cwsm.consumerMetricsManager.SetDuplicatedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType) if returnWebsocketRepliesChan { return firstSubscriptionReply, websocketRepliesChan, nil } @@ -428,7 +418,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription( cwsm.successfulPendingSubscription(hashedParams) // Need to be run once for subscription go cwsm.listenForSubscriptionMessages(webSocketCtx, dappID, consumerIp, replyServer, hashedParams, providerAddr, metricsData, closeSubscriptionChan) - + go cwsm.consumerMetricsManager.SetWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType) return &reply, websocketRepliesChan, nil } @@ -540,14 +530,14 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages( utils.LogAttr("GUID", webSocketCtx), utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)), ) - go cwsm.rpcConsumerLogs.SetRelaySubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, string(UserDisconnect)) + go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.UserDisconnect) return case <-replyServer.Context().Done(): utils.LavaFormatTrace("reply server context canceled", utils.LogAttr("GUID", webSocketCtx), utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)), ) - go cwsm.rpcConsumerLogs.SetRelaySubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, string(ConsumerDisconnect)) + go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.ConsumerDisconnect) return default: var reply pairingtypes.RelayReply @@ -555,7 +545,7 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages( if err != nil { // The connection was closed by the provider utils.LavaFormatTrace("error reading from subscription stream", utils.LogAttr("original error", err.Error())) - go cwsm.rpcConsumerLogs.SetRelaySubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, string(ProviderDisconnect)) + go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.ProviderDisconnect) return } err = cwsm.handleIncomingSubscriptionNodeMessage(hashedParams, &reply, providerAddr) @@ -564,7 +554,7 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages( utils.LogAttr("hashedParams", hashedParams), utils.LogAttr("reply", reply), ) - go cwsm.rpcConsumerLogs.SetFailedRelaySubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType) + go cwsm.consumerMetricsManager.SetFailedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType) return } } diff --git a/protocol/chainlib/consumer_ws_subscription_manager_test.go b/protocol/chainlib/consumer_ws_subscription_manager_test.go index 943f10e3cd..9aebc649a4 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager_test.go +++ b/protocol/chainlib/consumer_ws_subscription_manager_test.go @@ -54,7 +54,6 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes subscriptionFirstReply2: []byte(`{"jsonrpc":"2.0","id":4,"result":{}}`), }, } - rpcconsumerLogs, _ := metrics.NewRPCConsumerLogs(nil, nil) metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest) for _, play := range playbook { t.Run(play.name, func(t *testing.T) { @@ -140,7 +139,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String()) // Create a new ConsumerWSSubscriptionManager - manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs) + manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil) uniqueIdentifiers := make([]string, numberOfParallelSubscriptions) wg := sync.WaitGroup{} wg.Add(numberOfParallelSubscriptions) @@ -294,10 +293,9 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) { Times(1) // Should call SendParsedRelay, because it is the first time we subscribe consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String()) - rpcconsumerLogs, _ := metrics.NewRPCConsumerLogs(nil, nil) metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest) // Create a new ConsumerWSSubscriptionManager - manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs) + manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil) wg := sync.WaitGroup{} wg.Add(10) @@ -383,7 +381,6 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { unsubscribeMessage2: []byte(`{"jsonrpc":"2.0","method":"eth_unsubscribe","params":["0x2134567890"],"id":1}`), }, } - rpcconsumerLogs, _ := metrics.NewRPCConsumerLogs(nil, nil) metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest) for _, play := range playbook { @@ -544,7 +541,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String()) // Create a new ConsumerWSSubscriptionManager - manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs) + manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil) // Start a new subscription for the first time, called SendParsedRelay once ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) diff --git a/protocol/metrics/metrics_consumer_manager.go b/protocol/metrics/metrics_consumer_manager.go index 911ea3f04a..80dba76ba0 100644 --- a/protocol/metrics/metrics_consumer_manager.go +++ b/protocol/metrics/metrics_consumer_manager.go @@ -13,6 +13,14 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) +type WsDisconnectReasonEnum int + +const ( + ConsumerDisconnect WsDisconnectReasonEnum = iota + ProviderDisconnect WsDisconnectReasonEnum = iota + UserDisconnect WsDisconnectReasonEnum = iota +) + type LatencyTracker struct { AverageLatency time.Duration // in nano seconds (time.Since result) TotalRequests int @@ -26,37 +34,37 @@ func (lt *LatencyTracker) AddLatency(latency time.Duration) { } type ConsumerMetricsManager struct { - totalCURequestedMetric *prometheus.CounterVec - totalRelaysRequestedMetric *prometheus.CounterVec - totalErroredMetric *prometheus.CounterVec - totalNodeErroredMetric *prometheus.CounterVec - totalNodeErroredRecoveredSuccessfullyMetric *prometheus.CounterVec - totalNodeErroredRecoveryAttemptsMetric *prometheus.CounterVec - totalRelaysSentToProvidersMetric *prometheus.CounterVec - totalRelaysSentByNewBatchTickerMetric *prometheus.CounterVec - totalRelaySubscriptionRequestsMetric *prometheus.CounterVec - totalFailedRelaySubscriptionRequestsMetric *prometheus.CounterVec - totalRelaySubscriptionDissconnectMetric *prometheus.CounterVec - totalDuplicatedRelaySubscriptionRequestsMetric *prometheus.CounterVec - blockMetric *prometheus.GaugeVec - latencyMetric *prometheus.GaugeVec - qosMetric *prometheus.GaugeVec - qosExcellenceMetric *prometheus.GaugeVec - LatestBlockMetric *prometheus.GaugeVec - LatestProviderRelay *prometheus.GaugeVec - virtualEpochMetric *prometheus.GaugeVec - apiMethodCalls *prometheus.GaugeVec - endpointsHealthChecksOkMetric prometheus.Gauge - endpointsHealthChecksOk uint64 - lock sync.Mutex - protocolVersionMetric *prometheus.GaugeVec - providerRelays map[string]uint64 - addMethodsApiGauge bool - averageLatencyPerChain map[string]*LatencyTracker // key == chain Id + api interface - averageLatencyMetric *prometheus.GaugeVec - relayProcessingLatencyBeforeProvider *prometheus.GaugeVec - relayProcessingLatencyAfterProvider *prometheus.GaugeVec - averageProcessingLatency map[string]*LatencyTracker + totalCURequestedMetric *prometheus.CounterVec + totalRelaysRequestedMetric *prometheus.CounterVec + totalErroredMetric *prometheus.CounterVec + totalNodeErroredMetric *prometheus.CounterVec + totalNodeErroredRecoveredSuccessfullyMetric *prometheus.CounterVec + totalNodeErroredRecoveryAttemptsMetric *prometheus.CounterVec + totalRelaysSentToProvidersMetric *prometheus.CounterVec + totalRelaysSentByNewBatchTickerMetric *prometheus.CounterVec + totalWsSubscriptionRequestsMetric *prometheus.CounterVec + totalFailedWsSubscriptionRequestsMetric *prometheus.CounterVec + totalWsSubscriptionDissconnectMetric *prometheus.CounterVec + totalDuplicatedWsSubscriptionRequestsMetric *prometheus.CounterVec + blockMetric *prometheus.GaugeVec + latencyMetric *prometheus.GaugeVec + qosMetric *prometheus.GaugeVec + qosExcellenceMetric *prometheus.GaugeVec + LatestBlockMetric *prometheus.GaugeVec + LatestProviderRelay *prometheus.GaugeVec + virtualEpochMetric *prometheus.GaugeVec + apiMethodCalls *prometheus.GaugeVec + endpointsHealthChecksOkMetric prometheus.Gauge + endpointsHealthChecksOk uint64 + lock sync.Mutex + protocolVersionMetric *prometheus.GaugeVec + providerRelays map[string]uint64 + addMethodsApiGauge bool + averageLatencyPerChain map[string]*LatencyTracker // key == chain Id + api interface + averageLatencyMetric *prometheus.GaugeVec + relayProcessingLatencyBeforeProvider *prometheus.GaugeVec + relayProcessingLatencyAfterProvider *prometheus.GaugeVec + averageProcessingLatency map[string]*LatencyTracker } type ConsumerMetricsManagerOptions struct { @@ -92,24 +100,24 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM Help: "The total number of errors encountered by the consumer over time.", }, []string{"spec", "apiInterface"}) - totalRelaySubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "lava_consumer_total_relay_subscription_requests", - Help: "The total number of relay subscription requests by the consumer over time per chain id per api interface.", + totalWsSubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "lava_consumer_total_ws_subscription_requests", + Help: "The total number of websocket subscription requests by the consumer over time per chain id per api interface.", }, []string{"spec", "apiInterface"}) - totalFailedRelaySubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "lava_consumer_total_failed_relay_subscription_requests", - Help: "The total number of failed relay subscription requests by the consumer over time per chain id per api interface.", + totalFailedWsSubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "lava_consumer_total_failed_ws_subscription_requests", + Help: "The total number of failed websocket subscription requests by the consumer over time per chain id per api interface.", }, []string{"spec", "apiInterface"}) - totalDuplicatedRelaySubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "lava_consumer_total_duplicated_relay_subscription_requests", - Help: "The total number of duplicated relay subscription requests by the consumer over time per chain id per api interface.", + totalDuplicatedWsSubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "lava_consumer_total_duplicated_ws_subscription_requests", + Help: "The total number of duplicated webscket subscription requests by the consumer over time per chain id per api interface.", }, []string{"spec", "apiInterface"}) - totalRelaySubscriptionDissconnectMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "lava_consumer_total_relay_subscription_disconnect", - Help: "The total number of relay subscription disconnects over time per chain id per api interface per dissconnect reason.", + totalWsSubscriptionDissconnectMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "lava_consumer_total_ws_subscription_disconnect", + Help: "The total number of websocket subscription disconnects over time per chain id per api interface per dissconnect reason.", }, []string{"spec", "apiInterface", "dissconectReason"}) blockMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ @@ -220,42 +228,42 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM prometheus.MustRegister(totalNodeErroredRecoveryAttemptsMetric) prometheus.MustRegister(relayProcessingLatencyBeforeProvider) prometheus.MustRegister(relayProcessingLatencyAfterProvider) - prometheus.MustRegister(totalRelaySubscriptionRequestsMetric) - prometheus.MustRegister(totalFailedRelaySubscriptionRequestsMetric) - prometheus.MustRegister(totalDuplicatedRelaySubscriptionRequestsMetric) - prometheus.MustRegister(totalRelaySubscriptionDissconnectMetric) + prometheus.MustRegister(totalWsSubscriptionRequestsMetric) + prometheus.MustRegister(totalFailedWsSubscriptionRequestsMetric) + prometheus.MustRegister(totalDuplicatedWsSubscriptionRequestsMetric) + prometheus.MustRegister(totalWsSubscriptionDissconnectMetric) consumerMetricsManager := &ConsumerMetricsManager{ - totalCURequestedMetric: totalCURequestedMetric, - totalRelaysRequestedMetric: totalRelaysRequestedMetric, - totalRelaySubscriptionRequestsMetric: totalRelaySubscriptionRequestsMetric, - totalFailedRelaySubscriptionRequestsMetric: totalFailedRelaySubscriptionRequestsMetric, - totalDuplicatedRelaySubscriptionRequestsMetric: totalDuplicatedRelaySubscriptionRequestsMetric, - totalRelaySubscriptionDissconnectMetric: totalRelaySubscriptionDissconnectMetric, - totalErroredMetric: totalErroredMetric, - blockMetric: blockMetric, - latencyMetric: latencyMetric, - qosMetric: qosMetric, - qosExcellenceMetric: qosExcellenceMetric, - LatestBlockMetric: latestBlockMetric, - LatestProviderRelay: latestProviderRelay, - providerRelays: map[string]uint64{}, - averageLatencyPerChain: map[string]*LatencyTracker{}, - virtualEpochMetric: virtualEpochMetric, - endpointsHealthChecksOkMetric: endpointsHealthChecksOkMetric, - endpointsHealthChecksOk: 1, - protocolVersionMetric: protocolVersionMetric, - averageLatencyMetric: averageLatencyMetric, - totalRelaysSentByNewBatchTickerMetric: totalRelaysSentByNewBatchTickerMetric, - apiMethodCalls: apiSpecificsMetric, - addMethodsApiGauge: options.AddMethodsApiGauge, - totalNodeErroredMetric: totalNodeErroredMetric, - totalNodeErroredRecoveredSuccessfullyMetric: totalNodeErroredRecoveredSuccessfullyMetric, - totalNodeErroredRecoveryAttemptsMetric: totalNodeErroredRecoveryAttemptsMetric, - totalRelaysSentToProvidersMetric: totalRelaysSentToProvidersMetric, - relayProcessingLatencyBeforeProvider: relayProcessingLatencyBeforeProvider, - relayProcessingLatencyAfterProvider: relayProcessingLatencyAfterProvider, - averageProcessingLatency: map[string]*LatencyTracker{}, + totalCURequestedMetric: totalCURequestedMetric, + totalRelaysRequestedMetric: totalRelaysRequestedMetric, + totalWsSubscriptionRequestsMetric: totalWsSubscriptionRequestsMetric, + totalFailedWsSubscriptionRequestsMetric: totalFailedWsSubscriptionRequestsMetric, + totalDuplicatedWsSubscriptionRequestsMetric: totalDuplicatedWsSubscriptionRequestsMetric, + totalWsSubscriptionDissconnectMetric: totalWsSubscriptionDissconnectMetric, + totalErroredMetric: totalErroredMetric, + blockMetric: blockMetric, + latencyMetric: latencyMetric, + qosMetric: qosMetric, + qosExcellenceMetric: qosExcellenceMetric, + LatestBlockMetric: latestBlockMetric, + LatestProviderRelay: latestProviderRelay, + providerRelays: map[string]uint64{}, + averageLatencyPerChain: map[string]*LatencyTracker{}, + virtualEpochMetric: virtualEpochMetric, + endpointsHealthChecksOkMetric: endpointsHealthChecksOkMetric, + endpointsHealthChecksOk: 1, + protocolVersionMetric: protocolVersionMetric, + averageLatencyMetric: averageLatencyMetric, + totalRelaysSentByNewBatchTickerMetric: totalRelaysSentByNewBatchTickerMetric, + apiMethodCalls: apiSpecificsMetric, + addMethodsApiGauge: options.AddMethodsApiGauge, + totalNodeErroredMetric: totalNodeErroredMetric, + totalNodeErroredRecoveredSuccessfullyMetric: totalNodeErroredRecoveredSuccessfullyMetric, + totalNodeErroredRecoveryAttemptsMetric: totalNodeErroredRecoveryAttemptsMetric, + totalRelaysSentToProvidersMetric: totalRelaysSentToProvidersMetric, + relayProcessingLatencyBeforeProvider: relayProcessingLatencyBeforeProvider, + relayProcessingLatencyAfterProvider: relayProcessingLatencyAfterProvider, + averageProcessingLatency: map[string]*LatencyTracker{}, } http.Handle("/metrics", promhttp.Handler()) @@ -493,30 +501,35 @@ func SetVersionInner(protocolVersionMetric *prometheus.GaugeVec, version string) protocolVersionMetric.WithLabelValues("version").Set(float64(combined)) } -func (pme *ConsumerMetricsManager) SetRelaySubscriptionRequestMetric(chainId string, apiInterface string) { +func (pme *ConsumerMetricsManager) SetWsSubscriptionRequestMetric(chainId string, apiInterface string) { if pme == nil { return } - pme.totalRelaySubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc() + pme.totalWsSubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc() } -func (pme *ConsumerMetricsManager) SetFailedRelaySubscriptionRequestMetric(chainId string, apiInterface string) { +func (pme *ConsumerMetricsManager) SetFailedWsSubscriptionRequestMetric(chainId string, apiInterface string) { if pme == nil { return } - pme.totalFailedRelaySubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc() + pme.totalFailedWsSubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc() } -func (pme *ConsumerMetricsManager) SetDuplicatedRelaySubscriptionRequestMetric(chainId string, apiInterface string) { +func (pme *ConsumerMetricsManager) SetDuplicatedWsSubscriptionRequestMetric(chainId string, apiInterface string) { if pme == nil { return } - pme.totalDuplicatedRelaySubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc() + pme.totalDuplicatedWsSubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc() } -func (pme *ConsumerMetricsManager) SetRelaySubscriptioDisconnectRequestMetric(chainId string, apiInterface string, disconnectReason string) { +func (pme *ConsumerMetricsManager) SetWsSubscriptioDisconnectRequestMetric(chainId string, apiInterface string, disconnectReason WsDisconnectReasonEnum) { if pme == nil { return } - pme.totalRelaySubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, disconnectReason).Inc() + var disconnectReasonMap = map[WsDisconnectReasonEnum]string{ + ConsumerDisconnect: "ConsumerDisconnect", + ProviderDisconnect: "ProviderDisconnect", + UserDisconnect: "UserDisconnect", + } + pme.totalWsSubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, disconnectReasonMap[disconnectReason]).Inc() } diff --git a/protocol/metrics/rpcconsumerlogs.go b/protocol/metrics/rpcconsumerlogs.go index 51cf2c567b..606ce8f114 100644 --- a/protocol/metrics/rpcconsumerlogs.go +++ b/protocol/metrics/rpcconsumerlogs.go @@ -263,19 +263,3 @@ func (rpccl *RPCConsumerLogs) LogTestMode(fiberCtx *fiber.Ctx) { } utils.LavaFormatInfo(st) } - -func (rpccl *RPCConsumerLogs) SetRelaySubscriptionRequestMetric(chainId string, apiInterface string) { - rpccl.consumerMetricsManager.SetRelaySubscriptionRequestMetric(chainId, apiInterface) -} - -func (rpccl *RPCConsumerLogs) SetFailedRelaySubscriptionRequestMetric(chainId string, apiInterface string) { - rpccl.consumerMetricsManager.SetFailedRelaySubscriptionRequestMetric(chainId, apiInterface) -} - -func (rpccl *RPCConsumerLogs) SetDuplicatedRelaySubscriptionRequestMetric(chainId string, apiInterface string) { - rpccl.consumerMetricsManager.SetDuplicatedRelaySubscriptionRequestMetric(chainId, apiInterface) -} - -func (rpccl *RPCConsumerLogs) SetRelaySubscriptioDisconnectRequestMetric(chainId string, apiInterface string, disconnectReason string) { - rpccl.consumerMetricsManager.SetRelaySubscriptioDisconnectRequestMetric(chainId, apiInterface, disconnectReason) -} From c91b93fe42d46e903b37830298a7968ff090b8ca Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 17 Sep 2024 15:36:05 +0200 Subject: [PATCH 11/15] fix lint --- protocol/rpcconsumer/rpcconsumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index 6bad2cbbd0..1223ef34be 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -298,7 +298,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt if rpcEndpoint.ApiInterface == spectypes.APIInterfaceJsonRPC { specMethodType = http.MethodPost } - consumerWsSubscriptionManager = chainlib.NewConsumerWSSubscriptionManager(consumerSessionManager, rpcConsumerServer, options.refererData, specMethodType, chainParser, activeSubscriptionProvidersStorage, rpcConsumerMetrics) + consumerWsSubscriptionManager = chainlib.NewConsumerWSSubscriptionManager(consumerSessionManager, rpcConsumerServer, options.refererData, specMethodType, chainParser, activeSubscriptionProvidersStorage, consumerMetricsManager) utils.LavaFormatInfo("RPCConsumer Listening", utils.Attribute{Key: "endpoints", Value: rpcEndpoint.String()}) err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, rpcc.consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, options.requiredResponses, privKey, lavaChainID, options.cache, rpcConsumerMetrics, consumerAddr, consumerConsistency, relaysMonitor, options.cmdFlags, options.stateShare, options.refererData, consumerReportsManager, consumerWsSubscriptionManager) From f06dc42e372e13f9e01c4df909559d7a96ee27a9 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 17 Sep 2024 15:47:56 +0200 Subject: [PATCH 12/15] fix lint --- protocol/metrics/metrics_consumer_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/metrics/metrics_consumer_manager.go b/protocol/metrics/metrics_consumer_manager.go index 80dba76ba0..580921f0c1 100644 --- a/protocol/metrics/metrics_consumer_manager.go +++ b/protocol/metrics/metrics_consumer_manager.go @@ -526,7 +526,7 @@ func (pme *ConsumerMetricsManager) SetWsSubscriptioDisconnectRequestMetric(chain if pme == nil { return } - var disconnectReasonMap = map[WsDisconnectReasonEnum]string{ + disconnectReasonMap := map[WsDisconnectReasonEnum]string{ ConsumerDisconnect: "ConsumerDisconnect", ProviderDisconnect: "ProviderDisconnect", UserDisconnect: "UserDisconnect", From d0826b982a5b337f7ce0ea6e0684952db8756870 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 17 Sep 2024 15:52:25 +0200 Subject: [PATCH 13/15] change disconnect reason map --- protocol/metrics/metrics_consumer_manager.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/protocol/metrics/metrics_consumer_manager.go b/protocol/metrics/metrics_consumer_manager.go index 580921f0c1..68908aee27 100644 --- a/protocol/metrics/metrics_consumer_manager.go +++ b/protocol/metrics/metrics_consumer_manager.go @@ -21,6 +21,12 @@ const ( UserDisconnect WsDisconnectReasonEnum = iota ) +var disconnectReasonMap = map[WsDisconnectReasonEnum]string{ + ConsumerDisconnect: "ConsumerDisconnect", + ProviderDisconnect: "ProviderDisconnect", + UserDisconnect: "UserDisconnect", +} + type LatencyTracker struct { AverageLatency time.Duration // in nano seconds (time.Since result) TotalRequests int @@ -526,10 +532,5 @@ func (pme *ConsumerMetricsManager) SetWsSubscriptioDisconnectRequestMetric(chain if pme == nil { return } - disconnectReasonMap := map[WsDisconnectReasonEnum]string{ - ConsumerDisconnect: "ConsumerDisconnect", - ProviderDisconnect: "ProviderDisconnect", - UserDisconnect: "UserDisconnect", - } pme.totalWsSubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, disconnectReasonMap[disconnectReason]).Inc() } From 08d53e8910904b45b4e3bd694afaa3769df0f919 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Thu, 19 Sep 2024 14:06:28 +0200 Subject: [PATCH 14/15] fix pr --- .../consumer_ws_subscription_manager.go | 7 +++---- protocol/metrics/metrics_consumer_manager.go | 18 +++++++++--------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/protocol/chainlib/consumer_ws_subscription_manager.go b/protocol/chainlib/consumer_ws_subscription_manager.go index 88fd5c5c34..65d9223b80 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager.go +++ b/protocol/chainlib/consumer_ws_subscription_manager.go @@ -204,7 +204,6 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription( utils.LogAttr("GUID", webSocketCtx), utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)), utils.LogAttr("dappKey", dappKey), - utils.LogAttr("connectedDapps", cwsm.connectedDapps), ) websocketRepliesChan := make(chan *pairingtypes.RelayReply) @@ -530,14 +529,14 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages( utils.LogAttr("GUID", webSocketCtx), utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)), ) - go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.UserDisconnect) + go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WS_DISCONNECTION_REASON_USER) return case <-replyServer.Context().Done(): utils.LavaFormatTrace("reply server context canceled", utils.LogAttr("GUID", webSocketCtx), utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)), ) - go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.ConsumerDisconnect) + go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WS_DISCONNECTION_REASON_CONSUMER) return default: var reply pairingtypes.RelayReply @@ -545,7 +544,7 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages( if err != nil { // The connection was closed by the provider utils.LavaFormatTrace("error reading from subscription stream", utils.LogAttr("original error", err.Error())) - go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.ProviderDisconnect) + go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WS_DISCONNECTION_REASON_PROVIDER) return } err = cwsm.handleIncomingSubscriptionNodeMessage(hashedParams, &reply, providerAddr) diff --git a/protocol/metrics/metrics_consumer_manager.go b/protocol/metrics/metrics_consumer_manager.go index 68908aee27..2228a1c579 100644 --- a/protocol/metrics/metrics_consumer_manager.go +++ b/protocol/metrics/metrics_consumer_manager.go @@ -16,15 +16,15 @@ import ( type WsDisconnectReasonEnum int const ( - ConsumerDisconnect WsDisconnectReasonEnum = iota - ProviderDisconnect WsDisconnectReasonEnum = iota - UserDisconnect WsDisconnectReasonEnum = iota + WS_DISCONNECTION_REASON_CONSUMER WsDisconnectReasonEnum = iota + WS_DISCONNECTION_REASON_PROVIDER + WS_DISCONNECTION_REASON_USER ) var disconnectReasonMap = map[WsDisconnectReasonEnum]string{ - ConsumerDisconnect: "ConsumerDisconnect", - ProviderDisconnect: "ProviderDisconnect", - UserDisconnect: "UserDisconnect", + WS_DISCONNECTION_REASON_CONSUMER: "consumer-disconnect", + WS_DISCONNECTION_REASON_PROVIDER: "provider-disconnect", + WS_DISCONNECTION_REASON_USER: "user-disconnect", } type LatencyTracker struct { @@ -108,17 +108,17 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM totalWsSubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "lava_consumer_total_ws_subscription_requests", - Help: "The total number of websocket subscription requests by the consumer over time per chain id per api interface.", + Help: "The total number of websocket subscription requests over time per chain id per api interface.", }, []string{"spec", "apiInterface"}) totalFailedWsSubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "lava_consumer_total_failed_ws_subscription_requests", - Help: "The total number of failed websocket subscription requests by the consumer over time per chain id per api interface.", + Help: "The total number of failed websocket subscription requests over time per chain id per api interface.", }, []string{"spec", "apiInterface"}) totalDuplicatedWsSubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "lava_consumer_total_duplicated_ws_subscription_requests", - Help: "The total number of duplicated webscket subscription requests by the consumer over time per chain id per api interface.", + Help: "The total number of duplicated webscket subscription requests over time per chain id per api interface.", }, []string{"spec", "apiInterface"}) totalWsSubscriptionDissconnectMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ From bfd3e14b181381cd2ef30ce4fe0fce1c68465769 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 24 Sep 2024 13:01:37 +0200 Subject: [PATCH 15/15] fix pr --- .../consumer_ws_subscription_manager.go | 6 +++--- protocol/metrics/metrics_consumer_manager.go | 18 +++++------------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/protocol/chainlib/consumer_ws_subscription_manager.go b/protocol/chainlib/consumer_ws_subscription_manager.go index 65d9223b80..102bd8240a 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager.go +++ b/protocol/chainlib/consumer_ws_subscription_manager.go @@ -529,14 +529,14 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages( utils.LogAttr("GUID", webSocketCtx), utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)), ) - go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WS_DISCONNECTION_REASON_USER) + go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WsDisconnectionReasonUser) return case <-replyServer.Context().Done(): utils.LavaFormatTrace("reply server context canceled", utils.LogAttr("GUID", webSocketCtx), utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)), ) - go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WS_DISCONNECTION_REASON_CONSUMER) + go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WsDisconnectionReasonConsumer) return default: var reply pairingtypes.RelayReply @@ -544,7 +544,7 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages( if err != nil { // The connection was closed by the provider utils.LavaFormatTrace("error reading from subscription stream", utils.LogAttr("original error", err.Error())) - go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WS_DISCONNECTION_REASON_PROVIDER) + go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WsDisconnectionReasonProvider) return } err = cwsm.handleIncomingSubscriptionNodeMessage(hashedParams, &reply, providerAddr) diff --git a/protocol/metrics/metrics_consumer_manager.go b/protocol/metrics/metrics_consumer_manager.go index 2228a1c579..83cd72d025 100644 --- a/protocol/metrics/metrics_consumer_manager.go +++ b/protocol/metrics/metrics_consumer_manager.go @@ -13,20 +13,12 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) -type WsDisconnectReasonEnum int - const ( - WS_DISCONNECTION_REASON_CONSUMER WsDisconnectReasonEnum = iota - WS_DISCONNECTION_REASON_PROVIDER - WS_DISCONNECTION_REASON_USER + WsDisconnectionReasonConsumer = "consumer-disconnect" + WsDisconnectionReasonProvider = "provider-disconnect" + WsDisconnectionReasonUser = "user-disconnect" ) -var disconnectReasonMap = map[WsDisconnectReasonEnum]string{ - WS_DISCONNECTION_REASON_CONSUMER: "consumer-disconnect", - WS_DISCONNECTION_REASON_PROVIDER: "provider-disconnect", - WS_DISCONNECTION_REASON_USER: "user-disconnect", -} - type LatencyTracker struct { AverageLatency time.Duration // in nano seconds (time.Since result) TotalRequests int @@ -528,9 +520,9 @@ func (pme *ConsumerMetricsManager) SetDuplicatedWsSubscriptionRequestMetric(chai pme.totalDuplicatedWsSubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc() } -func (pme *ConsumerMetricsManager) SetWsSubscriptioDisconnectRequestMetric(chainId string, apiInterface string, disconnectReason WsDisconnectReasonEnum) { +func (pme *ConsumerMetricsManager) SetWsSubscriptioDisconnectRequestMetric(chainId string, apiInterface string, disconnectReason string) { if pme == nil { return } - pme.totalWsSubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, disconnectReasonMap[disconnectReason]).Inc() + pme.totalWsSubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, disconnectReason).Inc() }