Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automation tool to automatically upload caplin's snapshot files to R2 #8747

Merged
merged 13 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ COMMANDS += evm
COMMANDS += sentinel
COMMANDS += caplin
COMMANDS += caplin-regression
COMMANDS += tooling



# build each command using %.cmd rule
Expand Down
4 changes: 2 additions & 2 deletions cl/antiquary/antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ func (a *Antiquary) Loop() error {
if from >= to {
continue
}
to = utils.Min64(to, to-safetyMargin) // We don't want to retire snapshots that are too close to the finalized head
to = (to / snaptype.Erigon2RecentMergeLimit) * snaptype.Erigon2RecentMergeLimit
if to-from < snaptype.Erigon2RecentMergeLimit {
continue
}
to = utils.Min64(to, to-safetyMargin) // We don't want to retire snapshots that are too close to the finalized head
to = (to / snaptype.Erigon2RecentMergeLimit) * snaptype.Erigon2RecentMergeLimit
if err := a.antiquate(from, to); err != nil {
return err
}
Expand Down
62 changes: 23 additions & 39 deletions cmd/capcli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/chain/snapcfg"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/downloader"

"github.com/ledgerwatch/erigon/cl/abstract"
Expand Down Expand Up @@ -464,8 +463,6 @@ type CheckSnapshots struct {
chainCfg
outputFolder
withPPROF

Slot uint64 `name:"slot" help:"slot to check"`
}

func (c *CheckSnapshots) Run(ctx *Context) error {
Expand All @@ -480,7 +477,7 @@ func (c *CheckSnapshots) Run(ctx *Context) error {
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))

rawDB := persistence.AferoRawBeaconBlockChainFromOsPath(beaconConfig, dirs.CaplinHistory)
beaconDB, db, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, rawDB, dirs.CaplinIndexing, nil, false)
_, db, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, rawDB, dirs.CaplinIndexing, nil, false)
if err != nil {
return err
}
Expand All @@ -503,51 +500,38 @@ func (c *CheckSnapshots) Run(ctx *Context) error {
return err
}

br := &snapshot_format.MockBlockReader{}
snReader := freezeblocks.NewBeaconSnapshotReader(csn, br, beaconDB, beaconConfig)
for i := c.Slot; i < to; i++ {
// Read the original canonical slot
data, err := beaconDB.GetBlock(ctx, tx, i)
genesisHeader, _, _, err := csn.ReadHeader(0)
if err != nil {
return err
}
previousBlockRoot, err := genesisHeader.Header.HashSSZ()
if err != nil {
return err
}
previousBlockSlot := genesisHeader.Header.Slot
for i := uint64(1); i < to; i++ {
if utils.Min64(0, i-320) > previousBlockSlot {
return fmt.Errorf("snapshot %d has invalid slot", i)
}
// Checking of snapshots is a chain contiguity problem
currentHeader, _, _, err := csn.ReadHeader(i)
if err != nil {
return err
}
if data == nil {
continue
}
blk := data.Data
if blk == nil {
if currentHeader == nil {
continue
}
// first thing if the block is bellatrix update the mock block reader
if blk.Version() >= clparams.BellatrixVersion {
br.Block = blk.Block.Body.ExecutionPayload
}
blk2, err := snReader.ReadBlockBySlot(ctx, tx, i)
if err != nil {
log.Error("Error detected in decoding snapshots", "err", err, "slot", i)
return nil
}
if blk2 == nil {
log.Error("Block not found in snapshot", "slot", i)
return nil
}

hash1, _ := blk.Block.HashSSZ()
hash2, _ := blk2.Block.HashSSZ()
if hash1 != hash2 {
log.Error("Mismatching blocks", "slot", i, "gotSlot", blk2.Block.Slot, "datadir", libcommon.Hash(hash1), "snapshot", libcommon.Hash(hash2))
return nil
if currentHeader.Header.ParentRoot != previousBlockRoot {
return fmt.Errorf("snapshot %d has invalid parent root", i)
}
header, _, _, err := csn.ReadHeader(i)
previousBlockRoot, err = currentHeader.Header.HashSSZ()
if err != nil {
return err
}
hash3, _ := header.Header.HashSSZ()
if hash3 != hash2 {
log.Error("Mismatching blocks", "slot", i, "gotSlot", blk2.Block.Slot, "datadir", libcommon.Hash(hash1), "snapshot", libcommon.Hash(hash3))
return nil
previousBlockSlot = currentHeader.Header.Slot
if i%2000 == 0 {
log.Info("Successfully checked", "slot", i)
}
log.Info("Successfully checked", "slot", i)
}
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/tooling/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Tooling

this are a bunch of tools for our scripting necessities
174 changes: 174 additions & 0 deletions cmd/tooling/cli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package main

import (
"fmt"
"math"
"os/exec"
"time"

"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cmd/caplin/caplin1"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks"
"golang.org/x/net/context"

"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
"github.com/ledgerwatch/erigon/cl/persistence"
"github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies"
"github.com/ledgerwatch/erigon/cl/persistence/db_config"
"github.com/ledgerwatch/erigon/cl/utils"

"github.com/ledgerwatch/log/v3"
)

var CLI struct {
BucketCaplinAutomation BucketCaplinAutomation `cmd:"" help:"migrate from one state to another"`
}

type chainCfg struct {
Chain string `help:"chain" default:"mainnet"`
}

// func (c *chainCfg) configs() (beaconConfig *clparams.BeaconChainConfig, genesisConfig *clparams.GenesisConfig, err error) {
// genesisConfig, _, beaconConfig, _, err = clparams.GetConfigsByNetworkName(c.Chain)
// return
// }

type withDatadir struct {
Datadir string `help:"datadir" default:"~/.local/share/erigon" type:"existingdir"`
}

// func (w *withPPROF) withProfile() {
// if w.Pprof {
// debug.StartPProf("localhost:6060", metrics.Setup("localhost:6060", log.Root()))
// }
// }

// func (w *withSentinel) connectSentinel() (sentinel.SentinelClient, error) {
// // YOLO message size
// gconn, err := grpc.Dial(w.Sentinel, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt)))
// if err != nil {
// return nil, err
// }
// return sentinel.NewSentinelClient(gconn), nil
// }

// func openFs(fsName string, path string) (afero.Fs, error) {
// return afero.NewBasePathFs(afero.NewBasePathFs(afero.NewOsFs(), fsName), path), nil
// }

type BucketCaplinAutomation struct {
withDatadir
chainCfg

UploadPeriod time.Duration `help:"upload period" default:"1440h"`
Bucket string `help:"r2 address" default:"http://localhost:8080"`
}

func (c *BucketCaplinAutomation) Run(ctx *Context) error {
_, _, beaconConfig, _, err := clparams.GetConfigsByNetworkName(c.Chain)
if err != nil {
return err
}
log.Root().SetHandler(log.LvlFilterHandler(log.LvlDebug, log.StderrHandler))
log.Info("Started the automation tool for automatic snapshot sanity check and R2 uploading (caplin only)", "chain", c.Chain)
dirs := datadir.New(c.Datadir)
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))
tickerTriggerer := time.NewTicker(c.UploadPeriod)
defer tickerTriggerer.Stop()
// do the checking at first run
if err := checkSnapshots(ctx, beaconConfig, dirs); err != nil {
return err
}
log.Info("Uploading snapshots to R2 bucket")
// next upload to R2
command := "rclone"
args := []string{"sync", dirs.Snap, c.Bucket, "--include", "*beaconblocks*"}
if err := exec.Command(command, args...).Run(); err != nil {
return fmt.Errorf("rclone failed, make sure rclone is installed and is properly configured: %s", err)
}
log.Info("Finished snapshots to R2 bucket")
for {
select {
case <-tickerTriggerer.C:
log.Info("Checking snapshots")
if err := checkSnapshots(ctx, beaconConfig, dirs); err != nil {
return err
}
log.Info("Finishing snapshots")
// next upload to R2
command := "rclone"
args := []string{"sync", dirs.Snap, c.Bucket, "--include", "*beaconblocks*"}
log.Info("Uploading snapshots to R2 bucket")
if err := exec.Command(command, args...).Run(); err != nil {
return fmt.Errorf("rclone failed, make sure rclone is installed and is properly configured: %s", err)
}
log.Info("Finished snapshots to R2 bucket")
case <-ctx.Done():
return nil
}
}
}

func checkSnapshots(ctx context.Context, beaconConfig *clparams.BeaconChainConfig, dirs datadir.Dirs) error {
rawDB := persistence.AferoRawBeaconBlockChainFromOsPath(beaconConfig, dirs.CaplinHistory)
_, db, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, rawDB, dirs.CaplinIndexing, nil, false)
if err != nil {
return err
}
defer db.Close()
var to uint64
tx, err := db.BeginRo(ctx)
if err != nil {
return err
}
defer tx.Rollback()

to, err = beacon_indicies.ReadHighestFinalized(tx)
if err != nil {
return err
}

to = (to / snaptype.Erigon2RecentMergeLimit) * snaptype.Erigon2RecentMergeLimit

csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, dirs.Snap, log.Root())
if err := csn.ReopenFolder(); err != nil {
return err
}

genesisHeader, _, _, err := csn.ReadHeader(0)
if err != nil {
return err
}
previousBlockRoot, err := genesisHeader.Header.HashSSZ()
if err != nil {
return err
}
previousBlockSlot := genesisHeader.Header.Slot
for i := uint64(1); i < to; i++ {
if utils.Min64(0, i-320) > previousBlockSlot {
return fmt.Errorf("snapshot %d has invalid slot", i)
}
// Checking of snapshots is a chain contiguity problem
currentHeader, _, _, err := csn.ReadHeader(i)
if err != nil {
return err
}
if currentHeader == nil {
continue
}
if currentHeader.Header.ParentRoot != previousBlockRoot {
return fmt.Errorf("snapshot %d has invalid parent root", i)
}
previousBlockRoot, err = currentHeader.Header.HashSSZ()
if err != nil {
return err
}
previousBlockSlot = currentHeader.Header.Slot
if i%20000 == 0 {
log.Info("Successfully checked", "slot", i)
}
}
return nil
}
35 changes: 35 additions & 0 deletions cmd/tooling/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
Copyright 2022 Erigon-Lightclient contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"

"github.com/alecthomas/kong"
)

type Context struct {
context.Context
kctx *kong.Context
}

func main() {
ctx := kong.Parse(&CLI)
// Call the Run() method of the selected parsed command.
err := ctx.Run(&Context{
kctx: ctx,
Context: context.TODO(),
})
ctx.FatalIfErrorf(err)
}
Loading
Loading