From aa6c7e6da15e7522093900487ee0cfdb556e89e1 Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Mon, 12 Aug 2024 22:18:47 +0200 Subject: [PATCH] Cherry-picked essential caplin stuff (#11569) Co-authored-by: Kewei --- cl/antiquary/antiquary.go | 6 +++++- cl/phase1/stages/stage_history_download.go | 20 ++++++++++++-------- cmd/utils/flags.go | 2 +- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/cl/antiquary/antiquary.go b/cl/antiquary/antiquary.go index 61a03ccd130..91f363c55c1 100644 --- a/cl/antiquary/antiquary.go +++ b/cl/antiquary/antiquary.go @@ -20,7 +20,7 @@ import ( "github.com/ledgerwatch/log/v3" ) -const safetyMargin = 2_000 // We retire snapshots 2k blocks after the finalized head +const safetyMargin = 10_000 // We retire snapshots 10k blocks after the finalized head // Antiquary is where the snapshots go, aka old history, it is what keep track of the oldest records. type Antiquary struct { @@ -304,6 +304,10 @@ func (a *Antiquary) antiquateBlobs() error { defer roTx.Rollback() // perform blob antiquation if it is time to. currentBlobsProgress := a.sn.FrozenBlobs() + // We should NEVER get ahead of the block snapshots. + if currentBlobsProgress >= a.sn.BlocksAvailable() { + return nil + } minimunBlobsProgress := ((a.cfg.DenebForkEpoch * a.cfg.SlotsPerEpoch) / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit currentBlobsProgress = utils.Max64(currentBlobsProgress, minimunBlobsProgress) // read the finalized head diff --git a/cl/phase1/stages/stage_history_download.go b/cl/phase1/stages/stage_history_download.go index 44985789950..73013fdf3f9 100644 --- a/cl/phase1/stages/stage_history_download.go +++ b/cl/phase1/stages/stage_history_download.go @@ -204,18 +204,18 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co close(finishCh) if cfg.blobsBackfilling { go func() { - if err := downloadBlobHistoryWorker(cfg, ctx, logger); err != nil { + if err := downloadBlobHistoryWorker(cfg, ctx, true, logger); err != nil { logger.Error("Error downloading blobs", "err", err) } - // set a timer every 1 hour as a failsafe - ticker := time.NewTicker(time.Hour) + // set a timer every 15 minutes as a failsafe + ticker := time.NewTicker(15 * time.Minute) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: - if err := downloadBlobHistoryWorker(cfg, ctx, logger); err != nil { + if err := downloadBlobHistoryWorker(cfg, ctx, false, logger); err != nil { logger.Error("Error downloading blobs", "err", err) } } @@ -249,7 +249,7 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co } // downloadBlobHistoryWorker is a worker that downloads the blob history by using the already downloaded beacon blocks -func downloadBlobHistoryWorker(cfg StageHistoryReconstructionCfg, ctx context.Context, logger log.Logger) error { +func downloadBlobHistoryWorker(cfg StageHistoryReconstructionCfg, ctx context.Context, shouldLog bool, logger log.Logger) error { currentSlot := cfg.startingSlot + 1 blocksBatchSize := uint64(8) // requests 8 blocks worth of blobs at a time tx, err := cfg.indiciesDB.BeginRo(ctx) @@ -263,7 +263,7 @@ func downloadBlobHistoryWorker(cfg StageHistoryReconstructionCfg, ctx context.Co prevLogSlot := currentSlot prevTime := time.Now() targetSlot := cfg.beaconCfg.DenebForkEpoch * cfg.beaconCfg.SlotsPerEpoch - cfg.logger.Info("Downloading blobs backwards", "from", currentSlot, "to", targetSlot) + for currentSlot >= targetSlot { if currentSlot <= cfg.sn.FrozenBlobs() { break @@ -312,7 +312,9 @@ func downloadBlobHistoryWorker(cfg StageHistoryReconstructionCfg, ctx context.Co case <-ctx.Done(): return ctx.Err() case <-logInterval.C: - + if !shouldLog { + continue + } blkSec := float64(prevLogSlot-currentSlot) / time.Since(prevTime).Seconds() blkSecStr := fmt.Sprintf("%.1f", blkSec) // round to 1 decimal place and convert to string @@ -353,7 +355,9 @@ func downloadBlobHistoryWorker(cfg StageHistoryReconstructionCfg, ctx context.Co continue } } - log.Info("Blob history download finished successfully") + if shouldLog { + logger.Info("Blob history download finished successfully") + } cfg.antiquary.NotifyBlobBackfilled() return nil } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 7cf4b19a16f..68f4e36227a 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -926,7 +926,7 @@ var ( BeaconAPIFlag = cli.StringSliceFlag{ Name: "beacon.api", - Usage: "Enable beacon API (avaiable endpoints: beacon, builder, config, debug, events, node, validator, rewards, lighthouse)", + Usage: "Enable beacon API (avaiable endpoints: beacon, builder, config, debug, events, node, validator, lighthouse)", } BeaconApiProtocolFlag = cli.StringFlag{ Name: "beacon.api.protocol",