Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Slicer interface more generic #466

Merged
merged 11 commits into from
Jul 29, 2023
5 changes: 3 additions & 2 deletions client/container_statistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,9 +620,10 @@ func TestClientStatistic_ObjectPut(t *testing.T) {
writer, err := c.ObjectPutInit(ctx, hdr, signer, prm)
require.NoError(t, err)

require.True(t, writer.WritePayloadChunk(randBytes(10)))
_, err = writer.Write(randBytes(10))
require.NoError(t, err)

_, err = writer.Close()
err = writer.Close()
require.NoError(t, err)

require.Equal(t, 2, collector.methods[stat.MethodObjectPut].requests)
Expand Down
152 changes: 36 additions & 116 deletions client/object_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ import (
rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-sdk-go/bearer"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/object/slicer"
"github.com/nspcc-dev/neofs-sdk-go/stat"
"github.com/nspcc-dev/neofs-sdk-go/user"
)
Expand Down Expand Up @@ -58,10 +56,15 @@ func (x ResObjectPut) StoredObjectID() oid.ID {
}

// ObjectWriter is designed to write one object to NeoFS system.
type ObjectWriter interface {
io.WriteCloser
GetResult() ResObjectPut
}

// DefaultObjectWriter implements [ObjectWriter].
//
// Must be initialized using Client.ObjectPutInit, any other
// usage is unsafe.
type ObjectWriter struct {
// Must be initialized using [Client.ObjectPutInit], any other usage is unsafe.
type DefaultObjectWriter struct {
cancelCtxStream context.CancelFunc

client *Client
Expand Down Expand Up @@ -106,8 +109,8 @@ func (x *PrmObjectPutInit) WithXHeaders(hs ...string) {
}

// writeHeader writes header of the object. Result means success.
// Failure reason can be received via Close.
func (x *ObjectWriter) writeHeader(hdr object.Object) error {
// Failure reason can be received via [DefaultObjectWriter.Close].
func (x *DefaultObjectWriter) writeHeader(hdr object.Object) error {
if x.statisticCallback != nil {
defer func() {
x.statisticCallback(x.err)
Expand All @@ -134,8 +137,8 @@ func (x *ObjectWriter) writeHeader(hdr object.Object) error {
}

// WritePayloadChunk writes chunk of the object payload. Result means success.
// Failure reason can be received via Close.
func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool {
// Failure reason can be received via [DefaultObjectWriter.Close].
func (x *DefaultObjectWriter) Write(chunk []byte) (n int, err error) {
if x.statisticCallback != nil {
defer func() {
x.statisticCallback(x.err)
Expand All @@ -147,6 +150,8 @@ func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool {
x.req.GetBody().SetObjectPart(&x.partChunk)
}

var writtenBytes int

for ln := len(chunk); ln > 0; ln = len(chunk) {
// maxChunkLen restricts maximum byte length of the chunk
// transmitted in a single stream message. It depends on
Expand Down Expand Up @@ -174,22 +179,23 @@ func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool {
x.err = signServiceMessage(x.signer, &x.req)
if x.err != nil {
x.err = fmt.Errorf("sign message: %w", x.err)
return false
return writtenBytes, x.err
roman-khimov marked this conversation as resolved.
Show resolved Hide resolved
}

x.err = x.stream.Write(&x.req)
if x.err != nil {
return false
return writtenBytes, x.err
roman-khimov marked this conversation as resolved.
Show resolved Hide resolved
}

writtenBytes += len(chunk[:ln])
chunk = chunk[ln:]
}

return true
return writtenBytes, nil
}

// Close ends writing the object and returns the result of the operation
// along with the final results. Must be called after using the ObjectWriter.
// along with the final results. Must be called after using the [DefaultObjectWriter].
//
// Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as Go built-in error.
Expand All @@ -204,7 +210,7 @@ func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool {
// - [apistatus.ErrLockNonRegularObject]
// - [apistatus.ErrSessionTokenNotFound]
// - [apistatus.ErrSessionTokenExpired]
func (x *ObjectWriter) Close() (*ResObjectPut, error) {
func (x *DefaultObjectWriter) Close() error {
var err error
if x.statisticCallback != nil {
defer func() {
Expand All @@ -219,25 +225,25 @@ func (x *ObjectWriter) Close() (*ResObjectPut, error) {
// message. Server returns an error in response message (in status).
if x.err != nil && !errors.Is(x.err, io.EOF) {
err = x.err
return nil, err
return err
}

if x.err = x.stream.Close(); x.err != nil {
err = x.err
return nil, err
return err
}

if x.err = x.client.processResponse(&x.respV2); x.err != nil {
err = x.err
return nil, err
return err
}

const fieldID = "ID"

idV2 := x.respV2.GetBody().GetObjectID()
if idV2 == nil {
err = newErrMissingResponseField(fieldID)
return nil, err
return err
}

x.err = x.res.obj.ReadFromV2(*idV2)
Expand All @@ -246,27 +252,33 @@ func (x *ObjectWriter) Close() (*ResObjectPut, error) {
err = x.err
}

return &x.res, nil
return nil
}

// GetResult returns the put operation result.
func (x *DefaultObjectWriter) GetResult() ResObjectPut {
return x.res
}

// ObjectPutInit initiates writing an object through a remote server using NeoFS API protocol.
//
// The call only opens the transmission channel, explicit recording is done using the ObjectWriter.
// The call only opens the transmission channel, explicit recording is done using the [ObjectWriter].
// Exactly one return value is non-nil. Resulting writer must be finally closed.
//
// Context is required and must not be nil. It is used for network communication.
// Context is required and must not be nil. It will be used for network communication for the whole object transmission,
// including put init (this method) and subsequent object payload writes via ObjectWriter.
//
// Signer is required and must not be nil. The operation is executed on behalf of
// the account corresponding to the specified Signer, which is taken into account, in particular, for access control.
//
// Returns errors:
// - [ErrMissingSigner]
func (c *Client) ObjectPutInit(ctx context.Context, hdr object.Object, signer user.Signer, prm PrmObjectPutInit) (*ObjectWriter, error) {
func (c *Client) ObjectPutInit(ctx context.Context, hdr object.Object, signer user.Signer, prm PrmObjectPutInit) (ObjectWriter, error) {
var err error
defer func() {
c.sendStatistic(stat.MethodObjectPut, err)()
}()
var w ObjectWriter
var w DefaultObjectWriter
w.statisticCallback = func(err error) {
c.sendStatistic(stat.MethodObjectPutStream, err)()
}
Expand All @@ -292,102 +304,10 @@ func (c *Client) ObjectPutInit(ctx context.Context, hdr object.Object, signer us
c.prepareRequest(&w.req, &prm.meta)

if err = w.writeHeader(hdr); err != nil {
_, _ = w.Close()
_ = w.Close()
err = fmt.Errorf("header write: %w", err)
return nil, err
}

return &w, nil
}

type objectWriter struct {
context context.Context
client *Client
}

func (x *objectWriter) InitDataStream(header object.Object, signer user.Signer) (io.Writer, error) {
var prm PrmObjectPutInit

stream, err := x.client.ObjectPutInit(x.context, header, signer, prm)
if err != nil {
return nil, fmt.Errorf("init object stream: %w", err)
}

return &payloadWriter{
stream: stream,
}, nil
}

type payloadWriter struct {
stream *ObjectWriter
}

func (x *payloadWriter) Write(p []byte) (int, error) {
if !x.stream.WritePayloadChunk(p) {
return 0, x.Close()
}

return len(p), nil
}

func (x *payloadWriter) Close() error {
_, err := x.stream.Close()
if err != nil {
return err
}

return nil
}

// CreateObject creates new NeoFS object with given payload data and stores it
// in specified container of the NeoFS network using provided Client connection.
// The object is created on behalf of provided neofscrypto.Signer, and owned by
// the specified user.ID.
//
// In terms of NeoFS, parameterized neofscrypto.Signer represents object owner,
// object signer and request sender. Container SHOULD be public-write or sender
// SHOULD have corresponding rights.
//
// Client connection MUST be opened in advance, see Dial method for details.
// Network communication is carried out within a given context, so it MUST NOT
// be nil.
//
// Notice: This API is EXPERIMENTAL and is planned to be replaced/changed in the
// future. Be ready to refactor your code regarding imports and call mechanics,
// in essence the operation will not change.
func CreateObject(ctx context.Context, cli *Client, signer user.Signer, cnr cid.ID, owner user.ID, data io.Reader, attributes ...string) (oid.ID, error) {
s, err := NewDataSlicer(ctx, cli, signer, cnr, owner)
if err != nil {
return oid.ID{}, err
}

return s.Slice(data, attributes...)
}

// NewDataSlicer creates slicer.Slicer that saves data in the NeoFS network
// through provided Client. The data is packaged into NeoFS objects stored in
// the specified container. Provided signer is being used to sign the resulting
// objects as a system requirement. Produced objects are owned by the
// parameterized NeoFS user.
//
// Notice: This API is EXPERIMENTAL and is planned to be replaced/changed in the
// future. Be ready to refactor your code regarding imports and call mechanics,
// in essence the operation will not change.
func NewDataSlicer(ctx context.Context, cli *Client, signer user.Signer, cnr cid.ID, owner user.ID) (*slicer.Slicer, error) {
netInfo, err := cli.NetworkInfo(ctx, PrmNetworkInfo{})
if err != nil {
return nil, fmt.Errorf("read current network info: %w", err)
}

var opts slicer.Options
opts.SetObjectPayloadLimit(netInfo.MaxObjectSize())
opts.SetCurrentNeoFSEpoch(netInfo.CurrentEpoch())
if !netInfo.HomomorphicHashingDisabled() {
opts.CalculateHomomorphicChecksum()
}

return slicer.New(signer, cnr, owner, &objectWriter{
context: ctx,
client: cli,
}, opts), nil
}
17 changes: 17 additions & 0 deletions object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ func (o *Object) SetID(v oid.ID) {
SetObjectID(&v2)
}

// ResetID removes object identifier.
//
// See also [Object.SetID].
func (o *Object) ResetID() {
(*object.Object)(o).
SetObjectID(nil)
}

// Signature returns signature of the object identifier.
//
// See also [Object.SetSignature].
Expand Down Expand Up @@ -536,6 +544,15 @@ func (o *Object) SetParentID(v oid.ID) {
})
}

// ResetParentID removes identifier of the parent object.
//
// See also [Object.SetParentID].
func (o *Object) ResetParentID() {
o.setSplitFields(func(split *object.SplitHeader) {
split.SetParent(nil)
})
}

// Parent returns parent object w/o payload.
//
// See also [Object.SetParent].
Expand Down
31 changes: 31 additions & 0 deletions object/slicer/options.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package slicer

import (
"github.com/nspcc-dev/neofs-sdk-go/session"
)

// Options groups Slicer options.
type Options struct {
objectPayloadLimit uint64

currentNeoFSEpoch uint64

withHomoChecksum bool

sessionToken *session.Object
}

// SetObjectPayloadLimit specifies data size limit for produced physically
Expand All @@ -25,3 +31,28 @@ func (x *Options) SetCurrentNeoFSEpoch(e uint64) {
func (x *Options) CalculateHomomorphicChecksum() {
x.withHomoChecksum = true
}

// SetSession sets session object.
func (x *Options) SetSession(sess *session.Object) {
x.sessionToken = sess
}

// ObjectPayloadLimit returns required max object size.
func (x *Options) ObjectPayloadLimit() uint64 {
return x.objectPayloadLimit
}

// CurrentNeoFSEpoch returns epoch.
func (x *Options) CurrentNeoFSEpoch() uint64 {
return x.currentNeoFSEpoch
}

// IsHomomorphicChecksumEnabled indicates homomorphic checksum calculation status.
func (x *Options) IsHomomorphicChecksumEnabled() bool {
return x.withHomoChecksum
}

// Session returns session object.
func (x *Options) Session() *session.Object {
return x.sessionToken
}
Loading
Loading