diff --git a/cl/cltypes/beacon_block.go b/cl/cltypes/beacon_block.go index d09f404f9e5..8125342241c 100644 --- a/cl/cltypes/beacon_block.go +++ b/cl/cltypes/beacon_block.go @@ -94,7 +94,9 @@ func NewBeaconBlock(beaconCfg *clparams.BeaconChainConfig) *BeaconBlock { } func NewBeaconBody(beaconCfg *clparams.BeaconChainConfig) *BeaconBody { - return &BeaconBody{beaconCfg: beaconCfg} + return &BeaconBody{ + beaconCfg: beaconCfg, + } } // Version returns beacon block version. @@ -172,6 +174,8 @@ func (b *BeaconBody) DecodeSSZ(buf []byte, version int) error { return fmt.Errorf("[BeaconBody] err: %s", ssz.ErrLowBufferSize) } + b.ExecutionPayload = NewEth1Block(b.Version, b.beaconCfg) + err := ssz2.UnmarshalSSZ(buf, version, b.getSchema(false)...) return err } diff --git a/cl/persistence/format/chunk_encoding/chunks.go b/cl/persistence/format/chunk_encoding/chunks.go index 05ba58e3c54..eeefb3962ee 100644 --- a/cl/persistence/format/chunk_encoding/chunks.go +++ b/cl/persistence/format/chunk_encoding/chunks.go @@ -35,6 +35,11 @@ func ReadChunk(r io.Reader) (buf []byte, t DataType, err error) { } t = DataType(prefix[0]) prefix[0] = 0 + + bufLen := binary.BigEndian.Uint64(prefix) + if bufLen == 0 { + return + } buf = make([]byte, binary.BigEndian.Uint64(prefix)) if _, err = r.Read(buf); err != nil { return diff --git a/cl/persistence/format/snapshot_format/blocks.go b/cl/persistence/format/snapshot_format/blocks.go index e1f7abac312..8029ef78407 100644 --- a/cl/persistence/format/snapshot_format/blocks.go +++ b/cl/persistence/format/snapshot_format/blocks.go @@ -86,25 +86,10 @@ func WriteBlockForSnapshot(block *cltypes.SignedBeaconBlock, w io.Writer) error // count in body for phase0 fields currentChunkLength += uint64(body.ProposerSlashings.EncodingSizeSSZ()) currentChunkLength += uint64(body.AttesterSlashings.EncodingSizeSSZ()) - - // Write the chunk and chunk attestations - if err := chunk_encoding.WriteChunk(w, encoded[:currentChunkLength], chunk_encoding.ChunkDataType); err != nil { - return err - } - encoded = encoded[currentChunkLength:] - snappyWriter := snappy.NewBufferedWriter(w) - if err := chunk_encoding.WriteChunk(snappyWriter, encoded[:uint64(body.Attestations.EncodingSizeSSZ())], chunk_encoding.ChunkDataType); err != nil { - return err - } - if err := snappyWriter.Close(); err != nil { - return err - } - encoded = encoded[body.Attestations.EncodingSizeSSZ():] - currentChunkLength = 0 - + currentChunkLength += uint64(body.Attestations.EncodingSizeSSZ()) currentChunkLength += uint64(body.Deposits.EncodingSizeSSZ()) currentChunkLength += uint64(body.VoluntaryExits.EncodingSizeSSZ()) - + // Write the chunk and chunk attestations if err := chunk_encoding.WriteChunk(w, encoded[:currentChunkLength], chunk_encoding.ChunkDataType); err != nil { return err } @@ -140,6 +125,60 @@ func ReadBlockFromSnapshot(r io.Reader, executionReader ExecutionBlockReaderByNu return nil, err } + // Read the first chunk + chunk1, dT1, err := chunk_encoding.ReadChunk(r) + if err != nil { + return nil, err + } + if dT1 != chunk_encoding.ChunkDataType { + return nil, fmt.Errorf("malformed beacon block, invalid chunk 1 type %d, expected: %d", dT1, chunk_encoding.ChunkDataType) + } + plainSSZ = append(plainSSZ, chunk1...) + + if v <= clparams.AltairVersion { + return block, block.DecodeSSZ(plainSSZ, int(v)) + } + // Read the block pointer and retrieve chunk4 from the execution reader + blockPointer, err := readExecutionBlockPtr(r) + if err != nil { + return nil, err + } + executionBlock, err := executionReader.BlockByNumber(blockPointer) + if err != nil { + return nil, err + } + // Read the 4th chunk + chunk2, err := executionBlock.EncodeSSZ(nil) + if err != nil { + return nil, err + } + plainSSZ = append(plainSSZ, chunk2...) + if v <= clparams.BellatrixVersion { + return block, block.DecodeSSZ(plainSSZ, int(v)) + } + + // Read the 5h chunk + chunk3, dT5, err := chunk_encoding.ReadChunk(r) + if err != nil { + return nil, err + } + if dT5 != chunk_encoding.ChunkDataType { + return nil, fmt.Errorf("malformed beacon block, invalid chunk 5 type %d, expected: %d", dT5, chunk_encoding.ChunkDataType) + } + plainSSZ = append(plainSSZ, chunk3...) + + return block, block.DecodeSSZ(plainSSZ, int(v)) +} + +func ReadRawBlockFromSnapshot(r io.Reader, executionReader ExecutionBlockReaderByNumber, cfg *clparams.BeaconChainConfig) ([]byte, error) { + plainSSZ := []byte{} + + // Metadata section is just the current hardfork of the block. TODO(give it a useful purpose) + v, err := readMetadataForBlock(r) + if err != nil { + return nil, err + } + // Read the first chunk chunk1, dT1, err := chunk_encoding.ReadChunk(r) if err != nil { @@ -168,7 +207,7 @@ func ReadBlockFromSnapshot(r io.Reader, executionReader ExecutionBlockReaderByNu } plainSSZ = append(plainSSZ, chunk3...) if v <= clparams.AltairVersion { - return block, block.DecodeSSZ(plainSSZ, int(v)) + return plainSSZ, nil } // Read the block pointer and retrieve chunk4 from the execution reader blockPointer, err := readExecutionBlockPtr(r) @@ -186,7 +225,7 @@ func ReadBlockFromSnapshot(r io.Reader, executionReader ExecutionBlockReaderByNu } plainSSZ = append(plainSSZ, chunk4...) if v <= clparams.BellatrixVersion { - return block, block.DecodeSSZ(plainSSZ, int(v)) + return plainSSZ, nil } // Read the 5h chunk @@ -199,5 +238,5 @@ func ReadBlockFromSnapshot(r io.Reader, executionReader ExecutionBlockReaderByNu } plainSSZ = append(plainSSZ, chunk5...) - return block, block.DecodeSSZ(plainSSZ, int(v)) + return plainSSZ, nil } diff --git a/cl/persistence/format/snapshot_format/blocks_test.go b/cl/persistence/format/snapshot_format/blocks_test.go index 5efc2963fe6..8021c3fcc38 100644 --- a/cl/persistence/format/snapshot_format/blocks_test.go +++ b/cl/persistence/format/snapshot_format/blocks_test.go @@ -27,8 +27,15 @@ var capellaBlockSSZSnappy []byte //go:embed test_data/deneb.ssz_snappy var denebBlockSSZSnappy []byte +var emptyBlock = cltypes.NewSignedBeaconBlock(&clparams.MainnetBeaconConfig) + // obtain the test blocks func getTestBlocks(t *testing.T) []*cltypes.SignedBeaconBlock { + var emptyBlockCapella = cltypes.NewSignedBeaconBlock(&clparams.MainnetBeaconConfig) + emptyBlockCapella.Block.Slot = clparams.MainnetBeaconConfig.CapellaForkEpoch * 32 + + emptyBlock.EncodingSizeSSZ() + emptyBlockCapella.EncodingSizeSSZ() denebBlock := cltypes.NewSignedBeaconBlock(&clparams.MainnetBeaconConfig) capellaBlock := cltypes.NewSignedBeaconBlock(&clparams.MainnetBeaconConfig) bellatrixBlock := cltypes.NewSignedBeaconBlock(&clparams.MainnetBeaconConfig) @@ -40,22 +47,14 @@ func getTestBlocks(t *testing.T) []*cltypes.SignedBeaconBlock { require.NoError(t, utils.DecodeSSZSnappy(bellatrixBlock, bellatrixBlockSSZSnappy, int(clparams.BellatrixVersion))) require.NoError(t, utils.DecodeSSZSnappy(altairBlock, altairBlockSSZSnappy, int(clparams.AltairVersion))) require.NoError(t, utils.DecodeSSZSnappy(phase0Block, phase0BlockSSZSnappy, int(clparams.Phase0Version))) - return []*cltypes.SignedBeaconBlock{phase0Block, altairBlock, bellatrixBlock, capellaBlock, denebBlock} -} - -type TestBlockReader struct { - Block *cltypes.Eth1Block -} - -func (t *TestBlockReader) BlockByNumber(number uint64) (*cltypes.Eth1Block, error) { - return t.Block, nil + return []*cltypes.SignedBeaconBlock{phase0Block, altairBlock, bellatrixBlock, capellaBlock, denebBlock, emptyBlock, emptyBlockCapella} } func TestBlockSnapshotEncoding(t *testing.T) { for _, blk := range getTestBlocks(t) { - var br TestBlockReader + var br snapshot_format.MockBlockReader if blk.Version() >= clparams.BellatrixVersion { - br = TestBlockReader{Block: blk.Block.Body.ExecutionPayload} + br = snapshot_format.MockBlockReader{Block: blk.Block.Body.ExecutionPayload} } var b bytes.Buffer require.NoError(t, snapshot_format.WriteBlockForSnapshot(blk, &b)) diff --git a/cl/persistence/format/snapshot_format/snapshots.go b/cl/persistence/format/snapshot_format/snapshots.go deleted file mode 100644 index 8cecbc8f1e0..00000000000 --- a/cl/persistence/format/snapshot_format/snapshots.go +++ /dev/null @@ -1,87 +0,0 @@ -package snapshot_format - -import ( - "bytes" - "context" - "fmt" - - "github.com/ledgerwatch/erigon-lib/common/background" - "github.com/ledgerwatch/erigon-lib/common/cmp" - "github.com/ledgerwatch/erigon-lib/compress" - "github.com/ledgerwatch/erigon-lib/downloader/snaptype" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/cl/persistence" - "github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks" - "github.com/ledgerwatch/log/v3" -) - -func dumpBeaconBlocksRange(ctx context.Context, db kv.RoDB, b persistence.BlockSource, fromSlot uint64, toSlot uint64, tmpDir, snapDir string, workers int, lvl log.Lvl, logger log.Logger) error { - segName := snaptype.SegmentFileName(fromSlot, toSlot, snaptype.BeaconBlocks) - f, _ := snaptype.ParseFileName(snapDir, segName) - - sn, err := compress.NewCompressor(ctx, "Snapshot BeaconBlocks", f.Path, tmpDir, compress.MinPatternScore, workers, lvl, logger) - if err != nil { - return err - } - defer sn.Close() - - tx, err := db.BeginRo(ctx) - if err != nil { - return err - } - defer tx.Rollback() - // Generate .seg file, which is just the list of beacon blocks. - var buf bytes.Buffer - for i := fromSlot; i <= toSlot; i++ { - obj, err := b.GetBlock(ctx, tx, i) - if err != nil { - return err - } - if obj == nil { - if err := sn.AddWord(nil); err != nil { - return err - } - continue - } - if err := WriteBlockForSnapshot(obj.Data, &buf); err != nil { - return err - } - if err := sn.AddWord(buf.Bytes()); err != nil { - return err - } - buf.Reset() - } - if err := sn.Compress(); err != nil { - return fmt.Errorf("compress: %w", err) - } - // Generate .idx file, which is the slot => offset mapping. - p := &background.Progress{} - - return freezeblocks.BeaconBlocksIdx(ctx, segName, fromSlot, toSlot, snapDir, tmpDir, p, lvl, logger) -} - -func chooseSegmentEnd(from, to, blocksPerFile uint64) uint64 { - next := (from/blocksPerFile + 1) * blocksPerFile - to = cmp.Min(next, to) - - if to < snaptype.Erigon2MinSegmentSize { - return to - } - - return to - (to % snaptype.Erigon2MinSegmentSize) // round down to the nearest 1k -} - -func DumpBeaconBlocks(ctx context.Context, db kv.RoDB, b persistence.BlockSource, fromSlot, toSlot, blocksPerFile uint64, tmpDir, snapDir string, workers int, lvl log.Lvl, logger log.Logger) error { - if blocksPerFile == 0 { - return nil - } - - for i := fromSlot; i < toSlot; i = chooseSegmentEnd(i, toSlot, blocksPerFile) { - to := chooseSegmentEnd(i, toSlot, blocksPerFile) - logger.Log(lvl, "Dumping beacon blocks", "from", i, "to", to) - if err := dumpBeaconBlocksRange(ctx, db, b, i, to, tmpDir, snapDir, workers, lvl, logger); err != nil { - return err - } - } - return nil -} diff --git a/cl/persistence/format/snapshot_format/test_util.go b/cl/persistence/format/snapshot_format/test_util.go new file mode 100644 index 00000000000..3993c1648b7 --- /dev/null +++ b/cl/persistence/format/snapshot_format/test_util.go @@ -0,0 +1,11 @@ +package snapshot_format + +import "github.com/ledgerwatch/erigon/cl/cltypes" + +type MockBlockReader struct { + Block *cltypes.Eth1Block +} + +func (t *MockBlockReader) BlockByNumber(number uint64) (*cltypes.Eth1Block, error) { + return t.Block, nil +} diff --git a/cl/spectest/consensus_tests/ssz_static.go b/cl/spectest/consensus_tests/ssz_static.go index f5621181344..b38e71df336 100644 --- a/cl/spectest/consensus_tests/ssz_static.go +++ b/cl/spectest/consensus_tests/ssz_static.go @@ -1,10 +1,15 @@ package consensus_tests import ( - "github.com/ledgerwatch/erigon/spectest" + "bytes" "io/fs" "testing" + "github.com/ledgerwatch/erigon/spectest" + + "github.com/ledgerwatch/erigon/cl/clparams" + "github.com/ledgerwatch/erigon/cl/cltypes" + "github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format" "github.com/ledgerwatch/erigon/cl/phase1/core/state" libcommon "github.com/ledgerwatch/erigon-lib/common" @@ -59,6 +64,24 @@ func getSSZStaticConsensusTest[T unmarshalerMarshalerHashable](ref T) spectest.H haveEncoded, err := object.EncodeSSZ(nil) require.NoError(t, err) require.EqualValues(t, haveEncoded, encoded) + // Now let it do the encoding in snapshot format + if blk, ok := object.(*cltypes.SignedBeaconBlock); ok { + var b bytes.Buffer + require.NoError(t, snapshot_format.WriteBlockForSnapshot(blk, &b)) + var br snapshot_format.MockBlockReader + if blk.Version() >= clparams.BellatrixVersion { + br = snapshot_format.MockBlockReader{Block: blk.Block.Body.ExecutionPayload} + + } + + blk2, err := snapshot_format.ReadBlockFromSnapshot(&b, &br, &clparams.MainnetBeaconConfig) + require.NoError(t, err) + + haveRoot, err := blk2.HashSSZ() + require.NoError(t, err) + require.EqualValues(t, expectedRoot, haveRoot) + } + return nil }) } diff --git a/cmd/capcli/cli.go b/cmd/capcli/cli.go index e9c70f0629e..e593e4f4de9 100644 --- a/cmd/capcli/cli.go +++ b/cmd/capcli/cli.go @@ -7,11 +7,15 @@ import ( "strings" "time" + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/cl/abstract" "github.com/ledgerwatch/erigon/cl/clparams" "github.com/ledgerwatch/erigon/cl/cltypes" persistence2 "github.com/ledgerwatch/erigon/cl/persistence" "github.com/ledgerwatch/erigon/cmd/caplin/caplin1" + "github.com/ledgerwatch/erigon/eth/ethconfig" + "github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks" "github.com/ledgerwatch/erigon-lib/common/datadir" "github.com/ledgerwatch/erigon-lib/downloader/snaptype" @@ -45,8 +49,9 @@ var CLI struct { Blocks Blocks `cmd:"" help:"download blocks from reqresp network"` Epochs Epochs `cmd:"" help:"download epochs from reqresp network"` - Chain Chain `cmd:"" help:"download the entire chain from reqresp network"` - DumpSnapshots DumpSnapshots `cmd:"" help:"generate caplin snapshots"` + Chain Chain `cmd:"" help:"download the entire chain from reqresp network"` + DumpSnapshots DumpSnapshots `cmd:"" help:"generate caplin snapshots"` + CheckSnapshots CheckSnapshots `cmd:"" help:"check snapshot folder against content of chain data"` } type chainCfg struct { @@ -425,5 +430,87 @@ func (c *DumpSnapshots) Run(ctx *Context) error { return }) - return snapshot_format.DumpBeaconBlocks(ctx, db, beaconDB, 0, to, snaptype.Erigon2SegmentSize, dirs.Tmp, dirs.Snap, 8, log.LvlInfo, log.Root()) + return freezeblocks.DumpBeaconBlocks(ctx, db, beaconDB, 0, to, snaptype.Erigon2SegmentSize, dirs.Tmp, dirs.Snap, 8, log.LvlInfo, log.Root()) +} + +type CheckSnapshots struct { + chainCfg + outputFolder + + Slot uint64 `name:"slot" help:"slot to check"` +} + +func (c *CheckSnapshots) Run(ctx *Context) error { + _, _, beaconConfig, _, err := clparams.GetConfigsByNetworkName(c.Chain) + if err != nil { + return err + } + log.Root().SetHandler(log.LvlFilterHandler(log.LvlDebug, log.StderrHandler)) + log.Info("Started the checking process", "chain", c.Chain) + + dirs := datadir.New(c.Datadir) + log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) + + rawDB := persistence.AferoRawBeaconBlockChainFromOsPath(beaconConfig, dirs.CaplinHistory) + beaconDB, db, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, rawDB, dirs.CaplinIndexing, nil, false) + if err != nil { + return err + } + var to uint64 + tx, err := db.BeginRo(ctx) + if err != nil { + return err + } + defer tx.Rollback() + + to, err = beacon_indicies.ReadHighestFinalized(tx) + if err != nil { + return err + } + + to = (to / snaptype.Erigon2SegmentSize) * snaptype.Erigon2SegmentSize + + csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, dirs.Snap, log.Root()) + if err := csn.ReopenFolder(); err != nil { + return err + } + + br := &snapshot_format.MockBlockReader{} + snReader := freezeblocks.NewBeaconSnapshotReader(csn, br, beaconConfig) + for i := c.Slot; i < to; i++ { + // Read the original canonical slot + data, err := beaconDB.GetBlock(ctx, tx, i) + if err != nil { + return err + } + if data == nil { + continue + } + blk := data.Data + if blk == nil { + continue + } + // first thing if the block is bellatrix update the mock block reader + if blk.Version() >= clparams.BellatrixVersion { + br.Block = blk.Block.Body.ExecutionPayload + } + blk2, err := snReader.ReadBlock(i) + if err != nil { + log.Error("Error detected in decoding snapshots", "err", err, "slot", i) + return nil + } + if blk2 == nil { + log.Error("Block not found in snapshot", "slot", i) + return nil + } + + hash1, _ := blk.Block.HashSSZ() + hash2, _ := blk2.Block.HashSSZ() + if hash1 != hash2 { + log.Error("Mismatching blocks", "slot", i, "gotSlot", blk2.Block.Slot, "datadir", libcommon.Hash(hash1), "snapshot", libcommon.Hash(hash2)) + return nil + } + log.Info("Successfully checked", "slot", i) + } + return nil } diff --git a/turbo/snapshotsync/freezeblocks/beacon_block_reader.go b/turbo/snapshotsync/freezeblocks/beacon_block_reader.go new file mode 100644 index 00000000000..78ef3fa9cc4 --- /dev/null +++ b/turbo/snapshotsync/freezeblocks/beacon_block_reader.go @@ -0,0 +1,69 @@ +package freezeblocks + +import ( + "bytes" + + "github.com/ledgerwatch/erigon/cl/clparams" + "github.com/ledgerwatch/erigon/cl/cltypes" + "github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format" +) + +type BeaconSnapshotReader interface { + // ReadBlock reads the block at the given slot. + // If the block is not present, it returns nil. + ReadBlock(slot uint64) (*cltypes.SignedBeaconBlock, error) + RawBlockSSZ(slot uint64) ([]byte, error) + + FrozenSlots() uint64 +} + +type beaconSnapshotReader struct { + sn *CaplinSnapshots + + eth1Getter snapshot_format.ExecutionBlockReaderByNumber + cfg *clparams.BeaconChainConfig +} + +func NewBeaconSnapshotReader(snapshots *CaplinSnapshots, eth1Getter snapshot_format.ExecutionBlockReaderByNumber, cfg *clparams.BeaconChainConfig) BeaconSnapshotReader { + return &beaconSnapshotReader{sn: snapshots, eth1Getter: eth1Getter, cfg: cfg} +} + +func (r *beaconSnapshotReader) FrozenSlots() uint64 { + return r.sn.BlocksAvailable() +} + +func (r *beaconSnapshotReader) ReadBlock(slot uint64) (*cltypes.SignedBeaconBlock, error) { + buf, err := r.RawBlockSSZ(slot) + if err != nil { + return nil, err + } + if buf == nil { + return nil, nil + } + return snapshot_format.ReadBlockFromSnapshot(bytes.NewReader(buf), r.eth1Getter, r.cfg) +} + +func (r *beaconSnapshotReader) RawBlockSSZ(slot uint64) ([]byte, error) { + view := r.sn.View() + defer view.Close() + + var buf []byte + + seg, ok := view.BeaconBlocksSegment(slot) + if !ok { + return nil, nil + } + + if seg.idxSlot == nil { + return nil, nil + } + blockOffset := seg.idxSlot.OrdinalLookup(slot - seg.idxSlot.BaseDataID()) + + gg := seg.seg.MakeGetter() + gg.Reset(blockOffset) + if !gg.HasNext() { + return nil, nil + } + buf, _ = gg.Next(buf) + return buf, nil +} diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index cb1bf9f5af7..97e791128bb 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -1118,6 +1118,28 @@ func noOverlaps(in []snaptype.FileInfo) (res []snaptype.FileInfo) { return res } +func SegmentsCaplin(dir string) (res []snaptype.FileInfo, missingSnapshots []Range, err error) { + list, err := snaptype.Segments(dir) + if err != nil { + return nil, missingSnapshots, err + } + + { + var l []snaptype.FileInfo + var m []Range + for _, f := range list { + if f.T != snaptype.BeaconBlocks { + continue + } + l = append(l, f) + } + l, m = noGaps(noOverlaps(l)) + res = append(res, l...) + missingSnapshots = append(missingSnapshots, m...) + } + return res, missingSnapshots, nil +} + func Segments(dir string) (res []snaptype.FileInfo, missingSnapshots []Range, err error) { list, err := snaptype.Segments(dir) if err != nil { @@ -2014,71 +2036,6 @@ RETRY: return nil } -func BeaconBlocksIdx(ctx context.Context, segmentFilePath string, blockFrom, blockTo uint64, snapDir string, tmpDir string, p *background.Progress, lvl log.Lvl, logger log.Logger) (err error) { - defer func() { - if rec := recover(); rec != nil { - err = fmt.Errorf("BeaconBlocksIdx: at=%d-%d, %v, %s", blockFrom, blockTo, rec, dbg.Stack()) - } - }() - // Calculate how many records there will be in the index - d, err := compress.NewDecompressor(segmentFilePath) - if err != nil { - return err - } - defer d.Close() - g := d.MakeGetter() - var idxFilePath = filepath.Join(snapDir, snaptype.IdxFileName(blockFrom, blockTo, snaptype.BeaconBlocks.String())) - - var baseSpanId uint64 - if blockFrom > zerothSpanEnd { - baseSpanId = 1 + (blockFrom-zerothSpanEnd-1)/spanLength - } - - rs, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{ - KeyCount: d.Count(), - Enums: d.Count() > 0, - BucketSize: 2000, - LeafSize: 8, - TmpDir: tmpDir, - IndexFile: idxFilePath, - BaseDataID: baseSpanId, - }, logger) - if err != nil { - return err - } - rs.LogLvl(log.LvlDebug) - - defer d.EnableMadvNormal().DisableReadAhead() -RETRY: - g.Reset(0) - var i, offset, nextPos uint64 - var key [8]byte - for g.HasNext() { - nextPos, _ = g.Skip() - binary.BigEndian.PutUint64(key[:], i) - i++ - if err = rs.AddKey(key[:], offset); err != nil { - return err - } - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - offset = nextPos - } - if err = rs.Build(ctx); err != nil { - if errors.Is(err, recsplit.ErrCollision) { - logger.Info("Building recsplit. Collision happened. It's ok. Restarting with another salt...", "err", err) - rs.ResetNextSalt() - goto RETRY - } - return err - } - - return nil -} - // HeadersIdx - headerHash -> offset (analog of kv.HeaderNumber) func HeadersIdx(ctx context.Context, chainConfig *chain.Config, segmentFilePath string, firstBlockNumInSegment uint64, tmpDir string, p *background.Progress, lvl log.Lvl, logger log.Logger) (err error) { defer func() { diff --git a/turbo/snapshotsync/freezeblocks/caplin_snapshots.go b/turbo/snapshotsync/freezeblocks/caplin_snapshots.go new file mode 100644 index 00000000000..221cbe79301 --- /dev/null +++ b/turbo/snapshotsync/freezeblocks/caplin_snapshots.go @@ -0,0 +1,398 @@ +package freezeblocks + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "os" + "path" + "path/filepath" + "sync" + "sync/atomic" + + "github.com/ledgerwatch/erigon-lib/common/background" + "github.com/ledgerwatch/erigon-lib/common/cmp" + "github.com/ledgerwatch/erigon-lib/common/dbg" + "github.com/ledgerwatch/erigon-lib/compress" + "github.com/ledgerwatch/erigon-lib/downloader/snaptype" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/recsplit" + "github.com/ledgerwatch/erigon/cl/persistence" + "github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format" + "github.com/ledgerwatch/erigon/eth/ethconfig" + "github.com/ledgerwatch/log/v3" +) + +type BeaconBlockSegment struct { + seg *compress.Decompressor // value: chunked(ssz(SignedBeaconBlocks)) + idxSlot *recsplit.Index // slot -> beacon_slot_segment_offset + ranges Range +} + +func (sn *BeaconBlockSegment) closeIdx() { + if sn.idxSlot != nil { + sn.idxSlot.Close() + sn.idxSlot = nil + } +} +func (sn *BeaconBlockSegment) closeSeg() { + if sn.seg != nil { + sn.seg.Close() + sn.seg = nil + } +} +func (sn *BeaconBlockSegment) close() { + sn.closeSeg() + sn.closeIdx() +} +func (sn *BeaconBlockSegment) reopenSeg(dir string) (err error) { + sn.closeSeg() + fileName := snaptype.SegmentFileName(sn.ranges.from, sn.ranges.to, snaptype.BeaconBlocks) + sn.seg, err = compress.NewDecompressor(path.Join(dir, fileName)) + if err != nil { + return fmt.Errorf("%w, fileName: %s", err, fileName) + } + return nil +} +func (sn *BeaconBlockSegment) reopenIdxIfNeed(dir string, optimistic bool) (err error) { + if sn.idxSlot != nil { + return nil + } + err = sn.reopenIdx(dir) + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + if optimistic { + log.Warn("[snapshots] open index", "err", err) + } else { + return err + } + } + } + return nil +} + +func (sn *BeaconBlockSegment) reopenIdx(dir string) (err error) { + sn.closeIdx() + if sn.seg == nil { + return nil + } + fileName := snaptype.IdxFileName(sn.ranges.from, sn.ranges.to, snaptype.BeaconBlocks.String()) + sn.idxSlot, err = recsplit.OpenIndex(path.Join(dir, fileName)) + if err != nil { + return fmt.Errorf("%w, fileName: %s", err, fileName) + } + if sn.idxSlot.ModTime().Before(sn.seg.ModTime()) { + // Index has been created before the segment file, needs to be ignored (and rebuilt) as inconsistent + sn.idxSlot.Close() + sn.idxSlot = nil + } + return nil +} + +type beaconBlockSegments struct { + lock sync.RWMutex + segments []*BeaconBlockSegment +} + +func (s *beaconBlockSegments) View(f func(segments []*BeaconBlockSegment) error) error { + s.lock.RLock() + defer s.lock.RUnlock() + return f(s.segments) +} + +func BeaconBlocksIdx(ctx context.Context, sn snaptype.FileInfo, segmentFilePath string, blockFrom, blockTo uint64, snapDir string, tmpDir string, p *background.Progress, lvl log.Lvl, logger log.Logger) (err error) { + defer func() { + if rec := recover(); rec != nil { + err = fmt.Errorf("BeaconBlocksIdx: at=%d-%d, %v, %s", blockFrom, blockTo, rec, dbg.Stack()) + } + }() + + // Calculate how many records there will be in the index + d, err := compress.NewDecompressor(path.Join(snapDir, segmentFilePath)) + if err != nil { + return err + } + defer d.Close() + + _, fname := filepath.Split(segmentFilePath) + p.Name.Store(&fname) + p.Total.Store(uint64(d.Count())) + + if err := Idx(ctx, d, sn.From, tmpDir, log.LvlDebug, func(idx *recsplit.RecSplit, i, offset uint64, word []byte) error { + if i%100_000 == 0 { + logger.Log(lvl, "Compressing beacon blocks", "progress", i) + } + p.Processed.Add(1) + num := make([]byte, 8) + n := binary.PutUvarint(num, i) + if err := idx.AddKey(num[:n], offset); err != nil { + return err + } + return nil + }, logger); err != nil { + return fmt.Errorf("BodyNumberIdx: %w", err) + } + + return nil +} + +type CaplinSnapshots struct { + indicesReady atomic.Bool + segmentsReady atomic.Bool + + BeaconBlocks *beaconBlockSegments + + dir string + segmentsMax atomic.Uint64 // all types of .seg files are available - up to this number + idxMax atomic.Uint64 // all types of .idx files are available - up to this number + cfg ethconfig.BlocksFreezing + logger log.Logger +} + +// NewCaplinSnapshots - opens all snapshots. But to simplify everything: +// - it opens snapshots only on App start and immutable after +// - all snapshots of given blocks range must exist - to make this blocks range available +// - gaps are not allowed +// - segment have [from:to) semantic +func NewCaplinSnapshots(cfg ethconfig.BlocksFreezing, snapDir string, logger log.Logger) *CaplinSnapshots { + return &CaplinSnapshots{dir: snapDir, cfg: cfg, BeaconBlocks: &beaconBlockSegments{}, logger: logger} +} + +func (s *CaplinSnapshots) IndicesMax() uint64 { return s.idxMax.Load() } +func (s *CaplinSnapshots) SegmentsMax() uint64 { return s.segmentsMax.Load() } +func (s *CaplinSnapshots) BlocksAvailable() uint64 { + return cmp.Min(s.segmentsMax.Load(), s.idxMax.Load()) +} + +// ReopenList stops on optimistic=false, continue opening files on optimistic=true +func (s *CaplinSnapshots) ReopenList(fileNames []string, optimistic bool) error { + s.BeaconBlocks.lock.Lock() + defer s.BeaconBlocks.lock.Unlock() + + s.closeWhatNotInList(fileNames) + var segmentsMax uint64 + var segmentsMaxSet bool +Loop: + for _, fName := range fileNames { + f, ok := snaptype.ParseFileName(s.dir, fName) + if !ok { + continue + } + var processed bool = true + + switch f.T { + case snaptype.BeaconBlocks: + var sn *BeaconBlockSegment + var exists bool + for _, sn2 := range s.BeaconBlocks.segments { + if sn2.seg == nil { // it's ok if some segment was not able to open + continue + } + if fName == sn2.seg.FileName() { + sn = sn2 + exists = true + break + } + } + if !exists { + sn = &BeaconBlockSegment{ranges: Range{f.From, f.To}} + } + if err := sn.reopenSeg(s.dir); err != nil { + if errors.Is(err, os.ErrNotExist) { + if optimistic { + continue Loop + } else { + break Loop + } + } + if optimistic { + s.logger.Warn("[snapshots] open segment", "err", err) + continue Loop + } else { + return err + } + } + + if !exists { + // it's possible to iterate over .seg file even if you don't have index + // then make segment available even if index open may fail + s.BeaconBlocks.segments = append(s.BeaconBlocks.segments, sn) + } + if err := sn.reopenIdxIfNeed(s.dir, optimistic); err != nil { + return err + } + } + + if processed { + if f.To > 0 { + segmentsMax = f.To - 1 + } else { + segmentsMax = 0 + } + segmentsMaxSet = true + } + } + if segmentsMaxSet { + s.segmentsMax.Store(segmentsMax) + } + s.segmentsReady.Store(true) + s.idxMax.Store(s.idxAvailability()) + s.indicesReady.Store(true) + + return nil +} + +func (s *CaplinSnapshots) idxAvailability() uint64 { + var beaconBlocks uint64 + for _, seg := range s.BeaconBlocks.segments { + if seg.idxSlot == nil { + break + } + beaconBlocks = seg.ranges.to - 1 + } + return beaconBlocks +} + +func (s *CaplinSnapshots) ReopenFolder() error { + files, _, err := SegmentsCaplin(s.dir) + if err != nil { + return err + } + list := make([]string, 0, len(files)) + for _, f := range files { + _, fName := filepath.Split(f.Path) + list = append(list, fName) + } + return s.ReopenList(list, false) +} + +func (s *CaplinSnapshots) closeWhatNotInList(l []string) { +Loop1: + for i, sn := range s.BeaconBlocks.segments { + if sn.seg == nil { + continue Loop1 + } + _, name := filepath.Split(sn.seg.FilePath()) + for _, fName := range l { + if fName == name { + continue Loop1 + } + } + sn.close() + s.BeaconBlocks.segments[i] = nil + } + var i int + for i = 0; i < len(s.BeaconBlocks.segments) && s.BeaconBlocks.segments[i] != nil && s.BeaconBlocks.segments[i].seg != nil; i++ { + } + tail := s.BeaconBlocks.segments[i:] + s.BeaconBlocks.segments = s.BeaconBlocks.segments[:i] + for i = 0; i < len(tail); i++ { + if tail[i] != nil { + tail[i].close() + tail[i] = nil + } + } +} + +type CaplinView struct { + s *CaplinSnapshots + closed bool +} + +func (s *CaplinSnapshots) View() *CaplinView { + v := &CaplinView{s: s} + v.s.BeaconBlocks.lock.RLock() + return v +} + +func (v *CaplinView) Close() { + if v.closed { + return + } + v.closed = true + v.s.BeaconBlocks.lock.RUnlock() + +} + +func (v *CaplinView) BeaconBlocks() []*BeaconBlockSegment { return v.s.BeaconBlocks.segments } + +func (v *CaplinView) BeaconBlocksSegment(slot uint64) (*BeaconBlockSegment, bool) { + for _, seg := range v.BeaconBlocks() { + if !(slot >= seg.ranges.from && slot < seg.ranges.to) { + continue + } + return seg, true + } + return nil, false +} + +func dumpBeaconBlocksRange(ctx context.Context, db kv.RoDB, b persistence.BlockSource, fromSlot uint64, toSlot uint64, tmpDir, snapDir string, workers int, lvl log.Lvl, logger log.Logger) error { + segName := snaptype.SegmentFileName(fromSlot, toSlot, snaptype.BeaconBlocks) + f, _ := snaptype.ParseFileName(snapDir, segName) + + sn, err := compress.NewCompressor(ctx, "Snapshot BeaconBlocks", f.Path, tmpDir, compress.MinPatternScore, workers, lvl, logger) + if err != nil { + return err + } + defer sn.Close() + + tx, err := db.BeginRo(ctx) + if err != nil { + return err + } + defer tx.Rollback() + // Generate .seg file, which is just the list of beacon blocks. + for i := fromSlot; i < toSlot; i++ { + obj, err := b.GetBlock(ctx, tx, i) + if err != nil { + return err + } + if i%20_000 == 0 { + logger.Log(lvl, "Dumping beacon blocks", "progress", i) + } + if obj == nil { + + if err := sn.AddWord(nil); err != nil { + return err + } + continue + } + var buf bytes.Buffer + if err := snapshot_format.WriteBlockForSnapshot(obj.Data, &buf); err != nil { + return err + } + word := buf.Bytes() + + if err := sn.AddWord(word); err != nil { + return err + } + + } + if err := sn.Compress(); err != nil { + return fmt.Errorf("compress: %w", err) + } + // Generate .idx file, which is the slot => offset mapping. + p := &background.Progress{} + + return BeaconBlocksIdx(ctx, f, segName, fromSlot, toSlot, snapDir, tmpDir, p, lvl, logger) +} + +func DumpBeaconBlocks(ctx context.Context, db kv.RoDB, b persistence.BlockSource, fromSlot, toSlot, blocksPerFile uint64, tmpDir, snapDir string, workers int, lvl log.Lvl, logger log.Logger) error { + if blocksPerFile == 0 { + return nil + } + + for i := fromSlot; i < toSlot; i = chooseSegmentEnd(i, toSlot, blocksPerFile) { + if toSlot-i < blocksPerFile { + break + } + to := chooseSegmentEnd(i, toSlot, blocksPerFile) + logger.Log(lvl, "Dumping beacon blocks", "from", i, "to", to) + if err := dumpBeaconBlocksRange(ctx, db, b, i, to, tmpDir, snapDir, workers, lvl, logger); err != nil { + return err + } + } + return nil +}