Skip to content

Commit

Permalink
slicer: Update slicer interface
Browse files Browse the repository at this point in the history
Signed-off-by: Evgenii Baidakov <[email protected]>
  • Loading branch information
smallhive committed Jul 21, 2023
1 parent e0ef4a9 commit 35e3d01
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 116 deletions.
26 changes: 26 additions & 0 deletions object/slicer/options.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package slicer

import (
"github.com/nspcc-dev/neofs-sdk-go/session"
)

// Options groups Slicer options.
type Options struct {
objectPayloadLimit uint64

currentNeoFSEpoch uint64

withHomoChecksum bool

sessionToken *session.Object
}

// SetObjectPayloadLimit specifies data size limit for produced physically
Expand All @@ -25,3 +31,23 @@ func (x *Options) SetCurrentNeoFSEpoch(e uint64) {
func (x *Options) CalculateHomomorphicChecksum() {
x.withHomoChecksum = true
}

func (x *Options) ObjectPayloadLimit() uint64 {
return x.objectPayloadLimit
}

func (x *Options) CurrentNeoFSEpoch() uint64 {
return x.currentNeoFSEpoch
}

func (x *Options) IsCalculateHomomorphicChecksum() bool {
return x.withHomoChecksum
}

func (x *Options) Session() *session.Object {
return x.sessionToken
}

func (x *Options) SetSession(sess *session.Object) {
x.sessionToken = sess
}
107 changes: 70 additions & 37 deletions object/slicer/slicer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/session"
Expand All @@ -28,8 +29,8 @@ var (
ErrIncompleteHeader = errors.New("incomplete header")
)

// ObjectWriter represents a virtual object recorder.
type ObjectWriter interface {
// ObjectClient represents a virtual object recorder.
type ObjectClient interface {
// ObjectPutInit initializes and returns a stream of writable data associated
// with the object according to its header. Provided header includes at least
// container, owner and object ID fields.
Expand All @@ -39,41 +40,75 @@ type ObjectWriter interface {
ObjectPutInit(ctx context.Context, hdr object.Object, signer neofscrypto.Signer, prm client.PrmObjectPutInit) (client.ObjectWriter, error)
}

type NetworkedClient interface {
ObjectClient

NetworkInfo(ctx context.Context, prm client.PrmNetworkInfo) (netmap.NetworkInfo, error)
}

// Slicer converts input raw data streams into NeoFS objects. Working Slicer
// must be constructed via New.
type Slicer struct {
w ObjectWriter
sessionToken *session.Object
w ObjectClient
hdr object.Object
signer neofscrypto.Signer
opts Options
}

// New constructs Slicer which writes sliced ready-to-go objects owned by
// particular user into the specified container using provided ObjectWriter.
// particular user into the specified container using provided ObjectClient.
//
// If ObjectWriter returns data streams which provide io.Closer, they are closed
// If ObjectClient returns data streams which provide io.Closer, they are closed
// in Slicer.Slice after the payload of any object has been written. In this
// case, Slicer.Slice fails immediately on Close error.
//
// See also NewSession.
func New(w ObjectWriter) *Slicer {
return &Slicer{
w: w,
func New(ctx context.Context, nw NetworkedClient, signer neofscrypto.Signer, cnr cid.ID, owner user.ID, sessionToken *session.Object) (*Slicer, error) {
ni, err := nw.NetworkInfo(ctx, client.PrmNetworkInfo{})
if err != nil {
return nil, fmt.Errorf("network info: %w", err)
}
}

// NewSession creates Slicer which generates objects within provided session.
// NewSession work similar to New with the detail that the session issuer owns
// the produced objects. Specified session token is written to the metadata of
// all resulting objects. In this case, the object is considered to be created
// by a proxy on behalf of the session issuer.
func NewSession(w ObjectWriter, token session.Object) *Slicer {
slicer := &Slicer{
w: w,
sessionToken: &token,
var opts Options
opts.SetObjectPayloadLimit(ni.MaxObjectSize())
opts.SetCurrentNeoFSEpoch(ni.CurrentEpoch())
if !ni.HomomorphicHashingDisabled() {
opts.CalculateHomomorphicChecksum()
}
opts.sessionToken = sessionToken

// slicer.w.SetSession(token)
var hdr object.Object
hdr.SetContainerID(cnr)
hdr.SetType(object.TypeRegular)
hdr.SetOwnerID(&owner)
hdr.SetCreationEpoch(ni.CurrentEpoch())
hdr.SetSessionToken(sessionToken)

return &Slicer{
opts: opts,
w: nw,
signer: signer,
hdr: hdr,
}, nil
}

func (x *Slicer) Put(data io.Reader, attrs []object.Attribute) (oid.ID, error) {
x.hdr.SetAttributes(attrs...)

return slice(x.w, x.hdr, data, x.signer, x.opts)
}

func (x *Slicer) InitPut(attrs []object.Attribute) (*PayloadWriter, error) {
x.hdr.SetAttributes(attrs...)
return initPayloadStream(x.w, x.hdr, x.signer, x.opts)
}

func Put(ow ObjectClient, header object.Object, signer neofscrypto.Signer, data io.Reader, opts Options) (oid.ID, error) {
return slice(ow, header, data, signer, opts)
}

return slicer
func InitPut(ow ObjectClient, header object.Object, signer neofscrypto.Signer, opts Options) (*PayloadWriter, error) {
return initPayloadStream(ow, header, signer, opts)
}

const defaultPayloadSizeLimit = 1 << 20
Expand Down Expand Up @@ -104,7 +139,7 @@ func headerData(header object.Object) (cid.ID, user.ID, error) {

// Slice creates new NeoFS object from the input data stream, associates the
// object with the configured container and writes the object via underlying
// [ObjectWriter]. After a successful write, [Slice] returns an [oid.ID] which is a
// [ObjectClient]. After a successful write, [Slice] returns an [oid.ID] which is a
// unique reference to the object in the container. Slice sets all required
// calculated fields like payload length, checksum, etc.
//
Expand All @@ -122,23 +157,23 @@ func headerData(header object.Object) (cid.ID, user.ID, error) {
// specified limit, [Slicer] applies the slicing algorithm described within the
// same specification. The outcome will be a group of "small" objects containing
// a chunk of data, as well as an auxiliary linking object. All derived objects
// are written to the parameterized [ObjectWriter]. If the amount of data is
// are written to the parameterized [ObjectClient]. If the amount of data is
// within the limit, one object is produced. Note that Slicer can write multiple
// objects, but returns the root object ID only.
//
// If current NeoFS epoch is specified via [Options.SetCurrentNeoFSEpoch], it is
// written to the metadata of all resulting objects as a creation epoch.
//
// See New for details.
func (x *Slicer) Slice(header object.Object, data io.Reader, signer neofscrypto.Signer, opts Options) (oid.ID, error) {
func slice(ow ObjectClient, header object.Object, data io.Reader, signer neofscrypto.Signer, opts Options) (oid.ID, error) {
var rootID oid.ID

objectPayloadLimit := childPayloadSizeLimit(opts)

var n int
bChunk := make([]byte, objectPayloadLimit)

writer, err := x.InitPayloadStream(header, signer, opts)
writer, err := initPayloadStream(ow, header, signer, opts)
if err != nil {
return rootID, fmt.Errorf("init writter: %w", err)
}
Expand Down Expand Up @@ -168,18 +203,16 @@ func (x *Slicer) Slice(header object.Object, data io.Reader, signer neofscrypto.
return rootID, nil
}

// InitPayloadStream works similar to [Slicer.Slice] but provides [PayloadWriter] allowing
// the caller to write data himself.
func (x *Slicer) InitPayloadStream(header object.Object, signer neofscrypto.Signer, opts Options) (*PayloadWriter, error) {
func initPayloadStream(ow ObjectClient, header object.Object, signer neofscrypto.Signer, opts Options) (*PayloadWriter, error) {
containerID, owner, err := headerData(header)
if err != nil {
return nil, err
}

if x.sessionToken != nil {
header.SetSessionToken(x.sessionToken)
if opts.sessionToken != nil {
header.SetSessionToken(opts.sessionToken)
// session issuer is a container owner.
issuer := x.sessionToken.Issuer()
issuer := opts.sessionToken.Issuer()
owner = issuer
header.SetOwnerID(&owner)
}
Expand All @@ -191,12 +224,12 @@ func (x *Slicer) InitPayloadStream(header object.Object, signer neofscrypto.Sign
res := &PayloadWriter{
isHeaderWriteStep: true,
headerObject: header,
stream: x.w,
stream: ow,
signer: signer,
container: containerID,
owner: owner,
currentEpoch: opts.currentNeoFSEpoch,
sessionToken: x.sessionToken,
sessionToken: opts.sessionToken,
rootMeta: newDynamicObjectMetadata(opts.withHomoChecksum),
childMeta: newDynamicObjectMetadata(opts.withHomoChecksum),
}
Expand All @@ -210,7 +243,7 @@ func (x *Slicer) InitPayloadStream(header object.Object, signer neofscrypto.Sign

// PayloadWriter is a single-object payload stream provided by Slicer.
type PayloadWriter struct {
stream ObjectWriter
stream ObjectClient

rootID oid.ID
headerObject object.Object
Expand Down Expand Up @@ -302,13 +335,13 @@ func (x *PayloadWriter) ID() oid.ID {
}

// writeIntermediateChild writes intermediate split-chain element with specified
// dynamicObjectMetadata to the configured ObjectWriter.
// dynamicObjectMetadata to the configured ObjectClient.
func (x *PayloadWriter) writeIntermediateChild(meta dynamicObjectMetadata) error {
return x._writeChild(meta, false, nil)
}

// writeIntermediateChild writes last split-chain element with specified
// dynamicObjectMetadata to the configured ObjectWriter. If rootIDHandler is
// dynamicObjectMetadata to the configured ObjectClient. If rootIDHandler is
// specified, ID of the resulting root object is passed into it.
func (x *PayloadWriter) writeLastChild(meta dynamicObjectMetadata, rootIDHandler func(id oid.ID)) error {
return x._writeChild(meta, true, rootIDHandler)
Expand Down Expand Up @@ -429,7 +462,7 @@ func flushObjectMetadata(signer neofscrypto.Signer, meta dynamicObjectMetadata,
return id, nil
}

func writeInMemObject(signer neofscrypto.Signer, w ObjectWriter, header object.Object, payload []byte, meta dynamicObjectMetadata, session *session.Object) (oid.ID, error) {
func writeInMemObject(signer neofscrypto.Signer, w ObjectClient, header object.Object, payload []byte, meta dynamicObjectMetadata, session *session.Object) (oid.ID, error) {
id, err := flushObjectMetadata(signer, meta, &header)
if err != nil {
return id, err
Expand Down
Loading

0 comments on commit 35e3d01

Please sign in to comment.