Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

localstore: preliminary changes related to postage stamps/batches #933

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package api

import (
"context"
"encoding/hex"
"errors"
"fmt"
"io"
Expand All @@ -29,11 +30,12 @@ import (
)

const (
SwarmPinHeader = "Swarm-Pin"
SwarmTagUidHeader = "Swarm-Tag-Uid"
SwarmEncryptHeader = "Swarm-Encrypt"
SwarmIndexDocumentHeader = "Swarm-Index-Document"
SwarmErrorDocumentHeader = "Swarm-Error-Document"
SwarmPinHeader = "Swarm-Pin"
SwarmTagUidHeader = "Swarm-Tag-Uid"
SwarmEncryptHeader = "Swarm-Encrypt"
SwarmIndexDocumentHeader = "Swarm-Index-Document"
SwarmErrorDocumentHeader = "Swarm-Error-Document"
SwarmPostageBatchIdHeader = "Swarm-Postage-Batch-Id"
)

// The size of buffer used for prefetching content with Langos.
Expand All @@ -51,6 +53,7 @@ const (
var (
errInvalidNameOrAddress = errors.New("invalid name or bzz address")
errNoResolver = errors.New("no resolver connected")
errInvalidPostageBatch = errors.New("invalid postage batch id")
)

// Service is the API service interface.
Expand Down Expand Up @@ -186,6 +189,22 @@ func requestEncrypt(r *http.Request) bool {
return strings.ToLower(r.Header.Get(SwarmEncryptHeader)) == "true"
}

func requestPostageBatchId(r *http.Request) ([]byte, error) {
if h := strings.ToLower(r.Header.Get(SwarmPostageBatchIdHeader)); h != "" {
if len(h) != 64 {
return nil, errInvalidPostageBatch
}
b, err := hex.DecodeString(h)
if err != nil {
return nil, errInvalidPostageBatch
}
return b, nil
}

// fallback to a slice of 32 zeros
return make([]byte, 32), nil
}

func (s *server) newTracingHandler(spanName string) func(h http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -250,10 +269,10 @@ func equalASCIIFold(s, t string) bool {

type pipelineFunc func(context.Context, io.Reader, int64) (swarm.Address, error)

func requestPipelineFn(s storage.Storer, r *http.Request) pipelineFunc {
func requestPipelineFn(s storage.Storer, r *http.Request, batch []byte) pipelineFunc {
mode, encrypt := requestModePut(r), requestEncrypt(r)
return func(ctx context.Context, r io.Reader, l int64) (swarm.Address, error) {
pipe := builder.NewPipelineBuilder(ctx, s, mode, encrypt)
pipe := builder.NewPipelineBuilder(ctx, s, mode, encrypt, nil)
return builder.FeedPipeline(ctx, pipe, r, l)
}
}
67 changes: 67 additions & 0 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package api_test

import (
"encoding/hex"
"errors"
"io/ioutil"
"net/http"
Expand All @@ -14,11 +15,14 @@ import (
"time"

"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/pss"
"github.com/ethersphere/bee/pkg/resolver"
resolverMock "github.com/ethersphere/bee/pkg/resolver/mock"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/traversal"
Expand Down Expand Up @@ -163,3 +167,66 @@ func TestParseName(t *testing.T) {
})
}
}

// TestPostageHeaderError tests that incorrect postage batch ids
// provided to the api correct the appropriate error code.
func TestPostageHeaderError(t *testing.T) {
var (
mockStorer = mock.NewStorer()
mockStatestore = statestore.NewStateStore()
logger = logging.New(ioutil.Discard, 0)
client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer,
Tags: tags.NewTags(mockStatestore, logger),
Logger: logger,
})
)

for _, tc := range []struct {
name string
endpoint string
batchId []byte
expErr bool
}{
{
name: "bytes",
endpoint: "/bytes",
batchId: []byte{0},
expErr: true,
},
{
name: "bytes",
endpoint: "/bytes",
batchId: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, //32 bytes - ok
},
{
name: "bytes",
endpoint: "/bytes",
batchId: []byte{}, // empty batch id falls back to the default zero batch id, so expect 200 OK. This will change once we do not fall back to a default value
},
{
name: "dirs",
endpoint: "/dirs",
batchId: []byte{0},
expErr: true,
},
{
name: "files",
endpoint: "/files",
batchId: []byte{0},
expErr: true,
},
} {

t.Run(tc.name, func(t *testing.T) {
hexbatch := hex.EncodeToString(tc.batchId)
expCode := http.StatusOK
if tc.expErr {
expCode = http.StatusBadRequest
}
jsonhttptest.Request(t, client, http.MethodPost, tc.endpoint, expCode,
jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, hexbatch),
)
})
}
}
10 changes: 9 additions & 1 deletion pkg/api/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@ func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
// Add the tag to the context
ctx := sctx.SetTag(r.Context(), tag)

pipe := builder.NewPipelineBuilder(ctx, s.Storer, requestModePut(r), requestEncrypt(r))
batch, err := requestPostageBatchId(r)
if err != nil {
logger.Debugf("bytes upload: postage batch id:%v", err)
logger.Error("bytes upload: postage batch id")
jsonhttp.BadRequest(w, nil)
return
}

pipe := builder.NewPipelineBuilder(ctx, s.Storer, requestModePut(r), requestEncrypt(r), batch)
address, err := builder.FeedPipeline(ctx, pipe, r.Body, r.ContentLength)
if err != nil {
logger.Debugf("bytes upload: split write all: %v", err)
Expand Down
3 changes: 1 addition & 2 deletions pkg/api/bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ import (
"net/http"
"testing"

statestore "github.com/ethersphere/bee/pkg/statestore/mock"

"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
Expand Down
5 changes: 2 additions & 3 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"path"
"strings"

"github.com/gorilla/mux"

"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
Expand All @@ -26,6 +24,7 @@ import (
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/gorilla/mux"
)

func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -109,7 +108,7 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
m, err := manifest.NewManifestReference(
manifestMetadata.MimeType,
e.Reference(),
loadsave.New(s.Storer, storage.ModePutRequest, false), // mode and encryption values are fallback
loadsave.New(s.Storer, storage.ModePutRequest, false, nil), // mode and encryption values are fallback
)
if err != nil {
logger.Debugf("bzz download: not manifest %s: %v", address, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/bzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestBzz(t *testing.T) {
Logger: logging.New(ioutil.Discard, 5),
})
pipeWriteAll = func(r io.Reader, l int64) (swarm.Address, error) {
pipe := builder.NewPipelineBuilder(ctx, storer, storage.ModePutUpload, false)
pipe := builder.NewPipelineBuilder(ctx, storer, storage.ModePutUpload, false, nil)
return builder.FeedPipeline(ctx, pipe, r, l)
}
)
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestBzz(t *testing.T) {
}

// save manifest
m, err := manifest.NewDefaultManifest(loadsave.New(storer, storage.ModePutRequest, false))
m, err := manifest.NewDefaultManifest(loadsave.New(storer, storage.ModePutRequest, false, nil))
if err != nil {
t.Fatal(err)
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/api/dirs.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,17 @@ func (s *server) dirUploadHandler(w http.ResponseWriter, r *http.Request) {

// Add the tag to the context
ctx := sctx.SetTag(r.Context(), tag)
p := requestPipelineFn(s.Storer, r)
l := loadsave.New(s.Storer, requestModePut(r), requestEncrypt(r))

batch, err := requestPostageBatchId(r)
if err != nil {
logger.Debugf("dir upload: postage batch id:%v", err)
logger.Error("dir upload: postage batch id")
jsonhttp.InternalServerError(w, nil)
return
}

p := requestPipelineFn(s.Storer, r, batch)
l := loadsave.New(s.Storer, requestModePut(r), requestEncrypt(r), batch)
reference, err := storeDir(ctx, r.Body, s.Logger, p, l, r.Header.Get(SwarmIndexDocumentHeader), r.Header.Get(SwarmErrorDocumentHeader))
if err != nil {
logger.Debugf("dir upload: store dir err: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/dirs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func TestDirs(t *testing.T) {
verifyManifest, err := manifest.NewManifestReference(
manifest.DefaultManifestType,
e.Reference(),
loadsave.New(storer, storage.ModePutRequest, false),
loadsave.New(storer, storage.ModePutRequest, false, nil),
)
if err != nil {
t.Fatal(err)
Expand Down
9 changes: 8 additions & 1 deletion pkg/api/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,15 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
fileSize = uint64(n)
reader = tmp
}
batch, err := requestPostageBatchId(r)
if err != nil {
logger.Debugf("file upload: postage batch id:%v", err)
logger.Error("file upload: postage batch id")
jsonhttp.BadRequest(w, nil)
return
}

p := requestPipelineFn(s.Storer, r)
p := requestPipelineFn(s.Storer, r, batch)

// first store the file and get its reference
fr, err := p(ctx, reader, int64(fileSize))
Expand Down
2 changes: 1 addition & 1 deletion pkg/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func testSplitThenJoin(t *testing.T) {
paramstring = strings.Split(t.Name(), "/")
dataIdx, _ = strconv.ParseInt(paramstring[1], 10, 0)
store = mock.NewStorer()
p = builder.NewPipelineBuilder(context.Background(), store, storage.ModePutUpload, false)
p = builder.NewPipelineBuilder(context.Background(), store, storage.ModePutUpload, false, nil)
data, _ = test.GetVector(t, int(dataIdx))
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/file/joiner/joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestEncryptDecrypt(t *testing.T) {
t.Fatal(err)
}
ctx := context.Background()
pipe := builder.NewPipelineBuilder(ctx, store, storage.ModePutUpload, true)
pipe := builder.NewPipelineBuilder(ctx, store, storage.ModePutUpload, true, nil)
testDataReader := bytes.NewReader(testData)
resultAddress, err := builder.FeedPipeline(ctx, pipe, testDataReader, int64(len(testData)))
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/file/loadsave/loadsave.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type loadSave struct {
encrypted bool
}

func New(storer storage.Storer, mode storage.ModePut, enc bool) file.LoadSaver {
func New(storer storage.Storer, mode storage.ModePut, enc bool, _ []byte) file.LoadSaver {
return &loadSave{
storer: storer,
mode: mode,
Expand All @@ -45,7 +45,7 @@ func (ls *loadSave) Load(ctx context.Context, ref []byte) ([]byte, error) {
}

func (ls *loadSave) Save(ctx context.Context, data []byte) ([]byte, error) {
pipe := builder.NewPipelineBuilder(ctx, ls.storer, ls.mode, ls.encrypted)
pipe := builder.NewPipelineBuilder(ctx, ls.storer, ls.mode, ls.encrypted, nil)
address, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(data), int64(len(data)))
if err != nil {
return swarm.ZeroAddress.Bytes(), err
Expand Down
2 changes: 1 addition & 1 deletion pkg/file/pipeline/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

// NewPipelineBuilder returns the appropriate pipeline according to the specified parameters
func NewPipelineBuilder(ctx context.Context, s storage.Putter, mode storage.ModePut, encrypt bool) pipeline.Interface {
func NewPipelineBuilder(ctx context.Context, s storage.Putter, mode storage.ModePut, encrypt bool, batch []byte) pipeline.Interface {
if encrypt {
return newEncryptionPipeline(ctx, s, mode)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/file/pipeline/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

func TestPartialWrites(t *testing.T) {
m := mock.NewStorer()
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false)
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false, nil)
_, _ = p.Write([]byte("hello "))
_, _ = p.Write([]byte("world"))

Expand All @@ -38,7 +38,7 @@ func TestPartialWrites(t *testing.T) {

func TestHelloWorld(t *testing.T) {
m := mock.NewStorer()
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false)
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false, nil)

data := []byte("hello world")
_, err := p.Write(data)
Expand All @@ -61,7 +61,7 @@ func TestAllVectors(t *testing.T) {
data, expect := test.GetVector(t, i)
t.Run(fmt.Sprintf("data length %d, vector %d", len(data), i), func(t *testing.T) {
m := mock.NewStorer()
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false)
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false, nil)

_, err := p.Write(data)
if err != nil {
Expand Down Expand Up @@ -122,7 +122,7 @@ func benchmarkPipeline(b *testing.B, count int) {
b.StopTimer()

m := mock.NewStorer()
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false)
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false, nil)
data := make([]byte, count)
_, err := rand.Read(data)
if err != nil {
Expand Down
Loading