Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage incentives: batch service and backing store #1069

Closed
wants to merge 19 commits into from
14 changes: 11 additions & 3 deletions pkg/api/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"net/http"

"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/swarm"
Expand Down Expand Up @@ -43,14 +42,23 @@ func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}

pipe := builder.NewPipelineBuilder(ctx, s.Storer, requestModePut(r), requestEncrypt(r), batch)
address, err := builder.FeedPipeline(ctx, pipe, r.Body, r.ContentLength)
putter, err := newStamperPutter(s.Storer, s.post, s.signer, batch)
if err != nil {
logger.Debugf("bytes upload: get putter:%v", err)
logger.Error("bytes upload: putter")
jsonhttp.BadRequest(w, nil)
return
}

p := requestPipelineFn(putter, r)
address, err := p(ctx, r.Body, r.ContentLength)
if err != nil {
logger.Debugf("bytes upload: split write all: %v", err)
logger.Error("bytes upload: split write all")
jsonhttp.InternalServerError(w, nil)
return
}

if created {
_, err = tag.DoneSplit(address)
if err != nil {
Expand Down
42 changes: 42 additions & 0 deletions pkg/postage/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package postage

import (
"encoding/binary"
"math/big"
)

// Batch represents a postage batch, a payment on the blockchain.
type Batch struct {
ID []byte // batch ID
Value *big.Int // overall balance of the batch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incorrect or misleading comment

Start uint64 // block number the batch was created
Owner []byte // owner's ethereum address
Depth uint8 // batch depth, i.e., size = 2^{depth}
}

// MarshalBinary serialises a postage batch to a byte slice len 117.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure 117 is correct

func (b *Batch) MarshalBinary() ([]byte, error) {
out := make([]byte, 93)
copy(out, b.ID)
value := b.Value.Bytes()
copy(out[64-len(value):], value)
binary.BigEndian.PutUint64(out[64:72], b.Start)
copy(out[72:], b.Owner)
out[92] = b.Depth
return out, nil
}

// UnmarshalBinary deserialises the batch.
// Unsafe on slice index (len(buf) = 117) as only internally used in db.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

117?

func (b *Batch) UnmarshalBinary(buf []byte) error {
b.ID = buf[:32]
b.Value = big.NewInt(0).SetBytes(buf[32:64])
b.Start = binary.BigEndian.Uint64(buf[64:72])
b.Owner = buf[72:92]
b.Depth = buf[92]
return nil
}
44 changes: 44 additions & 0 deletions pkg/postage/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package postage_test

import (
"bytes"
"testing"

"github.com/ethersphere/bee/pkg/postage"
postagetesting "github.com/ethersphere/bee/pkg/postage/testing"
)

// TestBatchMarshalling tests the idempotence of binary marshal/unmarshal for a Batch.
func TestBatchMarshalling(t *testing.T) {
a, err := postagetesting.NewBatch()
if err != nil {
t.Fatal(err)
}

buf, err := a.MarshalBinary()
if err != nil {
t.Fatal(err)
}
if len(buf) != 93 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

117 -> 93 see above

t.Fatalf("invalid length for serialised batch. expected 93, got %d", len(buf))
}
b := &postage.Batch{}
if err := b.UnmarshalBinary(buf); err != nil {
t.Fatalf("unexpected error unmarshalling batch: %v", err)
}
if !bytes.Equal(b.ID, a.ID) {
t.Fatalf("id mismatch, expected %x, got %x", a.ID, b.ID)
}
if !bytes.Equal(b.Owner, a.Owner) {
t.Fatalf("owner mismatch, expected %x, got %x", a.Owner, b.Owner)
}
if a.Value.Uint64() != b.Value.Uint64() {
t.Fatalf("value mismatch, expected %d, got %d", a.Value.Uint64(), b.Value.Uint64())
}
if a.Start != b.Start {
t.Fatalf("start mismatch, expected %d, got %d", a.Start, b.Start)
}
if a.Depth != b.Depth {
t.Fatalf("depth mismatch, expected %d, got %d", a.Depth, b.Depth)
}
}
103 changes: 103 additions & 0 deletions pkg/postage/batchservice/batchservice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package batchservice

import (
"encoding/hex"
"fmt"
"math/big"

"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/postage"
)

// BatchService implements EventUpdater
type BatchService struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please do not call this service. This is a db/store/representation of onchain data
call it batches

cs postage.ChainState
storer postage.BatchStorer
logger logging.Logger
}

// NewBatchService will create a new BatchService
func NewBatchService(storer postage.BatchStorer, logger logging.Logger) (postage.EventUpdater, error) {
b := BatchService{
storer: storer,
logger: logger,
}

cs, err := storer.GetChainState()
if err != nil {
return nil, fmt.Errorf("new batch service: %v", err)
}
b.cs = *cs

return &b, nil
}

// Create will create a new batch and store it in the BatchStore.
func (svc *BatchService) Create(id, owner []byte, value *big.Int, depth uint8) error {
b := postage.Batch{
ID: id,
Owner: owner,
Value: value,
Start: svc.cs.Block,
Depth: depth,
}

err := svc.storer.Put(&b)
if err != nil {
return fmt.Errorf("CreateBatch: %w", err)
}

svc.logger.Debugf("created batch id %x", hex.EncodeToString(b.ID))
return nil
}

// TopUp implements the EventUpdater interface. It tops ups a batch with the
// given ID with the given amount of BZZ.
func (svc *BatchService) TopUp(id []byte, amount *big.Int) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use value instead of amount here

b, err := svc.storer.Get(id)
if err != nil {
return fmt.Errorf("TopUp: %w", err)
}

b.Value.Add(b.Value, amount)

err = svc.storer.Put(b)
if err != nil {
return fmt.Errorf("TopUp: %w", err)
}

svc.logger.Debugf("topped up batch id %x with %v", hex.EncodeToString(b.ID), b.Value)
return nil
}

// UpdateDepth implements the EventUpdater inteface. It sets the new depth of a
// batch with the given ID.
func (svc *BatchService) UpdateDepth(id []byte, depth uint8) error {
b, err := svc.storer.Get(id)
if err != nil {
return err
}

b.Depth = depth

err = svc.storer.Put(b)
if err != nil {
return fmt.Errorf("update depth: %w", err)
}

svc.logger.Debugf("updated depth of batch id %x to %d", hex.EncodeToString(b.ID), b.Depth)
return nil
}

// UpdatePrice implements the EventUpdater interface. It sets the current
// price from the chain in the service chain state.
func (svc *BatchService) UpdatePrice(price *big.Int) error {
svc.cs.Price = price

if err := svc.storer.PutChainState(&svc.cs); err != nil {
return fmt.Errorf("update price: %w", err)
}

svc.logger.Debugf("updated chain price to %s", svc.cs.Price)
return nil
}
Loading