Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diagnostics: Optimize db write #11016

Merged
merged 10 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions erigon-lib/diagnostics/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package diagnostics
import (
"context"
"net/http"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"

"github.com/c2h5oh/datasize"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -101,8 +105,61 @@ func (d *DiagnosticClient) Setup() {
d.setupBodiesDiagnostics(rootCtx)
d.setupResourcesUsageDiagnostics(rootCtx)
d.setupSpeedtestDiagnostics(rootCtx)
d.runSaveProcess(rootCtx)
d.runStopNodeListener(rootCtx)

//d.logDiagMsgs()

}

func (d *DiagnosticClient) runStopNodeListener(rootCtx context.Context) {
go func() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
select {
case <-ch:
d.SaveData()
case <-rootCtx.Done():
}
}()
}

// Save diagnostic data by time interval to reduce save events
func (d *DiagnosticClient) runSaveProcess(rootCtx context.Context) {
ticker := time.NewTicker(5 * time.Minute)
go func() {
for {
select {
case <-ticker.C:
d.SaveData()
case <-rootCtx.Done():
ticker.Stop()
return
}
}
}()
}

func (d *DiagnosticClient) SaveData() {
var funcs []func(tx kv.RwTx) error
funcs = append(funcs, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload))
funcs = append(funcs, StagesListUpdater(d.syncStages))
funcs = append(funcs, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing))

err := d.db.Update(d.ctx, func(tx kv.RwTx) error {
for _, updater := range funcs {
updErr := updater(tx)
if updErr != nil {
return updErr
}
}

return nil
})

if err != nil {
log.Warn("Failed to save diagnostics data", "err", err)
}
}

/*func (d *DiagnosticClient) logDiagMsgs() {
Expand Down
5 changes: 3 additions & 2 deletions erigon-lib/diagnostics/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ type SegmentPeer struct {
}

type SnapshotIndexingStatistics struct {
Segments []SnapshotSegmentIndexingStatistics `json:"segments"`
TimeElapsed float64 `json:"timeElapsed"`
Segments []SnapshotSegmentIndexingStatistics `json:"segments"`
TimeElapsed float64 `json:"timeElapsed"`
IndexingFinished bool `json:"indexingFinished"`
}

type SnapshotSegmentIndexingStatistics struct {
Expand Down
76 changes: 33 additions & 43 deletions erigon-lib/diagnostics/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) {
d.syncStats.SnapshotDownload.Sys = info.Sys
d.syncStats.SnapshotDownload.DownloadFinished = info.DownloadFinished
d.syncStats.SnapshotDownload.TorrentMetadataReady = info.TorrentMetadataReady
d.mu.Unlock()

downloadedPercent := getPercentDownloaded(info.Downloaded, info.Total)
remainingBytes := info.Total - info.Downloaded
Expand All @@ -61,15 +62,8 @@ func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) {
Progress: downloadedPercent,
}, "Downloading snapshots")

if err := d.db.Update(d.ctx, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot download info", "err", err)
}

d.saveSyncStagesToDB()

d.mu.Unlock()

if d.snapshotStageFinished() {
if info.DownloadFinished {
d.SaveData()
return
}
}
Expand All @@ -88,24 +82,17 @@ func getPercentDownloaded(downloaded, total uint64) string {
}

func (d *DiagnosticClient) updateSnapshotStageStats(stats SyncStageStats, subStageInfo string) {
d.mu.Lock()
defer d.mu.Unlock()
idxs := d.getCurrentSyncIdxs()
if idxs.Stage == -1 || idxs.SubStage == -1 {
log.Warn("[Diagnostics] Can't find running stage or substage while updating Snapshots stage stats.", "stages:", d.syncStages, "stats:", stats, "subStageInfo:", subStageInfo)
log.Debug("[Diagnostics] Can't find running stage or substage while updating Snapshots stage stats.", "stages:", d.syncStages, "stats:", stats, "subStageInfo:", subStageInfo)
return
}

d.syncStages[idxs.Stage].SubStages[idxs.SubStage].Stats = stats
}

func (d *DiagnosticClient) snapshotStageFinished() bool {
idx := d.getCurrentSyncIdxs()
if idx.Stage > 0 {
return true
} else {
return false
}
}

func (d *DiagnosticClient) runSegmentDownloadingListener(rootCtx context.Context) {
go func() {
ctx, ch, closeChannel := Context[SegmentDownloadStatistics](rootCtx, 1)
Expand All @@ -132,11 +119,6 @@ func (d *DiagnosticClient) runSegmentDownloadingListener(rootCtx context.Context
} else {
d.syncStats.SnapshotDownload.SegmentsDownloading[info.Name] = info
}

if err := d.db.Update(d.ctx, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot download info", "err", err)
}

d.mu.Unlock()
}
}
Expand All @@ -155,8 +137,11 @@ func (d *DiagnosticClient) runSegmentIndexingListener(rootCtx context.Context) {
return
case info := <-ch:
d.addOrUpdateSegmentIndexingState(info)
if err := d.db.Update(d.ctx, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot indexing info", "err", err)
d.updateIndexingStatus()

if d.syncStats.SnapshotIndexing.IndexingFinished {
d.SaveData()
return
}
}
}
Expand Down Expand Up @@ -192,16 +177,34 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener(rootCtx context.Co
})
}

if err := d.db.Update(d.ctx, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot indexing info", "err", err)
}

d.mu.Unlock()

d.updateIndexingStatus()
}
}
}()
}

func (d *DiagnosticClient) updateIndexingStatus() {
totalProgress := 0
totalProgressPercent := 0
for _, seg := range d.syncStats.SnapshotIndexing.Segments {
totalProgressPercent += seg.Percent
}

totalProgress = totalProgressPercent / len(d.syncStats.SnapshotIndexing.Segments)

d.updateSnapshotStageStats(SyncStageStats{
TimeElapsed: SecondsToHHMMString(uint64(d.syncStats.SnapshotIndexing.TimeElapsed)),
TimeLeft: "unknown",
Progress: fmt.Sprintf("%d%%", totalProgress),
}, "Indexing snapshots")

if totalProgress >= 100 {
d.syncStats.SnapshotIndexing.IndexingFinished = true
}
}

func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd SnapshotIndexingStatistics) {
d.mu.Lock()
defer d.mu.Unlock()
Expand All @@ -227,19 +230,6 @@ func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd SnapshotIndexingS
}

d.syncStats.SnapshotIndexing.TimeElapsed = upd.TimeElapsed

totalProgress := 0
for _, seg := range d.syncStats.SnapshotIndexing.Segments {
totalProgress += seg.Percent
}

d.updateSnapshotStageStats(SyncStageStats{
TimeElapsed: SecondsToHHMMString(uint64(upd.TimeElapsed)),
TimeLeft: "unknown",
Progress: fmt.Sprintf("%d%%", totalProgress/len(d.syncStats.SnapshotIndexing.Segments)),
}, "Indexing snapshots")

d.saveSyncStagesToDB()
}

func (d *DiagnosticClient) runSnapshotFilesListListener(rootCtx context.Context) {
Expand Down
32 changes: 10 additions & 22 deletions erigon-lib/diagnostics/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,7 @@ func (d *DiagnosticClient) runSyncStagesListListener(rootCtx context.Context) {
case <-rootCtx.Done():
return
case info := <-ch:
d.mu.Lock()
d.SetStagesList(info.StagesList)
d.mu.Unlock()

d.saveSyncStagesToDB()
}
}
}()
Expand All @@ -121,11 +117,7 @@ func (d *DiagnosticClient) runCurrentSyncStageListener(rootCtx context.Context)
case <-rootCtx.Done():
return
case info := <-ch:
d.mu.Lock()
d.SetCurrentSyncStage(info)
d.mu.Unlock()

d.saveSyncStagesToDB()
}
}
}()
Expand All @@ -142,11 +134,7 @@ func (d *DiagnosticClient) runCurrentSyncSubStageListener(rootCtx context.Contex
case <-rootCtx.Done():
return
case info := <-ch:
d.mu.Lock()
d.SetCurrentSyncSubStage(info)
d.mu.Unlock()

d.saveSyncStagesToDB()
}
}
}()
Expand All @@ -163,22 +151,12 @@ func (d *DiagnosticClient) runSubStageListener(rootCtx context.Context) {
case <-rootCtx.Done():
return
case info := <-ch:
d.mu.Lock()
d.SetSubStagesList(info.Stage, info.List)
d.mu.Unlock()

d.saveSyncStagesToDB()
}
}
}()
}

func (d *DiagnosticClient) saveSyncStagesToDB() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using approach with flushing to database by timeout can easily forget to flush once it's really needed (e.g. When stage is over)

Is flushing once in 5 min is really what you wanted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if err := d.db.Update(d.ctx, StagesListUpdater(d.syncStages)); err != nil {
log.Error("[Diagnostics] Failed to update stages list", "err", err)
}
}

func (d *DiagnosticClient) getCurrentSyncIdxs() CurrentSyncStagesIdxs {
currentIdxs := CurrentSyncStagesIdxs{
Stage: -1,
Expand All @@ -202,12 +180,17 @@ func (d *DiagnosticClient) getCurrentSyncIdxs() CurrentSyncStagesIdxs {
}

func (d *DiagnosticClient) SetStagesList(stages []SyncStage) {
d.mu.Lock()
defer d.mu.Unlock()

if len(d.syncStages) != len(stages) {
d.syncStages = stages
}
}

func (d *DiagnosticClient) SetSubStagesList(stageId string, subStages []SyncSubStage) {
d.mu.Lock()
defer d.mu.Unlock()
for idx, stage := range d.syncStages {
if stage.ID == stageId {
if len(d.syncStages[idx].SubStages) != len(subStages) {
Expand All @@ -219,6 +202,8 @@ func (d *DiagnosticClient) SetSubStagesList(stageId string, subStages []SyncSubS
}

func (d *DiagnosticClient) SetCurrentSyncStage(css CurrentSyncStage) {
d.mu.Lock()
defer d.mu.Unlock()
isSet := false
for idx, stage := range d.syncStages {
if !isSet {
Expand Down Expand Up @@ -246,6 +231,9 @@ func (d *DiagnosticClient) setSubStagesState(stadeIdx int, state StageState) {
}

func (d *DiagnosticClient) SetCurrentSyncSubStage(css CurrentSyncSubStage) {
d.mu.Lock()
defer d.mu.Unlock()

for idx, stage := range d.syncStages {
if stage.State == Running {
for subIdx, subStage := range stage.SubStages {
Expand Down
25 changes: 17 additions & 8 deletions erigon-lib/diagnostics/sys_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,25 @@ var (

func (d *DiagnosticClient) setupSysInfoDiagnostics() {
sysInfo := GetSysInfo(d.dataDirPath)
if err := d.db.Update(d.ctx, RAMInfoUpdater(sysInfo.RAM)); err != nil {
log.Error("[Diagnostics] Failed to update RAM info", "err", err)
}

if err := d.db.Update(d.ctx, CPUInfoUpdater(sysInfo.CPU)); err != nil {
log.Error("[Diagnostics] Failed to update CPU info", "err", err)
}
var funcs []func(tx kv.RwTx) error
funcs = append(funcs, RAMInfoUpdater(sysInfo.RAM))
funcs = append(funcs, CPUInfoUpdater(sysInfo.CPU))
funcs = append(funcs, DiskInfoUpdater(sysInfo.Disk))

if err := d.db.Update(d.ctx, DiskInfoUpdater(sysInfo.Disk)); err != nil {
log.Error("[Diagnostics] Failed to update Disk info", "err", err)
err := d.db.Update(d.ctx, func(tx kv.RwTx) error {
for _, updater := range funcs {
updErr := updater(tx)
if updErr != nil {
return updErr
}
}

return nil
})

if err != nil {
log.Warn("[Diagnostics] Failed to update system info", "err", err)
}

d.mu.Lock()
Expand Down
Loading