From df0699a12b7b10a4fdeac542eb109609d4542413 Mon Sep 17 00:00:00 2001 From: Mark Holt <135143369+mh0lt@users.noreply.github.com> Date: Wed, 27 Dec 2023 14:56:57 +0000 Subject: [PATCH] Added sentry simulator implementation (#9087) This adds a simulator object with implements the SentryServer api but takes objects from a pre-existing snapshot file. If the snapshot is not available locally it will download and index the .seg file for the header range being asked for. It is created as follows: ```go sim, err := simulator.NewSentry(ctx, "mumbai", dataDir, 1, logger) ``` Where the arguments are: * ctx - a callable context where cancel will close the simulator torrent and file connections (it also has a Close method) * chain - the name of the chain to take the snapshots from * datadir - a directory potentially containing snapshot .seg files. If not files exist in this directory they will be downloaded * num peers - the number of peers the simulator should create * logger - the loger to log actions to It can be attached to a client as follows: ```go simClient := direct.NewSentryClientDirect(66, sim) ``` At the moment only very basic functionality is implemented: * get headers will return headers by range or hash (hash assumes a pre-downloaded .seg as it needs an index * the header replay semantics need to be confirmed * eth 65 and 66(+) messaging is supported * For details see: `simulator_test.go More advanced peer behavior (e.g. header rewriting) can be added Bodies/Transactions handling can be added --- cmd/silkworm_api/snapshot_idx.go | 2 +- p2p/sentry/simulator/sentry_simulator.go | 454 ++++++++++++++++++ p2p/sentry/simulator/simulator_test.go | 203 ++++++++ p2p/sentry/simulator/syncutil.go | 195 ++++++++ .../snapshotsync/freezeblocks/block_reader.go | 41 +- .../freezeblocks/block_snapshots.go | 212 ++++---- .../freezeblocks/block_snapshots_test.go | 4 +- .../freezeblocks/bor_snapshots.go | 42 +- 8 files changed, 1024 insertions(+), 129 deletions(-) create mode 100644 p2p/sentry/simulator/sentry_simulator.go create mode 100644 p2p/sentry/simulator/simulator_test.go create mode 100644 p2p/sentry/simulator/syncutil.go diff --git a/cmd/silkworm_api/snapshot_idx.go b/cmd/silkworm_api/snapshot_idx.go index 4265ef19471..8353255451b 100644 --- a/cmd/silkworm_api/snapshot_idx.go +++ b/cmd/silkworm_api/snapshot_idx.go @@ -97,7 +97,7 @@ func buildIndex(cliCtx *cli.Context, dataDir string, snapshotPaths []string) err jobProgress := &background.Progress{} ps.Add(jobProgress) defer ps.Delete(jobProgress) - return freezeblocks.HeadersIdx(ctx, chainConfig, segment.Path, segment.From, dirs.Tmp, jobProgress, logLevel, logger) + return freezeblocks.HeadersIdx(ctx, segment.Path, segment.From, dirs.Tmp, jobProgress, logLevel, logger) }) case snaptype.Bodies: g.Go(func() error { diff --git a/p2p/sentry/simulator/sentry_simulator.go b/p2p/sentry/simulator/sentry_simulator.go new file mode 100644 index 00000000000..ca7dce37979 --- /dev/null +++ b/p2p/sentry/simulator/sentry_simulator.go @@ -0,0 +1,454 @@ +package simulator + +import ( + "bytes" + "context" + "fmt" + "path/filepath" + + "github.com/ledgerwatch/erigon-lib/chain/snapcfg" + "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/downloader/snaptype" + "github.com/ledgerwatch/erigon-lib/gointerfaces" + sentry_if "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" + "github.com/ledgerwatch/erigon-lib/gointerfaces/types" + core_types "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/crypto" + "github.com/ledgerwatch/erigon/eth/ethconfig" + "github.com/ledgerwatch/erigon/eth/protocols/eth" + "github.com/ledgerwatch/erigon/p2p" + "github.com/ledgerwatch/erigon/p2p/discover/v4wire" + "github.com/ledgerwatch/erigon/p2p/enode" + "github.com/ledgerwatch/erigon/p2p/sentry" + "github.com/ledgerwatch/erigon/rlp" + "github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks" + "github.com/ledgerwatch/log/v3" + "google.golang.org/protobuf/types/known/emptypb" +) + +type server struct { + sentry_if.UnimplementedSentryServer + ctx context.Context + peers map[[64]byte]*p2p.Peer + messageReceivers map[sentry_if.MessageId][]sentry_if.Sentry_MessagesServer + logger log.Logger + //snapshotVersion uint8 + knownSnapshots *freezeblocks.RoSnapshots + activeSnapshots *freezeblocks.RoSnapshots + blockReader *freezeblocks.BlockReader + downloader *TorrentClient +} + +func newPeer(name string, caps []p2p.Cap) (*p2p.Peer, error) { + key, err := crypto.GenerateKey() + + if err != nil { + return nil, err + } + + return p2p.NewPeer(enode.PubkeyToIDV4(&key.PublicKey), v4wire.EncodePubkey(&key.PublicKey), name, caps, true), nil +} + +func NewSentry(ctx context.Context, chain string, snapshotLocation string, peerCount int, logger log.Logger) (sentry_if.SentryServer, error) { + peers := map[[64]byte]*p2p.Peer{} + + for i := 0; i < peerCount; i++ { + peer, err := newPeer(fmt.Sprint("peer-", i), nil) + + if err != nil { + return nil, err + } + peers[peer.Pubkey()] = peer + } + + cfg := snapcfg.KnownCfg(chain) + + knownSnapshots := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{ + Enabled: true, + Produce: false, + NoDownloader: true, + }, "" /*s.snapshotVersion,*/, logger) + + files := make([]string, 0, len(cfg.Preverified)) + + for _, item := range cfg.Preverified { + files = append(files, item.Name) + } + + knownSnapshots.InitSegments(files) + + //s.knownSnapshots.ReopenList([]string{ent2.Name()}, false) + activeSnapshots := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{ + Enabled: true, + Produce: false, + NoDownloader: true, + }, snapshotLocation /*s.snapshotVersion,*/, logger) + + if err := activeSnapshots.ReopenFolder(); err != nil { + return nil, err + } + + downloader, err := NewTorrentClient(ctx, chain, snapshotLocation, logger) + + if err != nil { + return nil, err + } + + s := &server{ + ctx: ctx, + peers: peers, + messageReceivers: map[sentry_if.MessageId][]sentry_if.Sentry_MessagesServer{}, + knownSnapshots: knownSnapshots, + activeSnapshots: activeSnapshots, + blockReader: freezeblocks.NewBlockReader(activeSnapshots, nil), + logger: logger, + downloader: downloader, + } + + go func() { + <-ctx.Done() + s.Close() + }() + + return s, nil +} + +func (s *server) Close() { + s.downloader.Close() + if closer, ok := s.downloader.cfg.DefaultStorage.(interface{ Close() error }); ok { + closer.Close() + } + s.activeSnapshots.Close() +} + +func (s *server) NodeInfo(context.Context, *emptypb.Empty) (*types.NodeInfoReply, error) { + return nil, fmt.Errorf("TODO") +} + +func (s *server) PeerById(ctx context.Context, in *sentry_if.PeerByIdRequest) (*sentry_if.PeerByIdReply, error) { + peerId := sentry.ConvertH512ToPeerID(in.PeerId) + + peer, ok := s.peers[peerId] + + if !ok { + return nil, fmt.Errorf("unknown peer") + } + + info := peer.Info() + + return &sentry_if.PeerByIdReply{ + Peer: &types.PeerInfo{ + Id: info.ID, + Name: info.Name, + Enode: info.Enode, + Enr: info.ENR, + Caps: info.Caps, + ConnLocalAddr: info.Network.LocalAddress, + ConnRemoteAddr: info.Network.RemoteAddress, + ConnIsInbound: info.Network.Inbound, + ConnIsTrusted: info.Network.Trusted, + ConnIsStatic: info.Network.Static, + }, + }, nil +} + +func (s *server) PeerCount(context.Context, *sentry_if.PeerCountRequest) (*sentry_if.PeerCountReply, error) { + return &sentry_if.PeerCountReply{Count: uint64(len(s.peers))}, nil +} + +func (s *server) PeerEvents(*sentry_if.PeerEventsRequest, sentry_if.Sentry_PeerEventsServer) error { + return fmt.Errorf("TODO") +} + +func (s *server) PeerMinBlock(context.Context, *sentry_if.PeerMinBlockRequest) (*emptypb.Empty, error) { + return nil, fmt.Errorf("TODO") +} + +func (s *server) Peers(context.Context, *emptypb.Empty) (*sentry_if.PeersReply, error) { + reply := &sentry_if.PeersReply{} + + for _, peer := range s.peers { + info := peer.Info() + + reply.Peers = append(reply.Peers, + &types.PeerInfo{ + Id: info.ID, + Name: info.Name, + Enode: info.Enode, + Enr: info.ENR, + Caps: info.Caps, + ConnLocalAddr: info.Network.LocalAddress, + ConnRemoteAddr: info.Network.RemoteAddress, + ConnIsInbound: info.Network.Inbound, + ConnIsTrusted: info.Network.Trusted, + ConnIsStatic: info.Network.Static, + }) + } + + return reply, nil +} + +func (s *server) SendMessageById(ctx context.Context, in *sentry_if.SendMessageByIdRequest) (*sentry_if.SentPeers, error) { + peerId := sentry.ConvertH512ToPeerID(in.PeerId) + + if err := s.sendMessageById(ctx, peerId, in.Data); err != nil { + return nil, err + } + + return &sentry_if.SentPeers{ + Peers: []*types.H512{in.PeerId}, + }, nil +} + +func (s *server) sendMessageById(ctx context.Context, peerId [64]byte, messageData *sentry_if.OutboundMessageData) error { + peer, ok := s.peers[peerId] + + if !ok { + return fmt.Errorf("unknown peer") + } + + switch messageData.Id { + case sentry_if.MessageId_GET_BLOCK_HEADERS_65: + packet := ð.GetBlockHeadersPacket{} + if err := rlp.DecodeBytes(messageData.Data, packet); err != nil { + return fmt.Errorf("failed to decode packet: %w", err) + } + + go s.processGetBlockHeaders(ctx, peer, 0, packet) + + case sentry_if.MessageId_GET_BLOCK_HEADERS_66: + packet := ð.GetBlockHeadersPacket66{} + if err := rlp.DecodeBytes(messageData.Data, packet); err != nil { + return fmt.Errorf("failed to decode packet: %w", err) + } + + go s.processGetBlockHeaders(ctx, peer, packet.RequestId, packet.GetBlockHeadersPacket) + + default: + return fmt.Errorf("unhandled message id: %s", messageData.Id) + } + + return nil +} + +func (s *server) SendMessageByMinBlock(ctx context.Context, request *sentry_if.SendMessageByMinBlockRequest) (*sentry_if.SentPeers, error) { + return s.UnimplementedSentryServer.SendMessageByMinBlock(ctx, request) +} + +func (s *server) SendMessageToAll(ctx context.Context, data *sentry_if.OutboundMessageData) (*sentry_if.SentPeers, error) { + sentPeers := &sentry_if.SentPeers{} + + for _, peer := range s.peers { + peerKey := peer.Pubkey() + + if err := s.sendMessageById(ctx, peerKey, data); err != nil { + return sentPeers, err + } + + sentPeers.Peers = append(sentPeers.Peers, gointerfaces.ConvertBytesToH512(peerKey[:])) + } + + return sentPeers, nil +} + +func (s *server) SendMessageToRandomPeers(ctx context.Context, request *sentry_if.SendMessageToRandomPeersRequest) (*sentry_if.SentPeers, error) { + sentPeers := &sentry_if.SentPeers{} + + var i uint64 + + for _, peer := range s.peers { + peerKey := peer.Pubkey() + + if err := s.sendMessageById(ctx, peerKey, request.Data); err != nil { + return sentPeers, err + } + + sentPeers.Peers = append(sentPeers.Peers, gointerfaces.ConvertBytesToH512(peerKey[:])) + + i++ + + if i == request.MaxPeers { + break + } + } + + return sentPeers, nil + +} + +func (s *server) Messages(request *sentry_if.MessagesRequest, receiver sentry_if.Sentry_MessagesServer) error { + for _, messageId := range request.Ids { + receivers := s.messageReceivers[messageId] + s.messageReceivers[messageId] = append(receivers, receiver) + } + + <-s.ctx.Done() + + return nil +} + +func (s *server) processGetBlockHeaders(ctx context.Context, peer *p2p.Peer, requestId uint64, request *eth.GetBlockHeadersPacket) { + r65 := s.messageReceivers[sentry_if.MessageId_BLOCK_HEADERS_65] + r66 := s.messageReceivers[sentry_if.MessageId_BLOCK_HEADERS_66] + + if len(r65)+len(r66) > 0 { + + peerKey := peer.Pubkey() + peerId := gointerfaces.ConvertBytesToH512(peerKey[:]) + + headers, err := s.getHeaders(ctx, request.Origin, request.Amount, request.Skip, request.Reverse) + + if err != nil { + s.logger.Warn("Can't get headers", "error", err) + return + } + + if len(r65) > 0 { + var data bytes.Buffer + + err := rlp.Encode(&data, headers) + + if err != nil { + s.logger.Warn("Can't encode headers", "error", err) + return + } + + for _, receiver := range r65 { + receiver.Send(&sentry_if.InboundMessage{ + Id: sentry_if.MessageId_BLOCK_HEADERS_65, + Data: data.Bytes(), + PeerId: peerId, + }) + } + } + + if len(r66) > 0 { + var data bytes.Buffer + + err := rlp.Encode(&data, ð.BlockHeadersPacket66{ + RequestId: requestId, + BlockHeadersPacket: headers, + }) + + if err != nil { + fmt.Printf("Error (move to logger): %s", err) + return + } + + for _, receiver := range r66 { + receiver.Send(&sentry_if.InboundMessage{ + Id: sentry_if.MessageId_BLOCK_HEADERS_66, + Data: data.Bytes(), + PeerId: peerId, + }) + } + } + } +} + +func (s *server) getHeaders(ctx context.Context, origin eth.HashOrNumber, amount uint64, skip uint64, reverse bool) (eth.BlockHeadersPacket, error) { + + var headers eth.BlockHeadersPacket + + var next uint64 + + nextBlockNum := func(blockNum uint64) uint64 { + inc := uint64(1) + + if skip != 0 { + inc = skip + } + + if reverse { + return blockNum - inc + } else { + return blockNum + inc + } + } + + if origin.Hash != (common.Hash{}) { + header, err := s.getHeaderByHash(ctx, origin.Hash) + + if err != nil { + return nil, err + } + + headers = append(headers, header) + + next = nextBlockNum(header.Number.Uint64()) + } else { + header, err := s.getHeader(ctx, origin.Number) + + if err != nil { + return nil, err + } + + headers = append(headers, header) + + next = nextBlockNum(header.Number.Uint64()) + } + + for len(headers) < int(amount) { + header, err := s.getHeader(ctx, next) + + if err != nil { + return nil, err + } + + headers = append(headers, header) + + next = nextBlockNum(header.Number.Uint64()) + } + + return headers, nil +} + +func (s *server) getHeader(ctx context.Context, blockNum uint64) (*core_types.Header, error) { + header, err := s.blockReader.Header(ctx, nil, common.Hash{}, blockNum) + + if err != nil { + return nil, err + } + + if header == nil { + view := s.knownSnapshots.View() + defer view.Close() + + if seg, ok := view.HeadersSegment(blockNum); ok { + if err := s.downloadHeaders(ctx, seg); err != nil { + return nil, err + } + } + + s.activeSnapshots.ReopenSegments([]snaptype.Type{snaptype.Headers}) + + header, err = s.blockReader.Header(ctx, nil, common.Hash{}, blockNum) + + if err != nil { + return nil, err + } + } + + return header, nil +} + +func (s *server) getHeaderByHash(ctx context.Context, hash common.Hash) (*core_types.Header, error) { + return s.blockReader.HeaderByHash(ctx, nil, hash) +} + +func (s *server) downloadHeaders(ctx context.Context, header *freezeblocks.HeaderSegment) error { + fileName := snaptype.SegmentFileName(header.From(), header.To(), snaptype.Headers) + + s.logger.Info(fmt.Sprintf("Downloading %s", fileName)) + + err := s.downloader.Download(ctx, fileName) + + if err != nil { + return fmt.Errorf("can't download %s: %w", fileName, err) + } + + s.logger.Info(fmt.Sprintf("Indexing %s", fileName)) + + return freezeblocks.HeadersIdx(ctx, + filepath.Join(s.downloader.LocalFsRoot(), fileName), header.From(), s.downloader.LocalFsRoot(), nil, log.LvlDebug, s.logger) +} diff --git a/p2p/sentry/simulator/simulator_test.go b/p2p/sentry/simulator/simulator_test.go new file mode 100644 index 00000000000..3821bb88bf7 --- /dev/null +++ b/p2p/sentry/simulator/simulator_test.go @@ -0,0 +1,203 @@ +//go:build integration + +package simulator_test + +import ( + "bytes" + "context" + "testing" + + "github.com/ledgerwatch/erigon-lib/direct" + "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" + sentry_if "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" + "github.com/ledgerwatch/erigon/eth/protocols/eth" + "github.com/ledgerwatch/erigon/p2p/sentry/simulator" + "github.com/ledgerwatch/erigon/rlp" + "github.com/ledgerwatch/log/v3" +) + +func TestSimulatorStart(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + + defer cancel() + + logger := log.New() + logger.SetHandler(log.StdoutHandler) + dataDir := t.TempDir() + + sim, err := simulator.NewSentry(ctx, "mumbai", dataDir, 1, logger) + + if err != nil { + t.Fatal(err) + } + + simClient := direct.NewSentryClientDirect(66, sim) + + peerCount, err := simClient.PeerCount(ctx, &sentry.PeerCountRequest{}) + + if err != nil { + t.Fatal(err) + } + + if peerCount.Count != 1 { + t.Fatal("Invalid response count: expected:", 1, "got:", peerCount.Count) + } + + receiver, err := simClient.Messages(ctx, &sentry.MessagesRequest{ + Ids: []sentry.MessageId{sentry.MessageId_BLOCK_HEADERS_66}, + }) + + if err != nil { + t.Fatal(err) + } + + getHeaders66 := ð.GetBlockHeadersPacket66{ + RequestId: 1, + GetBlockHeadersPacket: ð.GetBlockHeadersPacket{ + Origin: eth.HashOrNumber{Number: 10}, + Amount: 10, + }, + } + + var data bytes.Buffer + + err = rlp.Encode(&data, getHeaders66) + + if err != nil { + t.Fatal(err) + } + + peers, err := simClient.SendMessageToAll(ctx, &sentry.OutboundMessageData{ + Id: sentry_if.MessageId_GET_BLOCK_HEADERS_66, + Data: data.Bytes(), + }) + + if err != nil { + t.Fatal(err) + } + + if len(peers.Peers) != int(peerCount.Count) { + t.Fatal("Unexpected peer count expected:", peerCount.Count, len(peers.Peers)) + } + + message, err := receiver.Recv() + + if err != nil { + t.Fatal(err) + } + + if message.Id != sentry_if.MessageId_BLOCK_HEADERS_66 { + t.Fatal("unexpected message id expected:", sentry_if.MessageId_BLOCK_HEADERS_66, "got:", message.Id) + } + + var expectedPeer bool + + for _, peer := range peers.Peers { + if message.PeerId.String() == peer.String() { + expectedPeer = true + break + } + } + + if !expectedPeer { + t.Fatal("message received from unexpected peer:", message.PeerId) + } + + packet := ð.BlockHeadersPacket66{} + + if err := rlp.DecodeBytes(message.Data, packet); err != nil { + t.Fatal("failed to decode packet:", err) + } + + if len(packet.BlockHeadersPacket) != 10 { + t.Fatal("unexpected header count: expected:", 10, "got:", len(packet.BlockHeadersPacket)) + } + + blockNum := uint64(10) + + for _, header := range packet.BlockHeadersPacket { + if header.Number.Uint64() != blockNum { + t.Fatal("unexpected block number: expected:", blockNum, "got:", header.Number) + } + + blockNum++ + } + + simClient65 := direct.NewSentryClientDirect(65, sim) + + getHeaders65 := ð.GetBlockHeadersPacket{ + Origin: eth.HashOrNumber{Number: 100}, + Amount: 50, + } + + data.Reset() + + err = rlp.Encode(&data, getHeaders65) + + if err != nil { + t.Fatal(err) + } + + peers65, err := simClient65.SendMessageById(ctx, &sentry_if.SendMessageByIdRequest{ + Data: &sentry.OutboundMessageData{ + Id: sentry_if.MessageId_GET_BLOCK_HEADERS_65, + Data: data.Bytes(), + }, + PeerId: peers.Peers[0], + }) + + if err != nil { + t.Fatal(err) + } + + if len(peers65.Peers) != 1 { + t.Fatal("message sent to unexpected number of peers:", len(peers65.Peers)) + } + + if peers65.Peers[0].String() != peers.Peers[0].String() { + t.Fatal("message sent to unexpected number of peers", peers65.Peers[0]) + } + + receiver65, err := simClient65.Messages(ctx, &sentry.MessagesRequest{ + Ids: []sentry.MessageId{sentry.MessageId_BLOCK_HEADERS_65}, + }) + + if err != nil { + t.Fatal(err) + } + + message, err = receiver65.Recv() + + if err != nil { + t.Fatal(err) + } + + if message.Id != sentry_if.MessageId_BLOCK_HEADERS_65 { + t.Fatal("unexpected message id expected:", sentry_if.MessageId_BLOCK_HEADERS_65, "got:", message.Id) + } + + if message.PeerId.String() != peers.Peers[0].String() { + t.Fatal("message received from unexpected peer:", message.PeerId) + } + + packet65 := eth.BlockHeadersPacket{} + + if err := rlp.DecodeBytes(message.Data, &packet65); err != nil { + t.Fatal("failed to decode packet:", err) + } + + if len(packet65) != 50 { + t.Fatal("unexpected header count: expected:", 50, "got:", len(packet.BlockHeadersPacket)) + } + + blockNum = uint64(100) + + for _, header := range packet65 { + if header.Number.Uint64() != blockNum { + t.Fatal("unexpected block number: expected:", blockNum, "got:", header.Number) + } + + blockNum++ + } +} diff --git a/p2p/sentry/simulator/syncutil.go b/p2p/sentry/simulator/syncutil.go new file mode 100644 index 00000000000..eb7980ceeee --- /dev/null +++ b/p2p/sentry/simulator/syncutil.go @@ -0,0 +1,195 @@ +package simulator + +import ( + "context" + "fmt" + "io/fs" + "os" + "path/filepath" + "runtime" + + "github.com/anacrolix/torrent" + "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/storage" + "github.com/c2h5oh/datasize" + "github.com/ledgerwatch/erigon-lib/chain/snapcfg" + "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common/datadir" + "github.com/ledgerwatch/erigon-lib/downloader" + "github.com/ledgerwatch/erigon-lib/downloader/downloadercfg" + "github.com/ledgerwatch/erigon-lib/downloader/snaptype" + "github.com/ledgerwatch/erigon/cmd/downloader/downloadernat" + "github.com/ledgerwatch/erigon/cmd/utils" + "github.com/ledgerwatch/erigon/p2p/nat" + "github.com/ledgerwatch/erigon/params" + "github.com/ledgerwatch/log/v3" + "golang.org/x/sync/errgroup" +) + +// The code in this file is taken from cmd/snapshots - which is yet to be merged +// to devel - once tthat is done this file can be removed + +type TorrentClient struct { + *torrent.Client + cfg *torrent.ClientConfig + items map[string]snapcfg.PreverifiedItem +} + +func NewTorrentClient(ctx context.Context, chain string, torrentDir string, logger log.Logger) (*TorrentClient, error) { + + relativeDataDir := torrentDir + if torrentDir != "" { + var err error + absdatadir, err := filepath.Abs(torrentDir) + if err != nil { + panic(err) + } + torrentDir = absdatadir + } + + dirs := datadir.Dirs{ + RelativeDataDir: relativeDataDir, + DataDir: torrentDir, + Snap: torrentDir, + } + + webseedsList := common.CliString2Array(utils.WebSeedsFlag.Value) + + if known, ok := snapcfg.KnownWebseeds[chain]; ok { + webseedsList = append(webseedsList, known...) + } + + var downloadRate, uploadRate datasize.ByteSize + + if err := downloadRate.UnmarshalText([]byte(utils.TorrentDownloadRateFlag.Value)); err != nil { + return nil, err + } + + if err := uploadRate.UnmarshalText([]byte(utils.TorrentUploadRateFlag.Value)); err != nil { + return nil, err + } + + logLevel, _, err := downloadercfg.Int2LogLevel(utils.TorrentVerbosityFlag.Value) + + if err != nil { + return nil, err + } + + version := "erigon: " + params.VersionWithCommit(params.GitCommit) + + cfg, err := downloadercfg.New(dirs, version, logLevel, downloadRate, uploadRate, + utils.TorrentPortFlag.Value, utils.TorrentConnsPerFileFlag.Value, 0, nil, webseedsList, chain) + + if err != nil { + return nil, err + } + + if err := os.MkdirAll(torrentDir, 0755); err != nil { + return nil, err + } + + cfg.ClientConfig.DataDir = torrentDir + + cfg.ClientConfig.PieceHashersPerTorrent = 32 * runtime.NumCPU() + cfg.ClientConfig.DisableIPv6 = utils.DisableIPV6.Value + cfg.ClientConfig.DisableIPv4 = utils.DisableIPV4.Value + + natif, err := nat.Parse(utils.NATFlag.Value) + + if err != nil { + return nil, fmt.Errorf("invalid nat option %s: %w", utils.NATFlag.Value, err) + } + + downloadernat.DoNat(natif, cfg.ClientConfig, logger) + + cfg.ClientConfig.DefaultStorage = storage.NewMMap(torrentDir) + + cli, err := torrent.NewClient(cfg.ClientConfig) + + if err != nil { + return nil, fmt.Errorf("can't create torrent client: %w", err) + } + + items := map[string]snapcfg.PreverifiedItem{} + for _, it := range snapcfg.KnownCfg(chain).Preverified { + items[it.Name] = it + } + + return &TorrentClient{cli, cfg.ClientConfig, items}, nil +} + +func (s *TorrentClient) LocalFsRoot() string { + return s.cfg.DataDir +} + +func (s *TorrentClient) Download(ctx context.Context, files ...string) error { + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(len(files)) + + for _, f := range files { + file := f + + g.Go(func() error { + it, ok := s.items[file] + + if !ok { + return fs.ErrNotExist + } + + t, err := func() (*torrent.Torrent, error) { + infoHash := snaptype.Hex2InfoHash(it.Hash) + + for _, t := range s.Torrents() { + if t.Name() == file { + return t, nil + } + } + + mi := &metainfo.MetaInfo{AnnounceList: downloader.Trackers} + magnet := mi.Magnet(&infoHash, &metainfo.Info{Name: file}) + spec, err := torrent.TorrentSpecFromMagnetUri(magnet.String()) + + if err != nil { + return nil, err + } + + spec.DisallowDataDownload = true + + t, _, err := s.AddTorrentSpec(spec) + if err != nil { + return nil, err + } + + return t, nil + }() + + if err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.GotInfo(): + } + + if !t.Complete.Bool() { + t.AllowDataDownload() + t.DownloadAll() + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.Complete.On(): + } + } + + closed := t.Closed() + t.Drop() + <-closed + + return nil + }) + } + + return g.Wait() +} diff --git a/turbo/snapshotsync/freezeblocks/block_reader.go b/turbo/snapshotsync/freezeblocks/block_reader.go index cb9dfc3f0f4..6b384929ce3 100644 --- a/turbo/snapshotsync/freezeblocks/block_reader.go +++ b/turbo/snapshotsync/freezeblocks/block_reader.go @@ -5,10 +5,11 @@ import ( "context" "encoding/binary" "fmt" - "github.com/ledgerwatch/erigon/consensus/bor" "math" "sort" + "github.com/ledgerwatch/erigon/consensus/bor" + "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/dbg" "github.com/ledgerwatch/erigon-lib/common/length" @@ -247,7 +248,13 @@ type BlockReader struct { } func NewBlockReader(snapshots services.BlockSnapshots, borSnapshots services.BlockSnapshots) *BlockReader { - return &BlockReader{sn: snapshots.(*RoSnapshots), borSn: borSnapshots.(*BorRoSnapshots)} + var borSn *BorRoSnapshots + + if borSnapshots != nil { + borSn = borSnapshots.(*BorRoSnapshots) + } + + return &BlockReader{sn: snapshots.(*RoSnapshots), borSn: borSn} } func (r *BlockReader) CanPruneTo(currentBlockInDB uint64) uint64 { @@ -366,9 +373,11 @@ func (r *BlockReader) CanonicalHash(ctx context.Context, tx kv.Getter, blockHeig } func (r *BlockReader) Header(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (h *types.Header, err error) { - h = rawdb.ReadHeader(tx, hash, blockHeight) - if h != nil { - return h, nil + if tx != nil { + h = rawdb.ReadHeader(tx, hash, blockHeight) + if h != nil { + return h, nil + } } view := r.sn.View() @@ -577,7 +586,7 @@ func (r *BlockReader) headerFromSnapshot(blockHeight uint64, sn *HeaderSegment, func (r *BlockReader) headerFromSnapshotByHash(hash common.Hash, sn *HeaderSegment, buf []byte) (*types.Header, error) { defer func() { if rec := recover(); rec != nil { - panic(fmt.Errorf("%+v, snapshot: %d-%d, trace: %s", rec, sn.ranges.from, sn.ranges.to, dbg.Stack())) + panic(fmt.Errorf("%+v, snapshot: %d-%d, trace: %s", rec, sn.from, sn.to, dbg.Stack())) } }() // avoid crash because Erigon's core does many things @@ -629,7 +638,7 @@ func (r *BlockReader) bodyFromSnapshot(blockHeight uint64, sn *BodySegment, buf func (r *BlockReader) bodyForStorageFromSnapshot(blockHeight uint64, sn *BodySegment, buf []byte) (*types.BodyForStorage, []byte, error) { defer func() { if rec := recover(); rec != nil { - panic(fmt.Errorf("%+v, snapshot: %d-%d, trace: %s", rec, sn.ranges.from, sn.ranges.to, dbg.Stack())) + panic(fmt.Errorf("%+v, snapshot: %d-%d, trace: %s", rec, sn.from, sn.to, dbg.Stack())) } }() // avoid crash because Erigon's core does many things @@ -659,7 +668,7 @@ func (r *BlockReader) bodyForStorageFromSnapshot(blockHeight uint64, sn *BodySeg func (r *BlockReader) txsFromSnapshot(baseTxnID uint64, txsAmount uint32, txsSeg *TxnSegment, buf []byte) (txs []types.Transaction, senders []common.Address, err error) { defer func() { if rec := recover(); rec != nil { - panic(fmt.Errorf("%+v, snapshot: %d-%d, trace: %s", rec, txsSeg.ranges.from, txsSeg.ranges.to, dbg.Stack())) + panic(fmt.Errorf("%+v, snapshot: %d-%d, trace: %s", rec, txsSeg.from, txsSeg.to, dbg.Stack())) } }() // avoid crash because Erigon's core does many things @@ -843,7 +852,7 @@ func (r *BlockReader) IterateFrozenBodies(f func(blockNum, baseTxNum, txAmount u var buf []byte g := sn.seg.MakeGetter() - blockNum := sn.ranges.from + blockNum := sn.from var b types.BodyForStorage for g.HasNext() { buf, _ = g.Next(buf[:0]) @@ -1045,10 +1054,10 @@ func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.H result := []rlp.RawValue{} for i := len(segments) - 1; i >= 0; i-- { sn := segments[i] - if sn.ranges.from > blockHeight { + if sn.from > blockHeight { continue } - if sn.ranges.to <= blockHeight { + if sn.to <= blockHeight { continue } if sn.IdxBorTxnHash == nil { @@ -1117,7 +1126,7 @@ func (r *BlockReader) LastFrozenSpanID() uint64 { return 0 } - lastSpanID := bor.SpanIDAt(lastSegment.ranges.to) + lastSpanID := bor.SpanIDAt(lastSegment.to) if lastSpanID > 0 { lastSpanID-- } @@ -1150,11 +1159,11 @@ func (r *BlockReader) Span(ctx context.Context, tx kv.Getter, spanId uint64) ([] if sn.idx == nil { continue } - spanFrom := bor.SpanIDAt(sn.ranges.from) + spanFrom := bor.SpanIDAt(sn.from) if spanId < spanFrom { continue } - spanTo := bor.SpanIDAt(sn.ranges.to) + spanTo := bor.SpanIDAt(sn.to) if spanId >= spanTo { continue } @@ -1190,10 +1199,10 @@ func (r *BlockReader) Integrity(ctx context.Context) error { view := r.sn.View() defer view.Close() for _, seg := range view.Headers() { - if err := r.ensureHeaderNumber(seg.ranges.from, seg); err != nil { + if err := r.ensureHeaderNumber(seg.from, seg); err != nil { return err } - if err := r.ensureHeaderNumber(seg.ranges.to-1, seg); err != nil { + if err := r.ensureHeaderNumber(seg.to-1, seg); err != nil { return err } } diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index 0346f061cbe..36c3f5198df 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -57,20 +57,20 @@ import ( type HeaderSegment struct { seg *compress.Decompressor // value: first_byte_of_header_hash + header_rlp idxHeaderHash *recsplit.Index // header_hash -> headers_segment_offset - ranges Range + Range } type BodySegment struct { seg *compress.Decompressor // value: rlp(types.BodyForStorage) idxBodyNumber *recsplit.Index // block_num_u64 -> bodies_segment_offset - ranges Range + Range } type TxnSegment struct { Seg *compress.Decompressor // value: first_byte_of_transaction_hash + sender_address + transaction_rlp IdxTxnHash *recsplit.Index // transaction_hash -> transactions_segment_offset IdxTxnHash2BlockNum *recsplit.Index // transaction_hash -> block_number - ranges Range + Range } func (sn *HeaderSegment) closeIdx() { @@ -91,7 +91,7 @@ func (sn *HeaderSegment) close() { } func (sn *HeaderSegment) reopenSeg(dir string) (err error) { sn.closeSeg() - fileName := snaptype.SegmentFileName(sn.ranges.from, sn.ranges.to, snaptype.Headers) + fileName := snaptype.SegmentFileName(sn.from, sn.to, snaptype.Headers) sn.seg, err = compress.NewDecompressor(path.Join(dir, fileName)) if err != nil { return fmt.Errorf("%w, fileName: %s", err, fileName) @@ -119,7 +119,7 @@ func (sn *HeaderSegment) reopenIdx(dir string) (err error) { if sn.seg == nil { return nil } - fileName := snaptype.IdxFileName(sn.ranges.from, sn.ranges.to, snaptype.Headers.String()) + fileName := snaptype.IdxFileName(sn.from, sn.to, snaptype.Headers.String()) sn.idxHeaderHash, err = recsplit.OpenIndex(path.Join(dir, fileName)) if err != nil { return fmt.Errorf("%w, fileName: %s", err, fileName) @@ -147,7 +147,7 @@ func (sn *BodySegment) close() { func (sn *BodySegment) reopenSeg(dir string) (err error) { sn.closeSeg() - fileName := snaptype.SegmentFileName(sn.ranges.from, sn.ranges.to, snaptype.Bodies) + fileName := snaptype.SegmentFileName(sn.from, sn.to, snaptype.Bodies) sn.seg, err = compress.NewDecompressor(path.Join(dir, fileName)) if err != nil { return fmt.Errorf("%w, fileName: %s", err, fileName) @@ -176,7 +176,7 @@ func (sn *BodySegment) reopenIdx(dir string) (err error) { if sn.seg == nil { return nil } - fileName := snaptype.IdxFileName(sn.ranges.from, sn.ranges.to, snaptype.Bodies.String()) + fileName := snaptype.IdxFileName(sn.from, sn.to, snaptype.Bodies.String()) sn.idxBodyNumber, err = recsplit.OpenIndex(path.Join(dir, fileName)) if err != nil { return fmt.Errorf("%w, fileName: %s", err, fileName) @@ -206,7 +206,7 @@ func (sn *TxnSegment) close() { } func (sn *TxnSegment) reopenSeg(dir string) (err error) { sn.closeSeg() - fileName := snaptype.SegmentFileName(sn.ranges.from, sn.ranges.to, snaptype.Transactions) + fileName := snaptype.SegmentFileName(sn.from, sn.to, snaptype.Transactions) sn.Seg, err = compress.NewDecompressor(path.Join(dir, fileName)) if err != nil { return fmt.Errorf("%w, fileName: %s", err, fileName) @@ -218,7 +218,7 @@ func (sn *TxnSegment) reopenIdx(dir string) (err error) { if sn.Seg == nil { return nil } - fileName := snaptype.IdxFileName(sn.ranges.from, sn.ranges.to, snaptype.Transactions.String()) + fileName := snaptype.IdxFileName(sn.from, sn.to, snaptype.Transactions.String()) sn.IdxTxnHash, err = recsplit.OpenIndex(path.Join(dir, fileName)) if err != nil { return fmt.Errorf("%w, fileName: %s", err, fileName) @@ -239,7 +239,7 @@ func (sn *TxnSegment) reopenIdx(dir string) (err error) { } */ - fileName = snaptype.IdxFileName(sn.ranges.from, sn.ranges.to, snaptype.Transactions2Block.String()) + fileName = snaptype.IdxFileName(sn.from, sn.to, snaptype.Transactions2Block.String()) sn.IdxTxnHash2BlockNum, err = recsplit.OpenIndex(path.Join(dir, fileName)) if err != nil { return fmt.Errorf("%w, fileName: %s", err, fileName) @@ -289,7 +289,7 @@ func (s *bodySegments) ViewSegment(blockNum uint64, f func(*BodySegment) error) s.lock.RLock() defer s.lock.RUnlock() for _, seg := range s.segments { - if !(blockNum >= seg.ranges.from && blockNum < seg.ranges.to) { + if !(blockNum >= seg.from && blockNum < seg.to) { continue } return true, f(seg) @@ -311,7 +311,7 @@ func (s *txnSegments) ViewSegment(blockNum uint64, f func(*TxnSegment) error) (f s.lock.RLock() defer s.lock.RUnlock() for _, seg := range s.segments { - if !(blockNum >= seg.ranges.from && blockNum < seg.ranges.to) { + if !(blockNum >= seg.from && blockNum < seg.to) { continue } return true, f(seg) @@ -445,19 +445,19 @@ func (s *RoSnapshots) idxAvailability() uint64 { if seg.idxHeaderHash == nil { break } - headers = seg.ranges.to - 1 + headers = seg.to - 1 } for _, seg := range s.Bodies.segments { if seg.idxBodyNumber == nil { break } - bodies = seg.ranges.to - 1 + bodies = seg.to - 1 } for _, seg := range s.Txs.segments { if seg.IdxTxnHash == nil || seg.IdxTxnHash2BlockNum == nil { break } - txs = seg.ranges.to - 1 + txs = seg.to - 1 } return cmp.Min(headers, cmp.Min(bodies, txs)) } @@ -487,7 +487,7 @@ func (s *RoSnapshots) Files() (list []string) { if seg.seg == nil { continue } - if seg.ranges.from > maxBlockNumInFiles { + if seg.from > maxBlockNumInFiles { continue } _, fName := filepath.Split(seg.seg.FilePath()) @@ -497,7 +497,7 @@ func (s *RoSnapshots) Files() (list []string) { if seg.seg == nil { continue } - if seg.ranges.from > maxBlockNumInFiles { + if seg.from > maxBlockNumInFiles { continue } _, fName := filepath.Split(seg.seg.FilePath()) @@ -507,7 +507,7 @@ func (s *RoSnapshots) Files() (list []string) { if seg.Seg == nil { continue } - if seg.ranges.from > maxBlockNumInFiles { + if seg.from > maxBlockNumInFiles { continue } _, fName := filepath.Split(seg.Seg.FilePath()) @@ -519,6 +519,14 @@ func (s *RoSnapshots) Files() (list []string) { // ReopenList stops on optimistic=false, continue opening files on optimistic=true func (s *RoSnapshots) ReopenList(fileNames []string, optimistic bool) error { + return s.rebuildSegments(fileNames, true, optimistic) +} + +func (s *RoSnapshots) InitSegments(fileNames []string) error { + return s.rebuildSegments(fileNames, false, true) +} + +func (s *RoSnapshots) rebuildSegments(fileNames []string, open bool, optimistic bool) error { s.Headers.lock.Lock() defer s.Headers.lock.Unlock() s.Bodies.lock.Lock() @@ -552,22 +560,25 @@ Loop: } } if !exists { - sn = &HeaderSegment{ranges: Range{f.From, f.To}} + sn = &HeaderSegment{Range: Range{f.From, f.To}} } - if err := sn.reopenSeg(s.dir); err != nil { - if errors.Is(err, os.ErrNotExist) { + + if open { + if err := sn.reopenSeg(s.dir); err != nil { + if errors.Is(err, os.ErrNotExist) { + if optimistic { + continue Loop + } else { + break Loop + } + } if optimistic { + s.logger.Warn("[snapshots] open segment", "err", err) continue Loop } else { - break Loop + return err } } - if optimistic { - s.logger.Warn("[snapshots] open segment", "err", err) - continue Loop - } else { - return err - } } if !exists { @@ -575,8 +586,11 @@ Loop: // then make segment available even if index open may fail s.Headers.segments = append(s.Headers.segments, sn) } - if err := sn.reopenIdxIfNeed(s.dir, optimistic); err != nil { - return err + + if open { + if err := sn.reopenIdxIfNeed(s.dir, optimistic); err != nil { + return err + } } case snaptype.Bodies: var sn *BodySegment @@ -592,28 +606,34 @@ Loop: } } if !exists { - sn = &BodySegment{ranges: Range{f.From, f.To}} + sn = &BodySegment{Range: Range{f.From, f.To}} } - if err := sn.reopenSeg(s.dir); err != nil { - if errors.Is(err, os.ErrNotExist) { + + if open { + if err := sn.reopenSeg(s.dir); err != nil { + if errors.Is(err, os.ErrNotExist) { + if optimistic { + continue Loop + } else { + break Loop + } + } if optimistic { + s.logger.Warn("[snapshots] open segment", "err", err) continue Loop } else { - break Loop + return err } } - if optimistic { - s.logger.Warn("[snapshots] open segment", "err", err) - continue Loop - } else { - return err - } } if !exists { s.Bodies.segments = append(s.Bodies.segments, sn) } - if err := sn.reopenIdxIfNeed(s.dir, optimistic); err != nil { - return err + + if open { + if err := sn.reopenIdxIfNeed(s.dir, optimistic); err != nil { + return err + } } case snaptype.Transactions: var sn *TxnSegment @@ -629,28 +649,35 @@ Loop: } } if !exists { - sn = &TxnSegment{ranges: Range{f.From, f.To}} + sn = &TxnSegment{Range: Range{f.From, f.To}} } - if err := sn.reopenSeg(s.dir); err != nil { - if errors.Is(err, os.ErrNotExist) { + + if open { + if err := sn.reopenSeg(s.dir); err != nil { + if errors.Is(err, os.ErrNotExist) { + if optimistic { + continue Loop + } else { + break Loop + } + } if optimistic { + s.logger.Warn("[snapshots] open segment", "err", err) continue Loop } else { - break Loop + return err } } - if optimistic { - s.logger.Warn("[snapshots] open segment", "err", err) - continue Loop - } else { - return err - } } + if !exists { s.Txs.segments = append(s.Txs.segments, sn) } - if err := sn.reopenIdxIfNeed(s.dir, optimistic); err != nil { - return err + + if open { + if err := sn.reopenIdxIfNeed(s.dir, optimistic); err != nil { + return err + } } default: processed = false @@ -680,7 +707,7 @@ func (s *RoSnapshots) Ranges() (ranges []Range) { defer view.Close() for _, sn := range view.Headers() { - ranges = append(ranges, sn.ranges) + ranges = append(ranges, sn.Range) } return ranges } @@ -688,7 +715,14 @@ func (s *RoSnapshots) Ranges() (ranges []Range) { func (s *RoSnapshots) OptimisticalyReopenFolder() { _ = s.ReopenFolder() } func (s *RoSnapshots) OptimisticalyReopenWithDB(db kv.RoDB) { _ = s.ReopenWithDB(db) } func (s *RoSnapshots) ReopenFolder() error { - files, _, err := Segments(s.dir) + return s.ReopenSegments(snaptype.BlockSnapshotTypes) +} + +func (s *RoSnapshots) ReopenSegments(types []snaptype.Type) error { + files, _, err := segments(s.dir, func(dir string, in []snaptype.FileInfo) (res []snaptype.FileInfo) { + return typeOfSegmentsMustExist(dir, in, types) + }) + if err != nil { return err } @@ -699,6 +733,7 @@ func (s *RoSnapshots) ReopenFolder() error { } return s.ReopenList(list, false) } + func (s *RoSnapshots) ReopenWithDB(db kv.RoDB) error { if err := db.View(context.Background(), func(tx kv.Tx) error { snList, _, err := rawdb.ReadSnapshots(tx) @@ -809,15 +844,15 @@ func (s *RoSnapshots) PrintDebug() { defer s.Txs.lock.RUnlock() fmt.Println(" == Snapshots, Header") for _, sn := range s.Headers.segments { - fmt.Printf("%d, %t\n", sn.ranges.from, sn.idxHeaderHash == nil) + fmt.Printf("%d, %t\n", sn.from, sn.idxHeaderHash == nil) } fmt.Println(" == Snapshots, Body") for _, sn := range s.Bodies.segments { - fmt.Printf("%d, %t\n", sn.ranges.from, sn.idxBodyNumber == nil) + fmt.Printf("%d, %t\n", sn.from, sn.idxBodyNumber == nil) } fmt.Println(" == Snapshots, Txs") for _, sn := range s.Txs.segments { - fmt.Printf("%d, %t, %t\n", sn.ranges.from, sn.IdxTxnHash == nil, sn.IdxTxnHash2BlockNum == nil) + fmt.Printf("%d, %t, %t\n", sn.from, sn.IdxTxnHash == nil, sn.IdxTxnHash2BlockNum == nil) } } @@ -879,7 +914,7 @@ func buildIdx(ctx context.Context, sn snaptype.FileInfo, chainConfig *chain.Conf //log.Info("[snapshots] build idx", "file", fName) switch sn.T { case snaptype.Headers: - if err := HeadersIdx(ctx, chainConfig, sn.Path, sn.From, tmpDir, p, lvl, logger); err != nil { + if err := HeadersIdx(ctx, sn.Path, sn.From, tmpDir, p, lvl, logger); err != nil { return err } case snaptype.Bodies: @@ -1066,13 +1101,13 @@ func noGaps(in []snaptype.FileInfo) (out []snaptype.FileInfo, missingSnapshots [ return out, missingSnapshots } -func allTypeOfSegmentsMustExist(dir string, in []snaptype.FileInfo) (res []snaptype.FileInfo) { +func typeOfSegmentsMustExist(dir string, in []snaptype.FileInfo, types []snaptype.Type) (res []snaptype.FileInfo) { MainLoop: for _, f := range in { if f.From == f.To { continue } - for _, t := range snaptype.BlockSnapshotTypes { + for _, t := range types { p := filepath.Join(dir, snaptype.SegmentFileName(f.From, f.To, t)) if !dir2.FileExist(p) { continue MainLoop @@ -1083,21 +1118,12 @@ MainLoop: return res } +func allTypeOfSegmentsMustExist(dir string, in []snaptype.FileInfo) (res []snaptype.FileInfo) { + return typeOfSegmentsMustExist(dir, in, snaptype.BlockSnapshotTypes) +} + func borSegmentsMustExist(dir string, in []snaptype.FileInfo) (res []snaptype.FileInfo) { -MainLoop: - for _, f := range in { - if f.From == f.To { - continue - } - for _, t := range []snaptype.Type{snaptype.BorEvents, snaptype.BorSpans} { - p := filepath.Join(dir, snaptype.SegmentFileName(f.From, f.To, t)) - if !dir2.FileExist(p) { - continue MainLoop - } - } - res = append(res, f) - } - return res + return typeOfSegmentsMustExist(dir, in, []snaptype.Type{snaptype.BorEvents, snaptype.BorSpans}) } // noOverlaps - keep largest ranges and avoid overlap @@ -1148,6 +1174,10 @@ func SegmentsCaplin(dir string) (res []snaptype.FileInfo, missingSnapshots []Ran } func Segments(dir string) (res []snaptype.FileInfo, missingSnapshots []Range, err error) { + return segments(dir, allTypeOfSegmentsMustExist) +} + +func segments(dir string, segmentsTypeCheck func(dir string, in []snaptype.FileInfo) []snaptype.FileInfo) (res []snaptype.FileInfo, missingSnapshots []Range, err error) { list, err := snaptype.Segments(dir) if err != nil { return nil, missingSnapshots, err @@ -1161,7 +1191,7 @@ func Segments(dir string) (res []snaptype.FileInfo, missingSnapshots []Range, er } l = append(l, f) } - l, m = noGaps(noOverlaps(allTypeOfSegmentsMustExist(dir, l))) + l, m = noGaps(noOverlaps(segmentsTypeCheck(dir, l))) res = append(res, l...) missingSnapshots = append(missingSnapshots, m...) } @@ -1173,7 +1203,7 @@ func Segments(dir string) (res []snaptype.FileInfo, missingSnapshots []Range, er } l = append(l, f) } - l, _ = noGaps(noOverlaps(allTypeOfSegmentsMustExist(dir, l))) + l, _ = noGaps(noOverlaps(segmentsTypeCheck(dir, l))) res = append(res, l...) } { @@ -1184,7 +1214,7 @@ func Segments(dir string) (res []snaptype.FileInfo, missingSnapshots []Range, er } l = append(l, f) } - l, _ = noGaps(noOverlaps(allTypeOfSegmentsMustExist(dir, l))) + l, _ = noGaps(noOverlaps(segmentsTypeCheck(dir, l))) res = append(res, l...) } @@ -2072,7 +2102,7 @@ RETRY: } // HeadersIdx - headerHash -> offset (analog of kv.HeaderNumber) -func HeadersIdx(ctx context.Context, chainConfig *chain.Config, segmentFilePath string, firstBlockNumInSegment uint64, tmpDir string, p *background.Progress, lvl log.Lvl, logger log.Logger) (err error) { +func HeadersIdx(ctx context.Context, segmentFilePath string, firstBlockNumInSegment uint64, tmpDir string, p *background.Progress, lvl log.Lvl, logger log.Logger) (err error) { defer func() { if rec := recover(); rec != nil { _, fName := filepath.Split(segmentFilePath) @@ -2086,15 +2116,19 @@ func HeadersIdx(ctx context.Context, chainConfig *chain.Config, segmentFilePath } defer d.Close() - _, fname := filepath.Split(segmentFilePath) - p.Name.Store(&fname) - p.Total.Store(uint64(d.Count())) + if p != nil { + _, fname := filepath.Split(segmentFilePath) + p.Name.Store(&fname) + p.Total.Store(uint64(d.Count())) + } hasher := crypto.NewKeccakState() defer cryptopool.ReturnToPoolKeccak256(hasher) var h common2.Hash if err := Idx(ctx, d, firstBlockNumInSegment, tmpDir, log.LvlDebug, func(idx *recsplit.RecSplit, i, offset uint64, word []byte) error { - p.Processed.Add(1) + if p != nil { + p.Processed.Add(1) + } headerRlp := word[1:] hasher.Reset() hasher.Write(headerRlp) @@ -2305,7 +2339,7 @@ func (v *View) Bodies() []*BodySegment { return v.s.Bodies.segments } func (v *View) Txs() []*TxnSegment { return v.s.Txs.segments } func (v *View) HeadersSegment(blockNum uint64) (*HeaderSegment, bool) { for _, seg := range v.Headers() { - if !(blockNum >= seg.ranges.from && blockNum < seg.ranges.to) { + if !(blockNum >= seg.from && blockNum < seg.to) { continue } return seg, true @@ -2314,7 +2348,7 @@ func (v *View) HeadersSegment(blockNum uint64) (*HeaderSegment, bool) { } func (v *View) BodiesSegment(blockNum uint64) (*BodySegment, bool) { for _, seg := range v.Bodies() { - if !(blockNum >= seg.ranges.from && blockNum < seg.ranges.to) { + if !(blockNum >= seg.from && blockNum < seg.to) { continue } return seg, true @@ -2323,7 +2357,7 @@ func (v *View) BodiesSegment(blockNum uint64) (*BodySegment, bool) { } func (v *View) TxsSegment(blockNum uint64) (*TxnSegment, bool) { for _, seg := range v.Txs() { - if !(blockNum >= seg.ranges.from && blockNum < seg.ranges.to) { + if !(blockNum >= seg.from && blockNum < seg.to) { continue } return seg, true @@ -2341,10 +2375,10 @@ func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (map[snap tSegments := view.Txs() for i, sn := range hSegments { - if sn.ranges.from < from { + if sn.from < from { continue } - if sn.ranges.to > to { + if sn.to > to { break } toMerge[snaptype.Headers] = append(toMerge[snaptype.Headers], hSegments[i].seg.FilePath()) diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots_test.go b/turbo/snapshotsync/freezeblocks/block_snapshots_test.go index 76ddb03c7de..609083d5891 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots_test.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots_test.go @@ -200,11 +200,11 @@ func TestOpenAllSnapshot(t *testing.T) { seg, ok := view.TxsSegment(10) require.True(ok) - require.Equal(int(seg.ranges.to), 500_000) + require.Equal(int(seg.to), 500_000) seg, ok = view.TxsSegment(500_000) require.True(ok) - require.Equal(int(seg.ranges.to), 1_000_000) + require.Equal(int(seg.to), 1_000_000) _, ok = view.TxsSegment(1_000_000) require.False(ok) diff --git a/turbo/snapshotsync/freezeblocks/bor_snapshots.go b/turbo/snapshotsync/freezeblocks/bor_snapshots.go index 9080f9ddac7..02e28b955cb 100644 --- a/turbo/snapshotsync/freezeblocks/bor_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/bor_snapshots.go @@ -41,7 +41,7 @@ import ( type BorEventSegment struct { seg *compress.Decompressor // value: event_rlp IdxBorTxnHash *recsplit.Index // bor_transaction_hash -> bor_event_segment_offset - ranges Range + Range } func (sn *BorEventSegment) closeIdx() { @@ -62,7 +62,7 @@ func (sn *BorEventSegment) close() { } func (sn *BorEventSegment) reopenSeg(dir string) (err error) { sn.closeSeg() - fileName := snaptype.SegmentFileName(sn.ranges.from, sn.ranges.to, snaptype.BorEvents) + fileName := snaptype.SegmentFileName(sn.from, sn.to, snaptype.BorEvents) sn.seg, err = compress.NewDecompressor(path.Join(dir, fileName)) if err != nil { return fmt.Errorf("%w, fileName: %s", err, fileName) @@ -75,7 +75,7 @@ func (sn *BorEventSegment) reopenIdx(dir string) (err error) { return nil } - fileName := snaptype.IdxFileName(sn.ranges.from, sn.ranges.to, snaptype.BorEvents.String()) + fileName := snaptype.IdxFileName(sn.from, sn.to, snaptype.BorEvents.String()) sn.IdxBorTxnHash, err = recsplit.OpenIndex(path.Join(dir, fileName)) if err != nil { return fmt.Errorf("%w, fileName: %s", err, fileName) @@ -106,9 +106,9 @@ type borEventSegments struct { } type BorSpanSegment struct { - seg *compress.Decompressor // value: span_json - idx *recsplit.Index // span_id -> offset - ranges Range + seg *compress.Decompressor // value: span_json + idx *recsplit.Index // span_id -> offset + Range } func (sn *BorSpanSegment) closeIdx() { @@ -129,7 +129,7 @@ func (sn *BorSpanSegment) close() { } func (sn *BorSpanSegment) reopenSeg(dir string) (err error) { sn.closeSeg() - fileName := snaptype.SegmentFileName(sn.ranges.from, sn.ranges.to, snaptype.BorSpans) + fileName := snaptype.SegmentFileName(sn.from, sn.to, snaptype.BorSpans) sn.seg, err = compress.NewDecompressor(path.Join(dir, fileName)) if err != nil { return fmt.Errorf("%w, fileName: %s", err, fileName) @@ -141,7 +141,7 @@ func (sn *BorSpanSegment) reopenIdx(dir string) (err error) { if sn.seg == nil { return nil } - fileName := snaptype.IdxFileName(sn.ranges.from, sn.ranges.to, snaptype.BorSpans.String()) + fileName := snaptype.IdxFileName(sn.from, sn.to, snaptype.BorSpans.String()) sn.idx, err = recsplit.OpenIndex(path.Join(dir, fileName)) if err != nil { return fmt.Errorf("%w, fileName: %s", err, fileName) @@ -694,13 +694,13 @@ func (s *BorRoSnapshots) idxAvailability() uint64 { if seg.IdxBorTxnHash == nil { break } - events = seg.ranges.to - 1 + events = seg.to - 1 } for _, seg := range s.Spans.segments { if seg.idx == nil { break } - spans = seg.ranges.to - 1 + spans = seg.to - 1 } return cmp.Min(events, spans) } @@ -728,7 +728,7 @@ func (s *BorRoSnapshots) Files() (list []string) { if seg.seg == nil { continue } - if seg.ranges.from > max { + if seg.from > max { continue } _, fName := filepath.Split(seg.seg.FilePath()) @@ -738,7 +738,7 @@ func (s *BorRoSnapshots) Files() (list []string) { if seg.seg == nil { continue } - if seg.ranges.from > max { + if seg.from > max { continue } _, fName := filepath.Split(seg.seg.FilePath()) @@ -782,7 +782,7 @@ Loop: } } if !exists { - sn = &BorEventSegment{ranges: Range{f.From, f.To}} + sn = &BorEventSegment{Range: Range{f.From, f.To}} } if err := sn.reopenSeg(s.dir); err != nil { if errors.Is(err, os.ErrNotExist) { @@ -822,7 +822,7 @@ Loop: } } if !exists { - sn = &BorSpanSegment{ranges: Range{f.From, f.To}} + sn = &BorSpanSegment{Range: Range{f.From, f.To}} } if err := sn.reopenSeg(s.dir); err != nil { if errors.Is(err, os.ErrNotExist) { @@ -876,7 +876,7 @@ func (s *BorRoSnapshots) Ranges() (ranges []Range) { defer view.Close() for _, sn := range view.Events() { - ranges = append(ranges, sn.ranges) + ranges = append(ranges, sn.Range) } return ranges } @@ -975,11 +975,11 @@ func (s *BorRoSnapshots) PrintDebug() { defer s.Spans.lock.RUnlock() fmt.Println(" == BorSnapshots, Event") for _, sn := range s.Events.segments { - fmt.Printf("%d, %t\n", sn.ranges.from, sn.IdxBorTxnHash == nil) + fmt.Printf("%d, %t\n", sn.from, sn.IdxBorTxnHash == nil) } fmt.Println(" == BorSnapshots, Span") for _, sn := range s.Spans.segments { - fmt.Printf("%d, %t\n", sn.ranges.from, sn.idx == nil) + fmt.Printf("%d, %t\n", sn.from, sn.idx == nil) } } @@ -1007,7 +1007,7 @@ func (v *BorView) Events() []*BorEventSegment { return v.s.Events.segments } func (v *BorView) Spans() []*BorSpanSegment { return v.s.Spans.segments } func (v *BorView) EventsSegment(blockNum uint64) (*BorEventSegment, bool) { for _, seg := range v.Events() { - if !(blockNum >= seg.ranges.from && blockNum < seg.ranges.to) { + if !(blockNum >= seg.from && blockNum < seg.to) { continue } return seg, true @@ -1016,7 +1016,7 @@ func (v *BorView) EventsSegment(blockNum uint64) (*BorEventSegment, bool) { } func (v *BorView) SpansSegment(blockNum uint64) (*BorSpanSegment, bool) { for _, seg := range v.Spans() { - if !(blockNum >= seg.ranges.from && blockNum < seg.ranges.to) { + if !(blockNum >= seg.from && blockNum < seg.to) { continue } return seg, true @@ -1073,10 +1073,10 @@ func (m *BorMerger) filesByRange(snapshots *BorRoSnapshots, from, to uint64) (ma sSegments := view.Spans() for i, sn := range eSegments { - if sn.ranges.from < from { + if sn.from < from { continue } - if sn.ranges.to > to { + if sn.to > to { break } toMerge[snaptype.BorEvents] = append(toMerge[snaptype.BorEvents], eSegments[i].seg.FilePath())