Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PRT: Add subscription metrics #1695

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion protocol/chainlib/consumer_websocket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
continue
}

// check whether its a normal relay / unsubscribe / unsubscribe_all otherwise its a subscription flow.
// check whether it's a normal relay / unsubscribe / unsubscribe_all otherwise its a subscription flow.
if !IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) {
if IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_UNSUBSCRIBE) {
err := cwm.consumerWsSubscriptionManager.Unsubscribe(webSocketCtx, protocolMessage, dappID, userIp, cwm.WebsocketConnectionUID, metricsData)
Expand Down
11 changes: 10 additions & 1 deletion protocol/chainlib/consumer_ws_subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type ConsumerWSSubscriptionManager struct {
activeSubscriptionProvidersStorage *lavasession.ActiveSubscriptionProvidersStorage
currentlyPendingSubscriptions map[string]*pendingSubscriptionsBroadcastManager
lock sync.RWMutex
consumerMetricsManager *metrics.ConsumerMetricsManager
}

func NewConsumerWSSubscriptionManager(
Expand All @@ -65,6 +66,7 @@ func NewConsumerWSSubscriptionManager(
connectionType string,
chainParser ChainParser,
activeSubscriptionProvidersStorage *lavasession.ActiveSubscriptionProvidersStorage,
consumerMetricsManager *metrics.ConsumerMetricsManager,
) *ConsumerWSSubscriptionManager {
return &ConsumerWSSubscriptionManager{
connectedDapps: make(map[string]map[string]*common.SafeChannelSender[*pairingtypes.RelayReply]),
Expand All @@ -76,6 +78,7 @@ func NewConsumerWSSubscriptionManager(
relaySender: relaySender,
connectionType: connectionType,
activeSubscriptionProvidersStorage: activeSubscriptionProvidersStorage,
consumerMetricsManager: consumerMetricsManager,
}
}

Expand Down Expand Up @@ -216,6 +219,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(

// called after send relay failure or parsing failure afterwards
onSubscriptionFailure := func() {
go cwsm.consumerMetricsManager.SetFailedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
cwsm.failedPendingSubscription(hashedParams)
closeWebsocketRepliesChannel()
}
Expand Down Expand Up @@ -255,6 +259,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(
// Validated there are no active subscriptions that we can use.
firstSubscriptionReply, returnWebsocketRepliesChan := cwsm.checkForActiveSubscriptionWithLock(webSocketCtx, hashedParams, protocolMessage, dappKey, websocketRepliesSafeChannelSender, closeWebsocketRepliesChannel)
if firstSubscriptionReply != nil {
go cwsm.consumerMetricsManager.SetDuplicatedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
if returnWebsocketRepliesChan {
return firstSubscriptionReply, websocketRepliesChan, nil
}
Expand Down Expand Up @@ -412,7 +417,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(
cwsm.successfulPendingSubscription(hashedParams)
// Need to be run once for subscription
go cwsm.listenForSubscriptionMessages(webSocketCtx, dappID, consumerIp, replyServer, hashedParams, providerAddr, metricsData, closeSubscriptionChan)

go cwsm.consumerMetricsManager.SetWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
return &reply, websocketRepliesChan, nil
}

Expand Down Expand Up @@ -524,19 +529,22 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages(
utils.LogAttr("GUID", webSocketCtx),
utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)),
)
go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WS_DISCONNECTION_REASON_USER)
return
case <-replyServer.Context().Done():
utils.LavaFormatTrace("reply server context canceled",
utils.LogAttr("GUID", webSocketCtx),
utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)),
)
go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WS_DISCONNECTION_REASON_CONSUMER)
return
default:
var reply pairingtypes.RelayReply
err := replyServer.RecvMsg(&reply)
if err != nil {
// The connection was closed by the provider
utils.LavaFormatTrace("error reading from subscription stream", utils.LogAttr("original error", err.Error()))
go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WS_DISCONNECTION_REASON_PROVIDER)
return
}
err = cwsm.handleIncomingSubscriptionNodeMessage(hashedParams, &reply, providerAddr)
Expand All @@ -545,6 +553,7 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages(
utils.LogAttr("hashedParams", hashedParams),
utils.LogAttr("reply", reply),
)
go cwsm.consumerMetricsManager.SetFailedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
return
}
}
Expand Down
35 changes: 19 additions & 16 deletions protocol/chainlib/consumer_ws_subscription_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
const (
numberOfParallelSubscriptions = 10
uniqueId = "1234"
projectHashTest = "test_projecthash"
chainIdTest = "test_chainId"
apiTypeTest = "test_apiType"
)

func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *testing.T) {
Expand All @@ -51,7 +54,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes
subscriptionFirstReply2: []byte(`{"jsonrpc":"2.0","id":4,"result":{}}`),
},
}

metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest)
for _, play := range playbook {
t.Run(play.name, func(t *testing.T) {
ts := SetupForTests(t, 1, play.specId, "../../")
Expand Down Expand Up @@ -136,7 +139,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes
consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String())

// Create a new ConsumerWSSubscriptionManager
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage())
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil)
uniqueIdentifiers := make([]string, numberOfParallelSubscriptions)
wg := sync.WaitGroup{}
wg.Add(numberOfParallelSubscriptions)
Expand All @@ -151,7 +154,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes
var repliesChan <-chan *pairingtypes.RelayReply
var firstReply *pairingtypes.RelayReply

firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[index], nil)
firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[index], metricsData)
go func() {
for subMsg := range repliesChan {
// utils.LavaFormatInfo("got reply for index", utils.LogAttr("index", index))
Expand All @@ -169,15 +172,15 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes
// now we have numberOfParallelSubscriptions subscriptions currently running
require.Len(t, manager.connectedDapps, numberOfParallelSubscriptions)
// remove one
err = manager.Unsubscribe(ts.Ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[0], nil)
err = manager.Unsubscribe(ts.Ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[0], metricsData)
require.NoError(t, err)
// now we have numberOfParallelSubscriptions - 1
require.Len(t, manager.connectedDapps, numberOfParallelSubscriptions-1)
// check we still have an active subscription.
require.Len(t, manager.activeSubscriptions, 1)

// same flow for unsubscribe all
err = manager.UnsubscribeAll(ts.Ctx, dapp, ip, uniqueIdentifiers[1], nil)
err = manager.UnsubscribeAll(ts.Ctx, dapp, ip, uniqueIdentifiers[1], metricsData)
require.NoError(t, err)
// now we have numberOfParallelSubscriptions - 2
require.Len(t, manager.connectedDapps, numberOfParallelSubscriptions-2)
Expand Down Expand Up @@ -209,7 +212,6 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) {
subscriptionFirstReply2: []byte(`{"jsonrpc":"2.0","id":4,"result":{}}`),
},
}

for _, play := range playbook {
t.Run(play.name, func(t *testing.T) {
ts := SetupForTests(t, 1, play.specId, "../../")
Expand Down Expand Up @@ -291,9 +293,9 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) {
Times(1) // Should call SendParsedRelay, because it is the first time we subscribe

consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String())

metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest)
// Create a new ConsumerWSSubscriptionManager
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage())
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil)

wg := sync.WaitGroup{}
wg.Add(10)
Expand All @@ -305,7 +307,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) {
ctx := utils.WithUniqueIdentifier(ts.Ctx, utils.GenerateUniqueIdentifier())
var repliesChan <-chan *pairingtypes.RelayReply
var firstReply *pairingtypes.RelayReply
firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp+strconv.Itoa(index), ts.Consumer.Addr.String(), uniqueId, nil)
firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp+strconv.Itoa(index), ts.Consumer.Addr.String(), uniqueId, metricsData)
go func() {
for subMsg := range repliesChan {
require.Equal(t, string(play.subscriptionFirstReply1), string(subMsg.Data))
Expand Down Expand Up @@ -379,6 +381,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {
unsubscribeMessage2: []byte(`{"jsonrpc":"2.0","method":"eth_unsubscribe","params":["0x2134567890"],"id":1}`),
},
}
metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest)

for _, play := range playbook {
t.Run(play.name, func(t *testing.T) {
Expand Down Expand Up @@ -538,12 +541,12 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {
consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String())

// Create a new ConsumerWSSubscriptionManager
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage())
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil)

// Start a new subscription for the first time, called SendParsedRelay once
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())

firstReply, repliesChan1, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, nil)
firstReply, repliesChan1, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData)
assert.NoError(t, err)
unsubscribeMessageWg.Add(1)
assert.Equal(t, string(play.subscriptionFirstReply1), string(firstReply.Data))
Expand All @@ -559,7 +562,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {

// Start a subscription again, same params, same dappKey, should not call SendParsedRelay
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
firstReply, repliesChan2, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, nil)
firstReply, repliesChan2, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData)
assert.NoError(t, err)
assert.Equal(t, string(play.subscriptionFirstReply1), string(firstReply.Data))
assert.Nil(t, repliesChan2) // Same subscription, same dappKey, no need for a new channel
Expand All @@ -568,7 +571,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {

// Start a subscription again, same params, different dappKey, should not call SendParsedRelay
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
firstReply, repliesChan3, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp2, ts.Consumer.Addr.String(), uniqueId, nil)
firstReply, repliesChan3, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp2, ts.Consumer.Addr.String(), uniqueId, metricsData)
assert.NoError(t, err)
assert.Equal(t, string(play.subscriptionFirstReply1), string(firstReply.Data))
assert.NotNil(t, repliesChan3) // Same subscription, but different dappKey, so will create new channel
Expand Down Expand Up @@ -652,7 +655,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {
// Start a subscription again, different params, same dappKey, should call SendParsedRelay
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())

firstReply, repliesChan4, err := manager.StartSubscription(ctx, subscribeProtocolMessage2, dapp1, ts.Consumer.Addr.String(), uniqueId, nil)
firstReply, repliesChan4, err := manager.StartSubscription(ctx, subscribeProtocolMessage2, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData)
assert.NoError(t, err)
unsubscribeMessageWg.Add(1)
assert.Equal(t, string(play.subscriptionFirstReply2), string(firstReply.Data))
Expand All @@ -671,7 +674,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {

ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
unsubProtocolMessage := NewProtocolMessage(unsubscribeChainMessage1, nil, relayResult1.Request.RelayData, dapp2, ts.Consumer.Addr.String())
err = manager.Unsubscribe(ctx, unsubProtocolMessage, dapp2, ts.Consumer.Addr.String(), uniqueId, nil)
err = manager.Unsubscribe(ctx, unsubProtocolMessage, dapp2, ts.Consumer.Addr.String(), uniqueId, metricsData)
require.NoError(t, err)

listenForExpectedMessages(ctx, repliesChan1, string(play.subscriptionFirstReply1))
Expand All @@ -697,7 +700,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {
Times(2) // Should call SendParsedRelay, because it unsubscribed

ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
err = manager.UnsubscribeAll(ctx, dapp1, ts.Consumer.Addr.String(), uniqueId, nil)
err = manager.UnsubscribeAll(ctx, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData)
require.NoError(t, err)

expectNoMoreMessages(ctx, repliesChan1)
Expand Down
74 changes: 74 additions & 0 deletions protocol/metrics/metrics_consumer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type WsDisconnectReasonEnum int

const (
WS_DISCONNECTION_REASON_CONSUMER WsDisconnectReasonEnum = iota
WS_DISCONNECTION_REASON_PROVIDER
WS_DISCONNECTION_REASON_USER
)

var disconnectReasonMap = map[WsDisconnectReasonEnum]string{
WS_DISCONNECTION_REASON_CONSUMER: "consumer-disconnect",
WS_DISCONNECTION_REASON_PROVIDER: "provider-disconnect",
WS_DISCONNECTION_REASON_USER: "user-disconnect",
}

type LatencyTracker struct {
AverageLatency time.Duration // in nano seconds (time.Since result)
TotalRequests int
Expand All @@ -34,6 +48,10 @@ type ConsumerMetricsManager struct {
totalNodeErroredRecoveryAttemptsMetric *prometheus.CounterVec
totalRelaysSentToProvidersMetric *prometheus.CounterVec
totalRelaysSentByNewBatchTickerMetric *prometheus.CounterVec
totalWsSubscriptionRequestsMetric *prometheus.CounterVec
totalFailedWsSubscriptionRequestsMetric *prometheus.CounterVec
totalWsSubscriptionDissconnectMetric *prometheus.CounterVec
totalDuplicatedWsSubscriptionRequestsMetric *prometheus.CounterVec
blockMetric *prometheus.GaugeVec
latencyMetric *prometheus.GaugeVec
qosMetric *prometheus.GaugeVec
Expand Down Expand Up @@ -88,6 +106,26 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
Help: "The total number of errors encountered by the consumer over time.",
}, []string{"spec", "apiInterface"})

totalWsSubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_consumer_total_ws_subscription_requests",
Help: "The total number of websocket subscription requests over time per chain id per api interface.",
}, []string{"spec", "apiInterface"})

totalFailedWsSubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_consumer_total_failed_ws_subscription_requests",
Help: "The total number of failed websocket subscription requests over time per chain id per api interface.",
}, []string{"spec", "apiInterface"})

totalDuplicatedWsSubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_consumer_total_duplicated_ws_subscription_requests",
Help: "The total number of duplicated webscket subscription requests over time per chain id per api interface.",
}, []string{"spec", "apiInterface"})

totalWsSubscriptionDissconnectMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_consumer_total_ws_subscription_disconnect",
Help: "The total number of websocket subscription disconnects over time per chain id per api interface per dissconnect reason.",
}, []string{"spec", "apiInterface", "dissconectReason"})

blockMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_latest_block",
Help: "The latest block measured",
Expand Down Expand Up @@ -196,10 +234,18 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
prometheus.MustRegister(totalNodeErroredRecoveryAttemptsMetric)
prometheus.MustRegister(relayProcessingLatencyBeforeProvider)
prometheus.MustRegister(relayProcessingLatencyAfterProvider)
prometheus.MustRegister(totalWsSubscriptionRequestsMetric)
prometheus.MustRegister(totalFailedWsSubscriptionRequestsMetric)
prometheus.MustRegister(totalDuplicatedWsSubscriptionRequestsMetric)
prometheus.MustRegister(totalWsSubscriptionDissconnectMetric)

consumerMetricsManager := &ConsumerMetricsManager{
totalCURequestedMetric: totalCURequestedMetric,
totalRelaysRequestedMetric: totalRelaysRequestedMetric,
totalWsSubscriptionRequestsMetric: totalWsSubscriptionRequestsMetric,
totalFailedWsSubscriptionRequestsMetric: totalFailedWsSubscriptionRequestsMetric,
totalDuplicatedWsSubscriptionRequestsMetric: totalDuplicatedWsSubscriptionRequestsMetric,
totalWsSubscriptionDissconnectMetric: totalWsSubscriptionDissconnectMetric,
totalErroredMetric: totalErroredMetric,
blockMetric: blockMetric,
latencyMetric: latencyMetric,
Expand Down Expand Up @@ -460,3 +506,31 @@ func SetVersionInner(protocolVersionMetric *prometheus.GaugeVec, version string)
combined := major*1000000 + minor*1000 + patch
protocolVersionMetric.WithLabelValues("version").Set(float64(combined))
}

func (pme *ConsumerMetricsManager) SetWsSubscriptionRequestMetric(chainId string, apiInterface string) {
if pme == nil {
return
}
pme.totalWsSubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc()
}

func (pme *ConsumerMetricsManager) SetFailedWsSubscriptionRequestMetric(chainId string, apiInterface string) {
if pme == nil {
return
}
pme.totalFailedWsSubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc()
}

func (pme *ConsumerMetricsManager) SetDuplicatedWsSubscriptionRequestMetric(chainId string, apiInterface string) {
if pme == nil {
return
}
pme.totalDuplicatedWsSubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc()
}

func (pme *ConsumerMetricsManager) SetWsSubscriptioDisconnectRequestMetric(chainId string, apiInterface string, disconnectReason WsDisconnectReasonEnum) {
if pme == nil {
return
}
pme.totalWsSubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, disconnectReasonMap[disconnectReason]).Inc()
}
Loading
Loading