-
Notifications
You must be signed in to change notification settings - Fork 207
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: PRT- adding provider retry mechanism on node error for better Q…
…OS (#1660) * feat: PRT - adding relay processor retry options * rename * improving the features * removed user data * fix lint * fix test * fixed all comments * llinty * bugberan * fix comment * feat: PRT- adding provider retry mechanism on node error for better QOS * fix race on tests. * remove spam, and add logs * fix retry 0 * implement provider relay state machine * fixed all comments * fix all comments * lint
- Loading branch information
1 parent
0f407c2
commit 4793c0b
Showing
13 changed files
with
253 additions
and
36 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
package rpcprovider | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"github.com/lavanet/lava/v2/protocol/chainlib" | ||
"github.com/lavanet/lava/v2/protocol/chainlib/chainproxy/rpcclient" | ||
"github.com/lavanet/lava/v2/protocol/common" | ||
"github.com/lavanet/lava/v2/protocol/lavaprotocol" | ||
"github.com/lavanet/lava/v2/utils" | ||
pairingtypes "github.com/lavanet/lava/v2/x/pairing/types" | ||
) | ||
|
||
type RelaySender interface { | ||
SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage chainlib.ChainMessageForSend, extensions []string) (relayReply *chainlib.RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) | ||
} | ||
|
||
type ProviderStateMachine struct { | ||
relayRetriesManager lavaprotocol.RelayRetriesManagerInf | ||
chainId string | ||
relaySender RelaySender | ||
} | ||
|
||
func NewProviderStateMachine(chainId string, relayRetriesManager lavaprotocol.RelayRetriesManagerInf, relaySender RelaySender) *ProviderStateMachine { | ||
return &ProviderStateMachine{ | ||
relayRetriesManager: relayRetriesManager, | ||
chainId: chainId, | ||
relaySender: relaySender, | ||
} | ||
} | ||
|
||
func (psm *ProviderStateMachine) SendNodeMessage(ctx context.Context, chainMsg chainlib.ChainMessage, request *pairingtypes.RelayRequest) (*chainlib.RelayReplyWrapper, error) { | ||
hash, err := chainMsg.GetRawRequestHash() | ||
requestHashString := "" | ||
if err != nil { | ||
utils.LavaFormatWarning("Failed converting message to hash", err, utils.LogAttr("url", request.RelayData.ApiUrl), utils.LogAttr("data", string(request.RelayData.Data))) | ||
} else { | ||
requestHashString = string(hash) | ||
} | ||
|
||
var replyWrapper *chainlib.RelayReplyWrapper | ||
var isNodeError bool | ||
for retryAttempt := 0; retryAttempt <= numberOfRetriesAllowedOnNodeErrors; retryAttempt++ { | ||
sendTime := time.Now() | ||
replyWrapper, _, _, _, _, err = psm.relaySender.SendNodeMsg(ctx, nil, chainMsg, request.RelayData.Extensions) | ||
if err != nil { | ||
return nil, utils.LavaFormatError("Sending chainMsg failed", err, utils.LogAttr("attempt", retryAttempt), utils.LogAttr("GUID", ctx), utils.LogAttr("specID", psm.chainId)) | ||
} | ||
|
||
if replyWrapper == nil || replyWrapper.RelayReply == nil { | ||
return nil, utils.LavaFormatError("Relay Wrapper returned nil without an error", nil, utils.LogAttr("attempt", retryAttempt), utils.LogAttr("GUID", ctx), utils.LogAttr("specID", psm.chainId)) | ||
} | ||
|
||
if debugLatency { | ||
utils.LavaFormatDebug("node reply received", utils.LogAttr("attempt", retryAttempt), utils.LogAttr("timeTaken", time.Since(sendTime)), utils.LogAttr("GUID", ctx), utils.LogAttr("specID", psm.chainId)) | ||
} | ||
|
||
// Failed fetching hash return the reply. | ||
if requestHashString == "" { | ||
utils.LavaFormatWarning("Failed to hash request, shouldn't happen", nil, utils.LogAttr("url", request.RelayData.ApiUrl), utils.LogAttr("data", string(request.RelayData.Data))) | ||
break // We can't perform the retries as we failed fetching the request hash. | ||
} | ||
|
||
// Check for node errors | ||
isNodeError, _ = chainMsg.CheckResponseError(replyWrapper.RelayReply.Data, replyWrapper.StatusCode) | ||
if !isNodeError { | ||
// Successful relay, remove it from the cache if we have it and return a valid response. | ||
go psm.relayRetriesManager.RemoveHashFromCache(requestHashString) | ||
return replyWrapper, nil | ||
} | ||
|
||
// On the first retry, check if this hash has already failed previously | ||
if retryAttempt == 0 && psm.relayRetriesManager.CheckHashInCache(requestHashString) { | ||
utils.LavaFormatTrace("received node error, request hash was already in cache, skipping retry") | ||
break | ||
} | ||
utils.LavaFormatTrace("Errored Node Message, retrying message", utils.LogAttr("retry", retryAttempt)) | ||
} | ||
|
||
if isNodeError { | ||
utils.LavaFormatTrace("failed all relay retries for message") | ||
go psm.relayRetriesManager.AddHashToCache(requestHashString) | ||
} | ||
return replyWrapper, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
package rpcprovider | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/golang/mock/gomock" | ||
"github.com/lavanet/lava/v2/protocol/chainlib" | ||
"github.com/lavanet/lava/v2/protocol/chainlib/chainproxy/rpcclient" | ||
"github.com/lavanet/lava/v2/protocol/common" | ||
"github.com/lavanet/lava/v2/protocol/lavaprotocol" | ||
types "github.com/lavanet/lava/v2/x/pairing/types" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
type relaySenderMock struct { | ||
numberOfTimesHitSendNodeMsg int | ||
} | ||
|
||
func (rs *relaySenderMock) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage chainlib.ChainMessageForSend, extensions []string) (relayReply *chainlib.RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) { | ||
rs.numberOfTimesHitSendNodeMsg++ | ||
return &chainlib.RelayReplyWrapper{RelayReply: &types.RelayReply{}}, "", nil, common.NodeUrl{}, "", nil | ||
} | ||
|
||
func TestStateMachineHappyFlow(t *testing.T) { | ||
relaySender := &relaySenderMock{} | ||
stateMachine := NewProviderStateMachine("test", lavaprotocol.NewRelayRetriesManager(), relaySender) | ||
chainMsgMock := chainlib.NewMockChainMessage(gomock.NewController(t)) | ||
chainMsgMock. | ||
EXPECT(). | ||
GetRawRequestHash(). | ||
Return([]byte{1, 2, 3}, nil). | ||
AnyTimes() | ||
chainMsgMock. | ||
EXPECT(). | ||
CheckResponseError(gomock.Any(), gomock.Any()). | ||
DoAndReturn(func(msg interface{}, msg2 interface{}) (interface{}, interface{}) { | ||
if relaySender.numberOfTimesHitSendNodeMsg < numberOfRetriesAllowedOnNodeErrors { | ||
return true, "" | ||
} | ||
return false, "" | ||
}). | ||
AnyTimes() | ||
stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}}) | ||
hash, _ := chainMsgMock.GetRawRequestHash() | ||
require.Equal(t, relaySender.numberOfTimesHitSendNodeMsg, numberOfRetriesAllowedOnNodeErrors) | ||
require.False(t, stateMachine.relayRetriesManager.CheckHashInCache(string(hash))) | ||
} | ||
|
||
func TestStateMachineAllFailureFlows(t *testing.T) { | ||
relaySender := &relaySenderMock{} | ||
stateMachine := NewProviderStateMachine("test", lavaprotocol.NewRelayRetriesManager(), relaySender) | ||
chainMsgMock := chainlib.NewMockChainMessage(gomock.NewController(t)) | ||
returnFalse := false | ||
chainMsgMock. | ||
EXPECT(). | ||
GetRawRequestHash(). | ||
Return([]byte{1, 2, 3}, nil). | ||
AnyTimes() | ||
chainMsgMock. | ||
EXPECT(). | ||
CheckResponseError(gomock.Any(), gomock.Any()). | ||
DoAndReturn(func(msg interface{}, msg2 interface{}) (interface{}, interface{}) { | ||
if returnFalse { | ||
return false, "" | ||
} | ||
return true, "" | ||
}). | ||
AnyTimes() | ||
stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}}) | ||
hash, _ := chainMsgMock.GetRawRequestHash() | ||
require.Equal(t, numberOfRetriesAllowedOnNodeErrors+1, relaySender.numberOfTimesHitSendNodeMsg) | ||
for i := 0; i < 10; i++ { | ||
// wait for routine to end.. | ||
if stateMachine.relayRetriesManager.CheckHashInCache(string(hash)) { | ||
break | ||
} | ||
time.Sleep(100 * time.Millisecond) | ||
} | ||
require.True(t, stateMachine.relayRetriesManager.CheckHashInCache(string(hash))) | ||
|
||
// send second relay with same hash. | ||
stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}}) | ||
require.Equal(t, 4, relaySender.numberOfTimesHitSendNodeMsg) // no retries. | ||
} | ||
|
||
func TestStateMachineFailureAndRecoveryFlow(t *testing.T) { | ||
relaySender := &relaySenderMock{} | ||
stateMachine := NewProviderStateMachine("test", lavaprotocol.NewRelayRetriesManager(), relaySender) | ||
chainMsgMock := chainlib.NewMockChainMessage(gomock.NewController(t)) | ||
returnFalse := false | ||
chainMsgMock. | ||
EXPECT(). | ||
GetRawRequestHash(). | ||
Return([]byte{1, 2, 3}, nil). | ||
AnyTimes() | ||
chainMsgMock. | ||
EXPECT(). | ||
CheckResponseError(gomock.Any(), gomock.Any()). | ||
DoAndReturn(func(msg interface{}, msg2 interface{}) (interface{}, interface{}) { | ||
if returnFalse { | ||
return false, "" | ||
} | ||
return true, "" | ||
}). | ||
AnyTimes() | ||
stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}}) | ||
hash, _ := chainMsgMock.GetRawRequestHash() | ||
require.Equal(t, numberOfRetriesAllowedOnNodeErrors+1, relaySender.numberOfTimesHitSendNodeMsg) | ||
for i := 0; i < 10; i++ { | ||
// wait for routine to end.. | ||
if stateMachine.relayRetriesManager.CheckHashInCache(string(hash)) { | ||
break | ||
} | ||
time.Sleep(100 * time.Millisecond) | ||
} | ||
require.True(t, stateMachine.relayRetriesManager.CheckHashInCache(string(hash))) | ||
|
||
// send second relay with same hash. | ||
returnFalse = true | ||
stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}}) | ||
require.Equal(t, 4, relaySender.numberOfTimesHitSendNodeMsg) // no retries, first success. | ||
// wait for routine to end.. | ||
for i := 0; i < 10; i++ { | ||
if !stateMachine.relayRetriesManager.CheckHashInCache(string(hash)) { | ||
break | ||
} | ||
time.Sleep(100 * time.Millisecond) | ||
} | ||
require.False(t, stateMachine.relayRetriesManager.CheckHashInCache(string(hash))) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.