Skip to content

Commit

Permalink
tests: Remove torrent simulator (#9845)
Browse files Browse the repository at this point in the history
  • Loading branch information
shohamc1 authored Apr 2, 2024
1 parent 06864e0 commit 95c8e37
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 249 deletions.
10 changes: 6 additions & 4 deletions cmd/snapshots/cmp/cmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
"time"

"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/log/v3"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"

"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/downloader"
Expand All @@ -24,9 +28,6 @@ import (
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/logging"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks"
"github.com/ledgerwatch/log/v3"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"
)

var Command = cli.Command{
Expand Down Expand Up @@ -140,7 +141,8 @@ func cmp(cliCtx *cli.Context) error {
}

if loc1.LType == sync.TorrentFs || loc2.LType == sync.TorrentFs {
torrentCli, err = sync.NewTorrentClient(cliCtx, chain)
config := sync.NewTorrentClientConfigFromCobra(cliCtx, chain)
torrentCli, err = sync.NewTorrentClient(config)
if err != nil {
return fmt.Errorf("can't create torrent: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions cmd/snapshots/copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
"strconv"
"strings"

"github.com/urfave/cli/v2"

"github.com/ledgerwatch/erigon-lib/downloader"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
"github.com/ledgerwatch/erigon/cmd/snapshots/flags"
"github.com/ledgerwatch/erigon/cmd/snapshots/sync"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/turbo/logging"
"github.com/urfave/cli/v2"
)

var (
Expand Down Expand Up @@ -126,7 +127,8 @@ func copy(cliCtx *cli.Context) error {

switch src.LType {
case sync.TorrentFs:
torrentCli, err = sync.NewTorrentClient(cliCtx, dst.Chain)
config := sync.NewTorrentClientConfigFromCobra(cliCtx, dst.Chain)
torrentCli, err = sync.NewTorrentClient(config)
if err != nil {
return fmt.Errorf("can't create torrent: %w", err)
}
Expand Down
85 changes: 68 additions & 17 deletions cmd/snapshots/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import (
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/log/v3"
"github.com/urfave/cli/v2"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"

"github.com/ledgerwatch/erigon-lib/chain/snapcfg"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/datadir"
Expand All @@ -27,9 +32,6 @@ import (
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/p2p/nat"
"github.com/ledgerwatch/erigon/params"
"github.com/urfave/cli/v2"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
)

type LType int
Expand Down Expand Up @@ -128,31 +130,80 @@ type TorrentClient struct {
cfg *torrent.ClientConfig
}

func NewTorrentClient(cliCtx *cli.Context, chain string) (*TorrentClient, error) {
logger := Logger(cliCtx.Context)
tempDir := TempDir(cliCtx.Context)
type CreateNewTorrentClientConfig struct {
Chain string
WebSeeds string
DownloadRate string
UploadRate string
Verbosity int
TorrentPort int
ConnsPerFile int
DisableIPv6 bool
DisableIPv4 bool
NatFlag string
Logger log.Logger
TempDir string
}

func NewTorrentClientConfigFromCobra(cliCtx *cli.Context, chain string) CreateNewTorrentClientConfig {
return CreateNewTorrentClientConfig{
Chain: chain,
WebSeeds: cliCtx.String(utils.WebSeedsFlag.Name),
DownloadRate: cliCtx.String(utils.TorrentDownloadRateFlag.Name),
UploadRate: cliCtx.String(utils.TorrentUploadRateFlag.Name),
Verbosity: cliCtx.Int(utils.TorrentVerbosityFlag.Name),
TorrentPort: cliCtx.Int(utils.TorrentPortFlag.Name),
ConnsPerFile: cliCtx.Int(utils.TorrentConnsPerFileFlag.Name),
DisableIPv6: cliCtx.Bool(utils.DisableIPV6.Name),
DisableIPv4: cliCtx.Bool(utils.DisableIPV4.Name),
NatFlag: cliCtx.String(utils.NATFlag.Name),
Logger: Logger(cliCtx.Context),
TempDir: TempDir(cliCtx.Context),
}
}

func NewDefaultTorrentClientConfig(chain string, torrentDir string, logger log.Logger) CreateNewTorrentClientConfig {
return CreateNewTorrentClientConfig{
Chain: chain,
WebSeeds: utils.WebSeedsFlag.Value,
DownloadRate: utils.TorrentDownloadRateFlag.Value,
UploadRate: utils.TorrentUploadRateFlag.Value,
Verbosity: utils.TorrentVerbosityFlag.Value,
TorrentPort: utils.TorrentPortFlag.Value,
ConnsPerFile: utils.TorrentConnsPerFileFlag.Value,
DisableIPv6: utils.DisableIPV6.Value,
DisableIPv4: utils.DisableIPV4.Value,
NatFlag: utils.NATFlag.Value,
Logger: logger,
TempDir: torrentDir,
}
}

func NewTorrentClient(config CreateNewTorrentClientConfig) (*TorrentClient, error) {
logger := config.Logger
tempDir := config.TempDir

torrentDir := filepath.Join(tempDir, "torrents", chain)
torrentDir := filepath.Join(tempDir, "torrents", config.Chain)

dirs := datadir.New(torrentDir)

webseedsList := common.CliString2Array(cliCtx.String(utils.WebSeedsFlag.Name))
webseedsList := common.CliString2Array(config.WebSeeds)

if known, ok := snapcfg.KnownWebseeds[chain]; ok {
if known, ok := snapcfg.KnownWebseeds[config.Chain]; ok {
webseedsList = append(webseedsList, known...)
}

var downloadRate, uploadRate datasize.ByteSize

if err := downloadRate.UnmarshalText([]byte(cliCtx.String(utils.TorrentDownloadRateFlag.Name))); err != nil {
if err := downloadRate.UnmarshalText([]byte(config.DownloadRate)); err != nil {
return nil, err
}

if err := uploadRate.UnmarshalText([]byte(cliCtx.String(utils.TorrentUploadRateFlag.Name))); err != nil {
if err := uploadRate.UnmarshalText([]byte(config.UploadRate)); err != nil {
return nil, err
}

logLevel, _, err := downloadercfg.Int2LogLevel(cliCtx.Int(utils.TorrentVerbosityFlag.Name))
logLevel, _, err := downloadercfg.Int2LogLevel(config.Verbosity)

if err != nil {
return nil, err
Expand All @@ -161,8 +212,8 @@ func NewTorrentClient(cliCtx *cli.Context, chain string) (*TorrentClient, error)
version := "erigon: " + params.VersionWithCommit(params.GitCommit)

cfg, err := downloadercfg.New(dirs, version, logLevel, downloadRate, uploadRate,
cliCtx.Int(utils.TorrentPortFlag.Name),
cliCtx.Int(utils.TorrentConnsPerFileFlag.Name), 0, nil, webseedsList, chain, true)
config.TorrentPort,
config.ConnsPerFile, 0, nil, webseedsList, config.Chain, true)

if err != nil {
return nil, err
Expand All @@ -181,10 +232,10 @@ func NewTorrentClient(cliCtx *cli.Context, chain string) (*TorrentClient, error)
cfg.ClientConfig.DataDir = torrentDir

cfg.ClientConfig.PieceHashersPerTorrent = 32 * runtime.NumCPU()
cfg.ClientConfig.DisableIPv6 = cliCtx.Bool(utils.DisableIPV6.Name)
cfg.ClientConfig.DisableIPv4 = cliCtx.Bool(utils.DisableIPV4.Name)
cfg.ClientConfig.DisableIPv6 = config.DisableIPv6
cfg.ClientConfig.DisableIPv4 = config.DisableIPv4

natif, err := nat.Parse(utils.NATFlag.Value)
natif, err := nat.Parse(config.NatFlag)

if err != nil {
return nil, fmt.Errorf("invalid nat option %s: %w", utils.NATFlag.Value, err)
Expand Down
9 changes: 6 additions & 3 deletions cmd/snapshots/verify/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"path/filepath"
"strconv"

"github.com/urfave/cli/v2"

"github.com/ledgerwatch/erigon-lib/downloader"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
"github.com/ledgerwatch/erigon/cmd/snapshots/flags"
"github.com/ledgerwatch/erigon/cmd/snapshots/sync"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/urfave/cli/v2"
)

var (
Expand Down Expand Up @@ -99,7 +100,8 @@ func verify(cliCtx *cli.Context) error {

switch dst.LType {
case sync.TorrentFs:
torrentCli, err = sync.NewTorrentClient(cliCtx, dst.Chain)
config := sync.NewTorrentClientConfigFromCobra(cliCtx, dst.Chain)
torrentCli, err = sync.NewTorrentClient(config)
if err != nil {
return fmt.Errorf("can't create torrent: %w", err)
}
Expand All @@ -125,7 +127,8 @@ func verify(cliCtx *cli.Context) error {
switch src.LType {
case sync.TorrentFs:
if torrentCli == nil {
torrentCli, err = sync.NewTorrentClient(cliCtx, dst.Chain)
config := sync.NewTorrentClientConfigFromCobra(cliCtx, dst.Chain)
torrentCli, err = sync.NewTorrentClient(config)
if err != nil {
return fmt.Errorf("can't create torrent: %w", err)
}
Expand Down
21 changes: 14 additions & 7 deletions p2p/sentry/simulator/sentry_simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"path/filepath"

"github.com/ledgerwatch/log/v3"
"google.golang.org/protobuf/types/known/emptypb"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/ledgerwatch/erigon-lib/gointerfaces"
sentry_if "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
"github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon/cmd/snapshots/sync"
core_types "github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/crypto"
"github.com/ledgerwatch/erigon/eth/ethconfig"
Expand All @@ -35,7 +37,8 @@ type server struct {
knownSnapshots *freezeblocks.RoSnapshots
activeSnapshots *freezeblocks.RoSnapshots
blockReader *freezeblocks.BlockReader
downloader *TorrentClient
downloader *sync.TorrentClient
chain string
}

func newPeer(name string, caps []p2p.Cap) (*p2p.Peer, error) {
Expand All @@ -61,6 +64,7 @@ func NewSentry(ctx context.Context, chain string, snapshotLocation string, peerC
}

cfg := snapcfg.KnownCfg(chain)
torrentDir := filepath.Join(snapshotLocation, "torrents", chain)

knownSnapshots := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{
Enabled: true,
Expand All @@ -81,13 +85,14 @@ func NewSentry(ctx context.Context, chain string, snapshotLocation string, peerC
Enabled: true,
Produce: false,
NoDownloader: true,
}, snapshotLocation, 0, logger)
}, torrentDir, 0, logger)

if err := activeSnapshots.ReopenFolder(); err != nil {
return nil, err
}

downloader, err := NewTorrentClient(ctx, chain, snapshotLocation, logger)
config := sync.NewDefaultTorrentClientConfig(chain, snapshotLocation, logger)
downloader, err := sync.NewTorrentClient(config)

if err != nil {
return nil, err
Expand All @@ -102,6 +107,7 @@ func NewSentry(ctx context.Context, chain string, snapshotLocation string, peerC
blockReader: freezeblocks.NewBlockReader(activeSnapshots, nil),
logger: logger,
downloader: downloader,
chain: chain,
}

go func() {
Expand Down Expand Up @@ -434,19 +440,20 @@ func (s *server) getHeaderByHash(ctx context.Context, hash common.Hash) (*core_t

func (s *server) downloadHeaders(ctx context.Context, header *freezeblocks.Segment) error {
fileName := snaptype.SegmentFileName(0, header.From(), header.To(), snaptype.Enums.Headers)
session := sync.NewTorrentSession(s.downloader, s.chain)

s.logger.Info(fmt.Sprintf("Downloading %s", fileName))

err := s.downloader.Download(ctx, fileName)
err := session.Download(ctx, fileName)

if err != nil {
return fmt.Errorf("can't download %s: %w", fileName, err)
}

s.logger.Info(fmt.Sprintf("Indexing %s", fileName))

info, _, _ := snaptype.ParseFileName(s.downloader.LocalFsRoot(), fileName)
info, _, _ := snaptype.ParseFileName(session.LocalFsRoot(), fileName)

salt := freezeblocks.GetIndicesSalt(s.downloader.LocalFsRoot())
return freezeblocks.HeadersIdx(ctx, info, salt, s.downloader.LocalFsRoot(), nil, log.LvlDebug, s.logger)
salt := freezeblocks.GetIndicesSalt(session.LocalFsRoot())
return freezeblocks.HeadersIdx(ctx, info, salt, session.LocalFsRoot(), nil, log.LvlDebug, s.logger)
}
Loading

0 comments on commit 95c8e37

Please sign in to comment.