Skip to content

Commit

Permalink
revert: median concurrency changes (#1213)
Browse files Browse the repository at this point in the history
* Revert "refactor: storing medians concurrently (#1172)"

This reverts commit 230a971.

* Revert "refactor: optimized GetSortedRevealedValues (#1173)"

This reverts commit 9842a61.

* fix: fixed API tests by updating RPC provider to op sepolia staging in POST request tests

* refactor: rewritten tests extensively for GetSortedRevealedValues()

* Revert "fix: Increased the buffer size of errChan in `GetBiggestStakerId` (#1175)"

This reverts commit 99de721.

* Revert "refactor: biggest staker calculation implemented using goroutines (#1171)"

This reverts commit ef41b96.
  • Loading branch information
Yashk767 authored Jun 13, 2024
1 parent 5ae7e15 commit e210cf5
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 273 deletions.
220 changes: 60 additions & 160 deletions cmd/propose.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ func (*UtilsStruct) GetBiggestStakeAndId(client *ethclient.Client, address strin
if numberOfStakers == 0 {
return nil, 0, errors.New("numberOfStakers is 0")
}
var biggestStakerId uint32
biggestStake := big.NewInt(0)

bufferPercent, err := cmdUtils.GetBufferPercent()
if err != nil {
Expand All @@ -222,53 +224,30 @@ func (*UtilsStruct) GetBiggestStakeAndId(client *ethclient.Client, address strin
log.Debug("GetBiggestStakeAndId: State remaining time: ", stateRemainingTime)
stateTimeout := time.NewTimer(time.Second * time.Duration(stateRemainingTime))

var (
biggestStake = big.NewInt(0)
biggestStakerId uint32
mu sync.Mutex
wg sync.WaitGroup
errChan = make(chan error, numberOfStakers)
)

log.Debug("Iterating over all the stakers...")
loop:
for i := 1; i <= int(numberOfStakers); i++ {
wg.Add(1)
go func(stakerId int) {
defer wg.Done()

select {
case <-stateTimeout.C:
errChan <- errors.New("state timeout error")
return
default:
stake, err := razorUtils.GetStakeSnapshot(client, uint32(stakerId), epoch)
if err != nil {
errChan <- err
return
}

mu.Lock()
defer mu.Unlock()
if stake.Cmp(biggestStake) > 0 {
biggestStake = stake
biggestStakerId = uint32(stakerId)
}
select {
case <-stateTimeout.C:
log.Error("State timeout!")
err = errors.New("state timeout error")
break loop
default:
log.Debug("Propose: Staker Id: ", i)
stake, err := razorUtils.GetStakeSnapshot(client, uint32(i), epoch)
if err != nil {
return nil, 0, err
}
}(i)
}

wg.Wait()

select {
case err := <-errChan:
return nil, 0, err
default:
log.Debugf("Stake Snapshot of staker having stakerId %d is %s", i, stake)
if stake.Cmp(biggestStake) > 0 {
biggestStake = stake
biggestStakerId = uint32(i)
}
}
}

if err != nil {
return nil, 0, err
}

log.Debug("Propose: BiggestStake: ", biggestStake)
log.Debug("Propose: Biggest Staker Id: ", biggestStakerId)
return biggestStake, biggestStakerId, nil
Expand Down Expand Up @@ -391,92 +370,45 @@ func (*UtilsStruct) GetSortedRevealedValues(client *ethclient.Client, blockNumbe
return nil, err
}
log.Debugf("GetSortedRevealedValues: Revealed Data: %+v", assignedAsset)

var wg sync.WaitGroup
resultsChan := make(chan *types.AssetResult, len(assignedAsset))

for _, asset := range assignedAsset {
wg.Add(1)
go processAsset(asset, resultsChan, &wg)
}

wg.Wait()
close(resultsChan)

revealedValuesWithIndex := make(map[uint16][]*big.Int)
voteWeights := make(map[string]*big.Int)
influenceSum := make(map[uint16]*big.Int)

for result := range resultsChan {
for leafId, values := range result.RevealedValuesWithIndex {
revealedValuesWithIndex[leafId] = append(revealedValuesWithIndex[leafId], values...)
}
for value, weight := range result.VoteWeights {
if voteWeights[value] == nil {
voteWeights[value] = weight
log.Debug("Calculating sorted revealed values, vote weights and influence sum...")
for _, asset := range assignedAsset {
for _, assetValue := range asset.RevealedValues {
if revealedValuesWithIndex[assetValue.LeafId] == nil {
revealedValuesWithIndex[assetValue.LeafId] = []*big.Int{assetValue.Value}
} else {
voteWeights[value].Add(voteWeights[value], weight)
if !utils.ContainsBigInteger(revealedValuesWithIndex[assetValue.LeafId], assetValue.Value) {
revealedValuesWithIndex[assetValue.LeafId] = append(revealedValuesWithIndex[assetValue.LeafId], assetValue.Value)
}
}
}
for leafId, sum := range result.InfluenceSum {
if influenceSum[leafId] == nil {
influenceSum[leafId] = sum
} else {
influenceSum[leafId].Add(influenceSum[leafId], sum)
//Calculate vote weights
if voteWeights[assetValue.Value.String()] == nil {
voteWeights[assetValue.Value.String()] = big.NewInt(0)
}
voteWeights[assetValue.Value.String()] = big.NewInt(0).Add(voteWeights[assetValue.Value.String()], asset.Influence)

//Calculate influence sum
if influenceSum[assetValue.LeafId] == nil {
influenceSum[assetValue.LeafId] = big.NewInt(0)
}
influenceSum[assetValue.LeafId] = big.NewInt(0).Add(influenceSum[assetValue.LeafId], asset.Influence)
}
}

for _, values := range revealedValuesWithIndex {
sort.Slice(values, func(i, j int) bool {
return values[i].Cmp(values[j]) == -1
//sort revealed values
for _, element := range revealedValuesWithIndex {
sort.Slice(element, func(i, j int) bool {
return element[i].Cmp(element[j]) == -1
})
}

return &types.RevealedDataMaps{
SortedRevealedValues: revealedValuesWithIndex,
VoteWeights: voteWeights,
InfluenceSum: influenceSum,
}, nil
}

func processAsset(asset types.RevealedStruct, resultsChan chan<- *types.AssetResult, wg *sync.WaitGroup) {
defer wg.Done()

revealedValuesWithIndex := make(map[uint16][]*big.Int)
voteWeights := make(map[string]*big.Int)
influenceSum := make(map[uint16]*big.Int)

for _, assetValue := range asset.RevealedValues {
leafId := assetValue.LeafId
valueStr := assetValue.Value.String()
influence := asset.Influence

// Append the leaf value to the revealed values slice if it's not already present
if !utils.ContainsBigInteger(revealedValuesWithIndex[leafId], assetValue.Value) {
revealedValuesWithIndex[leafId] = append(revealedValuesWithIndex[leafId], assetValue.Value)
}

// Calculate vote weights
if voteWeights[valueStr] == nil {
voteWeights[valueStr] = big.NewInt(0)
}
voteWeights[valueStr] = voteWeights[valueStr].Add(voteWeights[valueStr], influence)

// Calculate influence sum
if influenceSum[leafId] == nil {
influenceSum[leafId] = big.NewInt(0)
}
influenceSum[leafId] = influenceSum[leafId].Add(influenceSum[leafId], influence)
}

resultsChan <- &types.AssetResult{
RevealedValuesWithIndex: revealedValuesWithIndex,
VoteWeights: voteWeights,
InfluenceSum: influenceSum,
}
}

//This function returns the medians, idsRevealedInThisEpoch and revealedDataMaps
func (*UtilsStruct) MakeBlock(client *ethclient.Client, blockNumber *big.Int, epoch uint32, rogueData types.Rogue) ([]*big.Int, []uint16, *types.RevealedDataMaps, error) {
log.Debugf("MakeBlock: Calling GetSortedRevealedValues with arguments blockNumber = %s, epoch = %d", blockNumber, epoch)
Expand All @@ -492,42 +424,31 @@ func (*UtilsStruct) MakeBlock(client *ethclient.Client, blockNumber *big.Int, ep
}
log.Debug("MakeBlock: Active collections: ", activeCollections)

resultsChan := make(chan types.MedianResult, len(activeCollections))
var wg sync.WaitGroup
var (
medians []*big.Int
idsRevealedInThisEpoch []uint16
)

log.Debug("Iterating over all the active collections for medians calculation....")
for leafId := uint16(0); leafId < uint16(len(activeCollections)); leafId++ {
wg.Add(1)
go func(leafId uint16) {
defer wg.Done()
median := calculateMedianForLeafId(revealedDataMaps, leafId, rogueData)
if median != nil {
resultsChan <- types.MedianResult{LeafId: leafId, Median: median}
}
}(leafId)
}

wg.Wait()
close(resultsChan)

// Storing the median results temporarily in a map
medianResults := make(map[uint16]*big.Int)
for result := range resultsChan {
medianResults[result.LeafId] = result.Median
}

var medians []*big.Int
var idsRevealedInThisEpoch []uint16

// Storing medians in order of increasing leafIds starting from 0
for leafId := uint16(0); leafId < uint16(len(activeCollections)); leafId++ {
if median, exists := medianResults[leafId]; exists {
medians = append(medians, median)
influenceSum := revealedDataMaps.InfluenceSum[leafId]
if influenceSum != nil && influenceSum.Cmp(big.NewInt(0)) != 0 {
idsRevealedInThisEpoch = append(idsRevealedInThisEpoch, activeCollections[leafId])
if rogueData.IsRogue && utils.Contains(rogueData.RogueMode, "medians") {
medians = append(medians, razorUtils.GetRogueRandomValue(10000000))
continue
}
accWeight := big.NewInt(0)
for i := 0; i < len(revealedDataMaps.SortedRevealedValues[leafId]); i++ {
revealedValue := revealedDataMaps.SortedRevealedValues[leafId][i]
accWeight = accWeight.Add(accWeight, revealedDataMaps.VoteWeights[revealedValue.String()])
if accWeight.Cmp(influenceSum.Div(influenceSum, big.NewInt(2))) > 0 {
medians = append(medians, revealedValue)
break
}
}
}
}

// Handling rogue data
if rogueData.IsRogue && utils.Contains(rogueData.RogueMode, "missingIds") {
log.Warn("YOU ARE PROPOSING IDS REVEALED IN ROGUE MODE, THIS CAN INCUR PENALTIES!")
//Replacing the last ID: id with id+1 in idsRevealed array if rogueMode == missingIds
Expand All @@ -546,30 +467,9 @@ func (*UtilsStruct) MakeBlock(client *ethclient.Client, blockNumber *big.Int, ep
idsRevealedInThisEpoch[0] = idsRevealedInThisEpoch[1]
idsRevealedInThisEpoch[1] = temp
}

return medians, idsRevealedInThisEpoch, revealedDataMaps, nil
}

func calculateMedianForLeafId(revealedDataMaps *types.RevealedDataMaps, leafId uint16, rogueData types.Rogue) *big.Int {
influenceSum := revealedDataMaps.InfluenceSum[leafId]
if influenceSum != nil && influenceSum.Cmp(big.NewInt(0)) != 0 {
if rogueData.IsRogue && utils.Contains(rogueData.RogueMode, "medians") {
return razorUtils.GetRogueRandomValue(10000000)
}
accWeight := big.NewInt(0)
for _, revealedValue := range revealedDataMaps.SortedRevealedValues[leafId] {
accWeight = accWeight.Add(accWeight, revealedDataMaps.VoteWeights[revealedValue.String()])
if accWeight.Cmp(influenceSum.Div(influenceSum, big.NewInt(2))) > 0 {
log.Debugf("LeafId: %d, Calculated value %v", leafId, revealedValue)
return revealedValue
}
}
}
// Returning nil if no median is found or if influenceSum is nil or zero
log.Debugf("No median found for LeafId %d", leafId)
return nil
}

func (*UtilsStruct) GetSmallestStakeAndId(client *ethclient.Client, epoch uint32) (*big.Int, uint32, error) {
numberOfStakers, err := razorUtils.GetNumberOfStakers(client)
if err != nil {
Expand Down
Loading

0 comments on commit e210cf5

Please sign in to comment.