Skip to content

Commit

Permalink
api: accept postage id header and propagate it to pipeline (#886)
Browse files Browse the repository at this point in the history
postage: new pkg for postage stamps, uploader stamping (#890)

* postage: new pkg for postage stamps, uploader stamping

* postage: amount->value, blockNumber big.Int-> uint64, stamp only has batch ID, not Batch

* postage: fix godoc and copyright

* postage, swarm:
 - swarm.Stamp as an interface
 - add postage/testing for mock Stamps
 - fix Stamp MarshalBinary to allow nil batch id and signature
 - add StampSize const

* postage: heed review feedback

Co-authored-by: acud <[email protected]>
  • Loading branch information
acud authored and zelig committed Nov 30, 2020
1 parent 770f91a commit 5af8c3a
Show file tree
Hide file tree
Showing 24 changed files with 950 additions and 35 deletions.
29 changes: 24 additions & 5 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package api

import (
"encoding/hex"
"errors"
"fmt"
"io"
Expand All @@ -27,11 +28,12 @@ import (
)

const (
SwarmPinHeader = "Swarm-Pin"
SwarmTagUidHeader = "Swarm-Tag-Uid"
SwarmEncryptHeader = "Swarm-Encrypt"
SwarmIndexDocumentHeader = "Swarm-Index-Document"
SwarmErrorDocumentHeader = "Swarm-Error-Document"
SwarmPinHeader = "Swarm-Pin"
SwarmTagUidHeader = "Swarm-Tag-Uid"
SwarmEncryptHeader = "Swarm-Encrypt"
SwarmIndexDocumentHeader = "Swarm-Index-Document"
SwarmErrorDocumentHeader = "Swarm-Error-Document"
SwarmPostageBatchIdHeader = "Swarm-Postage-Batch-Id"
)

// The size of buffer used for prefetching content with Langos.
Expand All @@ -49,6 +51,7 @@ const (
var (
errInvalidNameOrAddress = errors.New("invalid name or bzz address")
errNoResolver = errors.New("no resolver connected")
errInvalidPostageBatch = errors.New("invalid postage batch id")
)

// Service is the API service interface.
Expand Down Expand Up @@ -184,6 +187,22 @@ func requestEncrypt(r *http.Request) bool {
return strings.ToLower(r.Header.Get(SwarmEncryptHeader)) == "true"
}

func requestPostageBatchId(r *http.Request) ([]byte, error) {
if h := strings.ToLower(r.Header.Get(SwarmPostageBatchIdHeader)); h != "" {
if len(h) != 64 {
return nil, errInvalidPostageBatch
}
b, err := hex.DecodeString(h)
if err != nil {
return nil, errInvalidPostageBatch
}
return b, nil
}

// fallback to a slice of 32 zeros
return make([]byte, 32), nil
}

func (s *server) newTracingHandler(spanName string) func(h http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
67 changes: 67 additions & 0 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package api_test

import (
"encoding/hex"
"errors"
"io/ioutil"
"net/http"
Expand All @@ -14,11 +15,14 @@ import (
"time"

"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/pss"
"github.com/ethersphere/bee/pkg/resolver"
resolverMock "github.com/ethersphere/bee/pkg/resolver/mock"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/traversal"
Expand Down Expand Up @@ -163,3 +167,66 @@ func TestParseName(t *testing.T) {
})
}
}

// TestPostageHeaderError tests that incorrect postage batch ids
// provided to the api correct the appropriate error code.
func TestPostageHeaderError(t *testing.T) {
var (
mockStorer = mock.NewStorer()
mockStatestore = statestore.NewStateStore()
logger = logging.New(ioutil.Discard, 0)
client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer,
Tags: tags.NewTags(mockStatestore, logger),
Logger: logger,
})
)

for _, tc := range []struct {
name string
endpoint string
batchId []byte
expErr bool
}{
{
name: "bytes",
endpoint: "/bytes",
batchId: []byte{0},
expErr: true,
},
{
name: "bytes",
endpoint: "/bytes",
batchId: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, //32 bytes - ok
},
{
name: "bytes",
endpoint: "/bytes",
batchId: []byte{}, // empty batch id falls back to the default zero batch id, so expect 200 OK. This will change once we do not fall back to a default value
},
{
name: "dirs",
endpoint: "/dirs",
batchId: []byte{0},
expErr: true,
},
{
name: "files",
endpoint: "/files",
batchId: []byte{0},
expErr: true,
},
} {

t.Run(tc.name, func(t *testing.T) {
hexbatch := hex.EncodeToString(tc.batchId)
expCode := http.StatusOK
if tc.expErr {
expCode = http.StatusBadRequest
}
jsonhttptest.Request(t, client, http.MethodPost, tc.endpoint, expCode,
jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, hexbatch),
)
})
}
}
10 changes: 9 additions & 1 deletion pkg/api/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@ func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
// Add the tag to the context
ctx := sctx.SetTag(r.Context(), tag)

pipe := builder.NewPipelineBuilder(ctx, s.Storer, requestModePut(r), requestEncrypt(r))
batch, err := requestPostageBatchId(r)
if err != nil {
logger.Debugf("bytes upload: postage batch id:%v", err)
logger.Error("bytes upload: postage batch id")
jsonhttp.BadRequest(w, nil)
return
}

pipe := builder.NewPipelineBuilder(ctx, s.Storer, requestModePut(r), requestEncrypt(r), batch)
address, err := builder.FeedPipeline(ctx, pipe, r.Body, r.ContentLength)
if err != nil {
logger.Debugf("bytes upload: split write all: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/bzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestBzz(t *testing.T) {
Logger: logging.New(ioutil.Discard, 5),
})
pipeWriteAll = func(r io.Reader, l int64) (swarm.Address, error) {
pipe := builder.NewPipelineBuilder(ctx, storer, storage.ModePutUpload, false)
pipe := builder.NewPipelineBuilder(ctx, storer, storage.ModePutUpload, false, nil)
return builder.FeedPipeline(ctx, pipe, r, l)
}
)
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestBzz(t *testing.T) {
}

// save manifest
m, err := manifest.NewDefaultManifest(false, storer)
m, err := manifest.NewDefaultManifest(false, storer, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
28 changes: 18 additions & 10 deletions pkg/api/dirs.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,15 @@ func (s *server) dirUploadHandler(w http.ResponseWriter, r *http.Request) {
// Add the tag to the context
ctx := sctx.SetTag(r.Context(), tag)

reference, err := storeDir(ctx, r.Body, s.Storer, requestModePut(r), s.Logger, requestEncrypt(r), r.Header.Get(SwarmIndexDocumentHeader), r.Header.Get(SwarmErrorDocumentHeader))
batch, err := requestPostageBatchId(r)
if err != nil {
logger.Debugf("dir upload: postage batch id:%v", err)
logger.Error("dir upload: postage batch id")
jsonhttp.InternalServerError(w, nil)
return
}

reference, err := storeDir(ctx, r.Body, s.Storer, requestModePut(r), s.Logger, requestEncrypt(r), r.Header.Get(SwarmIndexDocumentHeader), r.Header.Get(SwarmErrorDocumentHeader), batch)
if err != nil {
logger.Debugf("dir upload: store dir err: %v", err)
logger.Errorf("dir upload: store dir")
Expand Down Expand Up @@ -101,10 +109,10 @@ func validateRequest(r *http.Request) error {

// storeDir stores all files recursively contained in the directory given as a tar
// it returns the hash for the uploaded manifest corresponding to the uploaded dir
func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode storage.ModePut, log logging.Logger, encrypt bool, indexFilename string, errorFilename string) (swarm.Address, error) {
func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode storage.ModePut, log logging.Logger, encrypt bool, indexFilename string, errorFilename string, batch []byte) (swarm.Address, error) {
logger := tracing.NewLoggerWithTraceID(ctx, log)

dirManifest, err := manifest.NewDefaultManifest(encrypt, s)
dirManifest, err := manifest.NewDefaultManifest(encrypt, s, batch)
if err != nil {
return swarm.ZeroAddress, err
}
Expand Down Expand Up @@ -146,7 +154,7 @@ func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode
contentType: contentType,
reader: tarReader,
}
fileReference, err := storeFile(ctx, fileInfo, s, mode, encrypt)
fileReference, err := storeFile(ctx, fileInfo, s, mode, encrypt, batch)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("store dir file: %w", err)
}
Expand Down Expand Up @@ -196,7 +204,7 @@ func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode
return swarm.ZeroAddress, fmt.Errorf("metadata marshal: %w", err)
}

pipe := builder.NewPipelineBuilder(ctx, s, mode, encrypt)
pipe := builder.NewPipelineBuilder(ctx, s, mode, encrypt, batch)
mr, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split metadata: %w", err)
Expand All @@ -209,7 +217,7 @@ func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode
return swarm.ZeroAddress, fmt.Errorf("entry marshal: %w", err)
}

pipe = builder.NewPipelineBuilder(ctx, s, mode, encrypt)
pipe = builder.NewPipelineBuilder(ctx, s, mode, encrypt, batch)
manifestFileReference, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split entry: %w", err)
Expand All @@ -220,9 +228,9 @@ func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode

// storeFile uploads the given file and returns its reference
// this function was extracted from `fileUploadHandler` and should eventually replace its current code
func storeFile(ctx context.Context, fileInfo *fileUploadInfo, s storage.Storer, mode storage.ModePut, encrypt bool) (swarm.Address, error) {
func storeFile(ctx context.Context, fileInfo *fileUploadInfo, s storage.Storer, mode storage.ModePut, encrypt bool, batch []byte) (swarm.Address, error) {
// first store the file and get its reference
pipe := builder.NewPipelineBuilder(ctx, s, mode, encrypt)
pipe := builder.NewPipelineBuilder(ctx, s, mode, encrypt, batch)
fr, err := builder.FeedPipeline(ctx, pipe, fileInfo.reader, fileInfo.size)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split file: %w", err)
Expand All @@ -241,7 +249,7 @@ func storeFile(ctx context.Context, fileInfo *fileUploadInfo, s storage.Storer,
return swarm.ZeroAddress, fmt.Errorf("metadata marshal: %w", err)
}

pipe = builder.NewPipelineBuilder(ctx, s, mode, encrypt)
pipe = builder.NewPipelineBuilder(ctx, s, mode, encrypt, batch)
mr, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split metadata: %w", err)
Expand All @@ -253,7 +261,7 @@ func storeFile(ctx context.Context, fileInfo *fileUploadInfo, s storage.Storer,
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("entry marshal: %w", err)
}
pipe = builder.NewPipelineBuilder(ctx, s, mode, encrypt)
pipe = builder.NewPipelineBuilder(ctx, s, mode, encrypt, batch)
reference, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split entry: %w", err)
Expand Down
15 changes: 12 additions & 3 deletions pkg/api/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,16 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
fileSize = uint64(n)
reader = tmp
}
batch, err := requestPostageBatchId(r)
if err != nil {
logger.Debugf("file upload: postage batch id:%v", err)
logger.Error("file upload: postage batch id")
jsonhttp.BadRequest(w, nil)
return
}

// first store the file and get its reference
pipe := builder.NewPipelineBuilder(ctx, s.Storer, mode, requestEncrypt(r))
pipe := builder.NewPipelineBuilder(ctx, s.Storer, mode, requestEncrypt(r), batch)
fr, err := builder.FeedPipeline(ctx, pipe, reader, int64(fileSize))
if err != nil {
logger.Debugf("file upload: file store, file %q: %v", fileName, err)
Expand All @@ -177,7 +184,8 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.InternalServerError(w, "metadata marshal error")
return
}
pipe = builder.NewPipelineBuilder(ctx, s.Storer, mode, requestEncrypt(r))

pipe = builder.NewPipelineBuilder(ctx, s.Storer, mode, requestEncrypt(r), batch)
mr, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
if err != nil {
logger.Debugf("file upload: metadata store, file %q: %v", fileName, err)
Expand All @@ -195,7 +203,8 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.InternalServerError(w, "entry marshal error")
return
}
pipe = builder.NewPipelineBuilder(ctx, s.Storer, mode, requestEncrypt(r))

pipe = builder.NewPipelineBuilder(ctx, s.Storer, mode, requestEncrypt(r), batch)
reference, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
if err != nil {
logger.Debugf("file upload: entry store, file %q: %v", fileName, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func testSplitThenJoin(t *testing.T) {
paramstring = strings.Split(t.Name(), "/")
dataIdx, _ = strconv.ParseInt(paramstring[1], 10, 0)
store = mock.NewStorer()
p = builder.NewPipelineBuilder(context.Background(), store, storage.ModePutUpload, false)
p = builder.NewPipelineBuilder(context.Background(), store, storage.ModePutUpload, false, nil)
data, _ = test.GetVector(t, int(dataIdx))
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/file/joiner/joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestEncryptDecrypt(t *testing.T) {
t.Fatal(err)
}
ctx := context.Background()
pipe := builder.NewPipelineBuilder(ctx, store, storage.ModePutUpload, true)
pipe := builder.NewPipelineBuilder(ctx, store, storage.ModePutUpload, true, nil)
testDataReader := bytes.NewReader(testData)
resultAddress, err := builder.FeedPipeline(ctx, pipe, testDataReader, int64(len(testData)))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/file/pipeline/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

// NewPipelineBuilder returns the appropriate pipeline according to the specified parameters
func NewPipelineBuilder(ctx context.Context, s storage.Putter, mode storage.ModePut, encrypt bool) pipeline.Interface {
func NewPipelineBuilder(ctx context.Context, s storage.Putter, mode storage.ModePut, encrypt bool, _ []byte) pipeline.Interface {
if encrypt {
return newEncryptionPipeline(ctx, s, mode)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/file/pipeline/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

func TestPartialWrites(t *testing.T) {
m := mock.NewStorer()
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false)
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false, nil)
_, _ = p.Write([]byte("hello "))
_, _ = p.Write([]byte("world"))

Expand All @@ -38,7 +38,7 @@ func TestPartialWrites(t *testing.T) {

func TestHelloWorld(t *testing.T) {
m := mock.NewStorer()
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false)
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false, nil)

data := []byte("hello world")
_, err := p.Write(data)
Expand All @@ -61,7 +61,7 @@ func TestAllVectors(t *testing.T) {
data, expect := test.GetVector(t, i)
t.Run(fmt.Sprintf("data length %d, vector %d", len(data), i), func(t *testing.T) {
m := mock.NewStorer()
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false)
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false, nil)

_, err := p.Write(data)
if err != nil {
Expand Down Expand Up @@ -122,7 +122,7 @@ func benchmarkPipeline(b *testing.B, count int) {
b.StopTimer()

m := mock.NewStorer()
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false)
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false, nil)
data := make([]byte, count)
_, err := rand.Read(data)
if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions pkg/manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,23 @@ type Entry interface {
func NewDefaultManifest(
encrypted bool,
storer storage.Storer,
batch []byte,
) (Interface, error) {
return NewManifest(DefaultManifestType, encrypted, storer)
return NewManifest(DefaultManifestType, encrypted, storer, batch)
}

// NewManifest creates a new manifest.
func NewManifest(
manifestType string,
encrypted bool,
storer storage.Storer,
batch []byte,
) (Interface, error) {
switch manifestType {
case ManifestSimpleContentType:
return NewSimpleManifest(encrypted, storer)
return NewSimpleManifest(encrypted, storer, batch)
case ManifestMantarayContentType:
return NewMantarayManifest(encrypted, storer)
return NewMantarayManifest(encrypted, storer, batch)
default:
return nil, ErrInvalidManifestType
}
Expand Down
Loading

0 comments on commit 5af8c3a

Please sign in to comment.