From e5b71618cd009736ef269202d364e6886f1b9c26 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Wed, 26 Jun 2024 11:08:50 +0400 Subject: [PATCH 1/2] resolver: Use multiple endpoints Closes #968. Signed-off-by: Evgenii Baidakov --- api/resolver/init.go | 40 ++++++++++++++++++++--------------- api/resolver/resolver.go | 30 ++++++++++++++++++++++---- api/resolver/resolver_test.go | 17 +++++++++++++++ cmd/s3-gw/app.go | 4 ++-- 4 files changed, 68 insertions(+), 23 deletions(-) create mode 100644 api/resolver/resolver_test.go diff --git a/api/resolver/init.go b/api/resolver/init.go index 3524a6e9..937c2e4b 100644 --- a/api/resolver/init.go +++ b/api/resolver/init.go @@ -30,8 +30,8 @@ func (r *Container) ResolveCID(ctx context.Context, name string) (cid.ID, error) } // UpdateResolvers allows to update resolver in runtime. Resolvers will be created from scratch. -func (r *Container) UpdateResolvers(ctx context.Context, endpoint string) error { - newResolver, err := NewResolver(ctx, endpoint) +func (r *Container) UpdateResolvers(ctx context.Context, endpoints []string) error { + newResolver, err := NewResolver(ctx, endpoints) if err != nil { return fmt.Errorf("resolver reinit: %w", err) } @@ -44,8 +44,8 @@ func (r *Container) UpdateResolvers(ctx context.Context, endpoint string) error } // NewContainer is a constructor for the [Container]. -func NewContainer(ctx context.Context, endpoint string) (*Container, error) { - newResolver, err := NewResolver(ctx, endpoint) +func NewContainer(ctx context.Context, endpoints []string) (*Container, error) { + newResolver, err := NewResolver(ctx, endpoints) if err != nil { return nil, fmt.Errorf("resolver reinit: %w", err) } @@ -55,26 +55,32 @@ func NewContainer(ctx context.Context, endpoint string) (*Container, error) { }, nil } -// NewResolver returns resolver depending on corresponding endpoint. +// NewResolver returns resolver depending on corresponding endpoints. // // If endpoint is empty, error will be returned. -func NewResolver(ctx context.Context, endpoint string) (*NNSResolver, error) { - if endpoint == "" { - return nil, errors.New("endpoint must be set") +func NewResolver(ctx context.Context, endpoints []string) (*NNSResolver, error) { + if len(endpoints) == 0 { + return nil, errors.New("endpoints must be set") } - cl, err := rpcClient(ctx, endpoint) - if err != nil { - return nil, fmt.Errorf("rpcclient: %w", err) - } + var readers = make([]*nns.ContractReader, 0, len(endpoints)) - inv := invoker.New(cl, nil) - nnsReader, err := nns.NewInferredReader(cl, inv) - if err != nil { - return nil, fmt.Errorf("nns reader instantiation: %w", err) + for _, endpoint := range endpoints { + cl, err := rpcClient(ctx, endpoint) + if err != nil { + return nil, fmt.Errorf("rpcclient: %w", err) + } + + inv := invoker.New(cl, nil) + nnsReader, err := nns.NewInferredReader(cl, inv) + if err != nil { + return nil, fmt.Errorf("nns readers instantiation: %w", err) + } + + readers = append(readers, nnsReader) } - return NewNNSResolver(nnsReader), nil + return NewNNSResolver(readers), nil } func rpcClient(ctx context.Context, endpoint string) (*rpcclient.Client, error) { diff --git a/api/resolver/resolver.go b/api/resolver/resolver.go index f92ce92f..eb8284e7 100644 --- a/api/resolver/resolver.go +++ b/api/resolver/resolver.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" rpcNNS "github.com/nspcc-dev/neofs-contract/rpc/nns" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -18,19 +19,21 @@ var ( // NNSResolver allows to resolve container id by its name. type NNSResolver struct { - reader *rpcNNS.ContractReader + mu *sync.Mutex + next uint32 + readers []*rpcNNS.ContractReader } // NewNNSResolver is a constructor for the NNSResolver. -func NewNNSResolver(reader *rpcNNS.ContractReader) *NNSResolver { - return &NNSResolver{reader: reader} +func NewNNSResolver(readers []*rpcNNS.ContractReader) *NNSResolver { + return &NNSResolver{readers: readers, mu: &sync.Mutex{}} } // ResolveCID looks up the container id by its name via NNS contract. func (r *NNSResolver) ResolveCID(_ context.Context, name string) (cid.ID, error) { var result cid.ID - items, err := r.reader.GetRecords(nnsContainerDomain(name), rpcNNS.TXT) + items, err := r.reader().GetRecords(nnsContainerDomain(name), rpcNNS.TXT) if err != nil { return result, fmt.Errorf("nns get: %w", err) } @@ -46,6 +49,25 @@ func (r *NNSResolver) ResolveCID(_ context.Context, name string) (cid.ID, error) return result, nil } +func (r *NNSResolver) index() int { + r.mu.Lock() + + r.next++ + index := (int(r.next) - 1) % len(r.readers) + + if int(r.next) >= len(r.readers) { + r.next = 0 + } + + r.mu.Unlock() + + return index +} + +func (r *NNSResolver) reader() *rpcNNS.ContractReader { + return r.readers[r.index()] +} + func nnsContainerDomain(name string) string { return fmt.Sprintf("%s.%s", name, defaultZone) } diff --git a/api/resolver/resolver_test.go b/api/resolver/resolver_test.go new file mode 100644 index 00000000..236d231e --- /dev/null +++ b/api/resolver/resolver_test.go @@ -0,0 +1,17 @@ +package resolver + +import ( + "testing" + + rpcNNS "github.com/nspcc-dev/neofs-contract/rpc/nns" + "github.com/stretchr/testify/require" +) + +func Test_nnsResolver_index(t *testing.T) { + r := NewNNSResolver([]*rpcNNS.ContractReader{nil, nil, nil}) + indexes := []int{0, 1, 2, 0, 1, 2} + + for _, index := range indexes { + require.Equal(t, index, r.index()) + } +} diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index c6be35db..1783d9fb 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -227,7 +227,7 @@ func (a *App) initResolver(ctx context.Context) { a.log.Info("rpc endpoint", zap.String("address", endpoint)) - res, err := resolver.NewContainer(ctx, endpoint) + res, err := resolver.NewContainer(ctx, []string{endpoint}) if err != nil { a.log.Fatal("resolver", zap.Error(err)) } @@ -496,7 +496,7 @@ func (a *App) configReload(ctx context.Context) { return } - if err := a.resolverContainer.UpdateResolvers(ctx, a.cfg.GetString(cfgRPCEndpoint)); err != nil { + if err := a.resolverContainer.UpdateResolvers(ctx, []string{a.cfg.GetString(cfgRPCEndpoint)}); err != nil { a.log.Warn("failed to update resolvers", zap.Error(err)) } From 6e6e79f57efaa74e46a2d16a8787d9f4df3aa3e1 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Wed, 26 Jun 2024 11:11:31 +0400 Subject: [PATCH 2/2] resolver: Read endpoint slice from config Signed-off-by: Evgenii Baidakov --- CHANGELOG.md | 17 +++++++++++++++++ cmd/s3-gw/app.go | 8 ++++---- cmd/s3-gw/app_settings.go | 6 +++--- config/config.yaml | 4 +++- 4 files changed, 27 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 20f5bb50..96ff5ef6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,23 @@ This document outlines major changes between releases. ## [Unreleased] +### Changed +- Single `rpc_endpoint` config option replaced with multiple endpoints `fschain.endpoints` option (#968) + +### Updating from 0.30.1 + +Notice that the configuration parameter `fschain.endpoints` contains multiple endpoints. +For migration, you should update config from old notation +```yaml +rpc_endpoint: http://some-host:30333 +``` +to the new one +```yaml +fschain: + endpoints: + - http://some-host:30333 +``` + ## [0.30.1] - 2024-06-19 ### Added diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 1783d9fb..8072deac 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -223,11 +223,11 @@ func (a *App) initMetrics() { } func (a *App) initResolver(ctx context.Context) { - endpoint := a.cfg.GetString(cfgRPCEndpoint) + endpoints := a.cfg.GetStringSlice(cfgRPCEndpoints) - a.log.Info("rpc endpoint", zap.String("address", endpoint)) + a.log.Info("rpc endpoints", zap.Strings("address", endpoints)) - res, err := resolver.NewContainer(ctx, []string{endpoint}) + res, err := resolver.NewContainer(ctx, endpoints) if err != nil { a.log.Fatal("resolver", zap.Error(err)) } @@ -496,7 +496,7 @@ func (a *App) configReload(ctx context.Context) { return } - if err := a.resolverContainer.UpdateResolvers(ctx, []string{a.cfg.GetString(cfgRPCEndpoint)}); err != nil { + if err := a.resolverContainer.UpdateResolvers(ctx, a.cfg.GetStringSlice(cfgRPCEndpoints)); err != nil { a.log.Warn("failed to update resolvers", zap.Error(err)) } diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index e77ea302..00951061 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -106,7 +106,7 @@ const ( // Settings. cfgTreeServiceEndpoint = "tree.service" // NeoGo. - cfgRPCEndpoint = "rpc_endpoint" + cfgRPCEndpoints = "fschain.endpoints" // Application. cfgApplicationBuildTime = "app.build_time" @@ -237,7 +237,7 @@ func newSettings() *viper.Viper { peers := flags.StringArrayP(cfgPeers, "p", nil, "set NeoFS nodes") - flags.StringP(cfgRPCEndpoint, "r", "", "set RPC endpoint") + flags.StringP(cfgRPCEndpoints, "r", "", "set RPC endpoints") domains := flags.StringSliceP(cfgListenDomains, "d", nil, "set domains to be listened") @@ -356,7 +356,7 @@ func bindFlags(v *viper.Viper, flags *pflag.FlagSet) error { if err := v.BindPFlag(cfgMaxClientsDeadline, flags.Lookup(cfgMaxClientsDeadline)); err != nil { return err } - if err := v.BindPFlag(cfgRPCEndpoint, flags.Lookup(cfgRPCEndpoint)); err != nil { + if err := v.BindPFlag(cfgRPCEndpoints, flags.Lookup(cfgRPCEndpoints)); err != nil { return err } diff --git a/config/config.yaml b/config/config.yaml index cb6fddbf..9dc12d41 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -49,7 +49,9 @@ tree: service: node1.neofs:8080 # RPC endpoint and order of resolving of bucket names -rpc_endpoint: http://morph-chain.neofs.devenv:30333 +fschain: + endpoints: + - http://morph-chain.neofs.devenv:30333 # Metrics pprof: