Skip to content

Commit

Permalink
support postage stamps in localstore and protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
zelig committed Mar 14, 2021
1 parent 88f49a2 commit f850603
Show file tree
Hide file tree
Showing 18 changed files with 259 additions and 134 deletions.
6 changes: 1 addition & 5 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,7 @@ func (p *stamperPutter) Put(ctx context.Context, mode storage.ModePut, chs ...sw
if err != nil {
return nil, err
}
b, err := stamp.MarshalBinary()
if err != nil {
return nil, err
}
chs[i] = c.WithStamp(b)
chs[i] = c.WithStamp(stamp)
}

return p.Storer.Put(ctx, mode, chs...)
Expand Down
14 changes: 12 additions & 2 deletions pkg/localstore/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ func (db *DB) Export(w io.Writer) (count int64, err error) {
if err := tw.WriteHeader(hdr); err != nil {
return false, err
}
if _, err := tw.Write(item.Stamp); err != nil {
if _, err := tw.Write(item.BatchID); err != nil {
return false, err
}
if _, err := tw.Write(item.Sig); err != nil {
return false, err
}
if _, err := tw.Write(item.Data); err != nil {
Expand Down Expand Up @@ -146,7 +149,14 @@ func (db *DB) Import(r io.Reader, legacy bool) (count int64, err error) {
case <-ctx.Done():
}
}
stamp := rawdata[:postage.StampSize]
stamp := postage.NewStamp(nil, nil)
err = stamp.UnmarshalBinary(rawdata[:postage.StampSize])
if err != nil {
select {
case errC <- err:
case <-ctx.Done():
}
}
data := rawdata[postage.StampSize:]
key := swarm.NewAddress(keybytes)

Expand Down
10 changes: 8 additions & 2 deletions pkg/localstore/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ func TestExportImport(t *testing.T) {
if err != nil {
t.Fatal(err)
}
stamp := ch.Stamp()
stamp, err := ch.Stamp().MarshalBinary()
if err != nil {
t.Fatal(err)
}
chunks[ch.Address().String()] = append(stamp, ch.Data()...)
}

Expand Down Expand Up @@ -72,7 +75,10 @@ func TestExportImport(t *testing.T) {
if err != nil {
t.Fatal(err)
}
stamp := ch.Stamp()
stamp, err := ch.Stamp().MarshalBinary()
if err != nil {
t.Fatal(err)
}
got := append(stamp, ch.Data()...)
if !bytes.Equal(got, want) {
t.Fatalf("chunk %s: got stamp+data %x, want %x", addr, got, want)
Expand Down
16 changes: 13 additions & 3 deletions pkg/localstore/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,23 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
b := make([]byte, headerSize)
binary.BigEndian.PutUint64(b[:8], fields.BinID)
binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
copy(b[16:], fields.Stamp)
stamp, err := postage.NewStamp(fields.BatchID, fields.Sig).MarshalBinary()
if err != nil {
return nil, err
}
copy(b[16:], stamp)
value = append(b, fields.Data...)
return value, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16]))
e.BinID = binary.BigEndian.Uint64(value[:8])
e.Stamp = value[16:headerSize]
stamp := postage.NewStamp(nil, nil)
if err = stamp.UnmarshalBinary(value[16:headerSize]); err != nil {
return e, err
}
e.BatchID = stamp.BatchID()
e.Sig = stamp.Sig()
e.Data = value[headerSize:]
return e, nil
},
Expand Down Expand Up @@ -479,7 +488,8 @@ func chunkToItem(ch swarm.Chunk) shed.Item {
Address: ch.Address().Bytes(),
Data: ch.Data(),
Tag: ch.TagID(),
Stamp: ch.Stamp(),
BatchID: ch.Stamp().BatchID(),
Sig: ch.Stamp().Sig(),
}
}

Expand Down
41 changes: 18 additions & 23 deletions pkg/localstore/localstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func newTestDB(t testing.TB, o *Options) *DB {
if _, err := rand.Read(baseKey); err != nil {
t.Fatal(err)
}

logger := logging.New(ioutil.Discard, 0)
db, err := New("", baseKey, o, logger)
if err != nil {
Expand Down Expand Up @@ -222,15 +223,15 @@ func TestGenerateTestRandomChunk(t *testing.T) {
t.Errorf("first chunk address length %v, want %v", addrLen, 32)
}
dataLen := len(c1.Data())
if dataLen != swarm.ChunkSize+swarm.SpanSize {
if dataLen != swarm.ChunkWithSpanSize {
t.Errorf("first chunk data length %v, want %v", dataLen, swarm.ChunkSize)
}
addrLen = len(c2.Address().Bytes())
if addrLen != 32 {
t.Errorf("second chunk address length %v, want %v", addrLen, 32)
}
dataLen = len(c2.Data())
if dataLen != swarm.ChunkSize+swarm.SpanSize {
if dataLen != swarm.ChunkWithSpanSize {
t.Errorf("second chunk data length %v, want %v", dataLen, swarm.ChunkSize)
}
if c1.Address().Equal(c2.Address()) {
Expand All @@ -251,7 +252,7 @@ func newRetrieveIndexesTest(db *DB, chunk swarm.Chunk, storeTimestamp, accessTim
if err != nil {
t.Fatal(err)
}
validateItem(t, item, chunk.Address().Bytes(), chunk.Data(), storeTimestamp, 0, chunk.Stamp())
validateItem(t, item, chunk.Address().Bytes(), chunk.Data(), storeTimestamp, 0)

// access index should not be set
wantErr := leveldb.ErrNotFound
Expand All @@ -272,14 +273,14 @@ func newRetrieveIndexesTestWithAccess(db *DB, ch swarm.Chunk, storeTimestamp, ac
if err != nil {
t.Fatal(err)
}
validateItem(t, item, ch.Address().Bytes(), ch.Data(), storeTimestamp, 0, ch.Stamp())
validateItem(t, item, ch.Address().Bytes(), ch.Data(), storeTimestamp, 0)

if accessTimestamp > 0 {
item, err = db.retrievalAccessIndex.Get(addressToItem(ch.Address()))
if err != nil {
t.Fatal(err)
}
validateItem(t, item, ch.Address().Bytes(), nil, 0, accessTimestamp, ch.Stamp())
validateItem(t, item, ch.Address().Bytes(), nil, 0, accessTimestamp)
}
}
}
Expand All @@ -298,7 +299,7 @@ func newPullIndexTest(db *DB, ch swarm.Chunk, binID uint64, wantError error) fun
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
validateItem(t, item, ch.Address().Bytes(), nil, 0, 0, ch.Stamp())
validateItem(t, item, ch.Address().Bytes(), nil, 0, 0)
}
}
}
Expand All @@ -317,7 +318,7 @@ func newPushIndexTest(db *DB, ch swarm.Chunk, storeTimestamp int64, wantError er
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
validateItem(t, item, ch.Address().Bytes(), nil, storeTimestamp, 0, ch.Stamp())
validateItem(t, item, ch.Address().Bytes(), nil, storeTimestamp, 0)
}
}
}
Expand All @@ -337,7 +338,7 @@ func newGCIndexTest(db *DB, chunk swarm.Chunk, storeTimestamp, accessTimestamp i
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
validateItem(t, item, chunk.Address().Bytes(), nil, 0, accessTimestamp, chunk.Stamp())
validateItem(t, item, chunk.Address().Bytes(), nil, 0, accessTimestamp)
}
}
}
Expand All @@ -355,7 +356,7 @@ func newPinIndexTest(db *DB, chunk swarm.Chunk, wantError error) func(t *testing
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
validateItem(t, item, chunk.Address().Bytes(), nil, 0, 0, chunk.Stamp())
validateItem(t, item, chunk.Address().Bytes(), nil, 0, 0)
}
}
}
Expand Down Expand Up @@ -438,7 +439,7 @@ func testItemsOrder(t *testing.T, i shed.Index, chunks []testIndexChunk, sortFun
}

// validateItem is a helper function that checks Item values.
func validateItem(t *testing.T, item shed.Item, address, data []byte, storeTimestamp, accessTimestamp int64, stamp []byte) {
func validateItem(t *testing.T, item shed.Item, address, data []byte, storeTimestamp, accessTimestamp int64) {
t.Helper()

if !bytes.Equal(item.Address, address) {
Expand All @@ -453,9 +454,6 @@ func validateItem(t *testing.T, item shed.Item, address, data []byte, storeTimes
if item.AccessTimestamp != accessTimestamp {
t.Errorf("got item access timestamp %v, want %v", item.AccessTimestamp, accessTimestamp)
}
if stamp != nil && item.Stamp != nil && !bytes.Equal(item.Stamp, stamp) {
t.Errorf("got stamp %v want %v", item.Stamp, stamp)
}
}

// setNow replaces now function and
Expand Down Expand Up @@ -517,7 +515,7 @@ func TestSetNow(t *testing.T) {
}
}

func testIndexCounts(t *testing.T, pushIndex, pullIndex, gcIndex, gcExcludeIndex, pinIndex, retrievalDataIndex, retrievalAccessIndex int, indexInfo map[string]int) {
func testIndexCounts(t *testing.T, pushIndex, pullIndex, gcIndex, pinIndex, retrievalDataIndex, retrievalAccessIndex int, indexInfo map[string]int) {
t.Helper()
if indexInfo["pushIndex"] != pushIndex {
t.Fatalf("pushIndex count mismatch. got %d want %d", indexInfo["pushIndex"], pushIndex)
Expand All @@ -531,10 +529,6 @@ func testIndexCounts(t *testing.T, pushIndex, pullIndex, gcIndex, gcExcludeIndex
t.Fatalf("gcIndex count mismatch. got %d want %d", indexInfo["gcIndex"], gcIndex)
}

if indexInfo["gcExcludeIndex"] != gcExcludeIndex {
t.Fatalf("gcExcludeIndex count mismatch. got %d want %d", indexInfo["gcExcludeIndex"], gcExcludeIndex)
}

if indexInfo["pinIndex"] != pinIndex {
t.Fatalf("pinIndex count mismatch. got %d want %d", indexInfo["pinIndex"], pinIndex)
}
Expand All @@ -552,7 +546,6 @@ func testIndexCounts(t *testing.T, pushIndex, pullIndex, gcIndex, gcExcludeIndex
// index debug function
func TestDBDebugIndexes(t *testing.T) {
db := newTestDB(t, nil)
ctx := context.Background()

uploadTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
Expand All @@ -561,7 +554,7 @@ func TestDBDebugIndexes(t *testing.T) {

ch := generateTestRandomChunk()

_, err := db.Put(ctx, storage.ModePutUpload, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
Expand All @@ -571,10 +564,11 @@ func TestDBDebugIndexes(t *testing.T) {
t.Fatal(err)
}

testIndexCounts(t, 1, 1, 0, 0, 0, 1, 0, indexCounts)
// for reference: testIndexCounts(t *testing.T, pushIndex, pullIndex, gcIndex, gcExcludeIndex, pinIndex, retrievalDataIndex, retrievalAccessIndex int, indexInfo map[string]int)
testIndexCounts(t, 1, 1, 0, 0, 1, 0, indexCounts)

// set the chunk for pinning and expect the index count to grow
err = db.Set(ctx, storage.ModeSetPin, ch.Address())
err = db.Set(context.Background(), storage.ModeSetPin, ch.Address())
if err != nil {
t.Fatal(err)
}
Expand All @@ -585,5 +579,6 @@ func TestDBDebugIndexes(t *testing.T) {
}

// assert that there's a pin and gc exclude entry now
testIndexCounts(t, 1, 1, 0, 1, 1, 1, 0, indexCounts)
testIndexCounts(t, 1, 1, 0, 1, 1, 0, indexCounts)

}
9 changes: 6 additions & 3 deletions pkg/localstore/mode_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (
"errors"
"time"

"github.com/syndtr/goleveldb/leveldb"

"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)

// Get returns a chunk from the database. If the chunk is
Expand All @@ -50,7 +50,10 @@ func (db *DB) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address)
}
return nil, err
}
return swarm.NewChunk(swarm.NewAddress(out.Address), out.Data).WithPinCounter(out.PinCounter).WithStamp(out.Stamp), nil
return swarm.NewChunk(swarm.NewAddress(out.Address), out.Data).
WithPinCounter(out.PinCounter).
// WithTag(out.Tag).
WithStamp(postage.NewStamp(out.BatchID, out.Sig)), nil
}

// get returns Item from the retrieval index
Expand Down
5 changes: 4 additions & 1 deletion pkg/localstore/mode_get_multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"time"

"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
Expand Down Expand Up @@ -50,7 +51,9 @@ func (db *DB) GetMulti(ctx context.Context, mode storage.ModeGet, addrs ...swarm
}
chunks = make([]swarm.Chunk, len(out))
for i, ch := range out {
chunks[i] = swarm.NewChunk(swarm.NewAddress(ch.Address), ch.Data).WithPinCounter(ch.PinCounter).WithStamp(ch.Stamp)
chunks[i] = swarm.NewChunk(swarm.NewAddress(ch.Address), ch.Data).
WithPinCounter(ch.PinCounter).
WithStamp(postage.NewStamp(ch.BatchID, ch.Sig))
}
return chunks, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/localstore/subscription_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop fun
}

select {
case chunks <- swarm.NewChunk(swarm.NewAddress(dataItem.Address), dataItem.Data).WithTagID(item.Tag).WithStamp(dataItem.Stamp):
case chunks <- swarm.NewChunk(swarm.NewAddress(dataItem.Address), dataItem.Data).WithTagID(item.Tag):
count++
// set next iteration start item
// when its chunk is successfully sent to channel
Expand Down
5 changes: 3 additions & 2 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
// register localstore unreserve function on the batchstore before batch service starts listening to blockchain events

batchStore, err := batchstore.New(stateStore, nil)
// batchStore, err := batchstore.New(stateStore, storer.UnreserveBatch)
if err != nil {
return nil, fmt.Errorf("batchstore: %w", err)
}
Expand Down Expand Up @@ -450,7 +451,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,

traversalService := traversal.NewService(ns)

pushSyncProtocol := pushsync.New(p2ps, storer, kad, tagService, pssService.TryUnwrap, logger, acc, accounting.NewFixedPricer(swarmAddress, 1000000000), tracer)
pushSyncProtocol := pushsync.New(p2ps, storer, kad, tagService, pssService.TryUnwrap, postage.ValidStamp(batchStore), logger, acc, accounting.NewFixedPricer(swarmAddress, 1000000000), tracer)

// set the pushSyncer in the PSS
pssService.SetPushSyncer(pushSyncProtocol)
Expand All @@ -470,7 +471,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,

pullStorage := pullstorage.New(storer)

pullSync := pullsync.New(p2ps, pullStorage, pssService.TryUnwrap, logger)
pullSync := pullsync.New(p2ps, pullStorage, pssService.TryUnwrap, postage.ValidStamp(batchStore), logger)
b.pullSyncCloser = pullSync

if err = p2ps.AddProtocol(pullSync.Protocol()); err != nil {
Expand Down
24 changes: 24 additions & 0 deletions pkg/postage/stamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"errors"

"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)

// StampSize is the number of bytes in the serialisation of a stamp
Expand Down Expand Up @@ -102,3 +104,25 @@ func toSignDigest(addr swarm.Address, id []byte) ([]byte, error) {
}
return h.Sum(nil), nil
}

func ValidStamp(batchStore Storer) func(chunk swarm.Chunk, stampBytes []byte) error {
return func(chunk swarm.Chunk, stampBytes []byte) error {
stamp := new(Stamp)
err := stamp.UnmarshalBinary(stampBytes)
if err != nil {
return err
}
b, err := batchStore.Get(stamp.BatchID())
if err != nil {
if errors.Is(err, leveldb.ErrNotFound) {
return storage.ErrNotFound
}
return err
}
if err = stamp.Valid(chunk.Address(), b.Owner); err != nil {
return err
}
chunk.WithStamp(stamp).WithBatch(b.Radius, b.Depth)
return nil
}
}
Loading

0 comments on commit f850603

Please sign in to comment.