Skip to content

Commit

Permalink
api: accept postage id header and propagate it to pipeline (#886)
Browse files Browse the repository at this point in the history
postage: new pkg for postage stamps, uploader stamping (#890)

* postage: new pkg for postage stamps, uploader stamping

* postage: amount->value, blockNumber big.Int-> uint64, stamp only has batch ID, not Batch

* postage: fix godoc and copyright

* postage, swarm:
 - swarm.Stamp as an interface
 - add postage/testing for mock Stamps
 - fix Stamp MarshalBinary to allow nil batch id and signature
 - add StampSize const

* postage: heed review feedback

Co-authored-by: acud <[email protected]>

Storage incentives: add stamper putter to api

Postage BatchStore and BatchService (#1070)

Co-authored-by: zelig <[email protected]>

add normalisedBalance to updater interface (#1108)

make value the normalised balance (#1111)

postage: add event listener (#1099)

Wire up postage stamp syncing (#1114)

localstore, shed: persist stamps (#1116)

add --postage to beeinfra.sh setup

pullsync, pushsync: add postage stamps (#1117)

postage: add create endpoint (#1142)

retrieve erc20 address from postage contract (#1169)

postage: check balance before attempting stamp creation (#1177)

postage: fix bucket depth (#1178)

api: use hex encoding in postage api (#1179)

increase page size (#1182)

postage: handle bucket depth error in api (#1183)

localstore: attach stamp to outgoing chunk (#1192)

update postage stamp contract addresses for new token (#1208)

batchstore: reserve (#1262)

* postage/batchstore: reserve logic

Co-authored-by: acud <[email protected]>

stamp support in storage and protocols (#1321)

api: endpoints for stamp issuers (#1535)

retrieval: add stamps (#1552)

localstore reserve logic (#1322)

Co-authored-by: acud <[email protected]>
  • Loading branch information
acud committed Apr 27, 2021
1 parent 473d025 commit cdf5f14
Show file tree
Hide file tree
Showing 93 changed files with 7,708 additions and 583 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/beekeeper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ jobs:
run: |
echo -e "127.0.0.10\tregistry.localhost" | sudo tee -a /etc/hosts
for ((i=0; i<REPLICA; i++)); do echo -e "127.0.1.$((i+1))\tbee-${i}.localhost bee-${i}-debug.localhost"; done | sudo tee -a /etc/hosts
<<<<<<< HEAD
timeout 30m ./beeinfra.sh install --local -r "${REPLICA}" --bootnode /dnsaddr/localhost --geth --k3s --pay-threshold 1000000000000
=======
timeout 30m ./beeinfra.sh install --local -r "${REPLICA}" --bootnode /dnsaddr/localhost --geth --k3s --postage
>>>>>>> a6c986c4 (api: accept postage id header and propagate it to pipeline (#886))
- name: Test pingpong
id: pingpong-1
run: until ./beekeeper check pingpong --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}"; do echo "waiting for pingpong..."; sleep .3; done
Expand Down Expand Up @@ -103,7 +107,7 @@ jobs:
cp /etc/rancher/k3s/k3s.yaml ~/.kube/config
- name: Set testing cluster (Node connection and clef enabled)
run: |
timeout 30m ./beeinfra.sh install --local -r "${REPLICA}" --geth --clef --k3s --pay-threshold 1000000000000
timeout 30m ./beeinfra.sh install --local -r "${REPLICA}" --geth --clef --k3s --pay-threshold 1000000000000 --postage
- name: Test pingpong
id: pingpong-2
run: until ./beekeeper check pingpong --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}"; do echo "waiting for pingpong..."; sleep .3; done
Expand Down
4 changes: 4 additions & 0 deletions cmd/bee/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ const (
optionNameSwapFactoryAddress = "swap-factory-address"
optionNameSwapInitialDeposit = "swap-initial-deposit"
optionNameSwapEnable = "swap-enable"
optionNamePostageContractAddress = "postage-stamp-address"
optionNamePriceOracleAddress = "price-oracle-address"
)

func init() {
Expand Down Expand Up @@ -224,6 +226,8 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().String(optionNameSwapFactoryAddress, "", "swap factory address")
cmd.Flags().String(optionNameSwapInitialDeposit, "100000000000000000", "initial deposit if deploying a new chequebook")
cmd.Flags().Bool(optionNameSwapEnable, true, "enable swap")
cmd.Flags().String(optionNamePostageContractAddress, "", "postage stamp contract address")
cmd.Flags().String(optionNamePriceOracleAddress, "", "price oracle address")
}

func newLogger(cmd *cobra.Command, verbosity string) (logging.Logger, error) {
Expand Down
2 changes: 2 additions & 0 deletions cmd/bee/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz
SwapFactoryAddress: c.config.GetString(optionNameSwapFactoryAddress),
SwapInitialDeposit: c.config.GetString(optionNameSwapInitialDeposit),
SwapEnable: c.config.GetBool(optionNameSwapEnable),
PostageContractAddress: c.config.GetString(optionNamePostageContractAddress),
PriceOracleAddress: c.config.GetString(optionNamePriceOracleAddress),
})
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,7 @@ honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9
resenje.org/daemon v0.1.2/go.mod h1:mF5JRpH3EbrxI9WoeKY78e6PqSsbBtX9jAQL5vj/GBA=
resenje.org/email v0.1.3/go.mod h1:OhAVLRG3vqd9NSgayN3pAgzxTmc2B6mAefgShZvEgf0=
resenje.org/jsonhttp v0.2.0/go.mod h1:EDyeguyTWj2fU3D3SCE0qNTgthzyEkHYLM1uu0uikHU=
resenje.org/logging v0.1.5 h1:dw2TEg2kw7lhDqCCH5SqC1pFVuIFcqnTkI5PzgOhopM=
resenje.org/logging v0.1.5/go.mod h1:1IdoCm3+UwYfsplxDGV2pHCkUrLlQzlWwp4r28XfPx4=
resenje.org/marshal v0.1.1/go.mod h1:P7Cla6Ju5CFvW4Y8JbRgWX1Hcy4L1w4qcCsyadO7G94=
resenje.org/recovery v0.1.1/go.mod h1:3S6aCVKMJEWsSAb61oZTteaiqkIfQPTr1RdiWnRbhME=
Expand Down
115 changes: 85 additions & 30 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package api

import (
"context"
"encoding/hex"
"errors"
"fmt"
"io"
Expand All @@ -19,11 +20,14 @@ import (
"time"
"unicode/utf8"

"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/logging"
m "github.com/ethersphere/bee/pkg/metrics"
"github.com/ethersphere/bee/pkg/pinning"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/postage/postagecontract"
"github.com/ethersphere/bee/pkg/pss"
"github.com/ethersphere/bee/pkg/resolver"
"github.com/ethersphere/bee/pkg/storage"
Expand All @@ -34,14 +38,15 @@ import (
)

const (
SwarmPinHeader = "Swarm-Pin"
SwarmTagHeader = "Swarm-Tag"
SwarmEncryptHeader = "Swarm-Encrypt"
SwarmIndexDocumentHeader = "Swarm-Index-Document"
SwarmErrorDocumentHeader = "Swarm-Error-Document"
SwarmFeedIndexHeader = "Swarm-Feed-Index"
SwarmFeedIndexNextHeader = "Swarm-Feed-Index-Next"
SwarmCollectionHeader = "Swarm-Collection"
SwarmPinHeader = "Swarm-Pin"
SwarmTagHeader = "Swarm-Tag"
SwarmEncryptHeader = "Swarm-Encrypt"
SwarmIndexDocumentHeader = "Swarm-Index-Document"
SwarmErrorDocumentHeader = "Swarm-Error-Document"
SwarmFeedIndexHeader = "Swarm-Feed-Index"
SwarmFeedIndexNextHeader = "Swarm-Feed-Index-Next"
SwarmCollectionHeader = "Swarm-Collection"
SwarmPostageBatchIdHeader = "Swarm-Postage-Batch-Id"
)

// The size of buffer used for prefetching content with Langos.
Expand Down Expand Up @@ -70,6 +75,7 @@ var (
invalidContentLength = errors.New("invalid content-length")
directoryStoreError = errors.New("could not store directory")
fileStoreError = errors.New("could not store file")
errInvalidPostageBatch = errors.New("invalid postage batch id")
)

// Service is the API service interface.
Expand All @@ -80,15 +86,18 @@ type Service interface {
}

type server struct {
tags *tags.Tags
storer storage.Storer
resolver resolver.Interface
pss pss.Interface
traversal traversal.Traverser
pinning pinning.Interface
logger logging.Logger
tracer *tracing.Tracer
feedFactory feeds.Factory
tags *tags.Tags
storer storage.Storer
resolver resolver.Interface
pss pss.Interface
traversal traversal.Traverser
pinning pinning.Interface
logger logging.Logger
tracer *tracing.Tracer
feedFactory feeds.Factory
signer crypto.Signer
post postage.Service
postageContract postagecontract.Interface
Options
http.Handler
metrics metrics
Expand All @@ -109,20 +118,23 @@ const (
)

// New will create a and initialize a new API service.
func New(tags *tags.Tags, storer storage.Storer, resolver resolver.Interface, pss pss.Interface, traversalService traversal.Traverser, pinning pinning.Interface, feedFactory feeds.Factory, logger logging.Logger, tracer *tracing.Tracer, o Options) Service {
func New(tags *tags.Tags, storer storage.Storer, resolver resolver.Interface, pss pss.Interface, traversalService traversal.Traverser, pinning pinning.Interface, feedFactory feeds.Factory, post postage.Service, postageContract postagecontract.Interface, signer crypto.Signer, logger logging.Logger, tracer *tracing.Tracer, o Options) Service {
s := &server{
tags: tags,
storer: storer,
resolver: resolver,
pss: pss,
traversal: traversalService,
pinning: pinning,
feedFactory: feedFactory,
Options: o,
logger: logger,
tracer: tracer,
metrics: newMetrics(),
quit: make(chan struct{}),
tags: tags,
storer: storer,
resolver: resolver,
pss: pss,
traversal: traversalService,
pinning: pinning,
feedFactory: feedFactory,
post: post,
postageContract: postageContract,
signer: signer,
Options: o,
logger: logger,
tracer: tracer,
metrics: newMetrics(),
quit: make(chan struct{}),
}

s.setupRouting()
Expand Down Expand Up @@ -211,6 +223,22 @@ func requestEncrypt(r *http.Request) bool {
return strings.ToLower(r.Header.Get(SwarmEncryptHeader)) == "true"
}

func requestPostageBatchId(r *http.Request) ([]byte, error) {
if h := strings.ToLower(r.Header.Get(SwarmPostageBatchIdHeader)); h != "" {
if len(h) != 64 {
return nil, errInvalidPostageBatch
}
b, err := hex.DecodeString(h)
if err != nil {
return nil, errInvalidPostageBatch
}
return b, nil
}

// fallback to a slice of 32 zeros
return make([]byte, 32), nil
}

func (s *server) newTracingHandler(spanName string) func(h http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -285,6 +313,33 @@ func equalASCIIFold(s, t string) bool {
return s == t
}

type stamperPutter struct {
storage.Storer
stamper postage.Stamper
}

func newStamperPutter(s storage.Storer, post postage.Service, signer crypto.Signer, batch []byte) (storage.Storer, error) {
i, err := post.GetStampIssuer(batch)
if err != nil {
return nil, fmt.Errorf("stamp issuer: %w", err)
}

stamper := postage.NewStamper(i, signer)
return &stamperPutter{Storer: s, stamper: stamper}, nil
}

func (p *stamperPutter) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err error) {
for i, c := range chs {
stamp, err := p.stamper.Stamp(c.Address())
if err != nil {
return nil, err
}
chs[i] = c.WithStamp(stamp)
}

return p.Storer.Put(ctx, mode, chs...)
}

type pipelineFunc func(context.Context, io.Reader, int64) (swarm.Address, error)

func requestPipelineFn(s storage.Storer, r *http.Request) pipelineFunc {
Expand Down
81 changes: 75 additions & 6 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package api_test

import (
"encoding/hex"
"errors"
"io"
"io/ioutil"
Expand All @@ -15,13 +16,19 @@ import (
"time"

"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/pinning"
mockpost "github.com/ethersphere/bee/pkg/postage/mock"
"github.com/ethersphere/bee/pkg/postage/postagecontract"
"github.com/ethersphere/bee/pkg/pss"
"github.com/ethersphere/bee/pkg/resolver"
resolverMock "github.com/ethersphere/bee/pkg/resolver/mock"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/traversal"
Expand All @@ -43,10 +50,14 @@ type testServerOptions struct {
PreventRedirect bool
Feeds feeds.Factory
CORSAllowedOrigins []string
PostageContract postagecontract.Interface
}

func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.Conn, string) {
t.Helper()
pk, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(pk)
mockPostage := mockpost.New()

if o.Logger == nil {
o.Logger = logging.New(ioutil.Discard, 0)
Expand All @@ -57,7 +68,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.
if o.WsPingPeriod == 0 {
o.WsPingPeriod = 60 * time.Second
}
s := api.New(o.Tags, o.Storer, o.Resolver, o.Pss, o.Traversal, o.Pinning, o.Feeds, o.Logger, nil, api.Options{
s := api.New(o.Tags, o.Storer, o.Resolver, o.Pss, o.Traversal, o.Pinning, o.Feeds, mockPostage, o.PostageContract, signer, o.Logger, nil, api.Options{
CORSAllowedOrigins: o.CORSAllowedOrigins,
GatewayMode: o.GatewayMode,
WsPingPeriod: o.WsPingPeriod,
Expand Down Expand Up @@ -116,11 +127,11 @@ func request(t *testing.T, client *http.Client, method, resource string, body io

func TestParseName(t *testing.T) {
const bzzHash = "89c17d0d8018a19057314aa035e61c9d23c47581a61dd3a79a7839692c617e4d"
log := logging.New(ioutil.Discard, 0)

testCases := []struct {
desc string
name string
log logging.Logger
res resolver.Interface
noResolver bool
wantAdr swarm.Address
Expand Down Expand Up @@ -165,17 +176,18 @@ func TestParseName(t *testing.T) {
},
}
for _, tC := range testCases {
if tC.log == nil {
tC.log = logging.New(ioutil.Discard, 0)
}
if tC.res == nil && !tC.noResolver {
tC.res = resolverMock.NewResolver(
resolverMock.WithResolveFunc(func(string) (swarm.Address, error) {
return tC.wantAdr, nil
}))
}

s := api.New(nil, nil, tC.res, nil, nil, nil, nil, tC.log, nil, api.Options{}).(*api.Server)
pk, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(pk)
mockPostage := mockpost.New()

s := api.New(nil, nil, tC.res, nil, nil, nil, nil, mockPostage, nil, signer, log, nil, api.Options{}).(*api.Server)

t.Run(tC.desc, func(t *testing.T) {
got, err := s.ResolveNameOrAddress(tC.name)
Expand Down Expand Up @@ -226,3 +238,60 @@ func TestCalculateNumberOfChunksEncrypted(t *testing.T) {
}
}
}

// TestPostageHeaderError tests that incorrect postage batch ids
// provided to the api correct the appropriate error code.
func TestPostageHeaderError(t *testing.T) {
var (
mockStorer = mock.NewStorer()
mockStatestore = statestore.NewStateStore()
logger = logging.New(ioutil.Discard, 0)
client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer,
Tags: tags.NewTags(mockStatestore, logger),
Logger: logger,
})
)

for _, tc := range []struct {
name string
endpoint string
batchId []byte
expErr bool
}{
{
name: "bytes",
endpoint: "/bytes",
batchId: []byte{0},
expErr: true,
},
{
name: "bytes",
endpoint: "/bytes",
batchId: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, //32 bytes - ok
},
{
name: "bytes",
endpoint: "/bytes",
batchId: []byte{}, // empty batch id falls back to the default zero batch id, so expect 200 OK. This will change once we do not fall back to a default value
},
{
name: "bzz",
endpoint: "/bzz",
batchId: []byte{0},
expErr: true,
},
} {

t.Run(tc.name, func(t *testing.T) {
hexbatch := hex.EncodeToString(tc.batchId)
expCode := http.StatusOK
if tc.expErr {
expCode = http.StatusBadRequest
}
jsonhttptest.Request(t, client, http.MethodPost, tc.endpoint, expCode,
jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, hexbatch),
)
})
}
}
Loading

0 comments on commit cdf5f14

Please sign in to comment.