Skip to content

Commit

Permalink
fix pr
Browse files Browse the repository at this point in the history
  • Loading branch information
leon mandel authored and leon mandel committed Sep 17, 2024
1 parent de308d8 commit a11dbdb
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 127 deletions.
30 changes: 10 additions & 20 deletions protocol/chainlib/consumer_ws_subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,6 @@ import (
spectypes "github.com/lavanet/lava/v3/x/spec/types"
)

// is there a place to declare enums etc?
type RelayDisconnectReasonEnum string

const (
ConsumerDisconnect RelayDisconnectReasonEnum = "ConsumerDisconnect"
ProviderDisconnect RelayDisconnectReasonEnum = "ProviderDisconnect"
UserDisconnect RelayDisconnectReasonEnum = "UserDisconnect"
)

type unsubscribeRelayData struct {
protocolMessage ProtocolMessage
}
Expand Down Expand Up @@ -65,7 +56,7 @@ type ConsumerWSSubscriptionManager struct {
activeSubscriptionProvidersStorage *lavasession.ActiveSubscriptionProvidersStorage
currentlyPendingSubscriptions map[string]*pendingSubscriptionsBroadcastManager
lock sync.RWMutex
rpcConsumerLogs *metrics.RPCConsumerLogs
consumerMetricsManager *metrics.ConsumerMetricsManager
}

func NewConsumerWSSubscriptionManager(
Expand All @@ -75,7 +66,7 @@ func NewConsumerWSSubscriptionManager(
connectionType string,
chainParser ChainParser,
activeSubscriptionProvidersStorage *lavasession.ActiveSubscriptionProvidersStorage,
rpcConsumerLogs *metrics.RPCConsumerLogs,
consumerMetricsManager *metrics.ConsumerMetricsManager,
) *ConsumerWSSubscriptionManager {
return &ConsumerWSSubscriptionManager{
connectedDapps: make(map[string]map[string]*common.SafeChannelSender[*pairingtypes.RelayReply]),
Expand All @@ -87,7 +78,7 @@ func NewConsumerWSSubscriptionManager(
relaySender: relaySender,
connectionType: connectionType,
activeSubscriptionProvidersStorage: activeSubscriptionProvidersStorage,
rpcConsumerLogs: rpcConsumerLogs,
consumerMetricsManager: consumerMetricsManager,
}
}

Expand Down Expand Up @@ -202,7 +193,6 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(
webSocketConnectionUniqueId string,
metricsData *metrics.RelayMetrics,
) (firstReply *pairingtypes.RelayReply, repliesChan <-chan *pairingtypes.RelayReply, err error) {
go cwsm.rpcConsumerLogs.SetRelaySubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
hashedParams, _, err := cwsm.getHashedParams(protocolMessage)
if err != nil {
return nil, nil, utils.LavaFormatError("could not marshal params", err)
Expand Down Expand Up @@ -230,7 +220,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(

// called after send relay failure or parsing failure afterwards
onSubscriptionFailure := func() {
go cwsm.rpcConsumerLogs.SetFailedRelaySubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
go cwsm.consumerMetricsManager.SetFailedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
cwsm.failedPendingSubscription(hashedParams)
closeWebsocketRepliesChannel()
}
Expand Down Expand Up @@ -270,7 +260,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(
// Validated there are no active subscriptions that we can use.
firstSubscriptionReply, returnWebsocketRepliesChan := cwsm.checkForActiveSubscriptionWithLock(webSocketCtx, hashedParams, protocolMessage, dappKey, websocketRepliesSafeChannelSender, closeWebsocketRepliesChannel)
if firstSubscriptionReply != nil {
go cwsm.rpcConsumerLogs.SetDuplicatedRelaySubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
go cwsm.consumerMetricsManager.SetDuplicatedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
if returnWebsocketRepliesChan {
return firstSubscriptionReply, websocketRepliesChan, nil
}
Expand Down Expand Up @@ -428,7 +418,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(
cwsm.successfulPendingSubscription(hashedParams)
// Need to be run once for subscription
go cwsm.listenForSubscriptionMessages(webSocketCtx, dappID, consumerIp, replyServer, hashedParams, providerAddr, metricsData, closeSubscriptionChan)

go cwsm.consumerMetricsManager.SetWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
return &reply, websocketRepliesChan, nil
}

Expand Down Expand Up @@ -540,22 +530,22 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages(
utils.LogAttr("GUID", webSocketCtx),
utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)),
)
go cwsm.rpcConsumerLogs.SetRelaySubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, string(UserDisconnect))
go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.UserDisconnect)
return
case <-replyServer.Context().Done():
utils.LavaFormatTrace("reply server context canceled",
utils.LogAttr("GUID", webSocketCtx),
utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)),
)
go cwsm.rpcConsumerLogs.SetRelaySubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, string(ConsumerDisconnect))
go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.ConsumerDisconnect)
return
default:
var reply pairingtypes.RelayReply
err := replyServer.RecvMsg(&reply)
if err != nil {
// The connection was closed by the provider
utils.LavaFormatTrace("error reading from subscription stream", utils.LogAttr("original error", err.Error()))
go cwsm.rpcConsumerLogs.SetRelaySubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, string(ProviderDisconnect))
go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.ProviderDisconnect)
return
}
err = cwsm.handleIncomingSubscriptionNodeMessage(hashedParams, &reply, providerAddr)
Expand All @@ -564,7 +554,7 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages(
utils.LogAttr("hashedParams", hashedParams),
utils.LogAttr("reply", reply),
)
go cwsm.rpcConsumerLogs.SetFailedRelaySubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
go cwsm.consumerMetricsManager.SetFailedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
return
}
}
Expand Down
9 changes: 3 additions & 6 deletions protocol/chainlib/consumer_ws_subscription_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes
subscriptionFirstReply2: []byte(`{"jsonrpc":"2.0","id":4,"result":{}}`),
},
}
rpcconsumerLogs, _ := metrics.NewRPCConsumerLogs(nil, nil)
metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest)
for _, play := range playbook {
t.Run(play.name, func(t *testing.T) {
Expand Down Expand Up @@ -140,7 +139,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes
consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String())

// Create a new ConsumerWSSubscriptionManager
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs)
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil)
uniqueIdentifiers := make([]string, numberOfParallelSubscriptions)
wg := sync.WaitGroup{}
wg.Add(numberOfParallelSubscriptions)
Expand Down Expand Up @@ -294,10 +293,9 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) {
Times(1) // Should call SendParsedRelay, because it is the first time we subscribe

consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String())
rpcconsumerLogs, _ := metrics.NewRPCConsumerLogs(nil, nil)
metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest)
// Create a new ConsumerWSSubscriptionManager
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs)
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil)

wg := sync.WaitGroup{}
wg.Add(10)
Expand Down Expand Up @@ -383,7 +381,6 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {
unsubscribeMessage2: []byte(`{"jsonrpc":"2.0","method":"eth_unsubscribe","params":["0x2134567890"],"id":1}`),
},
}
rpcconsumerLogs, _ := metrics.NewRPCConsumerLogs(nil, nil)
metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest)

for _, play := range playbook {
Expand Down Expand Up @@ -544,7 +541,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {
consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String())

// Create a new ConsumerWSSubscriptionManager
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), rpcconsumerLogs)
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil)

// Start a new subscription for the first time, called SendParsedRelay once
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
Expand Down
Loading

0 comments on commit a11dbdb

Please sign in to comment.