Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PRT provider load rate #1720

Merged
merged 58 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
7b8ea69
load rate report in trailer
Sep 30, 2024
4f7838a
fix trailer name
Sep 30, 2024
4fead99
merge main
Sep 30, 2024
ea1598b
fix lint
Sep 30, 2024
04f1762
fix load manager logic
Sep 30, 2024
eb2b345
fix lint
Sep 30, 2024
2c33935
fix spelling
Sep 30, 2024
46e6faf
fix logic
Sep 30, 2024
1f977ae
fixed flag & header names
Oct 1, 2024
6db7dd3
fix load provider manager and creation logic
Oct 1, 2024
b9e199b
fix logs for relay load rate
Oct 1, 2024
8b8a05a
fix rpcprovider server relay load handling
Oct 1, 2024
5e3b7a4
fix tests
Oct 1, 2024
199ff2c
fix typo
Oct 1, 2024
9faf8ef
fix init lava script
Oct 1, 2024
cc1af1e
Merge branch 'main' into prt-add-provider-relay-load-trailer
Oct 1, 2024
56191f6
fix provider load manager
Oct 1, 2024
9eabb5a
fix provider server and load manager
Oct 1, 2024
ffdb986
fix lint - fix protocol test
Oct 1, 2024
19cc454
fix provider load manager applyProviderLoadMetadataToContextTrailer
Oct 1, 2024
b73d267
Merge branch 'main' into prt-add-provider-relay-load-trailer
Oct 1, 2024
5a29efd
change cmdRPCProvider load rate flag to uint64
Oct 1, 2024
93c3220
try fix
Oct 1, 2024
eeb46ea
fix cmd flag reading
Oct 1, 2024
55221f6
adjusting uint64
ranlavanet Oct 2, 2024
e81afb9
fix redundent nil check in provider load manager
Oct 3, 2024
56c2b3f
Merge branch 'prt-add-provider-relay-load-trailer' of github.com:lava…
Oct 3, 2024
725f40a
fix providerLoadManager per chain creation
Oct 3, 2024
6da0e99
rename and fix instance passing unnecessarily
ranlavanet Oct 3, 2024
9458d6c
fixed chainlib common formatting
Oct 6, 2024
44a5e5c
fix provider load manager comments
Oct 6, 2024
03e1b17
fix e2e tests
Oct 6, 2024
c4bc4ec
fix pr - unite add relay load and set trailer
Oct 6, 2024
2bd9032
fix common.go provider load header
Oct 6, 2024
a44f0ab
fix edge case of getProviderLoad
Oct 6, 2024
50dab3f
Merge branch 'main' into prt-add-provider-relay-load-trailer
Oct 7, 2024
c88aee2
fix command flag description
Oct 8, 2024
30efbf1
fix command flag description
Oct 8, 2024
810db13
add metric for load rate
Oct 8, 2024
82f3020
fix division to be float and not uint
Oct 8, 2024
14d8c68
roll back init lava only with node two consumers
Oct 8, 2024
2d61289
fix load metric
Oct 8, 2024
280074b
merge branch 'main' into prt-add-provider-relay-load-trailer
Oct 9, 2024
97f72ec
merge main
Oct 9, 2024
454db08
Update protocol/chainlib/common.go
Oct 9, 2024
78ed595
Merge branch 'main' into prt-add-provider-relay-load-trailer
Oct 9, 2024
63a0e89
Merge branch 'prt-add-provider-relay-load-trailer' of github.com:lava…
Oct 9, 2024
d4c7258
fix load calculation
ranlavanet Oct 10, 2024
273a32a
tidy code
ranlavanet Oct 10, 2024
6fb7276
changing rate limit to 1k
ranlavanet Oct 10, 2024
272e676
fix bug
ranlavanet Oct 10, 2024
42486ac
fix pr
ranlavanet Oct 10, 2024
9484f9f
Merge branch 'main' into prt-add-provider-relay-load-trailer
omerlavanet Oct 15, 2024
ecd416c
Merge branch 'main' into prt-add-provider-relay-load-trailer
ranlavanet Oct 27, 2024
974dbcb
v4
ranlavanet Oct 27, 2024
258578f
Merge branch 'main' into prt-add-provider-relay-load-trailer
ranlavanet Oct 27, 2024
feeb373
fix pr
ranlavanet Oct 27, 2024
51894ce
fix
ranlavanet Oct 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions protocol/chainlib/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
relayMsgLogMaxChars = 200
RPCProviderNodeAddressHash = "Lava-Provider-Node-Address-Hash"
RPCProviderNodeExtension = "Lava-Provider-Node-Extension"
RpcProviderLoadRateHeader = "Lava-Provider-load-rate"
lyomagma marked this conversation as resolved.
Show resolved Hide resolved
RpcProviderUniqueIdHeader = "Lava-Provider-Unique-Id"
WebSocketExtension = "websocket"
)
Expand Down
1 change: 1 addition & 0 deletions protocol/common/cobra_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
// websocket flags
RateLimitWebSocketFlag = "rate-limit-websocket-requests-per-connection"
BanDurationForWebsocketRateLimitExceededFlag = "ban-duration-for-websocket-rate-limit-exceeded"
RateLimitRequestPerSecondFlag = "rate-limit-requests-per-second"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func createRpcProvider(t *testing.T, ctx context.Context, rpcProviderOptions rpc
chainTracker.StartAndServe(ctx)
reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, &mockProviderStateTracker, rpcProviderOptions.account.Addr.String(), chainRouter, chainParser)
mockReliabilityManager := NewMockReliabilityManager(reliabilityManager)
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false)
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false, nil)
listener := rpcprovider.NewProviderListener(ctx, rpcProviderEndpoint.NetworkAddress, "/health")
err = listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint)
require.NoError(t, err)
Expand Down
7 changes: 7 additions & 0 deletions protocol/metrics/metrics_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type ProviderMetrics struct {
totalRelaysServicedMetric *prometheus.CounterVec
totalErroredMetric *prometheus.CounterVec
consumerQoSMetric *prometheus.GaugeVec
loadRateMetric *prometheus.GaugeVec
}

func (pm *ProviderMetrics) AddRelay(consumerAddress string, cu uint64, qos *pairingtypes.QualityOfServiceReport) {
Expand Down Expand Up @@ -49,6 +50,10 @@ func (pm *ProviderMetrics) AddRelay(consumerAddress string, cu uint64, qos *pair
}
}

func (pm *ProviderMetrics) SetLoadRate(loatRate float64) {
pm.loadRateMetric.WithLabelValues(pm.specID).Set(loatRate)
}

func (pm *ProviderMetrics) AddPayment(cu uint64) {
if pm == nil {
return
Expand All @@ -72,6 +77,7 @@ func NewProviderMetrics(specID, apiInterface string, totalCUServicedMetric *prom
totalRelaysServicedMetric *prometheus.CounterVec,
totalErroredMetric *prometheus.CounterVec,
consumerQoSMetric *prometheus.GaugeVec,
loadRateMetric *prometheus.GaugeVec,
) *ProviderMetrics {
pm := &ProviderMetrics{
specID: specID,
Expand All @@ -82,6 +88,7 @@ func NewProviderMetrics(specID, apiInterface string, totalCUServicedMetric *prom
totalRelaysServicedMetric: totalRelaysServicedMetric,
totalErroredMetric: totalErroredMetric,
consumerQoSMetric: consumerQoSMetric,
loadRateMetric: loadRateMetric,
}
return pm
}
10 changes: 9 additions & 1 deletion protocol/metrics/metrics_provider_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type ProviderMetricsManager struct {
endpointsHealthChecksOk uint64
relaysMonitors map[string]*RelaysMonitor
relaysMonitorsLock sync.RWMutex
loadRateMetric *prometheus.GaugeVec
}

func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
Expand Down Expand Up @@ -107,6 +108,11 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
Help: "The total number of get latest block queries that succeeded by chainfetcher",
}, []string{"spec"})

loadRateMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_provider_load_rate",
Help: "The load rate according to the load rate limit - Given Y simultaneous relay calls, a value of X and will measure Y/X load rate.",
}, []string{"spec"})

fetchBlockSuccessMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_provider_fetch_block_success",
Help: "The total number of get specific block queries that succeeded by chainfetcher",
Expand Down Expand Up @@ -141,6 +147,7 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
prometheus.MustRegister(virtualEpochMetric)
prometheus.MustRegister(endpointsHealthChecksOkMetric)
prometheus.MustRegister(protocolVersionMetric)
prometheus.MustRegister(loadRateMetric)

providerMetricsManager := &ProviderMetricsManager{
providerMetrics: map[string]*ProviderMetrics{},
Expand All @@ -161,6 +168,7 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
endpointsHealthChecksOk: 1,
protocolVersionMetric: protocolVersionMetric,
relaysMonitors: map[string]*RelaysMonitor{},
loadRateMetric: loadRateMetric,
}

http.Handle("/metrics", promhttp.Handler())
Expand Down Expand Up @@ -209,7 +217,7 @@ func (pme *ProviderMetricsManager) AddProviderMetrics(specID, apiInterface strin
}

if pme.getProviderMetric(specID, apiInterface) == nil {
providerMetric := NewProviderMetrics(specID, apiInterface, pme.totalCUServicedMetric, pme.totalCUPaidMetric, pme.totalRelaysServicedMetric, pme.totalErroredMetric, pme.consumerQoSMetric)
providerMetric := NewProviderMetrics(specID, apiInterface, pme.totalCUServicedMetric, pme.totalCUPaidMetric, pme.totalRelaysServicedMetric, pme.totalErroredMetric, pme.consumerQoSMetric, pme.loadRateMetric)
pme.setProviderMetric(providerMetric)

endpoint := fmt.Sprintf("/metrics/%s/%s/health", specID, apiInterface)
Expand Down
21 changes: 13 additions & 8 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,17 @@ func (rpccs *RPCConsumerServer) HandleDirectiveHeadersForMessage(chainMessage ch
chainMessage.SetForceCacheRefresh(ok)
}

func (rpccs *RPCConsumerServer) getMetadataFromRelayTrailer(metadataHeader string, relayResult *common.RelayResult) {
trailerValue := relayResult.ProviderTrailer.Get(metadataHeader)
if len(trailerValue) > 0 {
extensionMD := pairingtypes.Metadata{
Name: metadataHeader,
Value: trailerValue[0],
}
relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, extensionMD)
}
}

func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, relayResult *common.RelayResult, protocolErrors uint64, relayProcessor *RelayProcessor, protocolMessage chainlib.ProtocolMessage, apiName string) {
if relayResult == nil {
return
Expand Down Expand Up @@ -1333,14 +1344,8 @@ func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context,
}

// fetch trailer information from the provider by using the provider trailer field.
providerNodeExtensions := relayResult.ProviderTrailer.Get(chainlib.RPCProviderNodeExtension)
if len(providerNodeExtensions) > 0 {
extensionMD := pairingtypes.Metadata{
Name: chainlib.RPCProviderNodeExtension,
Value: providerNodeExtensions[0],
}
relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, extensionMD)
}
rpccs.getMetadataFromRelayTrailer(chainlib.RPCProviderNodeExtension, relayResult)
rpccs.getMetadataFromRelayTrailer(chainlib.RpcProviderLoadRateHeader, relayResult)

directiveHeaders := protocolMessage.GetDirectiveHeaders()
_, debugRelays := directiveHeaders[common.LAVA_DEBUG_RELAY]
Expand Down
74 changes: 74 additions & 0 deletions protocol/rpcprovider/provider_load_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package rpcprovider

import (
"context"
"strconv"
"sync/atomic"

"github.com/lavanet/lava/v3/protocol/chainlib"
grpc "google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

type ProviderLoadManager struct {
rateLimitThreshold atomic.Uint64
activeRequestsPerSecond atomic.Uint64
}

func NewProviderLoadManager(rateLimitThreshold uint64) *ProviderLoadManager {
if rateLimitThreshold == 0 {
return nil
}
loadManager := &ProviderLoadManager{}

loadManager.rateLimitThreshold.Store(rateLimitThreshold)

return loadManager
}

func (loadManager *ProviderLoadManager) addRelayCall() {
if loadManager == nil {
return
}
loadManager.activeRequestsPerSecond.Add(1)
}

func (loadManager *ProviderLoadManager) subtractRelayCall() {
if loadManager == nil {
return
}
loadManager.activeRequestsPerSecond.Add(^uint64(0))
shleikes marked this conversation as resolved.
Show resolved Hide resolved
}

func (loadManager *ProviderLoadManager) getProviderLoad() float64 {
if loadManager == nil {
return 0
}
rateLimitThreshold := loadManager.rateLimitThreshold.Load()
shleikes marked this conversation as resolved.
Show resolved Hide resolved
if rateLimitThreshold == 0 {
return 0
}
activeRequests := loadManager.activeRequestsPerSecond.Load()
return float64(activeRequests) / float64(rateLimitThreshold)
}

func (loadManager *ProviderLoadManager) applyProviderLoadMetadataToContextTrailer(ctx context.Context) bool {
provideRelayLoad := loadManager.getProviderLoad()
shleikes marked this conversation as resolved.
Show resolved Hide resolved
if provideRelayLoad == 0 {
return false
}
formattedProviderLoad := strconv.FormatFloat(provideRelayLoad, 'f', -1, 64)

trailerMd := metadata.Pairs(chainlib.RpcProviderLoadRateHeader, formattedProviderLoad)
grpc.SetTrailer(ctx, trailerMd)
return true
}

func (loadManager *ProviderLoadManager) addAndSetRelayLoadToContextTrailer(ctx context.Context) bool {
loadManager.addRelayCall()
return loadManager.applyProviderLoadMetadataToContextTrailer(ctx)
}

func (loadManager *ProviderLoadManager) getActiveRequestsPerSecond() uint64 {
return loadManager.activeRequestsPerSecond.Load()
}
59 changes: 36 additions & 23 deletions protocol/rpcprovider/rpcprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type rpcProviderStartOptions struct {
healthCheckMetricsOptions *rpcProviderHealthCheckMetricsOptions
staticProvider bool
staticSpecPath string
relayLoadLimit uint64
}

type rpcProviderHealthCheckMetricsOptions struct {
Expand All @@ -123,24 +124,26 @@ type RPCProvider struct {
rpcProviderListeners map[string]*ProviderListener
lock sync.Mutex
// all of the following members need to be concurrency proof
providerMetricsManager *metrics.ProviderMetricsManager
rewardServer *rewardserver.RewardServer
privKey *btcec.PrivateKey
lavaChainID string
addr sdk.AccAddress
blockMemorySize uint64
chainMutexes map[string]*sync.Mutex
parallelConnections uint
cache *performance.Cache
shardID uint // shardID is a flag that allows setting up multiple provider databases of the same chain
chainTrackers *common.SafeSyncMap[string, *chaintracker.ChainTracker]
relaysMonitorAggregator *metrics.RelaysMonitorAggregator
relaysHealthCheckEnabled bool
relaysHealthCheckInterval time.Duration
grpcHealthCheckEndpoint string
providerUniqueId string
staticProvider bool
staticSpecPath string
providerMetricsManager *metrics.ProviderMetricsManager
rewardServer *rewardserver.RewardServer
privKey *btcec.PrivateKey
lavaChainID string
addr sdk.AccAddress
blockMemorySize uint64
chainMutexes map[string]*sync.Mutex
parallelConnections uint
cache *performance.Cache
shardID uint // shardID is a flag that allows setting up multiple provider databases of the same chain
chainTrackers *common.SafeSyncMap[string, *chaintracker.ChainTracker]
relaysMonitorAggregator *metrics.RelaysMonitorAggregator
relaysHealthCheckEnabled bool
relaysHealthCheckInterval time.Duration
grpcHealthCheckEndpoint string
providerUniqueId string
staticProvider bool
staticSpecPath string
relayLoadLimit uint64
providerLoadManagersPerChain map[string]*ProviderLoadManager
}

func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) {
Expand All @@ -165,6 +168,8 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) {
rpcp.grpcHealthCheckEndpoint = options.healthCheckMetricsOptions.grpcHealthCheckEndpoint
rpcp.staticProvider = options.staticProvider
rpcp.staticSpecPath = options.staticSpecPath
rpcp.relayLoadLimit = options.relayLoadLimit
rpcp.providerLoadManagersPerChain = make(map[string]*ProviderLoadManager)

// single state tracker
lavaChainFetcher := chainlib.NewLavaChainFetcher(ctx, options.clientCtx)
Expand Down Expand Up @@ -307,9 +312,7 @@ func (rpcp *RPCProvider) SetupProviderEndpoints(rpcProviderEndpoints []*lavasess
wg.Add(parallelJobs)
disabledEndpoints := make(chan *lavasession.RPCProviderEndpoint, parallelJobs)
// validate static spec configuration is used only on a single chain setup.
chainIds := make(map[string]struct{})
for _, rpcProviderEndpoint := range rpcProviderEndpoints {
chainIds[rpcProviderEndpoint.ChainID] = struct{}{}
setupEndpoint := func(rpcProviderEndpoint *lavasession.RPCProviderEndpoint, specValidator *SpecValidator) {
defer wg.Done()
err := rpcp.SetupEndpoint(context.Background(), rpcProviderEndpoint, specValidator)
Expand Down Expand Up @@ -404,6 +407,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint
utils.Attribute{Key: "Chain", Value: rpcProviderEndpoint.ChainID},
utils.Attribute{Key: "apiInterface", Value: apiInterface})
}
var providerLoadManager *ProviderLoadManager

// in order to utilize shared resources between chains we need go routines with the same chain to wait for one another here
chainCommonSetup := func() error {
Expand Down Expand Up @@ -450,6 +454,14 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint
chainTracker = chainTrackerLoaded
utils.LavaFormatDebug("reusing chain tracker", utils.Attribute{Key: "chain", Value: rpcProviderEndpoint.ChainID})
}

// create provider load manager per chain ID
var keyExists bool
providerLoadManager, keyExists = rpcp.providerLoadManagersPerChain[rpcProviderEndpoint.ChainID]
if !keyExists {
providerLoadManager = NewProviderLoadManager(rpcp.relayLoadLimit)
rpcp.providerLoadManagersPerChain[rpcProviderEndpoint.ChainID] = providerLoadManager
}
return nil
}
err = chainCommonSetup()
Expand Down Expand Up @@ -485,8 +497,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint
utils.LavaFormatTrace("Creating provider node subscription manager", utils.LogAttr("rpcProviderEndpoint", rpcProviderEndpoint))
providerNodeSubscriptionManager = chainlib.NewProviderNodeSubscriptionManager(chainRouter, chainParser, rpcProviderServer, rpcp.privKey)
}

rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider)
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, providerLoadManager)
// set up grpc listener
var listener *ProviderListener
func() {
Expand Down Expand Up @@ -717,6 +728,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt
if stickinessHeaderName != "" {
RPCProviderStickinessHeaderName = stickinessHeaderName
}
relayLoadLimit := viper.GetUint64(common.RateLimitRequestPerSecondFlag)
prometheusListenAddr := viper.GetString(metrics.MetricsListenFlagName)
rewardStoragePath := viper.GetString(rewardserver.RewardServerStorageFlagName)
rewardTTL := viper.GetDuration(rewardserver.RewardTTLFlagName)
Expand Down Expand Up @@ -754,6 +766,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt
&rpcProviderHealthCheckMetricsOptions,
staticProvider,
offlineSpecPath,
relayLoadLimit,
}

rpcProvider := RPCProvider{}
Expand Down Expand Up @@ -790,7 +803,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt
cmdRPCProvider.Flags().BoolVar(&chainlib.IgnoreSubscriptionNotConfiguredError, chainlib.IgnoreSubscriptionNotConfiguredErrorFlag, chainlib.IgnoreSubscriptionNotConfiguredError, "ignore webSocket node url not configured error, when subscription is enabled in spec")
cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors")
cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json")

cmdRPCProvider.Flags().Uint64(common.RateLimitRequestPerSecondFlag, 0, "Measuring the load relative to this number for feedback - per second - per chain - default unlimited. Given Y simultaneous relay calls, a value of X and will measure Y/X load rate.")
common.AddRollingLogConfig(cmdRPCProvider)
return cmdRPCProvider
}
17 changes: 16 additions & 1 deletion protocol/rpcprovider/rpcprovider_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type RPCProviderServer struct {
providerUniqueId string
StaticProvider bool
providerStateMachine *ProviderStateMachine
providerLoadManager *ProviderLoadManager
}

type ReliabilityManagerInf interface {
Expand Down Expand Up @@ -112,6 +113,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests(
relaysMonitor *metrics.RelaysMonitor,
providerNodeSubscriptionManager *chainlib.ProviderNodeSubscriptionManager,
staticProvider bool,
providerLoadManager *ProviderLoadManager,
) {
rpcps.cache = cache
rpcps.chainRouter = chainRouter
Expand All @@ -134,6 +136,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests(
rpcps.relaysMonitor = relaysMonitor
rpcps.providerNodeSubscriptionManager = providerNodeSubscriptionManager
rpcps.providerStateMachine = NewProviderStateMachine(rpcProviderEndpoint.ChainID, lavaprotocol.NewRelayRetriesManager(), chainRouter)
rpcps.providerLoadManager = providerLoadManager

rpcps.initRelaysMonitor(ctx)
}
Expand All @@ -156,6 +159,11 @@ func (rpcps *RPCProviderServer) initRelaysMonitor(ctx context.Context) {
rpcps.relaysMonitor.Start(ctx)
}

func (rpcps *RPCProviderServer) setLoadMetric() {
loadRate := rpcps.providerLoadManager.getProviderLoad()
rpcps.metrics.SetLoadRate(loadRate)
}

func (rpcps *RPCProviderServer) craftChainMessage() (chainMessage chainlib.ChainMessage, err error) {
parsing, apiCollection, ok := rpcps.chainParser.GetParsingByTag(spectypes.FUNCTION_TAG_GET_BLOCKNUM)
if !ok {
Expand All @@ -180,7 +188,14 @@ func (rpcps *RPCProviderServer) craftChainMessage() (chainMessage chainlib.Chain

// function used to handle relay requests from a consumer, it is called by a provider_listener by calling RegisterReceiver
func (rpcps *RPCProviderServer) Relay(ctx context.Context, request *pairingtypes.RelayRequest) (*pairingtypes.RelayReply, error) {
grpc.SetTrailer(ctx, metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId))
// count the number of simultaneous relay calls
isLoadRateSet := rpcps.providerLoadManager.addAndSetRelayLoadToContextTrailer(ctx)
defer func() { go rpcps.providerLoadManager.subtractRelayCall() }()
if isLoadRateSet {
go rpcps.setLoadMetric()
}
trailerMd := metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId)
grpc.SetTrailer(ctx, trailerMd)
if request.RelayData == nil || request.RelaySession == nil {
return nil, utils.LavaFormatWarning("invalid relay request, internal fields are nil", nil)
}
Expand Down
Loading
Loading