Skip to content

Commit

Permalink
Get placement policies from contract (#1008)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov committed Sep 26, 2024
2 parents 0eb69b0 + dd767a7 commit 1455cdf
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 30 deletions.
19 changes: 14 additions & 5 deletions api/handler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"time"

"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
Expand All @@ -25,17 +26,25 @@ type (

// Config contains data which handler needs to keep.
Config struct {
Policy PlacementPolicy
DefaultMaxAge int
NotificatorEnabled bool
CopiesNumber uint32
MaxDeletePerRequest int
Policy PlacementPolicy
PlacementPolicyProvider PlacementPolicyProvider
DefaultMaxAge int
NotificatorEnabled bool
CopiesNumber uint32
MaxDeletePerRequest int
}

PlacementPolicy interface {
Default() netmap.PlacementPolicy
Get(string) (netmap.PlacementPolicy, bool)
}

// PlacementPolicyProvider takes placement policy from contract.
PlacementPolicyProvider interface {
// GetPlacementPolicy get policy by name.
// Returns [models.ErrNotFound] if policy not found.
GetPlacementPolicy(userAddr util.Uint160, policyName string) (*netmap.PlacementPolicy, error)
}
)

const (
Expand Down
26 changes: 22 additions & 4 deletions api/handler/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"encoding/base64"
"encoding/json"
"encoding/xml"
errorsStd "errors"
"errors"
"fmt"
"io"
"net"
Expand All @@ -16,13 +16,15 @@ import (
"strings"
"time"

"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/auth"
"github.com/nspcc-dev/neofs-s3-gw/api/data"
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
"github.com/nspcc-dev/neofs-s3-gw/api/layer/encryption"
"github.com/nspcc-dev/neofs-s3-gw/api/s3errors"
"github.com/nspcc-dev/neofs-s3-gw/creds/accessbox"
"github.com/nspcc-dev/neofs-s3-gw/internal/models"
"github.com/nspcc-dev/neofs-sdk-go/eacl"
"github.com/nspcc-dev/neofs-sdk-go/session"
"go.uber.org/zap"
Expand Down Expand Up @@ -336,7 +338,7 @@ func formEncryptionParams(r *http.Request) (enc encryption.Params, err error) {
}

if r.TLS == nil {
return enc, errorsStd.New("encryption available only when TLS is enabled")
return enc, errors.New("encryption available only when TLS is enabled")
}

if sseCustomerAlgorithm != layer.AESEncryptionAlgorithm {
Expand Down Expand Up @@ -708,11 +710,13 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
}

var policies []*accessbox.ContainerPolicy
var userAddr util.Uint160
boxData, err := layer.GetBoxData(r.Context())
if err == nil {
policies = boxData.Policies
p.SessionContainerCreation = boxData.Gate.SessionTokenForPut()
p.SessionEACL = boxData.Gate.SessionTokenForSetEACL()
userAddr = boxData.Gate.BearerToken.Issuer().ScriptHash()
}

if p.SessionContainerCreation == nil {
Expand All @@ -725,7 +729,7 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
return
}

h.setPolicy(p, createParams.LocationConstraint, policies)
h.setPolicy(p, userAddr, createParams.LocationConstraint, policies)

p.ObjectLockEnabled = isLockEnabled(r.Header)

Expand Down Expand Up @@ -753,7 +757,7 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
api.WriteSuccessResponseHeadersOnly(w)
}

func (h handler) setPolicy(prm *layer.CreateBucketParams, locationConstraint string, userPolicies []*accessbox.ContainerPolicy) {
func (h handler) setPolicy(prm *layer.CreateBucketParams, userAddr util.Uint160, locationConstraint string, userPolicies []*accessbox.ContainerPolicy) {
prm.Policy = h.cfg.Policy.Default()

if locationConstraint == "" {
Expand All @@ -772,6 +776,20 @@ func (h handler) setPolicy(prm *layer.CreateBucketParams, locationConstraint str
return
}
}

policy, err := h.cfg.PlacementPolicyProvider.GetPlacementPolicy(userAddr, locationConstraint)
if err != nil {
// nothing to do.
if errors.Is(err, models.ErrNotFound) {
return
}

h.log.Error("get policy from provider", zap.Error(err))
return
}

prm.Policy = *policy
prm.LocationConstraint = locationConstraint
}

func isLockEnabled(header http.Header) bool {
Expand Down
46 changes: 34 additions & 12 deletions cmd/s3-gw/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ type (
}

appSettings struct {
logLevel zap.AtomicLevel
policies *placementPolicy
logLevel zap.AtomicLevel
policies *placementPolicy
policyService *storagePolicyService
}

Logger struct {
Expand Down Expand Up @@ -142,7 +143,7 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
wrkDone: make(chan struct{}, 1),

maxClients: newMaxClients(v),
settings: newAppSettings(log, v),
settings: newAppSettings(ctx, log, v),
}

app.init(ctx, anonSigner, neoFS)
Expand Down Expand Up @@ -208,7 +209,7 @@ func loadLocations(v *viper.Viper) (map[string]string, error) {
return locations, nil
}

func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
func newAppSettings(ctx context.Context, log *Logger, v *viper.Viper) *appSettings {
locations, err := loadLocations(v)
if err != nil {
log.logger.Fatal("load locations failed", zap.Error(err))
Expand All @@ -219,9 +220,19 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
log.logger.Fatal("failed to create new policy mapping", zap.Error(err))
}

var policyProvider servicePolicyProvider = &noOpStoragePolicyProvider{}

if contractName := v.GetString(cfgPolicyLocationsContractName); contractName != "" {
policyProvider, err = newStoragePolicyProvider(ctx, contractName, v.GetStringSlice(cfgRPCEndpoints))
if err != nil {
log.logger.Fatal("failed to init provider", zap.Error(err))
}
}

return &appSettings{
logLevel: log.lvl,
policies: policies,
logLevel: log.lvl,
policies: policies,
policyService: newStoragePolicyService(policyProvider),
}
}

Expand Down Expand Up @@ -546,15 +557,15 @@ func (a *App) configReload(ctx context.Context) {
a.stopServices(ctx)
a.startServices()

a.updateSettings()
a.updateSettings(ctx)

a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
a.setHealthStatus()

a.log.Info("SIGHUP config reload completed")
}

func (a *App) updateSettings() {
func (a *App) updateSettings(ctx context.Context) {
if lvl, err := getLogLevel(a.cfg); err != nil {
a.log.Warn("log level won't be updated", zap.Error(err))
} else {
Expand All @@ -569,6 +580,16 @@ func (a *App) updateSettings() {
if err := a.settings.policies.update(getDefaultPolicyValue(a.cfg), a.cfg.GetString(cfgPolicyRegionMapFile), regions); err != nil {
a.log.Warn("policies won't be updated", zap.Error(err))
}

var policyProvider servicePolicyProvider = &noOpStoragePolicyProvider{}
if contractName := a.cfg.GetString(cfgPolicyLocationsContractName); contractName != "" {
policyProvider, err = newStoragePolicyProvider(ctx, contractName, a.cfg.GetStringSlice(cfgRPCEndpoints))
if err != nil {
a.log.Warn("failed to init policy provider", zap.Error(err))
}
}

a.settings.policyService.UpdateProvider(policyProvider)
}

func (a *App) startServices() {
Expand Down Expand Up @@ -706,10 +727,11 @@ func getAccessBoxCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {

func (a *App) initHandler() {
cfg := &handler.Config{
Policy: a.settings.policies,
DefaultMaxAge: handler.DefaultMaxAge,
NotificatorEnabled: a.cfg.GetBool(cfgEnableNATS),
CopiesNumber: handler.DefaultCopiesNumber,
Policy: a.settings.policies,
PlacementPolicyProvider: a.settings.policyService,
DefaultMaxAge: handler.DefaultMaxAge,
NotificatorEnabled: a.cfg.GetBool(cfgEnableNATS),
CopiesNumber: handler.DefaultCopiesNumber,
}

if a.cfg.IsSet(cfgDefaultMaxAge) {
Expand Down
7 changes: 4 additions & 3 deletions cmd/s3-gw/app_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ const ( // Settings.
cfgNATSRootCAFiles = "nats.root_ca"

// Policy.
cfgPolicyDefault = "placement_policy.default"
cfgPolicyRegionMapFile = "placement_policy.region_mapping"
cfgPolicyLocations = "placement_policy.locations"
cfgPolicyDefault = "placement_policy.default"
cfgPolicyRegionMapFile = "placement_policy.region_mapping"
cfgPolicyLocations = "placement_policy.locations"
cfgPolicyLocationsContractName = "placement_policy.contract_name"

// CORS.
cfgDefaultMaxAge = "cors.default_max_age"
Expand Down
39 changes: 39 additions & 0 deletions cmd/s3-gw/storage_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package main

import (
"sync"

"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
)

type (
storagePolicyService struct {
mu *sync.RWMutex
provider servicePolicyProvider
}

servicePolicyProvider interface {
GetPlacementPolicy(userAddr util.Uint160, policyName string) (*netmap.PlacementPolicy, error)
}
)

func newStoragePolicyService(provider servicePolicyProvider) *storagePolicyService {
return &storagePolicyService{
provider: provider,
mu: &sync.RWMutex{},
}
}

func (s *storagePolicyService) GetPlacementPolicy(userAddr util.Uint160, policyName string) (*netmap.PlacementPolicy, error) {
s.mu.RLock()
defer s.mu.RUnlock()

return s.provider.GetPlacementPolicy(userAddr, policyName)
}

func (s *storagePolicyService) UpdateProvider(p servicePolicyProvider) {
s.mu.Lock()
s.provider = p
s.mu.Unlock()
}
Loading

0 comments on commit 1455cdf

Please sign in to comment.