From 54d8f2f11a93a126d9ea85db1f6ed19c3199008c Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Tue, 24 Sep 2024 15:38:27 +0400 Subject: [PATCH 1/3] *: Read placement policies from contract Closes #999. Signed-off-by: Evgenii Baidakov --- api/handler/api.go | 19 +++- api/handler/put.go | 22 ++++- cmd/s3-gw/app.go | 46 ++++++--- cmd/s3-gw/app_settings.go | 7 +- cmd/s3-gw/storage_policy.go | 39 ++++++++ cmd/s3-gw/storage_policy_provider.go | 138 +++++++++++++++++++++++++++ config/config.yaml | 2 + docs/configuration.md | 12 ++- internal/models/errors.go | 10 ++ 9 files changed, 268 insertions(+), 27 deletions(-) create mode 100644 cmd/s3-gw/storage_policy.go create mode 100644 cmd/s3-gw/storage_policy_provider.go create mode 100644 internal/models/errors.go diff --git a/api/handler/api.go b/api/handler/api.go index 3b739cc4..519437d9 100644 --- a/api/handler/api.go +++ b/api/handler/api.go @@ -4,6 +4,7 @@ import ( "errors" "time" + "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api/layer" "github.com/nspcc-dev/neofs-sdk-go/netmap" @@ -25,17 +26,25 @@ type ( // Config contains data which handler needs to keep. Config struct { - Policy PlacementPolicy - DefaultMaxAge int - NotificatorEnabled bool - CopiesNumber uint32 - MaxDeletePerRequest int + Policy PlacementPolicy + PlacementPolicyProvider PlacementPolicyProvider + DefaultMaxAge int + NotificatorEnabled bool + CopiesNumber uint32 + MaxDeletePerRequest int } PlacementPolicy interface { Default() netmap.PlacementPolicy Get(string) (netmap.PlacementPolicy, bool) } + + // PlacementPolicyProvider takes placement policy from contract. + PlacementPolicyProvider interface { + // GetPlacementPolicy get policy by name. + // Returns [models.ErrNotFound] if policy not found. + GetPlacementPolicy(userAddr util.Uint160, policyName string) (*netmap.PlacementPolicy, error) + } ) const ( diff --git a/api/handler/put.go b/api/handler/put.go index 484845fd..ea5c996c 100644 --- a/api/handler/put.go +++ b/api/handler/put.go @@ -16,6 +16,7 @@ import ( "strings" "time" + "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api/auth" "github.com/nspcc-dev/neofs-s3-gw/api/data" @@ -23,6 +24,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/layer/encryption" "github.com/nspcc-dev/neofs-s3-gw/api/s3errors" "github.com/nspcc-dev/neofs-s3-gw/creds/accessbox" + "github.com/nspcc-dev/neofs-s3-gw/internal/models" "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/session" "go.uber.org/zap" @@ -708,11 +710,13 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) { } var policies []*accessbox.ContainerPolicy + var userAddr util.Uint160 boxData, err := layer.GetBoxData(r.Context()) if err == nil { policies = boxData.Policies p.SessionContainerCreation = boxData.Gate.SessionTokenForPut() p.SessionEACL = boxData.Gate.SessionTokenForSetEACL() + userAddr = boxData.Gate.BearerToken.Issuer().ScriptHash() } if p.SessionContainerCreation == nil { @@ -725,7 +729,7 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) { return } - h.setPolicy(p, createParams.LocationConstraint, policies) + h.setPolicy(p, userAddr, createParams.LocationConstraint, policies) p.ObjectLockEnabled = isLockEnabled(r.Header) @@ -753,7 +757,7 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) { api.WriteSuccessResponseHeadersOnly(w) } -func (h handler) setPolicy(prm *layer.CreateBucketParams, locationConstraint string, userPolicies []*accessbox.ContainerPolicy) { +func (h handler) setPolicy(prm *layer.CreateBucketParams, userAddr util.Uint160, locationConstraint string, userPolicies []*accessbox.ContainerPolicy) { prm.Policy = h.cfg.Policy.Default() if locationConstraint == "" { @@ -772,6 +776,20 @@ func (h handler) setPolicy(prm *layer.CreateBucketParams, locationConstraint str return } } + + policy, err := h.cfg.PlacementPolicyProvider.GetPlacementPolicy(userAddr, locationConstraint) + if err != nil { + // nothing to do. + if errorsStd.Is(err, models.ErrNotFound) { + return + } + + h.log.Error("get policy from provider", zap.Error(err)) + return + } + + prm.Policy = *policy + prm.LocationConstraint = locationConstraint } func isLockEnabled(header http.Header) bool { diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 58fa4394..639b0ba7 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -60,8 +60,9 @@ type ( } appSettings struct { - logLevel zap.AtomicLevel - policies *placementPolicy + logLevel zap.AtomicLevel + policies *placementPolicy + policyService *storagePolicyService } Logger struct { @@ -142,7 +143,7 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App { wrkDone: make(chan struct{}, 1), maxClients: newMaxClients(v), - settings: newAppSettings(log, v), + settings: newAppSettings(ctx, log, v), } app.init(ctx, anonSigner, neoFS) @@ -208,7 +209,7 @@ func loadLocations(v *viper.Viper) (map[string]string, error) { return locations, nil } -func newAppSettings(log *Logger, v *viper.Viper) *appSettings { +func newAppSettings(ctx context.Context, log *Logger, v *viper.Viper) *appSettings { locations, err := loadLocations(v) if err != nil { log.logger.Fatal("load locations failed", zap.Error(err)) @@ -219,9 +220,19 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings { log.logger.Fatal("failed to create new policy mapping", zap.Error(err)) } + var policyProvider servicePolicyProvider = &noOpStoragePolicyProvider{} + + if contractName := v.GetString(cfgPolicyLocationsContractName); contractName != "" { + policyProvider, err = newStoragePolicyProvider(ctx, contractName, v.GetStringSlice(cfgRPCEndpoints)) + if err != nil { + log.logger.Fatal("failed to init provider", zap.Error(err)) + } + } + return &appSettings{ - logLevel: log.lvl, - policies: policies, + logLevel: log.lvl, + policies: policies, + policyService: newStoragePolicyService(policyProvider), } } @@ -546,7 +557,7 @@ func (a *App) configReload(ctx context.Context) { a.stopServices(ctx) a.startServices() - a.updateSettings() + a.updateSettings(ctx) a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled)) a.setHealthStatus() @@ -554,7 +565,7 @@ func (a *App) configReload(ctx context.Context) { a.log.Info("SIGHUP config reload completed") } -func (a *App) updateSettings() { +func (a *App) updateSettings(ctx context.Context) { if lvl, err := getLogLevel(a.cfg); err != nil { a.log.Warn("log level won't be updated", zap.Error(err)) } else { @@ -569,6 +580,16 @@ func (a *App) updateSettings() { if err := a.settings.policies.update(getDefaultPolicyValue(a.cfg), a.cfg.GetString(cfgPolicyRegionMapFile), regions); err != nil { a.log.Warn("policies won't be updated", zap.Error(err)) } + + var policyProvider servicePolicyProvider = &noOpStoragePolicyProvider{} + if contractName := a.cfg.GetString(cfgPolicyLocationsContractName); contractName != "" { + policyProvider, err = newStoragePolicyProvider(ctx, contractName, a.cfg.GetStringSlice(cfgRPCEndpoints)) + if err != nil { + a.log.Warn("failed to init policy provider", zap.Error(err)) + } + } + + a.settings.policyService.UpdateProvider(policyProvider) } func (a *App) startServices() { @@ -706,10 +727,11 @@ func getAccessBoxCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config { func (a *App) initHandler() { cfg := &handler.Config{ - Policy: a.settings.policies, - DefaultMaxAge: handler.DefaultMaxAge, - NotificatorEnabled: a.cfg.GetBool(cfgEnableNATS), - CopiesNumber: handler.DefaultCopiesNumber, + Policy: a.settings.policies, + PlacementPolicyProvider: a.settings.policyService, + DefaultMaxAge: handler.DefaultMaxAge, + NotificatorEnabled: a.cfg.GetBool(cfgEnableNATS), + CopiesNumber: handler.DefaultCopiesNumber, } if a.cfg.IsSet(cfgDefaultMaxAge) { diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index 9d555d05..5c301b7e 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -82,9 +82,10 @@ const ( // Settings. cfgNATSRootCAFiles = "nats.root_ca" // Policy. - cfgPolicyDefault = "placement_policy.default" - cfgPolicyRegionMapFile = "placement_policy.region_mapping" - cfgPolicyLocations = "placement_policy.locations" + cfgPolicyDefault = "placement_policy.default" + cfgPolicyRegionMapFile = "placement_policy.region_mapping" + cfgPolicyLocations = "placement_policy.locations" + cfgPolicyLocationsContractName = "placement_policy.contract_name" // CORS. cfgDefaultMaxAge = "cors.default_max_age" diff --git a/cmd/s3-gw/storage_policy.go b/cmd/s3-gw/storage_policy.go new file mode 100644 index 00000000..52f846a3 --- /dev/null +++ b/cmd/s3-gw/storage_policy.go @@ -0,0 +1,39 @@ +package main + +import ( + "sync" + + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neofs-sdk-go/netmap" +) + +type ( + storagePolicyService struct { + mu *sync.RWMutex + provider servicePolicyProvider + } + + servicePolicyProvider interface { + GetPlacementPolicy(userAddr util.Uint160, policyName string) (*netmap.PlacementPolicy, error) + } +) + +func newStoragePolicyService(provider servicePolicyProvider) *storagePolicyService { + return &storagePolicyService{ + provider: provider, + mu: &sync.RWMutex{}, + } +} + +func (s *storagePolicyService) GetPlacementPolicy(userAddr util.Uint160, policyName string) (*netmap.PlacementPolicy, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.provider.GetPlacementPolicy(userAddr, policyName) +} + +func (s *storagePolicyService) UpdateProvider(p servicePolicyProvider) { + s.mu.Lock() + s.provider = p + s.mu.Unlock() +} diff --git a/cmd/s3-gw/storage_policy_provider.go b/cmd/s3-gw/storage_policy_provider.go new file mode 100644 index 00000000..b99c2608 --- /dev/null +++ b/cmd/s3-gw/storage_policy_provider.go @@ -0,0 +1,138 @@ +package main + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + + "github.com/nspcc-dev/neo-go/pkg/rpcclient" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap" + "github.com/nspcc-dev/neo-go/pkg/util" + rpcNNS "github.com/nspcc-dev/neofs-contract/rpc/nns" + "github.com/nspcc-dev/neofs-s3-gw/internal/models" + "github.com/nspcc-dev/neofs-sdk-go/netmap" +) + +type ( + storagePolicyProvider struct { + invokers []*invoker.Invoker + contractHash util.Uint160 + + mu *sync.Mutex + next uint32 + } + + noOpStoragePolicyProvider struct{} +) + +func newStoragePolicyProvider(ctx context.Context, contractName string, endpoints []string) (*storagePolicyProvider, error) { + if len(endpoints) == 0 { + return nil, errors.New("endpoints must be set") + } + + var ( + invokers = make([]*invoker.Invoker, 0, len(endpoints)) + contractHash util.Uint160 + zero util.Uint160 + ) + + for _, endpoint := range endpoints { + cl, err := rpcClient(ctx, endpoint) + if err != nil { + return nil, fmt.Errorf("rpcclient: %w", err) + } + + inv := invoker.New(cl, nil) + + // contract hash is not resolved. + if contractHash.Equals(zero) { + contractHash, err = resolveContract(cl, inv, contractName) + + if err != nil { + return nil, fmt.Errorf("resolve %q contract: %w", contractName, err) + } + } + + invokers = append(invokers, inv) + } + + return &storagePolicyProvider{ + contractHash: contractHash, + invokers: invokers, + mu: &sync.Mutex{}, + }, nil +} + +func resolveContract(cl *rpcclient.Client, inv *invoker.Invoker, contractName string) (util.Uint160, error) { + nnsReader, err := rpcNNS.NewInferredReader(cl, inv) + if err != nil { + return util.Uint160{}, fmt.Errorf("InferHash: %w", err) + } + + contractHash, err := nnsReader.ResolveFSContract(contractName) + if err != nil { + return util.Uint160{}, fmt.Errorf("ResolveFSContract %q: %w", contractName, err) + } + + return contractHash, nil +} + +func (p *storagePolicyProvider) GetPlacementPolicy(userAddr util.Uint160, policyName string) (*netmap.PlacementPolicy, error) { + payload, err := unwrap.Bytes( + p.invoker().Call(p.contractHash, "resolvePolicy", userAddr, policyName), + ) + + if err != nil { + if strings.Contains(err.Error(), "policy not found") { + return nil, models.ErrNotFound + } + + return nil, fmt.Errorf("get system storage policy: %w", err) + } + + var pp netmap.PlacementPolicy + if err = pp.UnmarshalJSON(payload); err != nil { + return nil, fmt.Errorf("unmarshal placement policy: %w", err) + } + + return &pp, nil +} + +func (p *storagePolicyProvider) index() int { + p.mu.Lock() + + p.next++ + index := (int(p.next) - 1) % len(p.invokers) + + if int(p.next) >= len(p.invokers) { + p.next = 0 + } + + p.mu.Unlock() + + return index +} + +func (p *storagePolicyProvider) invoker() *invoker.Invoker { + return p.invokers[p.index()] +} + +func rpcClient(ctx context.Context, endpoint string) (*rpcclient.Client, error) { + cl, err := rpcclient.New(ctx, endpoint, rpcclient.Options{}) + if err != nil { + return nil, fmt.Errorf("new: %w", err) + } + + if err = cl.Init(); err != nil { + return nil, fmt.Errorf("init: %w", err) + } + + return cl, nil +} + +func (p *noOpStoragePolicyProvider) GetPlacementPolicy(_ util.Uint160, _ string) (*netmap.PlacementPolicy, error) { + return nil, models.ErrNotFound +} diff --git a/config/config.yaml b/config/config.yaml index 59ca4bff..ddddffe4 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -131,6 +131,8 @@ placement_policy: REP-1: "REP 1" REP-3: "REP 3" complex: "REP 1 IN X CBF 1 SELECT 1 FROM * AS X" + # Contract name for resolving policies. Leave name empty for disabling the feature. + contract_name: "" # CORS # value of Access-Control-Max-Age header if this value is not set in a rule. Has an int type. diff --git a/docs/configuration.md b/docs/configuration.md index 4952f6ca..7807e808 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -255,13 +255,15 @@ peers: placement_policy: default: REP 3 region_mapping: /path/to/mapping/rules.json + contract_name: "" ``` -| Parameter | Type | SIGHUP reload | Default value | Description | -|------------------|----------|---------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `default` | `string` | yes | `REP 3` | Default policy of placing containers in NeoFS. If a user sends a request `CreateBucket` and doesn't define policy for placing of a container in NeoFS, the S3 Gateway will put the container with default policy. | -| `region_mapping` | `string` | yes | | Path to file that maps aws `LocationConstraint` values to NeoFS placement policy. The similar to `--container-policy` flag in `neofs-s3-authmate` util, see in [docs](./authmate.md#containers-policy) | -| `locations` | `map` | yes | | Hashtable that maps aws LocationConstraint values to NeoFS placement policy. It's similar to the region_mapping config option, but allows for in-place configuration and extends/overrides values from region_mapping file if it's provided. | +| Parameter | Type | SIGHUP reload | Default value | Description | +|------------------|----------|---------------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `default` | `string` | yes | `REP 3` | Default policy of placing containers in NeoFS. If a user sends a request `CreateBucket` and doesn't define policy for placing of a container in NeoFS, the S3 Gateway will put the container with default policy. | +| `region_mapping` | `string` | yes | | Path to file that maps aws `LocationConstraint` values to NeoFS placement policy. The similar to `--container-policy` flag in `neofs-s3-authmate` util, see in [docs](./authmate.md#containers-policy) | +| `locations` | `map` | yes | | Hashtable that maps aws LocationConstraint values to NeoFS placement policy. It's similar to the region_mapping config option, but allows for in-place configuration and extends/overrides values from region_mapping file if it's provided. | +| `contract_name` | `string` | yes | | The contract name to resolve policy name. An empty value disables feature. The contract must have method `resolvePolicy(userAddr interop.Hash160, policyName string) []byte`. The returning `[]byte` is a JSON marshaled netmap.PlacementPolicy (github.com/nspcc-dev/neofs-sdk-go/netmap). | File for `region_mapping` must contain something like this: diff --git a/internal/models/errors.go b/internal/models/errors.go new file mode 100644 index 00000000..7acd04ba --- /dev/null +++ b/internal/models/errors.go @@ -0,0 +1,10 @@ +package models + +import ( + "errors" +) + +var ( + // ErrNotFound indicates we didn't find something. It should be used to avoid unwilling package dependencies. + ErrNotFound = errors.New("not found") +) From f775895408dbbf80117c1e8ed3811ec743f6ef31 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Tue, 24 Sep 2024 16:09:26 +0400 Subject: [PATCH 2/3] handler: Remove redundant package alias Signed-off-by: Evgenii Baidakov --- api/handler/put.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api/handler/put.go b/api/handler/put.go index ea5c996c..fb7a89cd 100644 --- a/api/handler/put.go +++ b/api/handler/put.go @@ -6,7 +6,7 @@ import ( "encoding/base64" "encoding/json" "encoding/xml" - errorsStd "errors" + "errors" "fmt" "io" "net" @@ -338,7 +338,7 @@ func formEncryptionParams(r *http.Request) (enc encryption.Params, err error) { } if r.TLS == nil { - return enc, errorsStd.New("encryption available only when TLS is enabled") + return enc, errors.New("encryption available only when TLS is enabled") } if sseCustomerAlgorithm != layer.AESEncryptionAlgorithm { @@ -780,7 +780,7 @@ func (h handler) setPolicy(prm *layer.CreateBucketParams, userAddr util.Uint160, policy, err := h.cfg.PlacementPolicyProvider.GetPlacementPolicy(userAddr, locationConstraint) if err != nil { // nothing to do. - if errorsStd.Is(err, models.ErrNotFound) { + if errors.Is(err, models.ErrNotFound) { return } From dd767a70fe093ac35f0983d38b72a8b76096da96 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Wed, 25 Sep 2024 10:48:43 +0400 Subject: [PATCH 3/3] docs: Fix docs Signed-off-by: Evgenii Baidakov --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 7807e808..3f97529d 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -275,7 +275,7 @@ File for `region_mapping` must contain something like this: } ``` -The option `placement_policy.regions` may look like: +The option `placement_policy.locations` may look like: ```yaml placement_policy: locations: