Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PRT: Add subscription metrics #1695

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion protocol/chainlib/consumer_websocket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
continue
}

// check whether its a normal relay / unsubscribe / unsubscribe_all otherwise its a subscription flow.
// check whether it's a normal relay / unsubscribe / unsubscribe_all otherwise its a subscription flow.
if !IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) {
if IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_UNSUBSCRIBE) {
err := cwm.consumerWsSubscriptionManager.Unsubscribe(webSocketCtx, protocolMessage, dappID, userIp, cwm.WebsocketConnectionUID, metricsData)
Expand Down
12 changes: 11 additions & 1 deletion protocol/chainlib/consumer_ws_subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type ConsumerWSSubscriptionManager struct {
activeSubscriptionProvidersStorage *lavasession.ActiveSubscriptionProvidersStorage
currentlyPendingSubscriptions map[string]*pendingSubscriptionsBroadcastManager
lock sync.RWMutex
consumerMetricsManager *metrics.ConsumerMetricsManager
}

func NewConsumerWSSubscriptionManager(
Expand All @@ -65,6 +66,7 @@ func NewConsumerWSSubscriptionManager(
connectionType string,
chainParser ChainParser,
activeSubscriptionProvidersStorage *lavasession.ActiveSubscriptionProvidersStorage,
consumerMetricsManager *metrics.ConsumerMetricsManager,
) *ConsumerWSSubscriptionManager {
return &ConsumerWSSubscriptionManager{
connectedDapps: make(map[string]map[string]*common.SafeChannelSender[*pairingtypes.RelayReply]),
Expand All @@ -76,6 +78,7 @@ func NewConsumerWSSubscriptionManager(
relaySender: relaySender,
connectionType: connectionType,
activeSubscriptionProvidersStorage: activeSubscriptionProvidersStorage,
consumerMetricsManager: consumerMetricsManager,
}
}

Expand Down Expand Up @@ -201,6 +204,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(
utils.LogAttr("GUID", webSocketCtx),
utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)),
utils.LogAttr("dappKey", dappKey),
utils.LogAttr("connectedDapps", cwsm.connectedDapps),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is removed in main, why is is added back here?

)

websocketRepliesChan := make(chan *pairingtypes.RelayReply)
Expand All @@ -216,6 +220,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(

// called after send relay failure or parsing failure afterwards
onSubscriptionFailure := func() {
go cwsm.consumerMetricsManager.SetFailedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
cwsm.failedPendingSubscription(hashedParams)
closeWebsocketRepliesChannel()
}
Expand Down Expand Up @@ -255,6 +260,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(
// Validated there are no active subscriptions that we can use.
firstSubscriptionReply, returnWebsocketRepliesChan := cwsm.checkForActiveSubscriptionWithLock(webSocketCtx, hashedParams, protocolMessage, dappKey, websocketRepliesSafeChannelSender, closeWebsocketRepliesChannel)
if firstSubscriptionReply != nil {
go cwsm.consumerMetricsManager.SetDuplicatedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
if returnWebsocketRepliesChan {
return firstSubscriptionReply, websocketRepliesChan, nil
}
Expand Down Expand Up @@ -412,7 +418,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(
cwsm.successfulPendingSubscription(hashedParams)
// Need to be run once for subscription
go cwsm.listenForSubscriptionMessages(webSocketCtx, dappID, consumerIp, replyServer, hashedParams, providerAddr, metricsData, closeSubscriptionChan)

go cwsm.consumerMetricsManager.SetWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
return &reply, websocketRepliesChan, nil
}

Expand Down Expand Up @@ -524,19 +530,22 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages(
utils.LogAttr("GUID", webSocketCtx),
utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)),
)
go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.UserDisconnect)
return
case <-replyServer.Context().Done():
utils.LavaFormatTrace("reply server context canceled",
utils.LogAttr("GUID", webSocketCtx),
utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)),
)
go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.ConsumerDisconnect)
return
default:
var reply pairingtypes.RelayReply
err := replyServer.RecvMsg(&reply)
if err != nil {
// The connection was closed by the provider
utils.LavaFormatTrace("error reading from subscription stream", utils.LogAttr("original error", err.Error()))
go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.ProviderDisconnect)
return
}
err = cwsm.handleIncomingSubscriptionNodeMessage(hashedParams, &reply, providerAddr)
Expand All @@ -545,6 +554,7 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages(
utils.LogAttr("hashedParams", hashedParams),
utils.LogAttr("reply", reply),
)
go cwsm.consumerMetricsManager.SetFailedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
return
}
}
Expand Down
35 changes: 19 additions & 16 deletions protocol/chainlib/consumer_ws_subscription_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
const (
numberOfParallelSubscriptions = 10
uniqueId = "1234"
projectHashTest = "test_projecthash"
chainIdTest = "test_chainId"
apiTypeTest = "test_apiType"
)

func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *testing.T) {
Expand All @@ -51,7 +54,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes
subscriptionFirstReply2: []byte(`{"jsonrpc":"2.0","id":4,"result":{}}`),
},
}

metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest)
for _, play := range playbook {
t.Run(play.name, func(t *testing.T) {
ts := SetupForTests(t, 1, play.specId, "../../")
Expand Down Expand Up @@ -136,7 +139,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes
consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String())

// Create a new ConsumerWSSubscriptionManager
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage())
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil)
uniqueIdentifiers := make([]string, numberOfParallelSubscriptions)
wg := sync.WaitGroup{}
wg.Add(numberOfParallelSubscriptions)
Expand All @@ -151,7 +154,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes
var repliesChan <-chan *pairingtypes.RelayReply
var firstReply *pairingtypes.RelayReply

firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[index], nil)
firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[index], metricsData)
go func() {
for subMsg := range repliesChan {
// utils.LavaFormatInfo("got reply for index", utils.LogAttr("index", index))
Expand All @@ -169,15 +172,15 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes
// now we have numberOfParallelSubscriptions subscriptions currently running
require.Len(t, manager.connectedDapps, numberOfParallelSubscriptions)
// remove one
err = manager.Unsubscribe(ts.Ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[0], nil)
err = manager.Unsubscribe(ts.Ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[0], metricsData)
require.NoError(t, err)
// now we have numberOfParallelSubscriptions - 1
require.Len(t, manager.connectedDapps, numberOfParallelSubscriptions-1)
// check we still have an active subscription.
require.Len(t, manager.activeSubscriptions, 1)

// same flow for unsubscribe all
err = manager.UnsubscribeAll(ts.Ctx, dapp, ip, uniqueIdentifiers[1], nil)
err = manager.UnsubscribeAll(ts.Ctx, dapp, ip, uniqueIdentifiers[1], metricsData)
require.NoError(t, err)
// now we have numberOfParallelSubscriptions - 2
require.Len(t, manager.connectedDapps, numberOfParallelSubscriptions-2)
Expand Down Expand Up @@ -209,7 +212,6 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) {
subscriptionFirstReply2: []byte(`{"jsonrpc":"2.0","id":4,"result":{}}`),
},
}

for _, play := range playbook {
t.Run(play.name, func(t *testing.T) {
ts := SetupForTests(t, 1, play.specId, "../../")
Expand Down Expand Up @@ -291,9 +293,9 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) {
Times(1) // Should call SendParsedRelay, because it is the first time we subscribe

consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String())

metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest)
// Create a new ConsumerWSSubscriptionManager
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage())
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil)

wg := sync.WaitGroup{}
wg.Add(10)
Expand All @@ -305,7 +307,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) {
ctx := utils.WithUniqueIdentifier(ts.Ctx, utils.GenerateUniqueIdentifier())
var repliesChan <-chan *pairingtypes.RelayReply
var firstReply *pairingtypes.RelayReply
firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp+strconv.Itoa(index), ts.Consumer.Addr.String(), uniqueId, nil)
firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp+strconv.Itoa(index), ts.Consumer.Addr.String(), uniqueId, metricsData)
go func() {
for subMsg := range repliesChan {
require.Equal(t, string(play.subscriptionFirstReply1), string(subMsg.Data))
Expand Down Expand Up @@ -379,6 +381,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {
unsubscribeMessage2: []byte(`{"jsonrpc":"2.0","method":"eth_unsubscribe","params":["0x2134567890"],"id":1}`),
},
}
metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest)

for _, play := range playbook {
t.Run(play.name, func(t *testing.T) {
Expand Down Expand Up @@ -538,12 +541,12 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {
consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String())

// Create a new ConsumerWSSubscriptionManager
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage())
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil)

// Start a new subscription for the first time, called SendParsedRelay once
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())

firstReply, repliesChan1, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, nil)
firstReply, repliesChan1, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData)
assert.NoError(t, err)
unsubscribeMessageWg.Add(1)
assert.Equal(t, string(play.subscriptionFirstReply1), string(firstReply.Data))
Expand All @@ -559,7 +562,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {

// Start a subscription again, same params, same dappKey, should not call SendParsedRelay
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
firstReply, repliesChan2, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, nil)
firstReply, repliesChan2, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData)
assert.NoError(t, err)
assert.Equal(t, string(play.subscriptionFirstReply1), string(firstReply.Data))
assert.Nil(t, repliesChan2) // Same subscription, same dappKey, no need for a new channel
Expand All @@ -568,7 +571,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {

// Start a subscription again, same params, different dappKey, should not call SendParsedRelay
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
firstReply, repliesChan3, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp2, ts.Consumer.Addr.String(), uniqueId, nil)
firstReply, repliesChan3, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp2, ts.Consumer.Addr.String(), uniqueId, metricsData)
assert.NoError(t, err)
assert.Equal(t, string(play.subscriptionFirstReply1), string(firstReply.Data))
assert.NotNil(t, repliesChan3) // Same subscription, but different dappKey, so will create new channel
Expand Down Expand Up @@ -652,7 +655,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {
// Start a subscription again, different params, same dappKey, should call SendParsedRelay
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())

firstReply, repliesChan4, err := manager.StartSubscription(ctx, subscribeProtocolMessage2, dapp1, ts.Consumer.Addr.String(), uniqueId, nil)
firstReply, repliesChan4, err := manager.StartSubscription(ctx, subscribeProtocolMessage2, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData)
assert.NoError(t, err)
unsubscribeMessageWg.Add(1)
assert.Equal(t, string(play.subscriptionFirstReply2), string(firstReply.Data))
Expand All @@ -671,7 +674,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {

ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
unsubProtocolMessage := NewProtocolMessage(unsubscribeChainMessage1, nil, relayResult1.Request.RelayData, dapp2, ts.Consumer.Addr.String())
err = manager.Unsubscribe(ctx, unsubProtocolMessage, dapp2, ts.Consumer.Addr.String(), uniqueId, nil)
err = manager.Unsubscribe(ctx, unsubProtocolMessage, dapp2, ts.Consumer.Addr.String(), uniqueId, metricsData)
require.NoError(t, err)

listenForExpectedMessages(ctx, repliesChan1, string(play.subscriptionFirstReply1))
Expand All @@ -697,7 +700,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {
Times(2) // Should call SendParsedRelay, because it unsubscribed

ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
err = manager.UnsubscribeAll(ctx, dapp1, ts.Consumer.Addr.String(), uniqueId, nil)
err = manager.UnsubscribeAll(ctx, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData)
require.NoError(t, err)

expectNoMoreMessages(ctx, repliesChan1)
Expand Down
Loading
Loading