diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index 1e03a8264a8e..f43f1da242aa 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -425,6 +425,9 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment case routingOptionNoneKwd: ncfg.Routing = libp2p.NilRouterOption case routingOptionCustomKwd: + if cfg.Routing.AcceleratedDHTClient { + return fmt.Errorf("Routing.AcceleratedDHTClient option is set even tho Routing.Type is custom, using custom .AcceleratedDHTClient needs to be set on DHT routers individually") + } ncfg.Routing = libp2p.ConstructDelegatedRouting( cfg.Routing.Routers, cfg.Routing.Methods, diff --git a/config/experiments.go b/config/experiments.go index 072dcd0ddd62..f5ecf4be6292 100644 --- a/config/experiments.go +++ b/config/experiments.go @@ -8,7 +8,7 @@ type Experiments struct { Libp2pStreamMounting bool P2pHttpProxy bool //nolint StrategicProviding bool - AcceleratedDHTClient bool + AcceleratedDHTClient experimentalAcceleratedDHTClient `json:",omitempty"` OptimisticProvide bool OptimisticProvideJobsPoolSize int } diff --git a/config/routing.go b/config/routing.go index 1210bb3cecc3..7f5e48aa262b 100644 --- a/config/routing.go +++ b/config/routing.go @@ -15,6 +15,8 @@ type Routing struct { // When "custom" is set, user-provided Routing.Routers is used. Type *OptionalString `json:",omitempty"` + AcceleratedDHTClient bool + Routers Routers Methods Methods diff --git a/config/types.go b/config/types.go index 3a0d4f4b3b2d..a781f023af91 100644 --- a/config/types.go +++ b/config/types.go @@ -438,3 +438,27 @@ func (swarmLimits) UnmarshalJSON(b []byte) error { } } } + +type experimentalAcceleratedDHTClient struct{} + +var _ json.Unmarshaler = experimentalAcceleratedDHTClient{} + +func (experimentalAcceleratedDHTClient) UnmarshalJSON(b []byte) error { + d := json.NewDecoder(bytes.NewReader(b)) + for { + switch tok, err := d.Token(); err { + case io.EOF: + return nil + case nil: + switch tok { + case json.Delim('{'), json.Delim('}'): + // accept empty objects + continue + } + //nolint + return fmt.Errorf("The Experimental.AcceleratedDHTClient key has been moved to Routing.AcceleratedDHTClient in Kubo 0.21, please use this new key and remove the old one.") + default: + return err + } + } +} diff --git a/core/commands/stat_provide.go b/core/commands/stat_provide.go index f93f73cb4b8b..6ee51e516fa0 100644 --- a/core/commands/stat_provide.go +++ b/core/commands/stat_provide.go @@ -7,10 +7,10 @@ import ( "time" humanize "github.com/dustin/go-humanize" + "github.com/ipfs/boxo/provider" cmds "github.com/ipfs/go-ipfs-cmds" "github.com/ipfs/kubo/core/commands/cmdenv" - - "github.com/ipfs/boxo/provider/batched" + "golang.org/x/exp/constraints" ) var statProvideCmd = &cmds.Command{ @@ -34,12 +34,7 @@ This interface is not stable and may change from release to release. return ErrNotOnline } - sys, ok := nd.Provider.(*batched.BatchProvidingSystem) - if !ok { - return fmt.Errorf("can only return stats if Experimental.AcceleratedDHTClient is enabled") - } - - stats, err := sys.Stat(req.Context) + stats, err := nd.Provider.Stat() if err != nil { return err } @@ -51,7 +46,7 @@ This interface is not stable and may change from release to release. return nil }, Encoders: cmds.EncoderMap{ - cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, s *batched.BatchedProviderStats) error { + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, s *provider.ReproviderStats) error { wtr := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0) defer wtr.Flush() @@ -62,14 +57,14 @@ This interface is not stable and may change from release to release. return nil }), }, - Type: batched.BatchedProviderStats{}, + Type: provider.ReproviderStats{}, } func humanDuration(val time.Duration) string { return val.Truncate(time.Microsecond).String() } -func humanNumber(n int) string { +func humanNumber[T constraints.Float | constraints.Integer](n T) string { nf := float64(n) str := humanSI(nf, 0) fullStr := humanFull(nf, 0) diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index 6c97800a3384..5a7e321a9c6e 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -238,7 +238,7 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e return nil, fmt.Errorf("error constructing namesys: %w", err) } - subAPI.provider = provider.NewOfflineProvider() + subAPI.provider = provider.NewNoopProvider() subAPI.peerstore = nil subAPI.peerHost = nil diff --git a/core/node/groups.go b/core/node/groups.go index 5486788358cc..a626a5cae771 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -304,9 +304,9 @@ func Online(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part LibP2P(bcfg, cfg, userResourceOverrides), OnlineProviders( cfg.Experimental.StrategicProviding, - cfg.Experimental.AcceleratedDHTClient, cfg.Reprovider.Strategy.WithDefault(config.DefaultReproviderStrategy), cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval), + cfg.Routing.AcceleratedDHTClient, ), ) } @@ -320,12 +320,7 @@ func Offline(cfg *config.Config) fx.Option { fx.Provide(libp2p.Routing), fx.Provide(libp2p.ContentRouting), fx.Provide(libp2p.OfflineRouting), - OfflineProviders( - cfg.Experimental.StrategicProviding, - cfg.Experimental.AcceleratedDHTClient, - cfg.Reprovider.Strategy.WithDefault(config.DefaultReproviderStrategy), - cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval), - ), + OfflineProviders(), ) } diff --git a/core/node/libp2p/routing.go b/core/node/libp2p/routing.go index 2e356e447f5d..0b642ed8ce18 100644 --- a/core/node/libp2p/routing.go +++ b/core/node/libp2p/routing.go @@ -90,7 +90,7 @@ func BaseRouting(cfg *config.Config) interface{} { } } - if dualDHT != nil && cfg.Experimental.AcceleratedDHTClient { + if dualDHT != nil && cfg.Routing.AcceleratedDHTClient { cfg, err := in.Repo.Config() if err != nil { return out, err diff --git a/core/node/provider.go b/core/node/provider.go index 58e74bdb3f89..215e0adac790 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -5,145 +5,158 @@ import ( "fmt" "time" + "github.com/ipfs/boxo/blockstore" "github.com/ipfs/boxo/fetcher" pin "github.com/ipfs/boxo/pinning/pinner" provider "github.com/ipfs/boxo/provider" - "github.com/ipfs/boxo/provider/batched" - q "github.com/ipfs/boxo/provider/queue" - "github.com/ipfs/boxo/provider/simple" - "go.uber.org/fx" - - "github.com/ipfs/kubo/core/node/helpers" "github.com/ipfs/kubo/repo" irouting "github.com/ipfs/kubo/routing" + "go.uber.org/fx" ) -// SIMPLE - -// ProviderQueue creates new datastore backed provider queue -func ProviderQueue(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*q.Queue, error) { - return q.NewQueue(helpers.LifecycleCtx(mctx, lc), "provider-v1", repo.Datastore()) -} - -// SimpleProvider creates new record provider -func SimpleProvider(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *q.Queue, rt irouting.ProvideManyRouter) provider.Provider { - return simple.NewProvider(helpers.LifecycleCtx(mctx, lc), queue, rt) -} - -// SimpleReprovider creates new reprovider -func SimpleReprovider(reproviderInterval time.Duration) interface{} { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, rt irouting.ProvideManyRouter, keyProvider simple.KeyChanFunc) (provider.Reprovider, error) { - return simple.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil - } -} - -// SimpleProviderSys creates new provider system -func SimpleProviderSys(isOnline bool) interface{} { - return func(lc fx.Lifecycle, p provider.Provider, r provider.Reprovider) provider.System { - sys := provider.NewSystem(p, r) - - if isOnline { - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - sys.Run() - return nil - }, - OnStop: func(ctx context.Context) error { - return sys.Close() - }, - }) +func ProviderSys(reprovideInterval time.Duration, acceleratedDHTClient bool) fx.Option { + const magicThroughputReportCount = 128 + return fx.Provide(func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, keyProvider provider.KeyChanFunc, repo repo.Repo, bs blockstore.Blockstore) (provider.System, error) { + opts := []provider.Option{ + provider.Online(cr), + provider.ReproviderInterval(reprovideInterval), + provider.KeyProvider(keyProvider), } - - return sys - } -} - -// BatchedProviderSys creates new provider system -func BatchedProviderSys(isOnline bool, reprovideInterval time.Duration) interface{} { - return func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, q *q.Queue, keyProvider simple.KeyChanFunc, repo repo.Repo) (provider.System, error) { - sys, err := batched.New(cr, q, - batched.ReproviderInterval(reprovideInterval), - batched.Datastore(repo.Datastore()), - batched.KeyProvider(keyProvider)) + if !acceleratedDHTClient { + // The estimation kinda suck if you are running with accelerated DHT client, + // given this message is just trying to push people to use the acceleratedDHTClient + // let's not report on through if it's in use + opts = append(opts, + provider.ThroughputReport(func(reprovide bool, complete bool, keysProvided uint, duration time.Duration) bool { + avgProvideSpeed := duration / time.Duration(keysProvided) + count := uint64(keysProvided) + + if !reprovide || !complete { + // We don't know how many CIDs we have to provide, try to fetch it from the blockstore. + // But don't try for too long as this might be very expensive if you have a huge datastore. + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + defer cancel() + + // FIXME: I want a running counter of blocks so size of blockstore can be an O(1) lookup. + ch, err := bs.AllKeysChan(ctx) + if err != nil { + logger.Errorf("fetching AllKeysChain in provider ThroughputReport: %v", err) + return false + } + count = 0 + countLoop: + for { + select { + case _, ok := <-ch: + if !ok { + break countLoop + } + count++ + case <-ctx.Done(): + // really big blockstore mode + + // how many blocks would be in a 10TiB blockstore with 128KiB blocks. + const probableBigBlockstore = (10 * 1024 * 1024 * 1024 * 1024) / (128 * 1024) + // How long per block that lasts us. + expectedProvideSpeed := reprovideInterval / probableBigBlockstore + if avgProvideSpeed > expectedProvideSpeed { + logger.Errorf(` +🔔🔔🔔 YOU MAY BE FALLING BEHIND DHT REPROVIDES! 🔔🔔🔔 + +⚠️ Your system might be struggling to keep up with DHT reprovides! +This means your content could partially or completely inaccessible on the network. +We observed that you recently provided %d keys at an average rate of %v per key. + +🕑 An attempt to estimate your blockstore size timed out after 5 minutes, +implying your blockstore might be exceedingly large. Assuming a considerable +size of 10TiB, it would take %v to provide the complete set. + +⏰ The total provide time needs to stay under your reprovide interval (%v) to prevent falling behind! + +💡 Consider enabling the Accelerated DHT to enhance your system performance. See: +https://github.com/ipfs/kubo/blob/master/docs/config.md#routingaccelerateddhtclient`, + keysProvided, avgProvideSpeed, avgProvideSpeed*probableBigBlockstore, reprovideInterval) + return false + } + } + } + } + + // How long per block that lasts us. + expectedProvideSpeed := reprovideInterval / time.Duration(count) + if avgProvideSpeed > expectedProvideSpeed { + // FIXME(@Jorropo): add link to the accelerated DHT client docs once this isn't experimental anymore. + logger.Errorf(` +🔔🔔🔔 YOU ARE FALLING BEHIND DHT REPROVIDES! 🔔🔔🔔 + +⚠️ Your system is struggling to keep up with DHT reprovides! +This means your content could partially or completely inaccessible on the network. +We observed that you recently provided %d keys at an average rate of %v per key. + +💾 Your total CID count is ~%d which would total at %v reprovide process. + +⏰ The total provide time needs to stay under your reprovide interval (%v) to prevent falling behind! + +💡 Consider enabling the Accelerated DHT to enhance your reprovide throughput. See: +https://github.com/ipfs/kubo/blob/master/docs/config.md#routingaccelerateddhtclient`, + keysProvided, avgProvideSpeed, count, avgProvideSpeed*time.Duration(count), reprovideInterval) + } + return false + }, magicThroughputReportCount)) + } + sys, err := provider.New(repo.Datastore(), opts...) if err != nil { return nil, err } - if isOnline { - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - sys.Run() - return nil - }, - OnStop: func(ctx context.Context) error { - return sys.Close() - }, - }) - } + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return sys.Close() + }, + }) return sys, nil - } + }) } // ONLINE/OFFLINE // OnlineProviders groups units managing provider routing records online -func OnlineProviders(useStrategicProviding bool, useBatchedProviding bool, reprovideStrategy string, reprovideInterval time.Duration) fx.Option { - if useStrategicProviding { - return fx.Provide(provider.NewOfflineProvider) - } - - return fx.Options( - SimpleProviders(reprovideStrategy, reprovideInterval), - maybeProvide(SimpleProviderSys(true), !useBatchedProviding), - maybeProvide(BatchedProviderSys(true, reprovideInterval), useBatchedProviding), - ) -} - -// OfflineProviders groups units managing provider routing records offline -func OfflineProviders(useStrategicProviding bool, useBatchedProviding bool, reprovideStrategy string, reprovideInterval time.Duration) fx.Option { +func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval time.Duration, acceleratedDHTClient bool) fx.Option { if useStrategicProviding { - return fx.Provide(provider.NewOfflineProvider) + return OfflineProviders() } - return fx.Options( - SimpleProviders(reprovideStrategy, reprovideInterval), - maybeProvide(SimpleProviderSys(false), true), - //maybeProvide(BatchedProviderSys(false, reprovideInterval), useBatchedProviding), - ) -} - -// SimpleProviders creates the simple provider/reprovider dependencies -func SimpleProviders(reprovideStrategy string, reproviderInterval time.Duration) fx.Option { var keyProvider fx.Option switch reprovideStrategy { - case "all": - fallthrough - case "": - keyProvider = fx.Provide(simple.NewBlockstoreProvider) + case "all", "": + keyProvider = fx.Provide(provider.NewBlockstoreProvider) case "roots": keyProvider = fx.Provide(pinnedProviderStrategy(true)) case "pinned": keyProvider = fx.Provide(pinnedProviderStrategy(false)) default: - return fx.Error(fmt.Errorf("unknown reprovider strategy '%s'", reprovideStrategy)) + return fx.Error(fmt.Errorf("unknown reprovider strategy %q", reprovideStrategy)) } return fx.Options( - fx.Provide(ProviderQueue), - fx.Provide(SimpleProvider), keyProvider, - fx.Provide(SimpleReprovider(reproviderInterval)), + ProviderSys(reprovideInterval, acceleratedDHTClient), ) } +// OfflineProviders groups units managing provider routing records offline +func OfflineProviders() fx.Option { + return fx.Provide(provider.NewNoopProvider) +} + func pinnedProviderStrategy(onlyRoots bool) interface{} { type input struct { fx.In Pinner pin.Pinner IPLDFetcher fetcher.Factory `name:"ipldFetcher"` } - return func(in input) simple.KeyChanFunc { - return simple.NewPinnedProvider(onlyRoots, in.Pinner, in.IPLDFetcher) + return func(in input) provider.KeyChanFunc { + return provider.NewPinnedProvider(onlyRoots, in.Pinner, in.IPLDFetcher) } } diff --git a/docs/examples/kubo-as-a-library/go.mod b/docs/examples/kubo-as-a-library/go.mod index 8a80a42c9602..2d1692740eaf 100644 --- a/docs/examples/kubo-as-a-library/go.mod +++ b/docs/examples/kubo-as-a-library/go.mod @@ -21,7 +21,6 @@ require ( github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect - github.com/cenkalti/backoff v2.2.1+incompatible // indirect github.com/cenkalti/backoff/v4 v4.2.0 // indirect github.com/ceramicnetwork/go-dag-jose v0.1.0 // indirect github.com/cespare/xxhash v1.1.0 // indirect diff --git a/docs/examples/kubo-as-a-library/go.sum b/docs/examples/kubo-as-a-library/go.sum index 5c13ec7c5a72..7a72afaf1a75 100644 --- a/docs/examples/kubo-as-a-library/go.sum +++ b/docs/examples/kubo-as-a-library/go.sum @@ -74,8 +74,6 @@ github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= -github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4= github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/go.mod b/go.mod index 39443c36d1e7..742a0badf413 100644 --- a/go.mod +++ b/go.mod @@ -82,6 +82,7 @@ require ( go.uber.org/multierr v1.11.0 go.uber.org/zap v1.24.0 golang.org/x/crypto v0.9.0 + golang.org/x/exp v0.0.0-20230321023759-10a507213a29 golang.org/x/mod v0.10.0 golang.org/x/sync v0.1.0 golang.org/x/sys v0.8.0 @@ -93,7 +94,6 @@ require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cenkalti/backoff v2.2.1+incompatible // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/cgroups v1.1.0 // indirect @@ -215,7 +215,6 @@ require ( go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.uber.org/atomic v1.10.0 // indirect go4.org v0.0.0-20230225012048-214862532bf5 // indirect - golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect golang.org/x/net v0.10.0 // indirect golang.org/x/oauth2 v0.5.0 // indirect golang.org/x/term v0.8.0 // indirect diff --git a/go.sum b/go.sum index aec474d1cecd..ca0a6621cb02 100644 --- a/go.sum +++ b/go.sum @@ -84,8 +84,6 @@ github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= -github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4= github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index cb8de08c1deb..3c4a709c1955 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -37,7 +37,7 @@ const LockFile = "repo.lock" var log = logging.Logger("fsrepo") // RepoVersion is the version number that we are currently expecting to see -var RepoVersion = 13 +var RepoVersion = 14 var migrationInstructions = `See https://github.com/ipfs/fs-repo-migrations/blob/master/run.md Sorry for the inconvenience. In the future, these will run automatically.` diff --git a/repo/fsrepo/migrations/fetcher.go b/repo/fsrepo/migrations/fetcher.go index c174b5e77751..1dc4d0345c78 100644 --- a/repo/fsrepo/migrations/fetcher.go +++ b/repo/fsrepo/migrations/fetcher.go @@ -11,7 +11,7 @@ import ( const ( // Current distribution to fetch migrations from - CurrentIpfsDist = "/ipfs/Qmf4yftD4LuMo8JMNPqqw3BtUwYd2VkXMiAThuPE6usrbQ" // fs-repo-12-to-13 v1.0.0 + CurrentIpfsDist = "/ipfs/QmYerugGRCZWA8yQMKDsd9daEVXUR3C5nuw3VXuX1mggHa" // fs-repo-13-to-14 v1.0.0 // Latest distribution path. Default for fetchers. LatestIpfsDist = "/ipns/dist.ipfs.tech"