Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Downloader: add ProhibitNewDownloads() #8939

Merged
merged 37 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cl/antiquary/antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,14 @@ func (a *Antiquary) antiquate(from, to uint64) error {
}

paths := a.sn.SegFilePaths(from, to)
downloadItems := make([]*proto_downloader.DownloadItem, len(paths))
downloadItems := make([]*proto_downloader.AddItem, len(paths))
for i, path := range paths {
downloadItems[i] = &proto_downloader.DownloadItem{
downloadItems[i] = &proto_downloader.AddItem{
Path: path,
}
}
// Notify bittorent to seed the new snapshots
if _, err := a.downloader.Download(a.ctx, &proto_downloader.DownloadRequest{Items: downloadItems}); err != nil {
if _, err := a.downloader.Add(a.ctx, &proto_downloader.AddRequest{Items: downloadItems}); err != nil {
return err
}

Expand Down
3 changes: 1 addition & 2 deletions cl/antiquary/state_antiquary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package antiquary

import (
"context"
_ "embed"
"fmt"
"testing"

_ "embed"

"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/cl/antiquary/tests"
Expand Down
2 changes: 1 addition & 1 deletion cmd/capcli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func (d *DownloadSnapshots) Run(ctx *Context) error {
if err != nil {
return fmt.Errorf("new server: %w", err)
}
return snapshotsync.WaitForDownloader("CapCliDownloader", ctx, false, snapshotsync.OnlyCaplin, s, tx, freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, log.Root()), freezeblocks.NewBorRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, log.Root())), nil, params.ChainConfigByChainName(d.Chain), direct.NewDownloaderClient(bittorrentServer))
return snapshotsync.WaitForDownloader("CapCliDownloader", ctx, false, snapshotsync.OnlyCaplin, s, tx, freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, log.Root()), freezeblocks.NewBorRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, log.Root())), params.ChainConfigByChainName(d.Chain), direct.NewDownloaderClient(bittorrentServer))
}

type RetrieveHistoricalState struct {
Expand Down
6 changes: 3 additions & 3 deletions cmd/downloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func init() {
rootCmd.Flags().StringVar(&staticPeersStr, utils.TorrentStaticPeersFlag.Name, utils.TorrentStaticPeersFlag.Value, utils.TorrentStaticPeersFlag.Usage)
rootCmd.Flags().BoolVar(&disableIPV6, "downloader.disable.ipv6", utils.DisableIPV6.Value, utils.DisableIPV6.Usage)
rootCmd.Flags().BoolVar(&disableIPV4, "downloader.disable.ipv4", utils.DisableIPV4.Value, utils.DisableIPV6.Usage)
rootCmd.Flags().BoolVar(&seedbox, "seedbox", false, "seedbox determines to either download .torrent from webseed or not")
rootCmd.Flags().BoolVar(&seedbox, "seedbox", false, "Turns downloader into independent (doesn't need Erigon) software which discover/download/seed new files - useful for Erigon network, and can work on very cheap hardware. It will: 1) download .torrent from webseed 2) download new files after upgrade 3) we planing add discovery of new files soon")
rootCmd.PersistentFlags().BoolVar(&forceVerify, "verify", false, "Verify files. All by default, or passed by --verify.files")
rootCmd.PersistentFlags().StringArrayVar(&forceVerifyFiles, "verify.files", nil, "Limit list of files to verify")

Expand Down Expand Up @@ -434,8 +434,8 @@ func StartGrpc(snServer *downloader.GrpcServer, addr string, creds *credentials.

// Add pre-configured
func addPreConfiguredHashes(ctx context.Context, d *downloader.Downloader) error {
for _, it := range snapcfg.KnownCfg(chain, nil, nil).Preverified {
if err := d.AddInfoHashAsMagnetLink(ctx, snaptype.Hex2InfoHash(it.Hash), it.Name); err != nil {
for _, it := range snapcfg.KnownCfg(chain).Preverified {
if err := d.AddMagnetLink(ctx, snaptype.Hex2InfoHash(it.Hash), it.Name); err != nil {
return err
}
}
Expand Down
23 changes: 2 additions & 21 deletions erigon-lib/chain/snapcfg/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,31 +109,12 @@ var KnownCfgs = map[string]*Cfg{
}

// KnownCfg return list of preverified hashes for given network, but apply whiteList filter if it's not empty
func KnownCfg(networkName string, whiteList, whiteListHistory []string) *Cfg {
func KnownCfg(networkName string) *Cfg {
c, ok := KnownCfgs[networkName]
if !ok {
return newCfg(Preverified{})
}

var result Preverified
if len(whiteList) == 0 {
result = c.Preverified
} else {
wlMap := make(map[string]struct{}, len(whiteList))
for _, fName := range whiteList {
wlMap[fName] = struct{}{}
}

result = make(Preverified, 0, len(c.Preverified))
for _, p := range c.Preverified {
if _, ok := wlMap[p.Name]; !ok {
continue
}
result = append(result, p)
}
}

return newCfg(result)
return newCfg(c.Preverified)
}

var KnownWebseeds = map[string][]string{
Expand Down
8 changes: 6 additions & 2 deletions erigon-lib/direct/downloader_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ func NewDownloaderClient(server proto_downloader.DownloaderServer) *DownloaderCl
return &DownloaderClient{server: server}
}

func (c *DownloaderClient) Download(ctx context.Context, in *proto_downloader.DownloadRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
return c.server.Download(ctx, in)
func (c *DownloaderClient) Add(ctx context.Context, in *proto_downloader.AddRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
return c.server.Add(ctx, in)
}

func (c *DownloaderClient) ProhibitNewDownloads(ctx context.Context, in *proto_downloader.ProhibitNewDownloadsRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
return c.server.ProhibitNewDownloads(ctx, in)
}
func (c *DownloaderClient) Delete(ctx context.Context, in *proto_downloader.DeleteRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
return c.server.Delete(ctx, in)
Expand Down
51 changes: 41 additions & 10 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
Expand All @@ -34,6 +36,7 @@ import (
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/diagnostics"
"github.com/ledgerwatch/erigon-lib/downloader/downloadercfg"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
Expand Down Expand Up @@ -137,6 +140,27 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, dirs datadir.Dirs, logger
return d, nil
}

const prohibitNewDownloadsFileName = "prohibit_new_downloads.lock"

// Erigon "download once" - means restart/upgrade/downgrade will not download files (and will be fast)
// After "download once" - Erigon will produce and seed new files
// Downloader will able: seed new files (already existing on FS), download uncomplete parts of existing files (if Verify found some bad parts)
func (d *Downloader) prohibitNewDownloads() error {
fPath := filepath.Join(d.SnapDir(), prohibitNewDownloadsFileName)
f, err := os.Create(fPath)
if err != nil {
return err
}
defer f.Close()
if err := f.Sync(); err != nil {
return err
}
return nil
}
func (d *Downloader) newDownloadsAreProhibited() bool {
return dir.FileExist(filepath.Join(d.SnapDir(), prohibitNewDownloadsFileName))
}

func (d *Downloader) MainLoopInBackground(silent bool) {
d.wg.Add(1)
go func() {
Expand Down Expand Up @@ -555,23 +579,30 @@ func (d *Downloader) AddNewSeedableFile(ctx context.Context, name string) error
return nil
}

func (d *Downloader) exists(name string) bool {
// Paranoic Mode on: if same file changed infoHash - skip it
// use-cases:
// - release of re-compressed version of same file,
// - ErigonV1.24 produced file X, then ErigonV1.25 released with new compression algorithm and produced X with anouther infoHash.
// ErigonV1.24 node must keep using existing file instead of downloading new one.
func (d *Downloader) alreadyHaveThisName(name string) bool {
for _, t := range d.torrentClient.Torrents() {
if t.Name() == name {
return true
select {
case <-t.GotInfo():
if t.Name() == name {
return true
}
default:
}
}
return false
}
func (d *Downloader) AddInfoHashAsMagnetLink(ctx context.Context, infoHash metainfo.Hash, name string) error {
if d.exists(name) {

func (d *Downloader) AddMagnetLink(ctx context.Context, infoHash metainfo.Hash, name string) error {
// Paranoic Mode on: if same file changed infoHash - skip it
// Example:
// - Erigon generated file X with hash H1. User upgraded Erigon. New version has preverified file X with hash H2. Must ignore H2 (don't send to Downloader)
if d.alreadyHaveThisName(name) {
return nil
}
if d.newDownloadsAreProhibited() {
return nil
}

mi := &metainfo.MetaInfo{AnnounceList: Trackers}
magnet := mi.Magnet(&infoHash, &metainfo.Info{Name: name})
spec, err := torrent.TorrentSpecFromMagnetUri(magnet.String())
Expand Down
16 changes: 13 additions & 3 deletions erigon-lib/downloader/downloader_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,19 @@ type GrpcServer struct {
d *Downloader
}

// Download - create new .torrent ONLY if initialSync, everything else Erigon can generate by itself
func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.DownloadRequest) (*emptypb.Empty, error) {
func (s *GrpcServer) ProhibitNewDownloads(context.Context, *proto_downloader.ProhibitNewDownloadsRequest) (*emptypb.Empty, error) {
if err := s.d.prohibitNewDownloads(); err != nil {
return nil, err
}
return nil, nil
}

// Erigon "download once" - means restart/upgrade/downgrade will not download files (and will be fast)
// After "download once" - Erigon will produce and seed new files
// Downloader will able: seed new files (already existing on FS), download uncomplete parts of existing files (if Verify found some bad parts)
func (s *GrpcServer) Add(ctx context.Context, request *proto_downloader.AddRequest) (*emptypb.Empty, error) {
defer s.d.ReCalcStats(10 * time.Second) // immediately call ReCalc to set stat.Complete flag

logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()

Expand All @@ -69,7 +79,7 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow
continue
}

if err := s.d.AddInfoHashAsMagnetLink(ctx, Proto2InfoHash(it.TorrentHash), it.Path); err != nil {
if err := s.d.AddMagnetLink(ctx, Proto2InfoHash(it.TorrentHash), it.Path); err != nil {
return nil, err
}
}
Expand Down
6 changes: 3 additions & 3 deletions erigon-lib/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ func TestChangeInfoHashOfSameFile(t *testing.T) {
d, err := New(context.Background(), cfg, dirs, log.New(), log.LvlInfo, true)
require.NoError(err)
defer d.Close()
err = d.AddInfoHashAsMagnetLink(d.ctx, snaptype.Hex2InfoHash("aa"), "a.seg")
err = d.AddMagnetLink(d.ctx, snaptype.Hex2InfoHash("aa"), "a.seg")
require.NoError(err)
tt, ok := d.torrentClient.Torrent(snaptype.Hex2InfoHash("aa"))
require.True(ok)
require.Equal("a.seg", tt.Name())

// adding same file twice is ok
err = d.AddInfoHashAsMagnetLink(d.ctx, snaptype.Hex2InfoHash("aa"), "a.seg")
err = d.AddMagnetLink(d.ctx, snaptype.Hex2InfoHash("aa"), "a.seg")
require.NoError(err)

// adding same file with another infoHash - is ok, must be skipped
// use-cases:
// - release of re-compressed version of same file,
// - ErigonV1.24 produced file X, then ErigonV1.25 released with new compression algorithm and produced X with anouther infoHash.
// ErigonV1.24 node must keep using existing file instead of downloading new one.
err = d.AddInfoHashAsMagnetLink(d.ctx, snaptype.Hex2InfoHash("bb"), "a.seg")
err = d.AddMagnetLink(d.ctx, snaptype.Hex2InfoHash("bb"), "a.seg")
require.NoError(err)
tt, ok = d.torrentClient.Torrent(snaptype.Hex2InfoHash("aa"))
require.True(ok)
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/downloader/downloadercfg/downloadercfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func New(dirs datadir.Dirs, version string, verbosity lg.Level, downloadRate, up
webseedFileProviders = append(webseedFileProviders, localCfgFile)
}
//TODO: if don't pass "downloaded files list here" (which we store in db) - synced erigon will download new .torrent files. And erigon can't work with "unfinished" files.
snapCfg := snapcfg.KnownCfg(chainName, nil, nil)
snapCfg := snapcfg.KnownCfg(chainName)
return &Cfg{Dirs: dirs, ChainName: chainName,
ClientConfig: torrentConfig, DownloadSlots: downloadSlots,
WebSeedUrls: webseedHttpProviders, WebSeedFiles: webseedFileProviders, WebSeedS3Tokens: webseedS3Providers,
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.20
require (
github.com/erigontech/mdbx-go v0.27.21
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231210032908-6ff6f4c91c60
github.com/ledgerwatch/interfaces v0.0.0-20231031050643-c86352e41520
github.com/ledgerwatch/interfaces v0.0.0-20231209102305-b17e86fbe07d
github.com/ledgerwatch/log/v3 v3.9.0
github.com/ledgerwatch/secp256k1 v1.0.0
)
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231210032908-6ff6f4c91c60 h1:bsZ6XWPJkNp1DeVHkaX9/+/Tqg7+r5/IkRPlyc4Ztq4=
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20231210032908-6ff6f4c91c60/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/interfaces v0.0.0-20231031050643-c86352e41520 h1:j/PRJWbPrbk8wpVjU77SWS8xJ/N+dcxPs1relNSolUs=
github.com/ledgerwatch/interfaces v0.0.0-20231031050643-c86352e41520/go.mod h1:ugQv1QllJzBny3cKZKxUrSnykkjkBgm27eQM6dnGAcc=
github.com/ledgerwatch/interfaces v0.0.0-20231209102305-b17e86fbe07d h1:7aB9lKmUGAaWt4TzXnGLzJSZkhyuqREMmaao+Gn5Ky0=
github.com/ledgerwatch/interfaces v0.0.0-20231209102305-b17e86fbe07d/go.mod h1:ugQv1QllJzBny3cKZKxUrSnykkjkBgm27eQM6dnGAcc=
github.com/ledgerwatch/log/v3 v3.9.0 h1:iDwrXe0PVwBC68Dd94YSsHbMgQ3ufsgjzXtFNFVZFRk=
github.com/ledgerwatch/log/v3 v3.9.0/go.mod h1:EiAY6upmI/6LkNhOVxb4eVsmsP11HZCnZ3PlJMjYiqE=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=
Expand Down
Loading
Loading