Skip to content

Commit

Permalink
remove duplication of batches
Browse files Browse the repository at this point in the history
  • Loading branch information
omerlavanet committed Sep 13, 2024
1 parent dd307cd commit f3b2489
Showing 1 changed file with 14 additions and 25 deletions.
39 changes: 14 additions & 25 deletions protocol/rpcconsumer/consumer_relay_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ func (rssi *RelayStateSendInstructions) IsDone() bool {
func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSendInstructions {
relayTaskChannel := make(chan RelayStateSendInstructions)
go func() {
batchNumber := 0 // Set batch number
// A channel to be notified processing was done, true means we have results and can return
gotResults := make(chan bool, 1)
processingTimeout, relayTimeout := crsm.relaySender.getProcessingTimeout(crsm.GetProtocolMessage())
Expand All @@ -127,7 +126,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend

readResultsFromProcessor := func() {
// ProcessResults is reading responses while blocking until the conditions are met
utils.LavaFormatTrace("[StateMachine] Waiting for results", utils.LogAttr("batch", batchNumber))
utils.LavaFormatTrace("[StateMachine] Waiting for results", utils.LogAttr("batch", crsm.usedProviders.BatchNumber()))
crsm.parentRelayProcessor.WaitForResults(processingCtx)
// Decide if we need to resend or not
if crsm.parentRelayProcessor.HasRequiredNodeResults() {
Expand All @@ -140,11 +139,11 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
returnCondition := make(chan error, 1)
// Used for checking whether to return an error to the user or to allow other channels return their result first see detailed description on the switch case below
validateReturnCondition := func(err error) {
batchOnStart := batchNumber
batchOnStart := crsm.usedProviders.BatchNumber()
time.Sleep(15 * time.Millisecond)
utils.LavaFormatTrace("[StateMachine] validating return condition", utils.LogAttr("batch", batchNumber))
utils.LavaFormatTrace("[StateMachine] validating return condition", utils.LogAttr("batch", crsm.usedProviders.BatchNumber()))
if batchOnStart == crsm.usedProviders.BatchNumber() && crsm.usedProviders.CurrentlyUsed() == 0 {
utils.LavaFormatTrace("[StateMachine] return condition triggered", utils.LogAttr("batch", batchNumber), utils.LogAttr("err", err))
utils.LavaFormatTrace("[StateMachine] return condition triggered", utils.LogAttr("batch", crsm.usedProviders.BatchNumber()), utils.LogAttr("err", err))
returnCondition <- err
}
}
Expand All @@ -165,16 +164,16 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
// Getting batch update for either errors sending message or successful batches
case err := <-crsm.batchUpdate:
if err != nil { // Error handling
utils.LavaFormatTrace("[StateMachine] err := <-crsm.batchUpdate", utils.LogAttr("err", err), utils.LogAttr("batch", batchNumber), utils.LogAttr("consecutiveBatchErrors", consecutiveBatchErrors))
utils.LavaFormatTrace("[StateMachine] err := <-crsm.batchUpdate", utils.LogAttr("err", err), utils.LogAttr("batch", crsm.usedProviders.BatchNumber()), utils.LogAttr("consecutiveBatchErrors", consecutiveBatchErrors))
// Sending a new batch failed (consumer's protocol side), handling the state machine
consecutiveBatchErrors++ // Increase consecutive error counter
if consecutiveBatchErrors > SendRelayAttempts { // If we failed sending a message more than "SendRelayAttempts" time in a row.
if batchNumber == 0 && consecutiveBatchErrors == SendRelayAttempts+1 { // First relay attempt. print on first failure only.
if crsm.usedProviders.BatchNumber() == 0 && consecutiveBatchErrors == SendRelayAttempts+1 { // First relay attempt. print on first failure only.
utils.LavaFormatWarning("Failed Sending First Message", err, utils.LogAttr("consecutive errors", consecutiveBatchErrors))
}
go validateReturnCondition(err) // Check if we have ongoing messages pending return.
} else {
utils.LavaFormatTrace("[StateMachine] batchUpdate - err != nil - batch fail retry attempt", utils.LogAttr("batch", batchNumber), utils.LogAttr("consecutiveBatchErrors", consecutiveBatchErrors))
utils.LavaFormatTrace("[StateMachine] batchUpdate - err != nil - batch fail retry attempt", utils.LogAttr("batch", crsm.usedProviders.BatchNumber()), utils.LogAttr("consecutiveBatchErrors", consecutiveBatchErrors))
// Failed sending message, but we still want to attempt sending more.
relayTaskChannel <- RelayStateSendInstructions{
protocolMessage: crsm.GetProtocolMessage(),
Expand All @@ -183,44 +182,34 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
continue
}
// Successfully sent message.
batchNumber++
// Reset consecutiveBatchErrors
consecutiveBatchErrors = 0
// Batch number validation, should never happen.
if batchNumber != crsm.usedProviders.BatchNumber() {
// Mismatch, return error
relayTaskChannel <- RelayStateSendInstructions{
err: utils.LavaFormatError("Batch Number mismatch between state machine and used providers", nil, utils.LogAttr("batchNumber", batchNumber), utils.LogAttr("crsm.parentRelayProcessor.usedProviders.BatchNumber()", crsm.usedProviders.BatchNumber())),
done: true,
}
return
}
case success := <-gotResults:
utils.LavaFormatTrace("[StateMachine] success := <-gotResults", utils.LogAttr("batch", batchNumber))
utils.LavaFormatTrace("[StateMachine] success := <-gotResults", utils.LogAttr("batch", crsm.usedProviders.BatchNumber()))
// If we had a successful result return what we currently have
// Or we are done sending relays, and we have no other relays pending results.
if success { // Check wether we can return the valid results or we need to send another relay
relayTaskChannel <- RelayStateSendInstructions{done: true}
return
}
// If should retry == true, send a new batch. (success == false)
if crsm.ShouldRetry(batchNumber) {
utils.LavaFormatTrace("[StateMachine] success := <-gotResults - crsm.ShouldRetry(batchNumber)", utils.LogAttr("batch", batchNumber))
if crsm.ShouldRetry(crsm.usedProviders.BatchNumber()) {
utils.LavaFormatTrace("[StateMachine] success := <-gotResults - crsm.ShouldRetry(batchNumber)", utils.LogAttr("batch", crsm.usedProviders.BatchNumber()))
relayTaskChannel <- RelayStateSendInstructions{protocolMessage: crsm.GetProtocolMessage()}
} else {
go validateReturnCondition(nil)
}
go readResultsFromProcessor()
case <-startNewBatchTicker.C:
// Only trigger another batch for non BestResult relays or if we didn't pass the retry limit.
if crsm.ShouldRetry(batchNumber) {
utils.LavaFormatTrace("[StateMachine] ticker triggered", utils.LogAttr("batch", batchNumber))
if crsm.ShouldRetry(crsm.usedProviders.BatchNumber()) {
utils.LavaFormatTrace("[StateMachine] ticker triggered", utils.LogAttr("batch", crsm.usedProviders.BatchNumber()))
relayTaskChannel <- RelayStateSendInstructions{protocolMessage: crsm.GetProtocolMessage()}
// Add ticker launch metrics
go crsm.tickerMetricSetter.SetRelaySentByNewBatchTickerMetric(crsm.relaySender.GetChainIdAndApiInterface())
}
case returnErr := <-returnCondition:
utils.LavaFormatTrace("[StateMachine] returnErr := <-returnCondition", utils.LogAttr("batch", batchNumber))
utils.LavaFormatTrace("[StateMachine] returnErr := <-returnCondition", utils.LogAttr("batch", crsm.usedProviders.BatchNumber()))
// we use this channel because there could be a race condition between us releasing the provider and about to send the return
// to an error happening on another relay processor's routine. this can cause an error that returns to the user
// if we don't release the case, it will cause the success case condition to not be executed
Expand All @@ -239,7 +228,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend
utils.LogAttr("consumerIp", userData.ConsumerIp),
utils.LogAttr("protocolMessage.GetApi().Name", crsm.GetProtocolMessage().GetApi().Name),
utils.LogAttr("GUID", crsm.ctx),
utils.LogAttr("batchNumber", batchNumber),
utils.LogAttr("batchNumber", crsm.usedProviders.BatchNumber()),
utils.LogAttr("consecutiveBatchErrors", consecutiveBatchErrors),
)
// returning the context error
Expand Down

0 comments on commit f3b2489

Please sign in to comment.