diff --git a/diagnostics/diagnostic.go b/diagnostics/diagnostic.go index 5d12dd81cb2..69470f7a0d9 100644 --- a/diagnostics/diagnostic.go +++ b/diagnostics/diagnostic.go @@ -27,6 +27,7 @@ func (d *DiagnosticClient) Setup() { d.runSnapshotListener() d.runSegmentDownloadingListener() d.runSegmentIndexingListener() + d.runSegmentIndexingFinishedListener() } func (d *DiagnosticClient) runSnapshotListener() { @@ -107,8 +108,68 @@ func (d *DiagnosticClient) runSegmentIndexingListener() { cancel() return case info := <-ch: - d.snapshotDownload.SegmentIndexing = info + d.addOrUpdateSegmentIndexingState(info) } } }() } + +func (d *DiagnosticClient) runSegmentIndexingFinishedListener() { + go func() { + ctx, ch, cancel := diaglib.Context[diaglib.SnapshotSegmentIndexingFinishedUpdate](context.Background(), 1) + defer cancel() + + rootCtx, _ := common.RootContext() + + diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.SnapshotSegmentIndexingFinishedUpdate{}), log.Root()) + for { + select { + case <-rootCtx.Done(): + cancel() + return + case info := <-ch: + found := false + for i := range d.snapshotDownload.SegmentIndexing.Segments { + if d.snapshotDownload.SegmentIndexing.Segments[i].SegmentName == info.SegmentName { + found = true + d.snapshotDownload.SegmentIndexing.Segments[i].Percent = 100 + } + } + + if !found { + d.snapshotDownload.SegmentIndexing.Segments = append(d.snapshotDownload.SegmentIndexing.Segments, diaglib.SnapshotSegmentIndexingStatistics{ + SegmentName: info.SegmentName, + Percent: 100, + Alloc: 0, + Sys: 0, + }) + } + } + } + }() +} + +func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd diaglib.SnapshotIndexingStatistics) { + if d.snapshotDownload.SegmentIndexing.Segments == nil { + d.snapshotDownload.SegmentIndexing.Segments = []diaglib.SnapshotSegmentIndexingStatistics{} + } + + for i := range upd.Segments { + found := false + for j := range d.snapshotDownload.SegmentIndexing.Segments { + if d.snapshotDownload.SegmentIndexing.Segments[j].SegmentName == upd.Segments[i].SegmentName { + d.snapshotDownload.SegmentIndexing.Segments[j].Percent = upd.Segments[i].Percent + d.snapshotDownload.SegmentIndexing.Segments[j].Alloc = upd.Segments[i].Alloc + d.snapshotDownload.SegmentIndexing.Segments[j].Sys = upd.Segments[i].Sys + found = true + break + } + } + + if !found { + d.snapshotDownload.SegmentIndexing.Segments = append(d.snapshotDownload.SegmentIndexing.Segments, upd.Segments[i]) + } + } + + d.snapshotDownload.SegmentIndexing.TimeElapsed = upd.TimeElapsed +} diff --git a/erigon-lib/diagnostics/entities.go b/erigon-lib/diagnostics/entities.go index 7db98f024dc..747697a5c09 100644 --- a/erigon-lib/diagnostics/entities.go +++ b/erigon-lib/diagnostics/entities.go @@ -57,8 +57,19 @@ type SegmentDownloadStatistics struct { } type SnapshotIndexingStatistics struct { - Segments map[string]int `json:"segments"` - TimeElapsed float64 `json:"timeElapsed"` + Segments []SnapshotSegmentIndexingStatistics `json:"segments"` + TimeElapsed float64 `json:"timeElapsed"` +} + +type SnapshotSegmentIndexingStatistics struct { + SegmentName string `json:"segmentName"` + Percent int `json:"percent"` + Alloc uint64 `json:"alloc"` + Sys uint64 `json:"sys"` +} + +type SnapshotSegmentIndexingFinishedUpdate struct { + SegmentName string `json:"segmentName"` } func (ti SnapshotDownloadStatistics) Type() Type { @@ -72,3 +83,7 @@ func (ti SegmentDownloadStatistics) Type() Type { func (ti SnapshotIndexingStatistics) Type() Type { return TypeOf(ti) } + +func (ti SnapshotSegmentIndexingFinishedUpdate) Type() Type { + return TypeOf(ti) +} diff --git a/erigon-lib/downloader/snaptype/files.go b/erigon-lib/downloader/snaptype/files.go index 711e180fc79..99adb35042e 100644 --- a/erigon-lib/downloader/snaptype/files.go +++ b/erigon-lib/downloader/snaptype/files.go @@ -180,6 +180,7 @@ type FileInfo struct { func (f FileInfo) TorrentFileExists() bool { return dir.FileExist(f.Path + ".torrent") } func (f FileInfo) Seedable() bool { return f.To-f.From == Erigon2MergeLimit } func (f FileInfo) NeedTorrentFile() bool { return f.Seedable() && !f.TorrentFileExists() } +func (f FileInfo) Name() string { return filepath.Base(f.Path) } func IdxFiles(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".idx") } func Segments(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".seg") } diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index 0e1305fe7a3..98d0e9d3ae4 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -7,7 +7,6 @@ import ( "encoding/hex" "errors" "fmt" - "github.com/ledgerwatch/erigon/consensus/bor" "os" "path" "path/filepath" @@ -18,6 +17,8 @@ import ( "sync/atomic" "time" + "github.com/ledgerwatch/erigon/consensus/bor" + "github.com/holiman/uint256" "github.com/ledgerwatch/erigon-lib/chain" "github.com/ledgerwatch/erigon-lib/chain/snapcfg" @@ -929,12 +930,7 @@ func BuildMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs case <-logEvery.C: var m runtime.MemStats dbg.ReadMemStats(&m) - - diagnostics.Send(diagnostics.SnapshotIndexingStatistics{ - Segments: ps.DiagnossticsData(), - TimeElapsed: time.Since(startIndexingTime).Round(time.Second).Seconds(), - }) - + sendDiagnostics(startIndexingTime, ps.DiagnossticsData(), m.Alloc, m.Sys) logger.Info(fmt.Sprintf("[%s] Indexing", logPrefix), "progress", ps.String(), "total-indexing-time", time.Since(startIndexingTime).Round(time.Second).String(), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys)) case <-finish: return @@ -957,6 +953,7 @@ func BuildMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs g.Go(func() error { p := &background.Progress{} ps.Add(p) + defer notifySegmentIndexingFinished(sn.Name()) defer ps.Delete(p) return buildIdx(gCtx, sn, chainConfig, tmpDir, p, log.LvlInfo, logger) }) @@ -974,7 +971,6 @@ func BuildMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs case <-ctx.Done(): return ctx.Err() } - } func BuildBorMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs, chainConfig *chain.Config, workers int, logger log.Logger) error { @@ -1001,6 +997,7 @@ func BuildBorMissedIndices(logPrefix string, ctx context.Context, dirs datadir.D g.Go(func() error { p := &background.Progress{} ps.Add(p) + defer notifySegmentIndexingFinished(sn.Name()) defer ps.Delete(p) return buildIdx(gCtx, sn, chainConfig, tmpDir, p, log.LvlInfo, logger) }) @@ -1023,16 +1020,36 @@ func BuildBorMissedIndices(logPrefix string, ctx context.Context, dirs datadir.D case <-logEvery.C: var m runtime.MemStats dbg.ReadMemStats(&m) - dd := ps.DiagnossticsData() - diagnostics.Send(diagnostics.SnapshotIndexingStatistics{ - Segments: dd, - TimeElapsed: time.Since(startIndexingTime).Round(time.Second).Seconds(), - }) + sendDiagnostics(startIndexingTime, ps.DiagnossticsData(), m.Alloc, m.Sys) logger.Info(fmt.Sprintf("[%s] Indexing", logPrefix), "progress", ps.String(), "total-indexing-time", time.Since(startIndexingTime).Round(time.Second).String(), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys)) } } } +func notifySegmentIndexingFinished(name string) { + diagnostics.Send( + diagnostics.SnapshotSegmentIndexingFinishedUpdate{ + SegmentName: name, + }, + ) +} + +func sendDiagnostics(startIndexingTime time.Time, indexPercent map[string]int, alloc uint64, sys uint64) { + segmentsStats := make([]diagnostics.SnapshotSegmentIndexingStatistics, 0, len(indexPercent)) + for k, v := range indexPercent { + segmentsStats = append(segmentsStats, diagnostics.SnapshotSegmentIndexingStatistics{ + SegmentName: k, + Percent: v, + Alloc: alloc, + Sys: sys, + }) + } + diagnostics.Send(diagnostics.SnapshotIndexingStatistics{ + Segments: segmentsStats, + TimeElapsed: time.Since(startIndexingTime).Round(time.Second).Seconds(), + }) +} + func noGaps(in []snaptype.FileInfo) (out []snaptype.FileInfo, missingSnapshots []Range) { var prevTo uint64 for _, f := range in {