Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picked essential caplin stuff #11569

Merged
merged 2 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cl/antiquary/antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/ledgerwatch/log/v3"
)

const safetyMargin = 2_000 // We retire snapshots 2k blocks after the finalized head
const safetyMargin = 10_000 // We retire snapshots 10k blocks after the finalized head

// Antiquary is where the snapshots go, aka old history, it is what keep track of the oldest records.
type Antiquary struct {
Expand Down Expand Up @@ -304,6 +304,10 @@ func (a *Antiquary) antiquateBlobs() error {
defer roTx.Rollback()
// perform blob antiquation if it is time to.
currentBlobsProgress := a.sn.FrozenBlobs()
// We should NEVER get ahead of the block snapshots.
if currentBlobsProgress >= a.sn.BlocksAvailable() {
return nil
}
minimunBlobsProgress := ((a.cfg.DenebForkEpoch * a.cfg.SlotsPerEpoch) / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit
currentBlobsProgress = utils.Max64(currentBlobsProgress, minimunBlobsProgress)
// read the finalized head
Expand Down
20 changes: 12 additions & 8 deletions cl/phase1/stages/stage_history_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,18 +204,18 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co
close(finishCh)
if cfg.blobsBackfilling {
go func() {
if err := downloadBlobHistoryWorker(cfg, ctx, logger); err != nil {
if err := downloadBlobHistoryWorker(cfg, ctx, true, logger); err != nil {
logger.Error("Error downloading blobs", "err", err)
}
// set a timer every 1 hour as a failsafe
ticker := time.NewTicker(time.Hour)
// set a timer every 15 minutes as a failsafe
ticker := time.NewTicker(15 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := downloadBlobHistoryWorker(cfg, ctx, logger); err != nil {
if err := downloadBlobHistoryWorker(cfg, ctx, false, logger); err != nil {
logger.Error("Error downloading blobs", "err", err)
}
}
Expand Down Expand Up @@ -249,7 +249,7 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co
}

// downloadBlobHistoryWorker is a worker that downloads the blob history by using the already downloaded beacon blocks
func downloadBlobHistoryWorker(cfg StageHistoryReconstructionCfg, ctx context.Context, logger log.Logger) error {
func downloadBlobHistoryWorker(cfg StageHistoryReconstructionCfg, ctx context.Context, shouldLog bool, logger log.Logger) error {
currentSlot := cfg.startingSlot + 1
blocksBatchSize := uint64(8) // requests 8 blocks worth of blobs at a time
tx, err := cfg.indiciesDB.BeginRo(ctx)
Expand All @@ -263,7 +263,7 @@ func downloadBlobHistoryWorker(cfg StageHistoryReconstructionCfg, ctx context.Co
prevLogSlot := currentSlot
prevTime := time.Now()
targetSlot := cfg.beaconCfg.DenebForkEpoch * cfg.beaconCfg.SlotsPerEpoch
cfg.logger.Info("Downloading blobs backwards", "from", currentSlot, "to", targetSlot)

for currentSlot >= targetSlot {
if currentSlot <= cfg.sn.FrozenBlobs() {
break
Expand Down Expand Up @@ -312,7 +312,9 @@ func downloadBlobHistoryWorker(cfg StageHistoryReconstructionCfg, ctx context.Co
case <-ctx.Done():
return ctx.Err()
case <-logInterval.C:

if !shouldLog {
continue
}
blkSec := float64(prevLogSlot-currentSlot) / time.Since(prevTime).Seconds()
blkSecStr := fmt.Sprintf("%.1f", blkSec)
// round to 1 decimal place and convert to string
Expand Down Expand Up @@ -353,7 +355,9 @@ func downloadBlobHistoryWorker(cfg StageHistoryReconstructionCfg, ctx context.Co
continue
}
}
log.Info("Blob history download finished successfully")
if shouldLog {
logger.Info("Blob history download finished successfully")
}
cfg.antiquary.NotifyBlobBackfilled()
return nil
}
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ var (

BeaconAPIFlag = cli.StringSliceFlag{
Name: "beacon.api",
Usage: "Enable beacon API (avaiable endpoints: beacon, builder, config, debug, events, node, validator, rewards, lighthouse)",
Usage: "Enable beacon API (avaiable endpoints: beacon, builder, config, debug, events, node, validator, lighthouse)",
}
BeaconApiProtocolFlag = cli.StringFlag{
Name: "beacon.api.protocol",
Expand Down
Loading