Skip to content

Commit

Permalink
pullsync, pushsync: add postage stamps (#1117)
Browse files Browse the repository at this point in the history
  • Loading branch information
acud committed Feb 9, 2021
1 parent 215c510 commit dc809bb
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 50 deletions.
94 changes: 74 additions & 20 deletions pkg/pullsync/pb/pullsync.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/pullsync/pb/pullsync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ message Want {
message Delivery {
bytes Address = 1;
bytes Data = 2;
bytes Stamp = 3;
}

12 changes: 7 additions & 5 deletions pkg/pullsync/pullstorage/mock/pullstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func WithIntervalsResp(addrs []swarm.Address, top uint64, err error) Option {
func WithChunks(chs ...swarm.Chunk) Option {
return optionFunc(func(p *PullStorage) {
for _, c := range chs {
p.chunks[c.Address().String()] = c.Data()
c := c
p.chunks[c.Address().String()] = c
}
})
}
Expand Down Expand Up @@ -67,7 +68,7 @@ type PullStorage struct {
putCalls int
setCalls int

chunks map[string][]byte
chunks map[string]swarm.Chunk
evilAddr swarm.Address
evilChunk swarm.Chunk

Expand All @@ -80,7 +81,7 @@ type PullStorage struct {
// NewPullStorage returns a new PullStorage mock.
func NewPullStorage(opts ...Option) *PullStorage {
s := &PullStorage{
chunks: make(map[string][]byte),
chunks: make(map[string]swarm.Chunk),
}
for _, v := range opts {
v.apply(s)
Expand Down Expand Up @@ -128,7 +129,7 @@ func (s *PullStorage) Get(_ context.Context, _ storage.ModeGet, addrs ...swarm.A
}

if v, ok := s.chunks[a.String()]; ok {
chs = append(chs, swarm.NewChunk(a, v))
chs = append(chs, v)
} else if !ok {
return nil, storage.ErrNotFound
}
Expand All @@ -141,7 +142,8 @@ func (s *PullStorage) Put(_ context.Context, _ storage.ModePut, chs ...swarm.Chu
s.mtx.Lock()
defer s.mtx.Unlock()
for _, c := range chs {
s.chunks[c.Address().String()] = c.Data()
c := c
s.chunks[c.Address().String()] = c
}
s.putCalls++
return nil
Expand Down
5 changes: 2 additions & 3 deletions pkg/pullsync/pullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8
delete(wantChunks, addr.String())
s.metrics.DbOpsCounter.Inc()
s.metrics.DeliveryCounter.Inc()

chunk := swarm.NewChunk(addr, delivery.Data)
chunk := swarm.NewChunk(addr, delivery.Data).WithStamp(delivery.Stamp)
if content.Valid(chunk) {
go s.unwrap(chunk)
} else if !soc.Valid(chunk) {
Expand Down Expand Up @@ -304,7 +303,7 @@ func (s *Syncer) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (er
}

for _, v := range chs {
deliver := pb.Delivery{Address: v.Address().Bytes(), Data: v.Data()}
deliver := pb.Delivery{Address: v.Address().Bytes(), Data: v.Data(), Stamp: v.Stamp()}
if err := w.WriteMsgWithContext(ctx, &deliver); err != nil {
return fmt.Errorf("write delivery: %w", err)
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/pullsync/pullsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/streamtest"
postagetesting "github.com/ethersphere/bee/pkg/postage/testing"
"github.com/ethersphere/bee/pkg/pullsync"
"github.com/ethersphere/bee/pkg/pullsync/pullstorage/mock"
testingc "github.com/ethersphere/bee/pkg/storage/testing"
Expand Down Expand Up @@ -141,7 +142,11 @@ func TestIncoming_WantAll(t *testing.T) {
func TestIncoming_UnsolicitedChunk(t *testing.T) {
evilAddr := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000666")
evilData := []byte{0x66, 0x66, 0x66}
evil := swarm.NewChunk(evilAddr, evilData)
stamp, err := postagetesting.MustNewStamp().MarshalBinary()
if err != nil {
t.Fatal(err)
}
evil := swarm.NewChunk(evilAddr, evilData).WithStamp(stamp)

var (
mockTopmost = uint64(5)
Expand All @@ -150,7 +155,7 @@ func TestIncoming_UnsolicitedChunk(t *testing.T) {
psClient, _ = newPullSync(recorder)
)

_, _, err := psClient.SyncInterval(context.Background(), swarm.ZeroAddress, 0, 0, 5)
_, _, err = psClient.SyncInterval(context.Background(), swarm.ZeroAddress, 0, 0, 5)
if !errors.Is(err, pullsync.ErrUnsolicitedChunk) {
t.Fatalf("expected ErrUnsolicitedChunk but got %v", err)
}
Expand Down
68 changes: 61 additions & 7 deletions pkg/pushsync/pb/pushsync.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/pushsync/pb/pushsync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ option go_package = "pb";
message Delivery {
bytes Address = 1;
bytes Data = 2;
bytes Stamp = 3;
}

message Receipt {
Expand Down
11 changes: 2 additions & 9 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/pushsync/pb"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/storage"
Expand Down Expand Up @@ -107,14 +106,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
ps.metrics.TotalReceived.Inc()

// these are needed until we wire up the protocol the pass the stamps
fallbackBatchID := make([]byte, 32)
fallbackSig := make([]byte, 65)
stamp := postage.NewStamp(fallbackBatchID, fallbackSig)
b, err := stamp.MarshalBinary()
if err != nil {
return err
}
chunk := swarm.NewChunk(swarm.NewAddress(ch.Address), ch.Data).WithStamp(b)
chunk := swarm.NewChunk(swarm.NewAddress(ch.Address), ch.Data).WithStamp(ch.Stamp)

if content.Valid(chunk) {
if ps.unwrap != nil {
Expand Down Expand Up @@ -236,6 +228,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk) (rr *pb.R
if err := w.WriteMsgWithContext(ctx, &pb.Delivery{
Address: ch.Address().Bytes(),
Data: ch.Data(),
Stamp: ch.Stamp(),
}); err != nil {
_ = streamer.Reset()
lastErr = fmt.Errorf("chunk %s deliver to peer %s: %w", ch.Address().String(), peer.String(), err)
Expand Down
Loading

0 comments on commit dc809bb

Please sign in to comment.