From 4594422aab8d4b57fbd75d224df830a40dd877d3 Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Tue, 15 Feb 2022 20:32:14 -0300 Subject: [PATCH] feat(libp2p v0.18.0): add resource manager stats/limit commands --- core/commands/commands_test.go | 2 + core/commands/swarm.go | 416 ++++++++++++++++++++++++++++++++- core/core.go | 24 +- core/node/groups.go | 2 + core/node/libp2p/rcmgr.go | 39 ++-- 5 files changed, 446 insertions(+), 37 deletions(-) diff --git a/core/commands/commands_test.go b/core/commands/commands_test.go index 964baad9299..b0980f13126 100644 --- a/core/commands/commands_test.go +++ b/core/commands/commands_test.go @@ -237,11 +237,13 @@ func TestCommands(t *testing.T) { "/swarm/filters", "/swarm/filters/add", "/swarm/filters/rm", + "/swarm/limit", "/swarm/peers", "/swarm/peering", "/swarm/peering/add", "/swarm/peering/ls", "/swarm/peering/rm", + "/swarm/stats", "/tar", "/tar/add", "/tar/cat", diff --git a/core/commands/swarm.go b/core/commands/swarm.go index 7c7ee3e814f..c2291e02ce2 100644 --- a/core/commands/swarm.go +++ b/core/commands/swarm.go @@ -1,24 +1,31 @@ package commands import ( + "bytes" "context" + "encoding/json" "errors" "fmt" "io" + "os" "path" "sort" + "strings" "sync" "time" - commands "github.com/ipfs/go-ipfs/commands" - cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv" - repo "github.com/ipfs/go-ipfs/repo" - fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo" + "github.com/ipfs/go-ipfs/commands" + "github.com/ipfs/go-ipfs/core/commands/cmdenv" + "github.com/ipfs/go-ipfs/repo" + "github.com/ipfs/go-ipfs/repo/fsrepo" cmds "github.com/ipfs/go-ipfs-cmds" config "github.com/ipfs/go-ipfs-config" + "github.com/libp2p/go-libp2p-core/network" inet "github.com/libp2p/go-libp2p-core/network" - peer "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + rcmgr "github.com/libp2p/go-libp2p-resource-manager" ma "github.com/multiformats/go-multiaddr" madns "github.com/multiformats/go-multiaddr-dns" mamask "github.com/whyrusleeping/multiaddr-filter" @@ -52,6 +59,8 @@ ipfs peers in the internet. "filters": swarmFiltersCmd, "peers": swarmPeersCmd, "peering": swarmPeeringCmd, + "stats": swarmStatsCmd, + "limit": swarmLimitCmd, }, } @@ -304,6 +313,403 @@ var swarmPeersCmd = &cmds.Command{ Type: connInfos{}, } +var swarmStatsCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Report resource usage for a scope.", + LongDescription: `Report resource usage for a scope. + The scope can be one of the following: + - system -- reports the system aggregate resource usage. + - transient -- reports the transient resource usage. + - svc: -- reports the resource usage of a specific service. + - proto: -- reports the resource usage of a specific protocol. + - peer: -- reports the resource usage of a specific peer. + - all -- reports the resource usage for all currently active scopes. +`}, + Arguments: []cmds.Argument{ + cmds.StringArg("scope", true, false, "scope of the stat report"), + }, + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + node, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + if node.ResourceManager == nil { + return fmt.Errorf("no resource manager available, make sure the daemon is running") + } + + if len(req.Arguments) != 1 { + return fmt.Errorf("must specify exactly one scope") + } + scope := req.Arguments[0] + result, err := NetStat(node.ResourceManager, req.Context, scope) + if err != nil { + return err + } + + b := new(bytes.Buffer) + enc := json.NewEncoder(b) + err = enc.Encode(result) + if err != nil { + return err + } + return cmds.EmitOnce(res, b) + }, +} + +var swarmLimitCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Get or set resource limits for a scope.", + LongDescription: `Get or set resource limits for a scope. + The scope can be one of the following: + - system -- reports the system aggregate resource usage. + - transient -- reports the transient resource usage. + - svc: -- reports the resource usage of a specific service. + - proto: -- reports the resource usage of a specific protocol. + - peer: -- reports the resource usage of a specific peer. + The limit is json-formatted, with the same structure as the limits file. +`}, + Arguments: []cmds.Argument{ + cmds.StringArg("scope", true, false, "scope of the limit"), + cmds.StringArg("limit", false, false, "path of the limit configuration file"), + }, + Options: []cmds.Option{ + cmds.BoolOption("set", "s", "Set the limit for a scope (instead of viewing it).").WithDefault(false), + }, + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + node, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + if node.ResourceManager == nil { + return fmt.Errorf("no resource manager available, make sure the daemon is running") + } + + setLimit, _ := req.Options["set"].(bool) + if setLimit { + if len(req.Arguments) != 2 { + return fmt.Errorf("must specify exactly a scope and a limit") + } + + scope := req.Arguments[0] + limitPath := req.Arguments[1] + limitStr, err := os.ReadFile(limitPath) + if err != nil { + return fmt.Errorf("error opening limit JSON file: %w", err) + } + + var limit NetLimitConfig + err = json.Unmarshal([]byte(limitStr), &limit) + if err != nil { + return fmt.Errorf("error decoding limit: %w", err) + } + + return NetSetLimit(node.ResourceManager, req.Context, scope, limit) + } + + if len(req.Arguments) != 1 { + return fmt.Errorf("must specify exactly one scope") + } + scope := req.Arguments[0] + result, err := NetLimit(node.ResourceManager, req.Context, scope) + if err != nil { + return err + } + + b := new(bytes.Buffer) + enc := json.NewEncoder(b) + err = enc.Encode(result) + if err != nil { + return err + } + return cmds.EmitOnce(res, b) + }, +} + +// FIXME(BLOCKING): Decide where to move the net stat/limit logic and types. + +type NetStatOut struct { + System *network.ScopeStat `json:",omitempty"` + Transient *network.ScopeStat `json:",omitempty"` + Services map[string]network.ScopeStat `json:",omitempty"` + Protocols map[string]network.ScopeStat `json:",omitempty"` + Peers map[string]network.ScopeStat `json:",omitempty"` +} + +type NetLimitConfig struct { + Dynamic bool `json:",omitempty"` + // set if Dynamic is false + Memory int64 `json:",omitempty"` + // set if Dynamic is true + MemoryFraction float64 `json:",omitempty"` + MinMemory int64 `json:",omitempty"` + MaxMemory int64 `json:",omitempty"` + + Streams, StreamsInbound, StreamsOutbound int + Conns, ConnsInbound, ConnsOutbound int + FD int +} + +func NetStat(mgr network.ResourceManager, ctx context.Context, scope string) (NetStatOut, error) { + var err error + var result NetStatOut + switch { + case scope == "all": + rapi, ok := mgr.(rcmgr.ResourceManagerState) + if !ok { + return result, fmt.Errorf("resource manager does not support ResourceManagerState API") + } + + stat := rapi.Stat() + result.System = &stat.System + result.Transient = &stat.Transient + if len(stat.Services) > 0 { + result.Services = stat.Services + } + if len(stat.Protocols) > 0 { + result.Protocols = make(map[string]network.ScopeStat, len(stat.Protocols)) + for proto, stat := range stat.Protocols { + result.Protocols[string(proto)] = stat + } + } + if len(stat.Peers) > 0 { + result.Peers = make(map[string]network.ScopeStat, len(stat.Peers)) + for p, stat := range stat.Peers { + result.Peers[p.Pretty()] = stat + } + } + + return result, nil + + case scope == "system": + err = mgr.ViewSystem(func(s network.ResourceScope) error { + stat := s.Stat() + result.System = &stat + return nil + }) + return result, err + + case scope == "transient": + err = mgr.ViewTransient(func(s network.ResourceScope) error { + stat := s.Stat() + result.Transient = &stat + return nil + }) + return result, err + + case strings.HasPrefix(scope, "svc:"): + svc := scope[4:] + err = mgr.ViewService(svc, func(s network.ServiceScope) error { + stat := s.Stat() + result.Services = map[string]network.ScopeStat{ + svc: stat, + } + return nil + }) + return result, err + + case strings.HasPrefix(scope, "proto:"): + proto := scope[6:] + err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { + stat := s.Stat() + result.Protocols = map[string]network.ScopeStat{ + proto: stat, + } + return nil + }) + return result, err + + case strings.HasPrefix(scope, "peer:"): + p := scope[5:] + pid, err := peer.Decode(p) + if err != nil { + return result, fmt.Errorf("invalid peer ID: %s: %w", p, err) + } + err = mgr.ViewPeer(pid, func(s network.PeerScope) error { + stat := s.Stat() + result.Peers = map[string]network.ScopeStat{ + p: stat, + } + return nil + }) + return result, err + + default: + return result, fmt.Errorf("invalid scope %s", scope) + } +} + +func NetLimit(mgr network.ResourceManager, ctx context.Context, scope string) (NetLimitConfig, error) { + var result NetLimitConfig + getLimit := func(s network.ResourceScope) error { + limiter, ok := s.(rcmgr.ResourceScopeLimiter) + if !ok { + return fmt.Errorf("resource scope doesn't implement ResourceScopeLimiter interface") + } + + limit := limiter.Limit() + switch l := limit.(type) { + case *rcmgr.StaticLimit: + result.Memory = l.Memory + result.Streams = l.BaseLimit.Streams + result.StreamsInbound = l.BaseLimit.StreamsInbound + result.StreamsOutbound = l.BaseLimit.StreamsOutbound + result.Conns = l.BaseLimit.Conns + result.ConnsInbound = l.BaseLimit.ConnsInbound + result.ConnsOutbound = l.BaseLimit.ConnsOutbound + result.FD = l.BaseLimit.FD + + case *rcmgr.DynamicLimit: + result.Dynamic = true + result.MemoryFraction = l.MemoryLimit.MemoryFraction + result.MinMemory = l.MemoryLimit.MinMemory + result.MaxMemory = l.MemoryLimit.MaxMemory + result.Streams = l.BaseLimit.Streams + result.StreamsInbound = l.BaseLimit.StreamsInbound + result.StreamsOutbound = l.BaseLimit.StreamsOutbound + result.Conns = l.BaseLimit.Conns + result.ConnsInbound = l.BaseLimit.ConnsInbound + result.ConnsOutbound = l.BaseLimit.ConnsOutbound + result.FD = l.BaseLimit.FD + + default: + return fmt.Errorf("unknown limit type %T", limit) + } + + return nil + } + + switch { + case scope == "system": + err := mgr.ViewSystem(func(s network.ResourceScope) error { + return getLimit(s) + }) + return result, err + + case scope == "transient": + err := mgr.ViewTransient(func(s network.ResourceScope) error { + return getLimit(s) + }) + return result, err + + case strings.HasPrefix(scope, "svc:"): + svc := scope[4:] + err := mgr.ViewService(svc, func(s network.ServiceScope) error { + return getLimit(s) + }) + return result, err + + case strings.HasPrefix(scope, "proto:"): + proto := scope[6:] + err := mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { + return getLimit(s) + }) + return result, err + + case strings.HasPrefix(scope, "peer:"): + p := scope[5:] + pid, err := peer.Decode(p) + if err != nil { + return result, fmt.Errorf("invalid peer ID: %s: %w", p, err) + } + err = mgr.ViewPeer(pid, func(s network.PeerScope) error { + return getLimit(s) + }) + return result, err + + default: + return result, fmt.Errorf("invalid scope %s", scope) + } +} + +func NetSetLimit(mgr network.ResourceManager, ctx context.Context, scope string, limit NetLimitConfig) error { + setLimit := func(s network.ResourceScope) error { + limiter, ok := s.(rcmgr.ResourceScopeLimiter) + if !ok { + return fmt.Errorf("resource scope doesn't implement ResourceScopeLimiter interface") + } + + var newLimit rcmgr.Limit + if limit.Dynamic { + newLimit = &rcmgr.DynamicLimit{ + MemoryLimit: rcmgr.MemoryLimit{ + MemoryFraction: limit.MemoryFraction, + MinMemory: limit.MinMemory, + MaxMemory: limit.MaxMemory, + }, + BaseLimit: rcmgr.BaseLimit{ + Streams: limit.Streams, + StreamsInbound: limit.StreamsInbound, + StreamsOutbound: limit.StreamsOutbound, + Conns: limit.Conns, + ConnsInbound: limit.ConnsInbound, + ConnsOutbound: limit.ConnsOutbound, + FD: limit.FD, + }, + } + } else { + newLimit = &rcmgr.StaticLimit{ + Memory: limit.Memory, + BaseLimit: rcmgr.BaseLimit{ + Streams: limit.Streams, + StreamsInbound: limit.StreamsInbound, + StreamsOutbound: limit.StreamsOutbound, + Conns: limit.Conns, + ConnsInbound: limit.ConnsInbound, + ConnsOutbound: limit.ConnsOutbound, + FD: limit.FD, + }, + } + } + + limiter.SetLimit(newLimit) + return nil + } + + switch { + case scope == "system": + err := mgr.ViewSystem(func(s network.ResourceScope) error { + return setLimit(s) + }) + return err + + case scope == "transient": + err := mgr.ViewTransient(func(s network.ResourceScope) error { + return setLimit(s) + }) + return err + + case strings.HasPrefix(scope, "svc:"): + svc := scope[4:] + err := mgr.ViewService(svc, func(s network.ServiceScope) error { + return setLimit(s) + }) + return err + + case strings.HasPrefix(scope, "proto:"): + proto := scope[6:] + err := mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { + return setLimit(s) + }) + return err + + case strings.HasPrefix(scope, "peer:"): + p := scope[5:] + pid, err := peer.Decode(p) + if err != nil { + return fmt.Errorf("invalid peer ID: %s: %w", p, err) + } + err = mgr.ViewPeer(pid, func(s network.PeerScope) error { + return setLimit(s) + }) + return err + + default: + return fmt.Errorf("invalid scope %s", scope) + } +} + type streamInfo struct { Protocol string } diff --git a/core/core.go b/core/core.go index 888d3d78013..1a09b85e83f 100644 --- a/core/core.go +++ b/core/core.go @@ -30,6 +30,7 @@ import ( ic "github.com/libp2p/go-libp2p-core/crypto" p2phost "github.com/libp2p/go-libp2p-core/host" metrics "github.com/libp2p/go-libp2p-core/metrics" + "github.com/libp2p/go-libp2p-core/network" peer "github.com/libp2p/go-libp2p-core/peer" pstore "github.com/libp2p/go-libp2p-core/peerstore" routing "github.com/libp2p/go-libp2p-core/routing" @@ -85,17 +86,18 @@ type IpfsNode struct { RecordValidator record.Validator // Online - PeerHost p2phost.Host `optional:"true"` // the network host (server+client) - Peering *peering.PeeringService `optional:"true"` - Filters *ma.Filters `optional:"true"` - Bootstrapper io.Closer `optional:"true"` // the periodic bootstrapper - Routing routing.Routing `optional:"true"` // the routing system. recommend ipfs-dht - DNSResolver *madns.Resolver // the DNS resolver - Exchange exchange.Interface // the block exchange + strategy (bitswap) - Namesys namesys.NameSystem // the name system, resolves paths to hashes - Provider provider.System // the value provider system - IpnsRepub *ipnsrp.Republisher `optional:"true"` - GraphExchange graphsync.GraphExchange `optional:"true"` + PeerHost p2phost.Host `optional:"true"` // the network host (server+client) + Peering *peering.PeeringService `optional:"true"` + Filters *ma.Filters `optional:"true"` + Bootstrapper io.Closer `optional:"true"` // the periodic bootstrapper + Routing routing.Routing `optional:"true"` // the routing system. recommend ipfs-dht + DNSResolver *madns.Resolver // the DNS resolver + Exchange exchange.Interface // the block exchange + strategy (bitswap) + Namesys namesys.NameSystem // the name system, resolves paths to hashes + Provider provider.System // the value provider system + IpnsRepub *ipnsrp.Republisher `optional:"true"` + GraphExchange graphsync.GraphExchange `optional:"true"` + ResourceManager network.ResourceManager `optional:"true"` PubSub *pubsub.PubSub `optional:"true"` PSRouter *psrouter.PubsubValueStore `optional:"true"` diff --git a/core/node/groups.go b/core/node/groups.go index 3baea1aa8e0..c3bb966e247 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -144,7 +144,9 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { opts := fx.Options( BaseLibP2P, + // Services (resource management) fx.Provide(libp2p.ResourceManager()), + fx.Provide(libp2p.AddrFilters(cfg.Swarm.AddrFilters)), fx.Provide(libp2p.AddrsFactory(cfg.Addresses.Announce, cfg.Addresses.AppendAnnounce, cfg.Addresses.NoAnnounce)), fx.Provide(libp2p.SmuxTransport(cfg.Swarm.Transports)), diff --git a/core/node/libp2p/rcmgr.go b/core/node/libp2p/rcmgr.go index dcd80dc4699..233bb9aeab0 100644 --- a/core/node/libp2p/rcmgr.go +++ b/core/node/libp2p/rcmgr.go @@ -1,40 +1,37 @@ package libp2p import ( - "errors" + "context" "fmt" - "os" - + "github.com/ipfs/go-ipfs/repo" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/network" rcmgr "github.com/libp2p/go-libp2p-resource-manager" + "go.uber.org/fx" ) -func ResourceManager() func() (Libp2pOpts, error) { - return func() (opts Libp2pOpts, err error) { +func ResourceManager() func(fx.Lifecycle, repo.Repo) (network.ResourceManager, Libp2pOpts, error) { + return func(lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) { var limiter *rcmgr.BasicLimiter + var opts Libp2pOpts - limitsIn, err := os.Open("./limits.json") - switch { - case err == nil: - defer limitsIn.Close() - limiter, err = rcmgr.NewDefaultLimiterFromJSON(limitsIn) - if err != nil { - return opts, fmt.Errorf("error parsing limit file: %w", err) - } - case errors.Is(err, os.ErrNotExist): - limiter = rcmgr.NewDefaultLimiter() - default: - return opts, err - } + // FIXME(BLOCKING): Decide how is the `limit.json` file path going to be consumed, + // either by default in the repo root or through the `go-ipfs-config`. + limiter = rcmgr.NewDefaultLimiter() libp2p.SetDefaultServiceLimits(limiter) - // TODO: close the resource manager when the node is shut down rcmgr, err := rcmgr.NewResourceManager(limiter) if err != nil { - return opts, fmt.Errorf("error creating resource manager: %w", err) + return nil, opts, fmt.Errorf("error creating resource manager: %w", err) } opts.Opts = append(opts.Opts, libp2p.ResourceManager(rcmgr)) - return opts, nil + + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return rcmgr.Close() + }}) + + return rcmgr, opts, nil } }