Skip to content

Commit

Permalink
dvovk/snapidx (#9049)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvovk authored Dec 22, 2023
1 parent a3a6170 commit a36071e
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 16 deletions.
63 changes: 62 additions & 1 deletion diagnostics/diagnostic.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func (d *DiagnosticClient) Setup() {
d.runSnapshotListener()
d.runSegmentDownloadingListener()
d.runSegmentIndexingListener()
d.runSegmentIndexingFinishedListener()
}

func (d *DiagnosticClient) runSnapshotListener() {
Expand Down Expand Up @@ -107,8 +108,68 @@ func (d *DiagnosticClient) runSegmentIndexingListener() {
cancel()
return
case info := <-ch:
d.snapshotDownload.SegmentIndexing = info
d.addOrUpdateSegmentIndexingState(info)
}
}
}()
}

func (d *DiagnosticClient) runSegmentIndexingFinishedListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.SnapshotSegmentIndexingFinishedUpdate](context.Background(), 1)
defer cancel()

rootCtx, _ := common.RootContext()

diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.SnapshotSegmentIndexingFinishedUpdate{}), log.Root())
for {
select {
case <-rootCtx.Done():
cancel()
return
case info := <-ch:
found := false
for i := range d.snapshotDownload.SegmentIndexing.Segments {
if d.snapshotDownload.SegmentIndexing.Segments[i].SegmentName == info.SegmentName {
found = true
d.snapshotDownload.SegmentIndexing.Segments[i].Percent = 100
}
}

if !found {
d.snapshotDownload.SegmentIndexing.Segments = append(d.snapshotDownload.SegmentIndexing.Segments, diaglib.SnapshotSegmentIndexingStatistics{
SegmentName: info.SegmentName,
Percent: 100,
Alloc: 0,
Sys: 0,
})
}
}
}
}()
}

func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd diaglib.SnapshotIndexingStatistics) {
if d.snapshotDownload.SegmentIndexing.Segments == nil {
d.snapshotDownload.SegmentIndexing.Segments = []diaglib.SnapshotSegmentIndexingStatistics{}
}

for i := range upd.Segments {
found := false
for j := range d.snapshotDownload.SegmentIndexing.Segments {
if d.snapshotDownload.SegmentIndexing.Segments[j].SegmentName == upd.Segments[i].SegmentName {
d.snapshotDownload.SegmentIndexing.Segments[j].Percent = upd.Segments[i].Percent
d.snapshotDownload.SegmentIndexing.Segments[j].Alloc = upd.Segments[i].Alloc
d.snapshotDownload.SegmentIndexing.Segments[j].Sys = upd.Segments[i].Sys
found = true
break
}
}

if !found {
d.snapshotDownload.SegmentIndexing.Segments = append(d.snapshotDownload.SegmentIndexing.Segments, upd.Segments[i])
}
}

d.snapshotDownload.SegmentIndexing.TimeElapsed = upd.TimeElapsed
}
19 changes: 17 additions & 2 deletions erigon-lib/diagnostics/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,19 @@ type SegmentDownloadStatistics struct {
}

type SnapshotIndexingStatistics struct {
Segments map[string]int `json:"segments"`
TimeElapsed float64 `json:"timeElapsed"`
Segments []SnapshotSegmentIndexingStatistics `json:"segments"`
TimeElapsed float64 `json:"timeElapsed"`
}

type SnapshotSegmentIndexingStatistics struct {
SegmentName string `json:"segmentName"`
Percent int `json:"percent"`
Alloc uint64 `json:"alloc"`
Sys uint64 `json:"sys"`
}

type SnapshotSegmentIndexingFinishedUpdate struct {
SegmentName string `json:"segmentName"`
}

func (ti SnapshotDownloadStatistics) Type() Type {
Expand All @@ -72,3 +83,7 @@ func (ti SegmentDownloadStatistics) Type() Type {
func (ti SnapshotIndexingStatistics) Type() Type {
return TypeOf(ti)
}

func (ti SnapshotSegmentIndexingFinishedUpdate) Type() Type {
return TypeOf(ti)
}
1 change: 1 addition & 0 deletions erigon-lib/downloader/snaptype/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ type FileInfo struct {
func (f FileInfo) TorrentFileExists() bool { return dir.FileExist(f.Path + ".torrent") }
func (f FileInfo) Seedable() bool { return f.To-f.From == Erigon2MergeLimit }
func (f FileInfo) NeedTorrentFile() bool { return f.Seedable() && !f.TorrentFileExists() }
func (f FileInfo) Name() string { return filepath.Base(f.Path) }

func IdxFiles(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".idx") }
func Segments(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".seg") }
Expand Down
43 changes: 30 additions & 13 deletions turbo/snapshotsync/freezeblocks/block_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/ledgerwatch/erigon/consensus/bor"
"os"
"path"
"path/filepath"
Expand All @@ -18,6 +17,8 @@ import (
"sync/atomic"
"time"

"github.com/ledgerwatch/erigon/consensus/bor"

"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/chain/snapcfg"
Expand Down Expand Up @@ -929,12 +930,7 @@ func BuildMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs
case <-logEvery.C:
var m runtime.MemStats
dbg.ReadMemStats(&m)

diagnostics.Send(diagnostics.SnapshotIndexingStatistics{
Segments: ps.DiagnossticsData(),
TimeElapsed: time.Since(startIndexingTime).Round(time.Second).Seconds(),
})

sendDiagnostics(startIndexingTime, ps.DiagnossticsData(), m.Alloc, m.Sys)
logger.Info(fmt.Sprintf("[%s] Indexing", logPrefix), "progress", ps.String(), "total-indexing-time", time.Since(startIndexingTime).Round(time.Second).String(), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
case <-finish:
return
Expand All @@ -957,6 +953,7 @@ func BuildMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs
g.Go(func() error {
p := &background.Progress{}
ps.Add(p)
defer notifySegmentIndexingFinished(sn.Name())
defer ps.Delete(p)
return buildIdx(gCtx, sn, chainConfig, tmpDir, p, log.LvlInfo, logger)
})
Expand All @@ -974,7 +971,6 @@ func BuildMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs
case <-ctx.Done():
return ctx.Err()
}

}

func BuildBorMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs, chainConfig *chain.Config, workers int, logger log.Logger) error {
Expand All @@ -1001,6 +997,7 @@ func BuildBorMissedIndices(logPrefix string, ctx context.Context, dirs datadir.D
g.Go(func() error {
p := &background.Progress{}
ps.Add(p)
defer notifySegmentIndexingFinished(sn.Name())
defer ps.Delete(p)
return buildIdx(gCtx, sn, chainConfig, tmpDir, p, log.LvlInfo, logger)
})
Expand All @@ -1023,16 +1020,36 @@ func BuildBorMissedIndices(logPrefix string, ctx context.Context, dirs datadir.D
case <-logEvery.C:
var m runtime.MemStats
dbg.ReadMemStats(&m)
dd := ps.DiagnossticsData()
diagnostics.Send(diagnostics.SnapshotIndexingStatistics{
Segments: dd,
TimeElapsed: time.Since(startIndexingTime).Round(time.Second).Seconds(),
})
sendDiagnostics(startIndexingTime, ps.DiagnossticsData(), m.Alloc, m.Sys)
logger.Info(fmt.Sprintf("[%s] Indexing", logPrefix), "progress", ps.String(), "total-indexing-time", time.Since(startIndexingTime).Round(time.Second).String(), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
}
}
}

func notifySegmentIndexingFinished(name string) {
diagnostics.Send(
diagnostics.SnapshotSegmentIndexingFinishedUpdate{
SegmentName: name,
},
)
}

func sendDiagnostics(startIndexingTime time.Time, indexPercent map[string]int, alloc uint64, sys uint64) {
segmentsStats := make([]diagnostics.SnapshotSegmentIndexingStatistics, 0, len(indexPercent))
for k, v := range indexPercent {
segmentsStats = append(segmentsStats, diagnostics.SnapshotSegmentIndexingStatistics{
SegmentName: k,
Percent: v,
Alloc: alloc,
Sys: sys,
})
}
diagnostics.Send(diagnostics.SnapshotIndexingStatistics{
Segments: segmentsStats,
TimeElapsed: time.Since(startIndexingTime).Round(time.Second).Seconds(),
})
}

func noGaps(in []snaptype.FileInfo) (out []snaptype.FileInfo, missingSnapshots []Range) {
var prevTo uint64
for _, f := range in {
Expand Down

0 comments on commit a36071e

Please sign in to comment.