Skip to content

Commit

Permalink
Merge 96ceebd into 5a3a27b
Browse files Browse the repository at this point in the history
  • Loading branch information
carpawell authored May 27, 2021
2 parents 5a3a27b + 96ceebd commit 6555984
Show file tree
Hide file tree
Showing 25 changed files with 339 additions and 88 deletions.
13 changes: 12 additions & 1 deletion cmd/neofs-cli/modules/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"crypto/ecdsa"
"crypto/rand"
"crypto/tls"
"encoding/hex"
"errors"
"fmt"
Expand Down Expand Up @@ -240,7 +241,17 @@ func getSDKClient(key *ecdsa.PrivateKey) (client.Client, error) {
return nil, errInvalidEndpoint
}

c, err := client.New(client.WithAddress(hostAddr), client.WithDefaultPrivateKey(key))
options := []client.Option{
client.WithAddress(hostAddr),
client.WithDefaultPrivateKey(key),
}

if netAddr.TLSEnabled() {
options = append(options, client.WithTLSConfig(&tls.Config{}))
}

c, err := client.New(options...)

return c, err
}

Expand Down
29 changes: 29 additions & 0 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ const (

// config keys for cfgGRPC
cfgListenAddress = "grpc.endpoint"
cfgTLSEnabled = "grpc.tls.enabled"
cfgTLSCertFile = "grpc.tls.certificate"
cfgTLSKeyFile = "grpc.tls.key"

// config keys for API client cache
cfgAPIClientDialTimeout = "apiclient.dial_timeout"
Expand Down Expand Up @@ -206,6 +209,10 @@ type cfgGRPC struct {
maxChunkSize uint64

maxAddrAmount uint64

tlsEnabled bool
tlsCertFile string
tlsKeyFile string
}

type cfgMorph struct {
Expand Down Expand Up @@ -335,6 +342,22 @@ func initCfg(path string) *cfg {
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes

var (
tlsEnabled bool
tlsCertFile string
tlsKeyFile string
)

if viperCfg.GetBool(cfgTLSEnabled) {
tlsEnabled = true
tlsCertFile = viperCfg.GetString(cfgTLSCertFile)
tlsKeyFile = viperCfg.GetString(cfgTLSKeyFile)
}

if tlsEnabled {
netAddr.AddTLS()
}

state := newNetworkState()

containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
Expand Down Expand Up @@ -377,6 +400,9 @@ func initCfg(path string) *cfg {
cfgGRPC: cfgGRPC{
maxChunkSize: maxChunkSize,
maxAddrAmount: maxAddrAmount,
tlsEnabled: tlsEnabled,
tlsCertFile: tlsCertFile,
tlsKeyFile: tlsKeyFile,
},
localAddr: netAddr,
respSvc: response.NewService(
Expand Down Expand Up @@ -430,6 +456,9 @@ func defaultConfiguration(v *viper.Viper) {
v.SetDefault(cfgMorphNotifyDialTimeout, 5*time.Second)

v.SetDefault(cfgListenAddress, "127.0.0.1:50501") // listen address
v.SetDefault(cfgTLSEnabled, false)
v.SetDefault(cfgTLSCertFile, "")
v.SetDefault(cfgTLSKeyFile, "")

v.SetDefault(cfgAPIClientDialTimeout, 5*time.Second)

Expand Down
6 changes: 3 additions & 3 deletions cmd/neofs-node/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ type remoteLoadAnnounceProvider struct {
loadAddrSrc network.LocalAddressSource

clientCache interface {
Get(string) (apiClient.Client, error)
Get(*network.Address) (apiClient.Client, error)
}

deadEndProvider loadcontroller.WriterProvider
Expand All @@ -219,12 +219,12 @@ func (r *remoteLoadAnnounceProvider) InitRemote(srv loadroute.ServerInfo) (loadc
return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil
}

hostAddr, err := network.HostAddrFromMultiaddr(addr)
netAddr, err := network.AddressFromString(addr)
if err != nil {
return nil, fmt.Errorf("could not convert address to IP format: %w", err)
}

c, err := r.clientCache.Get(hostAddr)
c, err := r.clientCache.Get(netAddr)
if err != nil {
return nil, fmt.Errorf("could not initialize API client: %w", err)
}
Expand Down
16 changes: 14 additions & 2 deletions cmd/neofs-node/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

func initGRPC(c *cfg) {
Expand All @@ -15,9 +16,20 @@ func initGRPC(c *cfg) {
c.cfgGRPC.listener, err = net.Listen("tcp", c.viper.GetString(cfgListenAddress))
fatalOnErr(err)

c.cfgGRPC.server = grpc.NewServer(
serverOpts := []grpc.ServerOption{
grpc.MaxSendMsgSize(maxMsgSize),
)
}

if c.cfgGRPC.tlsEnabled {
creds, err := credentials.NewServerTLSFromFile(c.cfgGRPC.tlsCertFile, c.cfgGRPC.tlsKeyFile)
if err != nil {
fatalOnErr(fmt.Errorf("could not read credentionals from file: %w", err))
}

serverOpts = append(serverOpts, grpc.Creds(creds))
}

c.cfgGRPC.server = grpc.NewServer(serverOpts...)

c.onShutdown(func() {
stopGRPC("NeoFS Public API", c.cfgGRPC.server, c.log)
Expand Down
8 changes: 4 additions & 4 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ type reputationClientConstructor struct {
trustStorage *truststorage.Storage

basicConstructor interface {
Get(string) (client.Client, error)
Get(*network.Address) (client.Client, error)
}
}

Expand Down Expand Up @@ -485,7 +485,7 @@ func (c *reputationClient) SearchObject(ctx context.Context, prm *client.SearchO
return ids, err
}

func (c *reputationClientConstructor) Get(addr string) (client.Client, error) {
func (c *reputationClientConstructor) Get(addr *network.Address) (client.Client, error) {
cl, err := c.basicConstructor.Get(addr)
if err != nil {
return nil, err
Expand All @@ -494,9 +494,9 @@ func (c *reputationClientConstructor) Get(addr string) (client.Client, error) {
nm, err := netmap.GetLatestNetworkMap(c.nmSrc)
if err == nil {
for i := range nm.Nodes {
hostAddr, err := network.HostAddrFromMultiaddr(nm.Nodes[i].Address())
netAddr, err := network.AddressFromString(nm.Nodes[i].Address())
if err == nil {
if hostAddr == addr {
if netAddr.Equal(addr) {
prm := truststorage.UpdatePrm{}
prm.SetPeer(reputation.PeerIDFromBytes(nm.Nodes[i].PublicKey()))

Expand Down
6 changes: 3 additions & 3 deletions cmd/neofs-node/reputation/common/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type clientCache interface {
Get(string) (apiClient.Client, error)
Get(*network.Address) (apiClient.Client, error)
}

// clientKeyRemoteProvider must provide remote writer and take into account
Expand Down Expand Up @@ -77,12 +77,12 @@ func (rtp *remoteTrustProvider) InitRemote(srv reputationcommon.ServerInfo) (rep
return trustcontroller.SimpleWriterProvider(new(NopReputationWriter)), nil
}

hostAddr, err := network.HostAddrFromMultiaddr(addr)
netAddr, err := network.AddressFromString(addr)
if err != nil {
return nil, fmt.Errorf("could not convert address to IP format: %w", err)
}

c, err := rtp.clientCache.Get(hostAddr)
c, err := rtp.clientCache.Get(netAddr)
if err != nil {
return nil, fmt.Errorf("could not initialize API client: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/innerring/processors/audit/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,14 @@ func (ap *Processor) findStorageGroups(cid *container.ID, shuffled netmap.Nodes)
zap.Int("total_tries", ln),
)

addr, err := network.HostAddrFromMultiaddr(shuffled[i].Address())
netAddr, err := network.AddressFromString(shuffled[i].Address())
if err != nil {
log.Warn("can't parse remote address", zap.String("error", err.Error()))

continue
}

cli, err := ap.clientCache.Get(addr)
cli, err := ap.clientCache.Get(netAddr)
if err != nil {
log.Warn("can't setup remote connection", zap.String("error", err.Error()))

Expand Down
3 changes: 2 additions & 1 deletion pkg/innerring/processors/audit/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
wrapContainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
wrapNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/audit"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
Expand All @@ -28,7 +29,7 @@ type (

// NeoFSClientCache is an interface for cache of neofs RPC clients
NeoFSClientCache interface {
Get(address string) (SDKClient.Client, error)
Get(address *network.Address) (SDKClient.Client, error)
}

TaskManager interface {
Expand Down
22 changes: 11 additions & 11 deletions pkg/innerring/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type (
ClientCache struct {
log *zap.Logger
cache interface {
Get(string) (client.Client, error)
Get(address *network.Address) (client.Client, error)
}
key *ecdsa.PrivateKey

Expand All @@ -48,7 +48,7 @@ func newClientCache(p *clientCacheParams) *ClientCache {
}
}

func (c *ClientCache) Get(address string) (client.Client, error) {
func (c *ClientCache) Get(address *network.Address) (client.Client, error) {
// Because cache is used by `ClientCache` exclusively,
// client will always have valid key.
return c.cache.Get(address)
Expand All @@ -74,7 +74,7 @@ func (c *ClientCache) getSG(ctx context.Context, addr *object.Address, nm *netma
getParams.WithAddress(addr)

for _, node := range placement.FlattenNodes(nodes) {
addr, err := network.HostAddrFromMultiaddr(node.Address())
netAddr, err := network.AddressFromString(node.Address())
if err != nil {
c.log.Warn("can't parse remote address",
zap.String("address", node.Address()),
Expand All @@ -83,10 +83,10 @@ func (c *ClientCache) getSG(ctx context.Context, addr *object.Address, nm *netma
continue
}

cli, err := c.Get(addr)
cli, err := c.Get(netAddr)
if err != nil {
c.log.Warn("can't setup remote connection",
zap.String("address", addr),
zap.String("address", netAddr.String()),
zap.String("error", err.Error()))

continue
Expand Down Expand Up @@ -136,14 +136,14 @@ func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object.
headParams.WithMainFields()
headParams.WithAddress(objAddress)

addr, err := network.HostAddrFromMultiaddr(node.Address())
netAddr, err := network.AddressFromString(node.Address())
if err != nil {
return nil, fmt.Errorf("can't parse remote address %s: %w", node.Address(), err)
}

cli, err := c.Get(addr)
cli, err := c.Get(netAddr)
if err != nil {
return nil, fmt.Errorf("can't setup remote connection with %s: %w", addr, err)
return nil, fmt.Errorf("can't setup remote connection with %s: %w", netAddr, err)
}

cctx, cancel := context.WithTimeout(task.AuditContext(), c.headTimeout)
Expand Down Expand Up @@ -172,14 +172,14 @@ func (c *ClientCache) GetRangeHash(task *audit.Task, node *netmap.Node, id *obje
rangeParams.WithRangeList(rng)
rangeParams.WithSalt(nil) // it MUST be nil for correct hash concatenation in PDP game

addr, err := network.HostAddrFromMultiaddr(node.Address())
netAddr, err := network.AddressFromString(node.Address())
if err != nil {
return nil, fmt.Errorf("can't parse remote address %s: %w", node.Address(), err)
}

cli, err := c.Get(addr)
cli, err := c.Get(netAddr)
if err != nil {
return nil, fmt.Errorf("can't setup remote connection with %s: %w", addr, err)
return nil, fmt.Errorf("can't setup remote connection with %s: %w", netAddr, err)
}

cctx, cancel := context.WithTimeout(task.AuditContext(), c.rangeTimeout)
Expand Down
21 changes: 21 additions & 0 deletions pkg/network/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,32 @@ type LocalAddressSource interface {
LocalAddress() *Address
}

// Encapsulate wraps this Address around another. For example:
//
// /ip4/1.2.3.4 encapsulate /tcp/80 = /ip4/1.2.3.4/tcp/80
//
func (a *Address) Encapsulate(addr *Address) {
a.ma = a.ma.Encapsulate(addr.ma)
}

// Decapsulate removes an Address wrapping. For example:
//
// /ip4/1.2.3.4/tcp/80 decapsulate /ip4/1.2.3.4 = /tcp/80
//
func (a *Address) Decapsulate(addr *Address) {
a.ma = a.ma.Decapsulate(addr.ma)
}

// String returns multiaddr string
func (a Address) String() string {
return a.ma.String()
}

// Equal compares Address's.
func (a Address) Equal(addr *Address) bool {
return a.ma.Equal(addr.ma)
}

// IPAddrString returns network endpoint address in string format.
func (a Address) IPAddrString() (string, error) {
ip, err := manet.ToNetAddr(a.ma)
Expand Down
Loading

0 comments on commit 6555984

Please sign in to comment.