Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: Remove torrent simulator #9845

Merged
merged 3 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cmd/snapshots/cmp/cmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
"time"

"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/log/v3"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"

"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/downloader"
Expand All @@ -24,9 +28,6 @@ import (
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/logging"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks"
"github.com/ledgerwatch/log/v3"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"
)

var Command = cli.Command{
Expand Down Expand Up @@ -140,7 +141,8 @@ func cmp(cliCtx *cli.Context) error {
}

if loc1.LType == sync.TorrentFs || loc2.LType == sync.TorrentFs {
torrentCli, err = sync.NewTorrentClient(cliCtx, chain)
config := sync.NewTorrentClientConfigFromCobra(cliCtx, chain)
torrentCli, err = sync.NewTorrentClient(config)
if err != nil {
return fmt.Errorf("can't create torrent: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions cmd/snapshots/copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
"strconv"
"strings"

"github.com/urfave/cli/v2"

"github.com/ledgerwatch/erigon-lib/downloader"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
"github.com/ledgerwatch/erigon/cmd/snapshots/flags"
"github.com/ledgerwatch/erigon/cmd/snapshots/sync"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/turbo/logging"
"github.com/urfave/cli/v2"
)

var (
Expand Down Expand Up @@ -126,7 +127,8 @@ func copy(cliCtx *cli.Context) error {

switch src.LType {
case sync.TorrentFs:
torrentCli, err = sync.NewTorrentClient(cliCtx, dst.Chain)
config := sync.NewTorrentClientConfigFromCobra(cliCtx, dst.Chain)
torrentCli, err = sync.NewTorrentClient(config)
if err != nil {
return fmt.Errorf("can't create torrent: %w", err)
}
Expand Down
85 changes: 68 additions & 17 deletions cmd/snapshots/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import (
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/log/v3"
"github.com/urfave/cli/v2"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"

"github.com/ledgerwatch/erigon-lib/chain/snapcfg"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/datadir"
Expand All @@ -27,9 +32,6 @@ import (
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/p2p/nat"
"github.com/ledgerwatch/erigon/params"
"github.com/urfave/cli/v2"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
)

type LType int
Expand Down Expand Up @@ -128,31 +130,80 @@ type TorrentClient struct {
cfg *torrent.ClientConfig
}

func NewTorrentClient(cliCtx *cli.Context, chain string) (*TorrentClient, error) {
logger := Logger(cliCtx.Context)
tempDir := TempDir(cliCtx.Context)
type CreateNewTorrentClientConfig struct {
Chain string
WebSeeds string
DownloadRate string
UploadRate string
Verbosity int
TorrentPort int
ConnsPerFile int
DisableIPv6 bool
DisableIPv4 bool
NatFlag string
Logger log.Logger
TempDir string
}

func NewTorrentClientConfigFromCobra(cliCtx *cli.Context, chain string) CreateNewTorrentClientConfig {
return CreateNewTorrentClientConfig{
Chain: chain,
WebSeeds: cliCtx.String(utils.WebSeedsFlag.Name),
DownloadRate: cliCtx.String(utils.TorrentDownloadRateFlag.Name),
UploadRate: cliCtx.String(utils.TorrentUploadRateFlag.Name),
Verbosity: cliCtx.Int(utils.TorrentVerbosityFlag.Name),
TorrentPort: cliCtx.Int(utils.TorrentPortFlag.Name),
ConnsPerFile: cliCtx.Int(utils.TorrentConnsPerFileFlag.Name),
DisableIPv6: cliCtx.Bool(utils.DisableIPV6.Name),
DisableIPv4: cliCtx.Bool(utils.DisableIPV4.Name),
NatFlag: cliCtx.String(utils.NATFlag.Name),
Logger: Logger(cliCtx.Context),
TempDir: TempDir(cliCtx.Context),
}
}

func NewDefaultTorrentClientConfig(chain string, torrentDir string, logger log.Logger) CreateNewTorrentClientConfig {
return CreateNewTorrentClientConfig{
Chain: chain,
WebSeeds: utils.WebSeedsFlag.Value,
DownloadRate: utils.TorrentDownloadRateFlag.Value,
UploadRate: utils.TorrentUploadRateFlag.Value,
Verbosity: utils.TorrentVerbosityFlag.Value,
TorrentPort: utils.TorrentPortFlag.Value,
ConnsPerFile: utils.TorrentConnsPerFileFlag.Value,
DisableIPv6: utils.DisableIPV6.Value,
DisableIPv4: utils.DisableIPV4.Value,
NatFlag: utils.NATFlag.Value,
Logger: logger,
TempDir: torrentDir,
}
}

func NewTorrentClient(config CreateNewTorrentClientConfig) (*TorrentClient, error) {
logger := config.Logger
tempDir := config.TempDir

torrentDir := filepath.Join(tempDir, "torrents", chain)
torrentDir := filepath.Join(tempDir, "torrents", config.Chain)

dirs := datadir.New(torrentDir)

webseedsList := common.CliString2Array(cliCtx.String(utils.WebSeedsFlag.Name))
webseedsList := common.CliString2Array(config.WebSeeds)

if known, ok := snapcfg.KnownWebseeds[chain]; ok {
if known, ok := snapcfg.KnownWebseeds[config.Chain]; ok {
webseedsList = append(webseedsList, known...)
}

var downloadRate, uploadRate datasize.ByteSize

if err := downloadRate.UnmarshalText([]byte(cliCtx.String(utils.TorrentDownloadRateFlag.Name))); err != nil {
if err := downloadRate.UnmarshalText([]byte(config.DownloadRate)); err != nil {
return nil, err
}

if err := uploadRate.UnmarshalText([]byte(cliCtx.String(utils.TorrentUploadRateFlag.Name))); err != nil {
if err := uploadRate.UnmarshalText([]byte(config.UploadRate)); err != nil {
return nil, err
}

logLevel, _, err := downloadercfg.Int2LogLevel(cliCtx.Int(utils.TorrentVerbosityFlag.Name))
logLevel, _, err := downloadercfg.Int2LogLevel(config.Verbosity)

if err != nil {
return nil, err
Expand All @@ -161,8 +212,8 @@ func NewTorrentClient(cliCtx *cli.Context, chain string) (*TorrentClient, error)
version := "erigon: " + params.VersionWithCommit(params.GitCommit)

cfg, err := downloadercfg.New(dirs, version, logLevel, downloadRate, uploadRate,
cliCtx.Int(utils.TorrentPortFlag.Name),
cliCtx.Int(utils.TorrentConnsPerFileFlag.Name), 0, nil, webseedsList, chain, true)
config.TorrentPort,
config.ConnsPerFile, 0, nil, webseedsList, config.Chain, true)

if err != nil {
return nil, err
Expand All @@ -181,10 +232,10 @@ func NewTorrentClient(cliCtx *cli.Context, chain string) (*TorrentClient, error)
cfg.ClientConfig.DataDir = torrentDir

cfg.ClientConfig.PieceHashersPerTorrent = 32 * runtime.NumCPU()
cfg.ClientConfig.DisableIPv6 = cliCtx.Bool(utils.DisableIPV6.Name)
cfg.ClientConfig.DisableIPv4 = cliCtx.Bool(utils.DisableIPV4.Name)
cfg.ClientConfig.DisableIPv6 = config.DisableIPv6
cfg.ClientConfig.DisableIPv4 = config.DisableIPv4

natif, err := nat.Parse(utils.NATFlag.Value)
natif, err := nat.Parse(config.NatFlag)

if err != nil {
return nil, fmt.Errorf("invalid nat option %s: %w", utils.NATFlag.Value, err)
Expand Down
9 changes: 6 additions & 3 deletions cmd/snapshots/verify/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"path/filepath"
"strconv"

"github.com/urfave/cli/v2"

"github.com/ledgerwatch/erigon-lib/downloader"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
"github.com/ledgerwatch/erigon/cmd/snapshots/flags"
"github.com/ledgerwatch/erigon/cmd/snapshots/sync"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/urfave/cli/v2"
)

var (
Expand Down Expand Up @@ -99,7 +100,8 @@ func verify(cliCtx *cli.Context) error {

switch dst.LType {
case sync.TorrentFs:
torrentCli, err = sync.NewTorrentClient(cliCtx, dst.Chain)
config := sync.NewTorrentClientConfigFromCobra(cliCtx, dst.Chain)
torrentCli, err = sync.NewTorrentClient(config)
if err != nil {
return fmt.Errorf("can't create torrent: %w", err)
}
Expand All @@ -125,7 +127,8 @@ func verify(cliCtx *cli.Context) error {
switch src.LType {
case sync.TorrentFs:
if torrentCli == nil {
torrentCli, err = sync.NewTorrentClient(cliCtx, dst.Chain)
config := sync.NewTorrentClientConfigFromCobra(cliCtx, dst.Chain)
torrentCli, err = sync.NewTorrentClient(config)
if err != nil {
return fmt.Errorf("can't create torrent: %w", err)
}
Expand Down
21 changes: 14 additions & 7 deletions p2p/sentry/simulator/sentry_simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"path/filepath"

"github.com/ledgerwatch/log/v3"
"google.golang.org/protobuf/types/known/emptypb"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/ledgerwatch/erigon-lib/gointerfaces"
sentry_if "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
"github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon/cmd/snapshots/sync"
core_types "github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/crypto"
"github.com/ledgerwatch/erigon/eth/ethconfig"
Expand All @@ -35,7 +37,8 @@ type server struct {
knownSnapshots *freezeblocks.RoSnapshots
activeSnapshots *freezeblocks.RoSnapshots
blockReader *freezeblocks.BlockReader
downloader *TorrentClient
downloader *sync.TorrentClient
chain string
}

func newPeer(name string, caps []p2p.Cap) (*p2p.Peer, error) {
Expand All @@ -61,6 +64,7 @@ func NewSentry(ctx context.Context, chain string, snapshotLocation string, peerC
}

cfg := snapcfg.KnownCfg(chain)
torrentDir := filepath.Join(snapshotLocation, "torrents", chain)

knownSnapshots := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{
Enabled: true,
Expand All @@ -81,13 +85,14 @@ func NewSentry(ctx context.Context, chain string, snapshotLocation string, peerC
Enabled: true,
Produce: false,
NoDownloader: true,
}, snapshotLocation, 0, logger)
}, torrentDir, 0, logger)

if err := activeSnapshots.ReopenFolder(); err != nil {
return nil, err
}

downloader, err := NewTorrentClient(ctx, chain, snapshotLocation, logger)
config := sync.NewDefaultTorrentClientConfig(chain, snapshotLocation, logger)
downloader, err := sync.NewTorrentClient(config)

if err != nil {
return nil, err
Expand All @@ -102,6 +107,7 @@ func NewSentry(ctx context.Context, chain string, snapshotLocation string, peerC
blockReader: freezeblocks.NewBlockReader(activeSnapshots, nil),
logger: logger,
downloader: downloader,
chain: chain,
}

go func() {
Expand Down Expand Up @@ -434,19 +440,20 @@ func (s *server) getHeaderByHash(ctx context.Context, hash common.Hash) (*core_t

func (s *server) downloadHeaders(ctx context.Context, header *freezeblocks.Segment) error {
fileName := snaptype.SegmentFileName(0, header.From(), header.To(), snaptype.Enums.Headers)
session := sync.NewTorrentSession(s.downloader, s.chain)

s.logger.Info(fmt.Sprintf("Downloading %s", fileName))

err := s.downloader.Download(ctx, fileName)
err := session.Download(ctx, fileName)

if err != nil {
return fmt.Errorf("can't download %s: %w", fileName, err)
}

s.logger.Info(fmt.Sprintf("Indexing %s", fileName))

info, _, _ := snaptype.ParseFileName(s.downloader.LocalFsRoot(), fileName)
info, _, _ := snaptype.ParseFileName(session.LocalFsRoot(), fileName)

salt := freezeblocks.GetIndicesSalt(s.downloader.LocalFsRoot())
return freezeblocks.HeadersIdx(ctx, info, salt, s.downloader.LocalFsRoot(), nil, log.LvlDebug, s.logger)
salt := freezeblocks.GetIndicesSalt(session.LocalFsRoot())
return freezeblocks.HeadersIdx(ctx, info, salt, session.LocalFsRoot(), nil, log.LvlDebug, s.logger)
}
Loading
Loading