diff --git a/cmd/s3-authmate/main.go b/cmd/s3-authmate/main.go index de960212..ff3ce757 100644 --- a/cmd/s3-authmate/main.go +++ b/cmd/s3-authmate/main.go @@ -733,9 +733,10 @@ func createNeoFS(ctx context.Context, log *zap.Logger, cfg PoolConfig, anonSigne } neofsCfg := neofs.Config{ - MaxObjectSize: int64(ni.MaxObjectSize()), - IsSlicerEnabled: isSlicerEnabled, - IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(), + MaxObjectSize: int64(ni.MaxObjectSize()), + IsSlicerEnabled: isSlicerEnabled, + IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(), + UploadChunkPoolLength: 1, } neoFS := neofs.NewNeoFS(p, signer, anonSigner, neofsCfg, ni) diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index c86a24b3..f00fb25d 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -106,10 +106,16 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App { log.logger.Fatal("newApp: networkInfo", zap.Error(err)) } + poolSize := v.GetInt(cfgUploadChunkPoolLength) + if poolSize == 0 { + poolSize = defaultUploadChunkPoolLength + } + neofsCfg := neofs.Config{ - MaxObjectSize: int64(ni.MaxObjectSize()), - IsSlicerEnabled: v.GetBool(cfgSlicerEnabled), - IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(), + MaxObjectSize: int64(ni.MaxObjectSize()), + IsSlicerEnabled: v.GetBool(cfgSlicerEnabled), + IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(), + UploadChunkPoolLength: poolSize, } // If slicer is disabled, we should use "static" getter, which doesn't make periodic requests to the NeoFS. diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index 53a2dd9f..77698815 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -28,6 +28,8 @@ const ( defaultMaxClientsCount = 100 defaultMaxClientsDeadline = time.Second * 30 + + defaultUploadChunkPoolLength = 64 ) const ( // Settings. @@ -124,6 +126,9 @@ const ( // Settings. // Timeout between retrieving actual epoch from NeoFS. Actual only if slicer.enabled = true. cfgEpochUpdateInterval = "neofs.epoch_update_interval" + // Pool size for upload object chunks. They are used to upload objects payload. + cfgUploadChunkPoolLength = "neofs.upload_chunk_pool_length" + // List of allowed AccessKeyID prefixes. cfgAllowedAccessKeyIDPrefixes = "allowed_access_key_id_prefixes" diff --git a/config/config.yaml b/config/config.yaml index e2972b9d..93afd300 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -138,6 +138,8 @@ neofs: set_copies_number: 0 # Timeout between retrieving actual epoch from NeoFS. Actual only if slicer.enabled = true. epoch_update_interval: 2m + # Pool size for upload object chunks. They are used to upload objects payload. + upload_chunk_pool_length: 64 # List of allowed AccessKeyID prefixes # If the parameter is omitted, S3 GW will accept all AccessKeyIDs diff --git a/internal/neofs/neofs.go b/internal/neofs/neofs.go index 106aa60e..127f4324 100644 --- a/internal/neofs/neofs.go +++ b/internal/neofs/neofs.go @@ -32,9 +32,10 @@ import ( // Config allows to configure some [NeoFS] parameters. type Config struct { - MaxObjectSize int64 - IsSlicerEnabled bool - IsHomomorphicEnabled bool + MaxObjectSize int64 + IsSlicerEnabled bool + IsHomomorphicEnabled bool + UploadChunkPoolLength int } // NeoFS represents virtual connection to the NeoFS network. @@ -46,16 +47,23 @@ type NeoFS struct { anonSigner user.Signer cfg Config epochGetter EpochGetter + buffers chan []byte } // NewNeoFS creates new NeoFS using provided pool.Pool. func NewNeoFS(p *pool.Pool, signer user.Signer, anonSigner user.Signer, cfg Config, epochGetter EpochGetter) *NeoFS { + buffers := make(chan []byte, cfg.UploadChunkPoolLength) + for i := 0; i < cfg.UploadChunkPoolLength; i++ { + buffers <- make([]byte, cfg.MaxObjectSize) + } + return &NeoFS{ pool: p, gateSigner: signer, anonSigner: anonSigner, cfg: cfg, epochGetter: epochGetter, + buffers: buffers, } } @@ -316,7 +324,11 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi return oid.ID{}, fmt.Errorf("save object via connection pool: %w", err) } - chunk := make([]byte, x.cfg.MaxObjectSize) + chunk := <-x.buffers + defer func() { + x.buffers <- chunk + }() + _, err = io.CopyBuffer(writer, prm.Payload, chunk) if err != nil { return oid.ID{}, fmt.Errorf("read payload chunk: %w", err) diff --git a/internal/neofs/neofs_test.go b/internal/neofs/neofs_test.go index f83b1880..7a8f63e0 100644 --- a/internal/neofs/neofs_test.go +++ b/internal/neofs/neofs_test.go @@ -1,11 +1,25 @@ package neofs import ( + "bytes" + "context" + "crypto/rand" "fmt" + "strconv" "testing" + "time" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neofs-s3-gw/api/layer" + "github.com/nspcc-dev/neofs-sdk-go/client" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + "github.com/nspcc-dev/neofs-sdk-go/container" + "github.com/nspcc-dev/neofs-sdk-go/container/acl" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/pool" + "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/nspcc-dev/neofs-sdk-go/waiter" "github.com/stretchr/testify/require" ) @@ -23,3 +37,98 @@ func TestErrorChecking(t *testing.T) { require.ErrorIs(t, wrappedError, layer.ErrAccessDenied) require.Contains(t, wrappedError.Error(), reason) } + +func Benchmark(b *testing.B) { + ctx := context.Background() + + pk, err := keys.NewPrivateKey() + require.NoError(b, err) + signer := user.NewAutoIDSignerRFC6979(pk.PrivateKey) + + anonPk, err := keys.NewPrivateKey() + require.NoError(b, err) + anonSigner := user.NewAutoIDSignerRFC6979(anonPk.PrivateKey) + + var prm pool.InitParameters + prm.SetSigner(signer) + prm.AddNode(pool.NewNodeParam(1, "localhost:8080", 1)) + + p, err := pool.NewPool(prm) + require.NoError(b, err) + + require.NoError(b, p.Dial(ctx)) + + ni, err := p.NetworkInfo(ctx, client.PrmNetworkInfo{}) + require.NoError(b, err) + + neofsCfg := Config{ + MaxObjectSize: int64(ni.MaxObjectSize()), + IsSlicerEnabled: false, + IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(), + UploadChunkPoolLength: 64, + } + + neo := NewNeoFS(p, signer, anonSigner, neofsCfg, ni) + + var createParams layer.PrmObjectCreate + createParams.Creator = signer.UserID() + + for i := 128; i <= 512; i += 128 { + b.Run("object upload "+strconv.Itoa(i), func(b *testing.B) { + b.StopTimer() + payload := make([]byte, i*1024) + _, err = rand.Read(payload) + require.NoError(b, err) + + id, err := createContainer(ctx, signer, p) + require.NoError(b, err) + createParams.Container = id + + defer func() { + _ = deleteContainer(ctx, id, signer, p) + }() + + b.ReportAllocs() + b.ResetTimer() + b.StartTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + createParams.Payload = bytes.NewReader(payload) + createParams.CreationTime = time.Now() + b.StartTimer() + + _, err = neo.CreateObject(ctx, createParams) + b.StopTimer() + require.NoError(b, err) + b.StartTimer() + } + }) + } +} + +func createContainer(ctx context.Context, signer user.Signer, p *pool.Pool) (cid.ID, error) { + var cnr container.Container + cnr.Init() + cnr.SetOwner(signer.UserID()) + + var rd netmap.ReplicaDescriptor + rd.SetNumberOfObjects(1) + + var pp netmap.PlacementPolicy + pp.SetContainerBackupFactor(1) + pp.AddReplicas(rd) + + cnr.SetPlacementPolicy(pp) + cnr.SetBasicACL(acl.PublicRW) + + var prm client.PrmContainerPut + + w := waiter.NewContainerPutWaiter(p, waiter.DefaultPollInterval) + return w.ContainerPut(ctx, cnr, signer, prm) +} + +func deleteContainer(ctx context.Context, id cid.ID, signer user.Signer, p *pool.Pool) error { + var prm client.PrmContainerDelete + return p.ContainerDelete(ctx, id, signer, prm) +}