Skip to content

Commit

Permalink
diag: race in updateIndexingStatus (#11274)
Browse files Browse the repository at this point in the history
for #11268
  • Loading branch information
AskAlexSharov authored Jul 22, 2024
1 parent 3f9d0d6 commit 9316c6e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
23 changes: 14 additions & 9 deletions erigon-lib/diagnostics/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) {
downloadTimeLeft := CalculateTime(remainingBytes, info.DownloadRate)
totalDownloadTimeString := time.Duration(info.TotalTime) * time.Second

d.updateSnapshotStageStats(SyncStageStats{
d.UpdateSnapshotStageStats(SyncStageStats{
TimeElapsed: totalDownloadTimeString.String(),
TimeLeft: downloadTimeLeft,
Progress: downloadedPercent,
Expand Down Expand Up @@ -105,10 +105,14 @@ func GetShanpshotsPercentDownloaded(downloaded uint64, total uint64, torrentMeta
return fmt.Sprintf("%.2f%%", percent)
}

func (d *DiagnosticClient) updateSnapshotStageStats(stats SyncStageStats, subStageInfo string) {
func (d *DiagnosticClient) UpdateSnapshotStageStats(stats SyncStageStats, subStageInfo string) {
d.mu.Lock()
defer d.mu.Unlock()
idxs := d.GetCurrentSyncIdxs()
d.updateSnapshotStageStats(stats, subStageInfo)
}

func (d *DiagnosticClient) updateSnapshotStageStats(stats SyncStageStats, subStageInfo string) {
idxs := d.getCurrentSyncIdxs()
if idxs.Stage == -1 || idxs.SubStage == -1 {
log.Debug("[Diagnostics] Can't find running stage or substage while updating Snapshots stage stats.", "stages:", d.syncStages, "stats:", stats, "subStageInfo:", subStageInfo)
return
Expand Down Expand Up @@ -181,8 +185,8 @@ func (d *DiagnosticClient) runSegmentIndexingListener(rootCtx context.Context) {
return
case info := <-ch:
d.addOrUpdateSegmentIndexingState(info)
d.updateIndexingStatus()
if d.syncStats.SnapshotIndexing.IndexingFinished {
indexingFinished := d.UpdateIndexingStatus()
if indexingFinished {
d.SaveData()
return
}
Expand Down Expand Up @@ -222,20 +226,20 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener(rootCtx context.Co

d.mu.Unlock()

d.updateIndexingStatus()
d.UpdateIndexingStatus()
}
}
}()
}

func (d *DiagnosticClient) updateIndexingStatus() {
func (d *DiagnosticClient) UpdateIndexingStatus() (indexingFinished bool) {
totalProgressPercent := 0
d.mu.Lock()
defer d.mu.Unlock()

for _, seg := range d.syncStats.SnapshotIndexing.Segments {
totalProgressPercent += seg.Percent
}
d.mu.Unlock()

totalProgress := totalProgressPercent / len(d.syncStats.SnapshotIndexing.Segments)

Expand All @@ -248,6 +252,7 @@ func (d *DiagnosticClient) updateIndexingStatus() {
if totalProgress >= 100 {
d.syncStats.SnapshotIndexing.IndexingFinished = true
}
return d.syncStats.SnapshotIndexing.IndexingFinished
}

func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd SnapshotIndexingStatistics) {
Expand Down Expand Up @@ -399,7 +404,7 @@ func (d *DiagnosticClient) runFillDBListener(rootCtx context.Context) {

totalTimeString := time.Duration(info.TimeElapsed) * time.Second

d.updateSnapshotStageStats(SyncStageStats{
d.UpdateSnapshotStageStats(SyncStageStats{
TimeElapsed: totalTimeString.String(),
TimeLeft: "unknown",
Progress: fmt.Sprintf("%d%%", (info.Stage.Current*100)/info.Stage.Total),
Expand Down
6 changes: 6 additions & 0 deletions erigon-lib/diagnostics/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ func (d *DiagnosticClient) runSubStageListener(rootCtx context.Context) {
}

func (d *DiagnosticClient) GetCurrentSyncIdxs() CurrentSyncStagesIdxs {
d.mu.Lock()
defer d.mu.Unlock()
return d.getCurrentSyncIdxs()
}

func (d *DiagnosticClient) getCurrentSyncIdxs() CurrentSyncStagesIdxs {
currentIdxs := CurrentSyncStagesIdxs{
Stage: -1,
SubStage: -1,
Expand Down

0 comments on commit 9316c6e

Please sign in to comment.