Skip to content

Commit

Permalink
Added blob sidecars snapshots for sepolia (#9766)
Browse files Browse the repository at this point in the history
# Downloader lock desing

The snapshot lock design has been changed to be more flexible, before it
was an empty file which determined whether to skip snapshots or not. now
this file got extended a little bit.

We now treat the lock file as a JSON file which has the following
format:

```json
["headers", "bodies", etc...]
```

the strings are the stringfied snapshot types:

```go
var Enums = struct {
	Unknown,
	Headers,
	Bodies,
	Transactions,
	BorEvents,
	BorSpans,
	BeaconBlocks,
	BlobSidecars ,
}
```

After each download is finished we push into the list the enums of the
downloaded snapshots (for a normal sync that is only `Headers`, `Bodies`
and `Transactions`+ Bor).

When the node starts we prohibit all the snapshot types in the lock file
and keep download open for the ones not in it.

---------

Co-authored-by: alex.sharov <[email protected]>
  • Loading branch information
Giulio2002 and AskAlexSharov authored Mar 22, 2024
1 parent 28c0c57 commit e08867b
Show file tree
Hide file tree
Showing 14 changed files with 184 additions and 62 deletions.
27 changes: 14 additions & 13 deletions cl/phase1/stages/stage_history_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co

var currEth1Progress atomic.Int64

bytesReadInTotal := atomic.Uint64{}
destinationSlotForEL := uint64(math.MaxUint64)
if cfg.engine != nil && cfg.engine.SupportInsertion() && cfg.beaconCfg.DenebForkEpoch != math.MaxUint64 {
destinationSlotForEL = cfg.beaconCfg.BellatrixForkEpoch * cfg.beaconCfg.SlotsPerEpoch
Expand All @@ -109,8 +108,6 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co

destinationSlotForCL := cfg.sn.SegmentsMax()

bytesReadInTotal.Add(uint64(blk.EncodingSizeSSZ()))

slot := blk.Block.Slot
if destinationSlotForCL <= blk.Block.Slot {
if err := beacon_indicies.WriteBeaconBlockAndIndicies(ctx, tx, blk, true); err != nil {
Expand Down Expand Up @@ -154,6 +151,8 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co
finishCh := make(chan struct{})
// Start logging thread

isBackfilling := atomic.Bool{}

go func() {
logInterval := time.NewTicker(logIntervalTime)
defer logInterval.Stop()
Expand All @@ -177,24 +176,21 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co
ratio := float64(logTime / time.Second)
speed := blockProgress / ratio
prevProgress = currProgress
peerCount, err := cfg.downloader.Peers()
if err != nil {
log.Debug("could not get peer count", "err", err)
continue
}

if speed == 0 {
continue
}
logArgs = append(logArgs,
"slot", currProgress,
"blockNumber", currEth1Progress.Load(),
"blk/sec", fmt.Sprintf("%.1f", speed),
"mbps/sec", fmt.Sprintf("%.1f", float64(bytesReadInTotal.Load())/(1000*1000*ratio)),
"peers", peerCount,
"snapshots", cfg.sn.SegmentsMax(),
)
bytesReadInTotal.Store(0)
logger.Info("Backfilling History", logArgs...)
logMsg := "Node is still syncing... downloading past blocks"
if isBackfilling.Load() {
logMsg = "Node has finished syncing... full history is being downloaded for archiving purposes"
}
logger.Info(logMsg, logArgs...)
case <-finishCh:
return
case <-ctx.Done():
Expand All @@ -211,7 +207,11 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co
}
}
cfg.antiquary.NotifyBackfilled()
log.Info("Backfilling finished")
if cfg.backfilling {
cfg.logger.Info("full backfilling finished")
} else {
cfg.logger.Info("Missing blocks download finished (note: this does not mean that the history is complete, only that the missing blocks need for sync have been downloaded)")
}

close(finishCh)
if cfg.blobsBackfilling {
Expand Down Expand Up @@ -240,6 +240,7 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co
return err
}
defer tx2.Rollback()
isBackfilling.Store(true)

cfg.logger.Info("Ready to insert history, waiting for sync cycle to finish")

Expand Down
34 changes: 31 additions & 3 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, dirs datadir.Dirs, logger
var stats AggStats

lock, err := getSnapshotLock(ctx, cfg, db, &stats, mutex, logger)

if err != nil {
return nil, fmt.Errorf("can't initialize snapshot lock: %w", err)
}
Expand Down Expand Up @@ -658,7 +657,7 @@ func (d *Downloader) mainLoop(silent bool) error {
go func() {
defer d.wg.Done()
d.webseeds.Discover(d.ctx, d.cfg.WebSeedUrls, d.cfg.WebSeedFiles, d.cfg.Dirs.Snap)
// webseeds.Discover may create new .torrent files on disk
// apply webseeds to existing torrents
if err := d.addTorrentFilesFromDisk(true); err != nil && !errors.Is(err, context.Canceled) {
d.logger.Warn("[snapshots] addTorrentFilesFromDisk", "err", err)
}
Expand Down Expand Up @@ -2140,8 +2139,12 @@ func (d *Downloader) AddMagnetLink(ctx context.Context, infoHash metainfo.Hash,
if d.alreadyHaveThisName(name) || !IsSnapNameAllowed(name) {
return nil
}
isProhibited, err := d.torrentFiles.newDownloadsAreProhibited(name)
if err != nil {
return err
}

if d.torrentFiles.newDownloadsAreProhibited() && !d.torrentFiles.Exists(name) {
if isProhibited && !d.torrentFiles.Exists(name) {
return nil
}

Expand All @@ -2168,8 +2171,33 @@ func (d *Downloader) AddMagnetLink(ctx context.Context, infoHash metainfo.Hash,
case <-ctx.Done():
return
case <-t.GotInfo():
case <-time.After(30 * time.Second): //fallback to r2
// TOOD: handle errors
// TOOD: add `d.webseeds.Complete` chan - to prevent race - Discover is also async
// TOOD: maybe run it in goroutine and return channel - to select with p2p

ok, err := d.webseeds.DownloadAndSaveTorrentFile(ctx, name)
if ok && err == nil {
ts, err := d.torrentFiles.LoadByPath(filepath.Join(d.SnapDir(), name+".torrent"))
if err != nil {
return
}
_, _, err = addTorrentFile(ctx, ts, d.torrentClient, d.db, d.webseeds)
if err != nil {
return
}
return
}

// wait for p2p
select {
case <-ctx.Done():
return
case <-t.GotInfo():
}
}

//TODO: remove whitelist check - Erigon may send us new seedable files
if !d.snapshotLock.Downloads.Contains(name) {
mi := t.Metainfo()
if err := CreateTorrentFileIfNotExists(d.SnapDir(), t.Info(), &mi, d.torrentFiles); err != nil {
Expand Down
7 changes: 2 additions & 5 deletions erigon-lib/downloader/downloader_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,8 @@ type GrpcServer struct {
d *Downloader
}

func (s *GrpcServer) ProhibitNewDownloads(context.Context, *proto_downloader.ProhibitNewDownloadsRequest) (*emptypb.Empty, error) {
if err := s.d.torrentFiles.prohibitNewDownloads(); err != nil {
return nil, err
}
return nil, nil
func (s *GrpcServer) ProhibitNewDownloads(ctx context.Context, req *proto_downloader.ProhibitNewDownloadsRequest) (*emptypb.Empty, error) {
return &emptypb.Empty{}, s.d.torrentFiles.prohibitNewDownloads(req.Type)
}

// Erigon "download once" - means restart/upgrade/downgrade will not download files (and will be fast)
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/downloader/snaptype/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ func (s snapType) IdxFileNames(version Version, from uint64, to uint64) []string
}

func (s snapType) IdxFileName(version Version, from uint64, to uint64, index ...Index) string {

if len(index) == 0 {
if len(s.indexes) == 0 {
return ""
Expand Down Expand Up @@ -344,7 +343,7 @@ var (

BorSnapshotTypes = []Type{BorEvents, BorSpans}

CaplinSnapshotTypes = []Type{BeaconBlocks}
CaplinSnapshotTypes = []Type{BeaconBlocks, BlobSidecars}

AllTypes = []Type{
Headers,
Expand All @@ -353,5 +352,6 @@ var (
BorEvents,
BorSpans,
BeaconBlocks,
BlobSidecars,
}
)
76 changes: 60 additions & 16 deletions erigon-lib/downloader/torrent_files.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package downloader

import (
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
Expand All @@ -10,6 +12,7 @@ import (
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/ledgerwatch/erigon-lib/common/dir"
"golang.org/x/exp/slices"
)

// TorrentFiles - does provide thread-safe CRUD operations on .torrent files
Expand Down Expand Up @@ -125,29 +128,70 @@ const ProhibitNewDownloadsFileName = "prohibit_new_downloads.lock"
// Erigon "download once" - means restart/upgrade/downgrade will not download files (and will be fast)
// After "download once" - Erigon will produce and seed new files
// Downloader will able: seed new files (already existing on FS), download uncomplete parts of existing files (if Verify found some bad parts)
func (tf *TorrentFiles) prohibitNewDownloads() error {
func (tf *TorrentFiles) prohibitNewDownloads(t string) error {
tf.lock.Lock()
defer tf.lock.Unlock()
return CreateProhibitNewDownloadsFile(tf.dir)
}
func (tf *TorrentFiles) newDownloadsAreProhibited() bool {
tf.lock.Lock()
defer tf.lock.Unlock()
return dir.FileExist(filepath.Join(tf.dir, ProhibitNewDownloadsFileName))
// open or create file ProhibitNewDownloadsFileName
f, err := os.OpenFile(filepath.Join(tf.dir, ProhibitNewDownloadsFileName), os.O_CREATE|os.O_RDONLY, 0644)
if err != nil {
return fmt.Errorf("open file: %w", err)
}
defer f.Close()
var prohibitedList []string
torrentListJsonBytes, err := io.ReadAll(f)
if err != nil {
return fmt.Errorf("read file: %w", err)
}
if len(torrentListJsonBytes) > 0 {
if err := json.Unmarshal(torrentListJsonBytes, &prohibitedList); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
}
if slices.Contains(prohibitedList, t) {
return nil
}
prohibitedList = append(prohibitedList, t)
f.Close()

//return dir.FileExist(filepath.Join(tf.dir, ProhibitNewDownloadsFileName)) ||
// dir.FileExist(filepath.Join(tf.dir, SnapshotsLockFileName))
// write new prohibited list by opening the file in truncate mode
f, err = os.OpenFile(filepath.Join(tf.dir, ProhibitNewDownloadsFileName), os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("open file for writing: %w", err)
}
defer f.Close()
prohibitedListJsonBytes, err := json.Marshal(prohibitedList)
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
if _, err := f.Write(prohibitedListJsonBytes); err != nil {
return fmt.Errorf("write: %w", err)
}

return f.Sync()
}

func CreateProhibitNewDownloadsFile(dir string) error {
fPath := filepath.Join(dir, ProhibitNewDownloadsFileName)
f, err := os.Create(fPath)
func (tf *TorrentFiles) newDownloadsAreProhibited(name string) (bool, error) {
tf.lock.Lock()
defer tf.lock.Unlock()
f, err := os.OpenFile(filepath.Join(tf.dir, ProhibitNewDownloadsFileName), os.O_CREATE|os.O_APPEND|os.O_RDONLY, 0644)
if err != nil {
return err
return false, err
}
defer f.Close()
if err := f.Sync(); err != nil {
return err
var prohibitedList []string
torrentListJsonBytes, err := io.ReadAll(f)
if err != nil {
return false, fmt.Errorf("newDownloadsAreProhibited: read file: %w", err)
}
return nil
if len(torrentListJsonBytes) > 0 {
if err := json.Unmarshal(torrentListJsonBytes, &prohibitedList); err != nil {
return false, fmt.Errorf("newDownloadsAreProhibited: unmarshal: %w", err)
}
}
for _, p := range prohibitedList {
if strings.Contains(name, p) {
return true, nil
}
}
return false, nil
}
1 change: 1 addition & 0 deletions erigon-lib/downloader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ func _addTorrentFile(ctx context.Context, ts *torrent.TorrentSpec, torrentClient
return nil, false, fmt.Errorf("addTorrentFile %s: update failed: %w", ts.DisplayName, err)
}
} else {
t.AddWebSeeds(ts.Webseeds)
if err := db.Update(ctx, torrentInfoReset(ts.DisplayName, ts.InfoHash.Bytes(), 0)); err != nil {
return nil, false, fmt.Errorf("addTorrentFile %s: reset failed: %w", ts.DisplayName, err)
}
Expand Down
55 changes: 45 additions & 10 deletions erigon-lib/downloader/webseed.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ type WebSeeds struct {
}

func (d *WebSeeds) Discover(ctx context.Context, urls []*url.URL, files []string, rootDir string) {
if d.torrentFiles.newDownloadsAreProhibited() {
return
}
listsOfFiles := d.constructListsOfFiles(ctx, urls, files)
torrentMap := d.makeTorrentUrls(listsOfFiles)
webSeedMap := d.downloadTorrentFilesFromProviders(ctx, rootDir, torrentMap)
Expand All @@ -64,6 +61,14 @@ func (d *WebSeeds) constructListsOfFiles(ctx context.Context, httpProviders []*u
d.logger.Debug("[snapshots.webseed] get from HTTP provider", "err", err, "url", webSeedProviderURL.EscapedPath())
continue
}
// check if we need to prohibit new downloads for some files
for name := range manifestResponse {
prohibited, err := d.torrentFiles.newDownloadsAreProhibited(name)
if prohibited || err != nil {
delete(manifestResponse, name)
}
}

listsOfFiles = append(listsOfFiles, manifestResponse)
}

Expand All @@ -74,6 +79,13 @@ func (d *WebSeeds) constructListsOfFiles(ctx context.Context, httpProviders []*u
d.logger.Debug("[snapshots.webseed] get from File provider", "err", err)
continue
}
// check if we need to prohibit new downloads for some files
for name := range response {
prohibited, err := d.torrentFiles.newDownloadsAreProhibited(name)
if prohibited || err != nil {
delete(response, name)
}
}
listsOfFiles = append(listsOfFiles, response)
}
return listsOfFiles
Expand Down Expand Up @@ -219,17 +231,13 @@ func (d *WebSeeds) downloadTorrentFilesFromProviders(ctx context.Context, rootDi
tUrls := tUrls
e.Go(func() error {
for _, url := range tUrls {
res, err := d.callTorrentHttpProvider(ctx, url, name)
//validation happens inside
_, err := d.callTorrentHttpProvider(ctx, url, name)
if err != nil {
d.logger.Log(d.verbosity, "[snapshots] got from webseed", "name", name, "err", err, "url", url)
continue
}
if !d.torrentFiles.Exists(name) {
if err := d.torrentFiles.Create(name, res); err != nil {
d.logger.Log(d.verbosity, "[snapshots] .torrent from webseed rejected", "name", name, "err", err, "url", url)
continue
}
}
//don't save .torrent here - do it inside downloader.Add
webSeeMapLock.Lock()
webSeedMap[torrentMap[*url]] = struct{}{}
webSeeMapLock.Unlock()
Expand All @@ -244,6 +252,33 @@ func (d *WebSeeds) downloadTorrentFilesFromProviders(ctx context.Context, rootDi
return webSeedMap
}

func (d *WebSeeds) DownloadAndSaveTorrentFile(ctx context.Context, name string) (bool, error) {
urls, ok := d.ByFileName(name)
if !ok {
return false, nil
}
for _, urlStr := range urls {
parsedUrl, err := url.Parse(urlStr)
if err != nil {
continue
}
res, err := d.callTorrentHttpProvider(ctx, parsedUrl, name)
if err != nil {
return false, err
}
if d.torrentFiles.Exists(name) {
continue
}
if err := d.torrentFiles.Create(name, res); err != nil {
d.logger.Log(d.verbosity, "[snapshots] .torrent from webseed rejected", "name", name, "err", err)
continue
}
return true, nil
}

return false, nil
}

func (d *WebSeeds) callTorrentHttpProvider(ctx context.Context, url *url.URL, fileName string) ([]byte, error) {
request, err := http.NewRequest(http.MethodGet, url.String(), nil)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

require (
github.com/erigontech/mdbx-go v0.27.24
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20240222083139-3cef6c872d07
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20240321134048-58ba2110522a
github.com/ledgerwatch/interfaces v0.0.0-20240320062914-b57f05746087
github.com/ledgerwatch/log/v3 v3.9.0
github.com/ledgerwatch/secp256k1 v1.0.0
Expand Down
Loading

0 comments on commit e08867b

Please sign in to comment.