diff --git a/erigon-lib/diagnostics/client.go b/erigon-lib/diagnostics/client.go index ecadc345126..5e0ca6a5c4f 100644 --- a/erigon-lib/diagnostics/client.go +++ b/erigon-lib/diagnostics/client.go @@ -3,8 +3,12 @@ package diagnostics import ( "context" "net/http" + "os" + "os/signal" "path/filepath" "sync" + "syscall" + "time" "github.com/c2h5oh/datasize" "golang.org/x/sync/semaphore" @@ -101,8 +105,59 @@ func (d *DiagnosticClient) Setup() { d.setupBodiesDiagnostics(rootCtx) d.setupResourcesUsageDiagnostics(rootCtx) d.setupSpeedtestDiagnostics(rootCtx) + d.runSaveProcess(rootCtx) + d.runStopNodeListener(rootCtx) //d.logDiagMsgs() + +} + +func (d *DiagnosticClient) runStopNodeListener(rootCtx context.Context) { + go func() { + ch := make(chan os.Signal, 1) + signal.Notify(ch, os.Interrupt, syscall.SIGTERM) + select { + case <-ch: + d.SaveData() + case <-rootCtx.Done(): + } + }() +} + +// Save diagnostic data by time interval to reduce save events +func (d *DiagnosticClient) runSaveProcess(rootCtx context.Context) { + ticker := time.NewTicker(5 * time.Minute) + go func() { + for { + select { + case <-ticker.C: + d.SaveData() + case <-rootCtx.Done(): + ticker.Stop() + return + } + } + }() +} + +func (d *DiagnosticClient) SaveData() { + var funcs []func(tx kv.RwTx) error + funcs = append(funcs, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload), StagesListUpdater(d.syncStages), SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)) + + err := d.db.Update(d.ctx, func(tx kv.RwTx) error { + for _, updater := range funcs { + updErr := updater(tx) + if updErr != nil { + return updErr + } + } + + return nil + }) + + if err != nil { + log.Warn("Failed to save diagnostics data", "err", err) + } } /*func (d *DiagnosticClient) logDiagMsgs() { diff --git a/erigon-lib/diagnostics/entities.go b/erigon-lib/diagnostics/entities.go index 5eaea58f081..e9200bf25bc 100644 --- a/erigon-lib/diagnostics/entities.go +++ b/erigon-lib/diagnostics/entities.go @@ -115,8 +115,9 @@ type SegmentPeer struct { } type SnapshotIndexingStatistics struct { - Segments []SnapshotSegmentIndexingStatistics `json:"segments"` - TimeElapsed float64 `json:"timeElapsed"` + Segments []SnapshotSegmentIndexingStatistics `json:"segments"` + TimeElapsed float64 `json:"timeElapsed"` + IndexingFinished bool `json:"indexingFinished"` } type SnapshotSegmentIndexingStatistics struct { diff --git a/erigon-lib/diagnostics/snapshots.go b/erigon-lib/diagnostics/snapshots.go index f042510c6d4..85579efebaa 100644 --- a/erigon-lib/diagnostics/snapshots.go +++ b/erigon-lib/diagnostics/snapshots.go @@ -49,6 +49,7 @@ func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) { d.syncStats.SnapshotDownload.Sys = info.Sys d.syncStats.SnapshotDownload.DownloadFinished = info.DownloadFinished d.syncStats.SnapshotDownload.TorrentMetadataReady = info.TorrentMetadataReady + d.mu.Unlock() downloadedPercent := getPercentDownloaded(info.Downloaded, info.Total) remainingBytes := info.Total - info.Downloaded @@ -61,15 +62,8 @@ func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) { Progress: downloadedPercent, }, "Downloading snapshots") - if err := d.db.Update(d.ctx, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)); err != nil { - log.Error("[Diagnostics] Failed to update snapshot download info", "err", err) - } - - d.saveSyncStagesToDB() - - d.mu.Unlock() - - if d.snapshotStageFinished() { + if info.DownloadFinished { + d.SaveData() return } } @@ -88,24 +82,17 @@ func getPercentDownloaded(downloaded, total uint64) string { } func (d *DiagnosticClient) updateSnapshotStageStats(stats SyncStageStats, subStageInfo string) { + d.mu.Lock() + defer d.mu.Unlock() idxs := d.getCurrentSyncIdxs() if idxs.Stage == -1 || idxs.SubStage == -1 { - log.Warn("[Diagnostics] Can't find running stage or substage while updating Snapshots stage stats.", "stages:", d.syncStages, "stats:", stats, "subStageInfo:", subStageInfo) + log.Debug("[Diagnostics] Can't find running stage or substage while updating Snapshots stage stats.", "stages:", d.syncStages, "stats:", stats, "subStageInfo:", subStageInfo) return } d.syncStages[idxs.Stage].SubStages[idxs.SubStage].Stats = stats } -func (d *DiagnosticClient) snapshotStageFinished() bool { - idx := d.getCurrentSyncIdxs() - if idx.Stage > 0 { - return true - } else { - return false - } -} - func (d *DiagnosticClient) runSegmentDownloadingListener(rootCtx context.Context) { go func() { ctx, ch, closeChannel := Context[SegmentDownloadStatistics](rootCtx, 1) @@ -132,11 +119,6 @@ func (d *DiagnosticClient) runSegmentDownloadingListener(rootCtx context.Context } else { d.syncStats.SnapshotDownload.SegmentsDownloading[info.Name] = info } - - if err := d.db.Update(d.ctx, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)); err != nil { - log.Error("[Diagnostics] Failed to update snapshot download info", "err", err) - } - d.mu.Unlock() } } @@ -155,8 +137,11 @@ func (d *DiagnosticClient) runSegmentIndexingListener(rootCtx context.Context) { return case info := <-ch: d.addOrUpdateSegmentIndexingState(info) - if err := d.db.Update(d.ctx, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)); err != nil { - log.Error("[Diagnostics] Failed to update snapshot indexing info", "err", err) + d.updateIndexingStatus() + + if d.syncStats.SnapshotIndexing.IndexingFinished { + d.SaveData() + return } } } @@ -192,16 +177,33 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener(rootCtx context.Co }) } - if err := d.db.Update(d.ctx, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)); err != nil { - log.Error("[Diagnostics] Failed to update snapshot indexing info", "err", err) - } - d.mu.Unlock() + + d.updateIndexingStatus() } } }() } +func (d *DiagnosticClient) updateIndexingStatus() { + totalProgressPercent := 0 + for _, seg := range d.syncStats.SnapshotIndexing.Segments { + totalProgressPercent += seg.Percent + } + + totalProgress := totalProgressPercent / len(d.syncStats.SnapshotIndexing.Segments) + + d.updateSnapshotStageStats(SyncStageStats{ + TimeElapsed: SecondsToHHMMString(uint64(d.syncStats.SnapshotIndexing.TimeElapsed)), + TimeLeft: "unknown", + Progress: fmt.Sprintf("%d%%", totalProgress), + }, "Indexing snapshots") + + if totalProgress >= 100 { + d.syncStats.SnapshotIndexing.IndexingFinished = true + } +} + func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd SnapshotIndexingStatistics) { d.mu.Lock() defer d.mu.Unlock() @@ -227,19 +229,6 @@ func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd SnapshotIndexingS } d.syncStats.SnapshotIndexing.TimeElapsed = upd.TimeElapsed - - totalProgress := 0 - for _, seg := range d.syncStats.SnapshotIndexing.Segments { - totalProgress += seg.Percent - } - - d.updateSnapshotStageStats(SyncStageStats{ - TimeElapsed: SecondsToHHMMString(uint64(upd.TimeElapsed)), - TimeLeft: "unknown", - Progress: fmt.Sprintf("%d%%", totalProgress/len(d.syncStats.SnapshotIndexing.Segments)), - }, "Indexing snapshots") - - d.saveSyncStagesToDB() } func (d *DiagnosticClient) runSnapshotFilesListListener(rootCtx context.Context) { diff --git a/erigon-lib/diagnostics/stages.go b/erigon-lib/diagnostics/stages.go index bc7670609e2..47bae6d807c 100644 --- a/erigon-lib/diagnostics/stages.go +++ b/erigon-lib/diagnostics/stages.go @@ -100,11 +100,7 @@ func (d *DiagnosticClient) runSyncStagesListListener(rootCtx context.Context) { case <-rootCtx.Done(): return case info := <-ch: - d.mu.Lock() d.SetStagesList(info.StagesList) - d.mu.Unlock() - - d.saveSyncStagesToDB() } } }() @@ -121,11 +117,7 @@ func (d *DiagnosticClient) runCurrentSyncStageListener(rootCtx context.Context) case <-rootCtx.Done(): return case info := <-ch: - d.mu.Lock() d.SetCurrentSyncStage(info) - d.mu.Unlock() - - d.saveSyncStagesToDB() } } }() @@ -142,11 +134,7 @@ func (d *DiagnosticClient) runCurrentSyncSubStageListener(rootCtx context.Contex case <-rootCtx.Done(): return case info := <-ch: - d.mu.Lock() d.SetCurrentSyncSubStage(info) - d.mu.Unlock() - - d.saveSyncStagesToDB() } } }() @@ -163,22 +151,12 @@ func (d *DiagnosticClient) runSubStageListener(rootCtx context.Context) { case <-rootCtx.Done(): return case info := <-ch: - d.mu.Lock() d.SetSubStagesList(info.Stage, info.List) - d.mu.Unlock() - - d.saveSyncStagesToDB() } } }() } -func (d *DiagnosticClient) saveSyncStagesToDB() { - if err := d.db.Update(d.ctx, StagesListUpdater(d.syncStages)); err != nil { - log.Error("[Diagnostics] Failed to update stages list", "err", err) - } -} - func (d *DiagnosticClient) getCurrentSyncIdxs() CurrentSyncStagesIdxs { currentIdxs := CurrentSyncStagesIdxs{ Stage: -1, @@ -202,12 +180,17 @@ func (d *DiagnosticClient) getCurrentSyncIdxs() CurrentSyncStagesIdxs { } func (d *DiagnosticClient) SetStagesList(stages []SyncStage) { + d.mu.Lock() + defer d.mu.Unlock() + if len(d.syncStages) != len(stages) { d.syncStages = stages } } func (d *DiagnosticClient) SetSubStagesList(stageId string, subStages []SyncSubStage) { + d.mu.Lock() + defer d.mu.Unlock() for idx, stage := range d.syncStages { if stage.ID == stageId { if len(d.syncStages[idx].SubStages) != len(subStages) { @@ -219,6 +202,8 @@ func (d *DiagnosticClient) SetSubStagesList(stageId string, subStages []SyncSubS } func (d *DiagnosticClient) SetCurrentSyncStage(css CurrentSyncStage) { + d.mu.Lock() + defer d.mu.Unlock() isSet := false for idx, stage := range d.syncStages { if !isSet { @@ -246,6 +231,9 @@ func (d *DiagnosticClient) setSubStagesState(stadeIdx int, state StageState) { } func (d *DiagnosticClient) SetCurrentSyncSubStage(css CurrentSyncSubStage) { + d.mu.Lock() + defer d.mu.Unlock() + for idx, stage := range d.syncStages { if stage.State == Running { for subIdx, subStage := range stage.SubStages { diff --git a/erigon-lib/diagnostics/sys_info.go b/erigon-lib/diagnostics/sys_info.go index b870e649324..68a7ce7d4c1 100644 --- a/erigon-lib/diagnostics/sys_info.go +++ b/erigon-lib/diagnostics/sys_info.go @@ -20,16 +20,23 @@ var ( func (d *DiagnosticClient) setupSysInfoDiagnostics() { sysInfo := GetSysInfo(d.dataDirPath) - if err := d.db.Update(d.ctx, RAMInfoUpdater(sysInfo.RAM)); err != nil { - log.Error("[Diagnostics] Failed to update RAM info", "err", err) - } - if err := d.db.Update(d.ctx, CPUInfoUpdater(sysInfo.CPU)); err != nil { - log.Error("[Diagnostics] Failed to update CPU info", "err", err) - } + var funcs []func(tx kv.RwTx) error + funcs = append(funcs, RAMInfoUpdater(sysInfo.RAM), CPUInfoUpdater(sysInfo.CPU), DiskInfoUpdater(sysInfo.Disk)) - if err := d.db.Update(d.ctx, DiskInfoUpdater(sysInfo.Disk)); err != nil { - log.Error("[Diagnostics] Failed to update Disk info", "err", err) + err := d.db.Update(d.ctx, func(tx kv.RwTx) error { + for _, updater := range funcs { + updErr := updater(tx) + if updErr != nil { + return updErr + } + } + + return nil + }) + + if err != nil { + log.Warn("[Diagnostics] Failed to update system info", "err", err) } d.mu.Lock()