Skip to content
This repository has been archived by the owner on Mar 19, 2024. It is now read-only.

Commit

Permalink
Merge pull request #261 from nspcc-dev/260-upgrade-sdk-to-rc10
Browse files Browse the repository at this point in the history
Upgrade sdk to rc10
  • Loading branch information
roman-khimov committed Aug 14, 2023
2 parents adafa3f + 465257d commit 29548c7
Show file tree
Hide file tree
Showing 17 changed files with 510 additions and 709 deletions.
118 changes: 56 additions & 62 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,35 @@ import (
"github.com/nspcc-dev/neofs-http-gw/response"
"github.com/nspcc-dev/neofs-http-gw/uploader"
"github.com/nspcc-dev/neofs-http-gw/utils"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/pool"
"github.com/nspcc-dev/neofs-sdk-go/stat"
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/spf13/viper"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)

const (
defaultObjectSize = int64(1 << 21) // 2MB
)

type (
app struct {
log *zap.Logger
logLevel zap.AtomicLevel
pool *pool.Pool
owner *user.ID
cfg *viper.Viper
webServer *fasthttp.Server
webDone chan struct{}
resolver *resolver.ContainerResolver
metrics *gateMetrics
services []*metrics.Service
settings *appSettings
servers []Server
log *zap.Logger
logLevel zap.AtomicLevel
pool *pool.Pool
poolStat *stat.PoolStat
owner *user.ID
cfg *viper.Viper
webServer *fasthttp.Server
webDone chan struct{}
resolverContainer *resolver.Container
metrics *gateMetrics
services []*metrics.Service
settings *appSettings
servers []Server
signer user.Signer
}

appSettings struct {
Expand Down Expand Up @@ -129,12 +136,9 @@ func newApp(ctx context.Context, opt ...Option) App {
a.log.Fatal("failed to get neofs credentials", zap.Error(err))
}

signer := neofsecdsa.SignerRFC6979(*key)

var owner user.ID
if err = user.IDFromSigner(&owner, signer); err != nil {
a.log.Fatal("failed to get user id", zap.Error(err))
}
signer := user.NewAutoIDSignerRFC6979(*key)
a.signer = signer
owner := signer.UserID()
a.owner = &owner

var prm pool.InitParameters
Expand Down Expand Up @@ -163,6 +167,9 @@ func newApp(ctx context.Context, opt ...Option) App {
zap.Float64("weight", weight), zap.Int("priority", priority))
}

a.poolStat = stat.NewPoolStatistic()
prm.SetStatisticCallback(a.poolStat.OperationCallback)

a.pool, err = pool.NewPool(prm)
if err != nil {
a.log.Fatal("failed to create connection pool", zap.Error(err))
Expand All @@ -173,51 +180,37 @@ func newApp(ctx context.Context, opt ...Option) App {
a.log.Fatal("failed to dial pool", zap.Error(err))
}

a.initAppSettings()
a.initResolver()
a.initAppSettings(ctx)
a.initResolver(ctx)
a.initMetrics()

return a
}

func (a *app) initAppSettings() {
func (a *app) initAppSettings(ctx context.Context) {
a.settings = &appSettings{
Uploader: &uploader.Settings{},
Downloader: &downloader.Settings{},
}

a.updateSettings()
}

func (a *app) initResolver() {
var err error
a.resolver, err = resolver.NewContainerResolver(a.getResolverConfig())
if err != nil {
a.log.Fatal("failed to create resolver", zap.Error(err))
}
a.updateSettings(ctx)
}

func (a *app) getResolverConfig() ([]string, *resolver.Config) {
resolveCfg := &resolver.Config{
NeoFS: resolver.NewNeoFSResolver(a.pool),
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
}
func (a *app) initResolver(ctx context.Context) {
endpoint := a.cfg.GetString(cfgRPCEndpoint)

order := a.cfg.GetStringSlice(cfgResolveOrder)
if resolveCfg.RPCAddress == "" {
order = remove(order, resolver.NNSResolver)
a.log.Warn(fmt.Sprintf("resolver '%s' won't be used since '%s' isn't provided", resolver.NNSResolver, cfgRPCEndpoint))
}
a.log.Info("rpc endpoint", zap.String("address", endpoint))

if len(order) == 0 {
a.log.Info("container resolver will be disabled because of resolvers 'resolver_order' is empty")
res, err := resolver.NewContainer(ctx, endpoint)
if err != nil {
a.log.Fatal("failed to create resolver", zap.Error(err))
}

return order, resolveCfg
a.resolverContainer = res
}

func (a *app) initMetrics() {
gateMetricsProvider := metrics.NewGateMetrics(a.pool)
gateMetricsProvider := metrics.NewGateMetrics(a.pool, a.poolStat)
gateMetricsProvider.SetGWVersion(Version)
a.metrics = newGateMetrics(a.log, gateMetricsProvider, a.cfg.GetBool(cfgPrometheusEnabled))
}
Expand Down Expand Up @@ -263,15 +256,6 @@ func (m *gateMetrics) Shutdown() {
m.mu.Unlock()
}

func remove(list []string, element string) []string {
for i, item := range list {
if item == element {
return append(list[:i], list[i+1:]...)
}
}
return list
}

func getNeoFSKey(a *app) (*ecdsa.PrivateKey, error) {
walletPath := a.cfg.GetString(cfgWalletPath)

Expand Down Expand Up @@ -345,8 +329,8 @@ func (a *app) setHealthStatus() {
}

func (a *app) Serve(ctx context.Context) {
uploadRoutes := uploader.New(ctx, a.AppParams(), a.settings.Uploader)
downloadRoutes := downloader.New(ctx, a.AppParams(), a.settings.Downloader)
uploadRoutes := uploader.New(ctx, a.AppParams(), a.settings.Uploader, a.signer)
downloadRoutes := downloader.New(ctx, a.AppParams(), a.settings.Downloader, a.signer)

// Configure router.
a.configureRouter(uploadRoutes, downloadRoutes)
Expand All @@ -372,7 +356,7 @@ LOOP:
case <-ctx.Done():
break LOOP
case <-sigs:
a.configReload()
a.configReload(ctx)
}
}

Expand All @@ -384,7 +368,7 @@ LOOP:
close(a.webDone)
}

func (a *app) configReload() {
func (a *app) configReload(ctx context.Context) {
a.log.Info("SIGHUP config reload started")
if !a.cfg.IsSet(cmdConfig) {
a.log.Warn("failed to reload config because it's missed")
Expand All @@ -400,7 +384,7 @@ func (a *app) configReload() {
a.logLevel.SetLevel(lvl)
}

if err := a.resolver.UpdateResolvers(a.getResolverConfig()); err != nil {
if err := a.resolverContainer.UpdateResolvers(ctx, a.cfg.GetString(cfgRPCEndpoint)); err != nil {
a.log.Warn("failed to update resolvers", zap.Error(err))
}

Expand All @@ -411,17 +395,27 @@ func (a *app) configReload() {
a.stopServices()
a.startServices()

a.updateSettings()
a.updateSettings(ctx)

a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
a.setHealthStatus()

a.log.Info("SIGHUP config reload completed")
}

func (a *app) updateSettings() {
func (a *app) updateSettings(ctx context.Context) {
a.settings.Uploader.SetDefaultTimestamp(a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp))
a.settings.Downloader.SetZipCompression(a.cfg.GetBool(cfgZipCompression))
maxObjectSize := defaultObjectSize

ni, err := a.pool.NetworkInfo(ctx, client.PrmNetworkInfo{})
if err != nil {
a.log.Error("get network info", zap.Error(err))
} else {
maxObjectSize = int64(ni.MaxObjectSize())
}

a.settings.Uploader.SetMaxObjectSize(maxObjectSize)
}

func (a *app) startServices() {
Expand Down Expand Up @@ -484,7 +478,7 @@ func (a *app) AppParams() *utils.AppParams {
Logger: a.log,
Pool: a.pool,
Owner: a.owner,
Resolver: a.resolver,
Resolver: a.resolverContainer,
}
}

Expand Down
2 changes: 0 additions & 2 deletions config/config.env
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ HTTP_GW_MAX_REQUEST_BODY_SIZE=4194304

# RPC endpoint to be able to use nns container resolving.
HTTP_GW_RPC_ENDPOINT=http://morph-chain.neofs.devenv:30333
# The order in which resolvers are used to find an container id by name.
HTTP_GW_RESOLVE_ORDER="nns dns"

# Create timestamp for object if it isn't provided by header.
HTTP_GW_UPLOAD_HEADER_USE_DEFAULT_TIMESTAMP=false
Expand Down
4 changes: 0 additions & 4 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ web:

# RPC endpoint to be able to use nns container resolving.
rpc_endpoint: http://morph-chain.neofs.devenv:30333
# The order in which resolvers are used to find an container id by name.
resolve_order:
- nns
- dns

upload_header:
use_default_timestamp: false # Create timestamp for object if it isn't provided by header.
Expand Down
Loading

0 comments on commit 29548c7

Please sign in to comment.