From f3b2489fc70c84edfa564617768f4e39bf2f57ea Mon Sep 17 00:00:00 2001 From: omerlavanet Date: Fri, 13 Sep 2024 23:18:26 +0300 Subject: [PATCH] remove duplication of batches --- .../consumer_relay_state_machine.go | 39 +++++++------------ 1 file changed, 14 insertions(+), 25 deletions(-) diff --git a/protocol/rpcconsumer/consumer_relay_state_machine.go b/protocol/rpcconsumer/consumer_relay_state_machine.go index 348757245..b7fd41f68 100644 --- a/protocol/rpcconsumer/consumer_relay_state_machine.go +++ b/protocol/rpcconsumer/consumer_relay_state_machine.go @@ -114,7 +114,6 @@ func (rssi *RelayStateSendInstructions) IsDone() bool { func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSendInstructions { relayTaskChannel := make(chan RelayStateSendInstructions) go func() { - batchNumber := 0 // Set batch number // A channel to be notified processing was done, true means we have results and can return gotResults := make(chan bool, 1) processingTimeout, relayTimeout := crsm.relaySender.getProcessingTimeout(crsm.GetProtocolMessage()) @@ -127,7 +126,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend readResultsFromProcessor := func() { // ProcessResults is reading responses while blocking until the conditions are met - utils.LavaFormatTrace("[StateMachine] Waiting for results", utils.LogAttr("batch", batchNumber)) + utils.LavaFormatTrace("[StateMachine] Waiting for results", utils.LogAttr("batch", crsm.usedProviders.BatchNumber())) crsm.parentRelayProcessor.WaitForResults(processingCtx) // Decide if we need to resend or not if crsm.parentRelayProcessor.HasRequiredNodeResults() { @@ -140,11 +139,11 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend returnCondition := make(chan error, 1) // Used for checking whether to return an error to the user or to allow other channels return their result first see detailed description on the switch case below validateReturnCondition := func(err error) { - batchOnStart := batchNumber + batchOnStart := crsm.usedProviders.BatchNumber() time.Sleep(15 * time.Millisecond) - utils.LavaFormatTrace("[StateMachine] validating return condition", utils.LogAttr("batch", batchNumber)) + utils.LavaFormatTrace("[StateMachine] validating return condition", utils.LogAttr("batch", crsm.usedProviders.BatchNumber())) if batchOnStart == crsm.usedProviders.BatchNumber() && crsm.usedProviders.CurrentlyUsed() == 0 { - utils.LavaFormatTrace("[StateMachine] return condition triggered", utils.LogAttr("batch", batchNumber), utils.LogAttr("err", err)) + utils.LavaFormatTrace("[StateMachine] return condition triggered", utils.LogAttr("batch", crsm.usedProviders.BatchNumber()), utils.LogAttr("err", err)) returnCondition <- err } } @@ -165,16 +164,16 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend // Getting batch update for either errors sending message or successful batches case err := <-crsm.batchUpdate: if err != nil { // Error handling - utils.LavaFormatTrace("[StateMachine] err := <-crsm.batchUpdate", utils.LogAttr("err", err), utils.LogAttr("batch", batchNumber), utils.LogAttr("consecutiveBatchErrors", consecutiveBatchErrors)) + utils.LavaFormatTrace("[StateMachine] err := <-crsm.batchUpdate", utils.LogAttr("err", err), utils.LogAttr("batch", crsm.usedProviders.BatchNumber()), utils.LogAttr("consecutiveBatchErrors", consecutiveBatchErrors)) // Sending a new batch failed (consumer's protocol side), handling the state machine consecutiveBatchErrors++ // Increase consecutive error counter if consecutiveBatchErrors > SendRelayAttempts { // If we failed sending a message more than "SendRelayAttempts" time in a row. - if batchNumber == 0 && consecutiveBatchErrors == SendRelayAttempts+1 { // First relay attempt. print on first failure only. + if crsm.usedProviders.BatchNumber() == 0 && consecutiveBatchErrors == SendRelayAttempts+1 { // First relay attempt. print on first failure only. utils.LavaFormatWarning("Failed Sending First Message", err, utils.LogAttr("consecutive errors", consecutiveBatchErrors)) } go validateReturnCondition(err) // Check if we have ongoing messages pending return. } else { - utils.LavaFormatTrace("[StateMachine] batchUpdate - err != nil - batch fail retry attempt", utils.LogAttr("batch", batchNumber), utils.LogAttr("consecutiveBatchErrors", consecutiveBatchErrors)) + utils.LavaFormatTrace("[StateMachine] batchUpdate - err != nil - batch fail retry attempt", utils.LogAttr("batch", crsm.usedProviders.BatchNumber()), utils.LogAttr("consecutiveBatchErrors", consecutiveBatchErrors)) // Failed sending message, but we still want to attempt sending more. relayTaskChannel <- RelayStateSendInstructions{ protocolMessage: crsm.GetProtocolMessage(), @@ -183,20 +182,10 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend continue } // Successfully sent message. - batchNumber++ // Reset consecutiveBatchErrors consecutiveBatchErrors = 0 - // Batch number validation, should never happen. - if batchNumber != crsm.usedProviders.BatchNumber() { - // Mismatch, return error - relayTaskChannel <- RelayStateSendInstructions{ - err: utils.LavaFormatError("Batch Number mismatch between state machine and used providers", nil, utils.LogAttr("batchNumber", batchNumber), utils.LogAttr("crsm.parentRelayProcessor.usedProviders.BatchNumber()", crsm.usedProviders.BatchNumber())), - done: true, - } - return - } case success := <-gotResults: - utils.LavaFormatTrace("[StateMachine] success := <-gotResults", utils.LogAttr("batch", batchNumber)) + utils.LavaFormatTrace("[StateMachine] success := <-gotResults", utils.LogAttr("batch", crsm.usedProviders.BatchNumber())) // If we had a successful result return what we currently have // Or we are done sending relays, and we have no other relays pending results. if success { // Check wether we can return the valid results or we need to send another relay @@ -204,8 +193,8 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend return } // If should retry == true, send a new batch. (success == false) - if crsm.ShouldRetry(batchNumber) { - utils.LavaFormatTrace("[StateMachine] success := <-gotResults - crsm.ShouldRetry(batchNumber)", utils.LogAttr("batch", batchNumber)) + if crsm.ShouldRetry(crsm.usedProviders.BatchNumber()) { + utils.LavaFormatTrace("[StateMachine] success := <-gotResults - crsm.ShouldRetry(batchNumber)", utils.LogAttr("batch", crsm.usedProviders.BatchNumber())) relayTaskChannel <- RelayStateSendInstructions{protocolMessage: crsm.GetProtocolMessage()} } else { go validateReturnCondition(nil) @@ -213,14 +202,14 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend go readResultsFromProcessor() case <-startNewBatchTicker.C: // Only trigger another batch for non BestResult relays or if we didn't pass the retry limit. - if crsm.ShouldRetry(batchNumber) { - utils.LavaFormatTrace("[StateMachine] ticker triggered", utils.LogAttr("batch", batchNumber)) + if crsm.ShouldRetry(crsm.usedProviders.BatchNumber()) { + utils.LavaFormatTrace("[StateMachine] ticker triggered", utils.LogAttr("batch", crsm.usedProviders.BatchNumber())) relayTaskChannel <- RelayStateSendInstructions{protocolMessage: crsm.GetProtocolMessage()} // Add ticker launch metrics go crsm.tickerMetricSetter.SetRelaySentByNewBatchTickerMetric(crsm.relaySender.GetChainIdAndApiInterface()) } case returnErr := <-returnCondition: - utils.LavaFormatTrace("[StateMachine] returnErr := <-returnCondition", utils.LogAttr("batch", batchNumber)) + utils.LavaFormatTrace("[StateMachine] returnErr := <-returnCondition", utils.LogAttr("batch", crsm.usedProviders.BatchNumber())) // we use this channel because there could be a race condition between us releasing the provider and about to send the return // to an error happening on another relay processor's routine. this can cause an error that returns to the user // if we don't release the case, it will cause the success case condition to not be executed @@ -239,7 +228,7 @@ func (crsm *ConsumerRelayStateMachine) GetRelayTaskChannel() chan RelayStateSend utils.LogAttr("consumerIp", userData.ConsumerIp), utils.LogAttr("protocolMessage.GetApi().Name", crsm.GetProtocolMessage().GetApi().Name), utils.LogAttr("GUID", crsm.ctx), - utils.LogAttr("batchNumber", batchNumber), + utils.LogAttr("batchNumber", crsm.usedProviders.BatchNumber()), utils.LogAttr("consecutiveBatchErrors", consecutiveBatchErrors), ) // returning the context error