From dc809bbb8130e01fbff1fbc080a7ea904c40b8a8 Mon Sep 17 00:00:00 2001 From: acud <12988138+acud@users.noreply.github.com> Date: Fri, 22 Jan 2021 10:13:42 +0100 Subject: [PATCH] pullsync, pushsync: add postage stamps (#1117) --- pkg/pullsync/pb/pullsync.pb.go | 94 +++++++++++++++----- pkg/pullsync/pb/pullsync.proto | 1 + pkg/pullsync/pullstorage/mock/pullstorage.go | 12 +-- pkg/pullsync/pullsync.go | 5 +- pkg/pullsync/pullsync_test.go | 9 +- pkg/pushsync/pb/pushsync.pb.go | 68 ++++++++++++-- pkg/pushsync/pb/pushsync.proto | 1 + pkg/pushsync/pushsync.go | 11 +-- pkg/storage/testing/chunk.go | 16 +++- 9 files changed, 167 insertions(+), 50 deletions(-) diff --git a/pkg/pullsync/pb/pullsync.pb.go b/pkg/pullsync/pb/pullsync.pb.go index a52a00d39c0..1e1f7bd1adf 100644 --- a/pkg/pullsync/pb/pullsync.pb.go +++ b/pkg/pullsync/pb/pullsync.pb.go @@ -349,6 +349,7 @@ func (m *Want) GetBitVector() []byte { type Delivery struct { Address []byte `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"` Data []byte `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"` + Stamp []byte `protobuf:"bytes,3,opt,name=Stamp,proto3" json:"Stamp,omitempty"` } func (m *Delivery) Reset() { *m = Delivery{} } @@ -398,6 +399,13 @@ func (m *Delivery) GetData() []byte { return nil } +func (m *Delivery) GetStamp() []byte { + if m != nil { + return m.Stamp + } + return nil +} + func init() { proto.RegisterType((*Syn)(nil), "pullsync.Syn") proto.RegisterType((*Ack)(nil), "pullsync.Ack") @@ -412,26 +420,27 @@ func init() { func init() { proto.RegisterFile("pullsync.proto", fileDescriptor_d1dee042cf9c065c) } var fileDescriptor_d1dee042cf9c065c = []byte{ - // 295 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x90, 0xbf, 0x4e, 0xf3, 0x30, - 0x10, 0xc0, 0xeb, 0x24, 0xed, 0xd7, 0xef, 0x54, 0x2a, 0xe4, 0x01, 0x45, 0xa8, 0x32, 0x95, 0xc5, - 0xd0, 0x89, 0x85, 0x05, 0x36, 0x9a, 0x56, 0xc0, 0x86, 0x64, 0x22, 0x90, 0xd8, 0xdc, 0xc4, 0x85, - 0x88, 0xd4, 0x8e, 0x6c, 0x07, 0x29, 0x6f, 0xc1, 0x63, 0x31, 0x76, 0x64, 0x44, 0xc9, 0x8b, 0xa0, - 0x98, 0x44, 0x2c, 0x4c, 0xfe, 0xfd, 0xee, 0x7c, 0x7f, 0x74, 0x30, 0x2d, 0xca, 0x3c, 0x37, 0x95, - 0x4c, 0xce, 0x0a, 0xad, 0xac, 0xc2, 0xe3, 0xde, 0xe9, 0x10, 0xfc, 0xfb, 0x4a, 0xd2, 0x13, 0xf0, - 0x97, 0xc9, 0x2b, 0x0e, 0xe1, 0xdf, 0xaa, 0xd4, 0x46, 0x69, 0x13, 0xa2, 0xb9, 0xbf, 0x08, 0x58, - 0xaf, 0xf4, 0x18, 0x02, 0x56, 0x66, 0x29, 0xc6, 0x3f, 0x6f, 0x88, 0xe6, 0x68, 0x71, 0xc0, 0x1c, - 0xd3, 0x19, 0x8c, 0x56, 0x5c, 0x26, 0x22, 0xff, 0x33, 0x7b, 0x05, 0xe3, 0x1b, 0x61, 0x19, 0x97, - 0xcf, 0x02, 0x1f, 0x82, 0x1f, 0x65, 0xd2, 0xa5, 0x87, 0xac, 0xc5, 0xb6, 0xe2, 0x5a, 0xab, 0x5d, - 0xe8, 0xcd, 0xd1, 0x22, 0x60, 0x8e, 0xf1, 0x14, 0xbc, 0x58, 0x85, 0xbe, 0x8b, 0x78, 0xb1, 0xa2, - 0x97, 0x30, 0xbc, 0xdb, 0x6e, 0x85, 0x6e, 0xd7, 0x8b, 0x55, 0xb1, 0x53, 0xc6, 0xba, 0x16, 0x01, - 0xeb, 0x15, 0x1f, 0xc1, 0xe8, 0x96, 0x9b, 0x17, 0x61, 0x5c, 0xa3, 0x09, 0xeb, 0x8c, 0x9e, 0x42, - 0xf0, 0xc8, 0xa5, 0xc5, 0x33, 0xf8, 0x1f, 0x65, 0xf6, 0x41, 0x24, 0x56, 0x69, 0x57, 0x3b, 0x61, - 0xbf, 0x01, 0x7a, 0x01, 0xe3, 0xb5, 0xc8, 0xb3, 0x37, 0xa1, 0xab, 0x76, 0xc6, 0x32, 0x4d, 0xb5, - 0x30, 0xa6, 0xfb, 0xd7, 0x6b, 0xbb, 0xea, 0x9a, 0x5b, 0xde, 0x4d, 0x70, 0x1c, 0xcd, 0x3e, 0x6a, - 0x82, 0xf6, 0x35, 0x41, 0x5f, 0x35, 0x41, 0xef, 0x0d, 0x19, 0xec, 0x1b, 0x32, 0xf8, 0x6c, 0xc8, - 0xe0, 0xc9, 0x2b, 0x36, 0x9b, 0x91, 0xbb, 0xf6, 0xf9, 0x77, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe5, - 0x7c, 0x69, 0x94, 0x7f, 0x01, 0x00, 0x00, + // 307 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x91, 0xcf, 0x4a, 0x03, 0x31, + 0x10, 0xc6, 0x9b, 0xfd, 0x53, 0xeb, 0x50, 0x8b, 0x04, 0x91, 0x45, 0x4a, 0x2c, 0xc1, 0x43, 0x4f, + 0x5e, 0x3c, 0x79, 0xb3, 0x7f, 0x50, 0x4f, 0x0a, 0x69, 0x51, 0xf0, 0x96, 0x6e, 0x53, 0x5d, 0xdc, + 0x26, 0x4b, 0x92, 0x15, 0xf6, 0x2d, 0x7c, 0x2c, 0x8f, 0x3d, 0x7a, 0x94, 0xdd, 0x17, 0x91, 0x4d, + 0x77, 0xf1, 0xe2, 0x29, 0xdf, 0x6f, 0x26, 0x33, 0xdf, 0x07, 0x03, 0x83, 0x2c, 0x4f, 0x53, 0x53, + 0xc8, 0xf8, 0x32, 0xd3, 0xca, 0x2a, 0xdc, 0x6b, 0x99, 0x86, 0xe0, 0x2f, 0x0a, 0x49, 0xcf, 0xc1, + 0x9f, 0xc4, 0xef, 0x38, 0x82, 0x83, 0x59, 0xae, 0x8d, 0xd2, 0x26, 0x42, 0x23, 0x7f, 0x1c, 0xb0, + 0x16, 0xe9, 0x19, 0x04, 0x2c, 0x4f, 0xd6, 0x18, 0xef, 0xdf, 0x08, 0x8d, 0xd0, 0xf8, 0x88, 0x39, + 0x4d, 0x87, 0xd0, 0x9d, 0x71, 0x19, 0x8b, 0xf4, 0xdf, 0xee, 0x0d, 0xf4, 0xee, 0x84, 0x65, 0x5c, + 0xbe, 0x0a, 0x7c, 0x0c, 0xfe, 0x34, 0x91, 0xae, 0x1d, 0xb2, 0x5a, 0xd6, 0x13, 0xb7, 0x5a, 0x6d, + 0x23, 0x6f, 0x84, 0xc6, 0x01, 0x73, 0x1a, 0x0f, 0xc0, 0x5b, 0xaa, 0xc8, 0x77, 0x15, 0x6f, 0xa9, + 0xe8, 0x35, 0x84, 0x8f, 0x9b, 0x8d, 0xd0, 0x75, 0xbc, 0xa5, 0xca, 0xb6, 0xca, 0x58, 0xb7, 0x22, + 0x60, 0x2d, 0xe2, 0x53, 0xe8, 0xde, 0x73, 0xf3, 0x26, 0x8c, 0x5b, 0xd4, 0x67, 0x0d, 0xd1, 0x0b, + 0x08, 0x9e, 0xb9, 0xb4, 0x78, 0x08, 0x87, 0xd3, 0xc4, 0x3e, 0x89, 0xd8, 0x2a, 0xed, 0x66, 0xfb, + 0xec, 0xaf, 0x40, 0x1f, 0xa0, 0x37, 0x17, 0x69, 0xf2, 0x21, 0x74, 0x51, 0x7b, 0x4c, 0xd6, 0x6b, + 0x2d, 0x8c, 0x69, 0xfe, 0xb5, 0x58, 0x47, 0x9d, 0x73, 0xcb, 0x1b, 0x07, 0xa7, 0xf1, 0x09, 0x84, + 0x0b, 0xcb, 0xb7, 0x99, 0x4b, 0xdb, 0x67, 0x7b, 0x98, 0x0e, 0xbf, 0x4a, 0x82, 0x76, 0x25, 0x41, + 0x3f, 0x25, 0x41, 0x9f, 0x15, 0xe9, 0xec, 0x2a, 0xd2, 0xf9, 0xae, 0x48, 0xe7, 0xc5, 0xcb, 0x56, + 0xab, 0xae, 0xbb, 0xc1, 0xd5, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xf7, 0xd1, 0x30, 0xa1, 0x95, + 0x01, 0x00, 0x00, } func (m *Syn) Marshal() (dAtA []byte, err error) { @@ -677,6 +686,13 @@ func (m *Delivery) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.Stamp) > 0 { + i -= len(m.Stamp) + copy(dAtA[i:], m.Stamp) + i = encodeVarintPullsync(dAtA, i, uint64(len(m.Stamp))) + i-- + dAtA[i] = 0x1a + } if len(m.Data) > 0 { i -= len(m.Data) copy(dAtA[i:], m.Data) @@ -815,6 +831,10 @@ func (m *Delivery) Size() (n int) { if l > 0 { n += 1 + l + sovPullsync(uint64(l)) } + l = len(m.Stamp) + if l > 0 { + n += 1 + l + sovPullsync(uint64(l)) + } return n } @@ -1550,6 +1570,40 @@ func (m *Delivery) Unmarshal(dAtA []byte) error { m.Data = []byte{} } iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Stamp", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPullsync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthPullsync + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthPullsync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Stamp = append(m.Stamp[:0], dAtA[iNdEx:postIndex]...) + if m.Stamp == nil { + m.Stamp = []byte{} + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipPullsync(dAtA[iNdEx:]) diff --git a/pkg/pullsync/pb/pullsync.proto b/pkg/pullsync/pb/pullsync.proto index 56a9753af49..0e82c5a8be7 100644 --- a/pkg/pullsync/pb/pullsync.proto +++ b/pkg/pullsync/pb/pullsync.proto @@ -40,5 +40,6 @@ message Want { message Delivery { bytes Address = 1; bytes Data = 2; + bytes Stamp = 3; } diff --git a/pkg/pullsync/pullstorage/mock/pullstorage.go b/pkg/pullsync/pullstorage/mock/pullstorage.go index 740f52e40cd..57077614823 100644 --- a/pkg/pullsync/pullstorage/mock/pullstorage.go +++ b/pkg/pullsync/pullstorage/mock/pullstorage.go @@ -35,7 +35,8 @@ func WithIntervalsResp(addrs []swarm.Address, top uint64, err error) Option { func WithChunks(chs ...swarm.Chunk) Option { return optionFunc(func(p *PullStorage) { for _, c := range chs { - p.chunks[c.Address().String()] = c.Data() + c := c + p.chunks[c.Address().String()] = c } }) } @@ -67,7 +68,7 @@ type PullStorage struct { putCalls int setCalls int - chunks map[string][]byte + chunks map[string]swarm.Chunk evilAddr swarm.Address evilChunk swarm.Chunk @@ -80,7 +81,7 @@ type PullStorage struct { // NewPullStorage returns a new PullStorage mock. func NewPullStorage(opts ...Option) *PullStorage { s := &PullStorage{ - chunks: make(map[string][]byte), + chunks: make(map[string]swarm.Chunk), } for _, v := range opts { v.apply(s) @@ -128,7 +129,7 @@ func (s *PullStorage) Get(_ context.Context, _ storage.ModeGet, addrs ...swarm.A } if v, ok := s.chunks[a.String()]; ok { - chs = append(chs, swarm.NewChunk(a, v)) + chs = append(chs, v) } else if !ok { return nil, storage.ErrNotFound } @@ -141,7 +142,8 @@ func (s *PullStorage) Put(_ context.Context, _ storage.ModePut, chs ...swarm.Chu s.mtx.Lock() defer s.mtx.Unlock() for _, c := range chs { - s.chunks[c.Address().String()] = c.Data() + c := c + s.chunks[c.Address().String()] = c } s.putCalls++ return nil diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 8ac86bb4aef..8c41d7f6fbc 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -215,8 +215,7 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 delete(wantChunks, addr.String()) s.metrics.DbOpsCounter.Inc() s.metrics.DeliveryCounter.Inc() - - chunk := swarm.NewChunk(addr, delivery.Data) + chunk := swarm.NewChunk(addr, delivery.Data).WithStamp(delivery.Stamp) if content.Valid(chunk) { go s.unwrap(chunk) } else if !soc.Valid(chunk) { @@ -304,7 +303,7 @@ func (s *Syncer) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (er } for _, v := range chs { - deliver := pb.Delivery{Address: v.Address().Bytes(), Data: v.Data()} + deliver := pb.Delivery{Address: v.Address().Bytes(), Data: v.Data(), Stamp: v.Stamp()} if err := w.WriteMsgWithContext(ctx, &deliver); err != nil { return fmt.Errorf("write delivery: %w", err) } diff --git a/pkg/pullsync/pullsync_test.go b/pkg/pullsync/pullsync_test.go index 02338af872d..9028730364a 100644 --- a/pkg/pullsync/pullsync_test.go +++ b/pkg/pullsync/pullsync_test.go @@ -14,6 +14,7 @@ import ( "github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p/streamtest" + postagetesting "github.com/ethersphere/bee/pkg/postage/testing" "github.com/ethersphere/bee/pkg/pullsync" "github.com/ethersphere/bee/pkg/pullsync/pullstorage/mock" testingc "github.com/ethersphere/bee/pkg/storage/testing" @@ -141,7 +142,11 @@ func TestIncoming_WantAll(t *testing.T) { func TestIncoming_UnsolicitedChunk(t *testing.T) { evilAddr := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000666") evilData := []byte{0x66, 0x66, 0x66} - evil := swarm.NewChunk(evilAddr, evilData) + stamp, err := postagetesting.MustNewStamp().MarshalBinary() + if err != nil { + t.Fatal(err) + } + evil := swarm.NewChunk(evilAddr, evilData).WithStamp(stamp) var ( mockTopmost = uint64(5) @@ -150,7 +155,7 @@ func TestIncoming_UnsolicitedChunk(t *testing.T) { psClient, _ = newPullSync(recorder) ) - _, _, err := psClient.SyncInterval(context.Background(), swarm.ZeroAddress, 0, 0, 5) + _, _, err = psClient.SyncInterval(context.Background(), swarm.ZeroAddress, 0, 0, 5) if !errors.Is(err, pullsync.ErrUnsolicitedChunk) { t.Fatalf("expected ErrUnsolicitedChunk but got %v", err) } diff --git a/pkg/pushsync/pb/pushsync.pb.go b/pkg/pushsync/pb/pushsync.pb.go index abc6159c745..401f96614fa 100644 --- a/pkg/pushsync/pb/pushsync.pb.go +++ b/pkg/pushsync/pb/pushsync.pb.go @@ -25,6 +25,7 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package type Delivery struct { Address []byte `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"` Data []byte `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"` + Stamp []byte `protobuf:"bytes,3,opt,name=Stamp,proto3" json:"Stamp,omitempty"` } func (m *Delivery) Reset() { *m = Delivery{} } @@ -74,6 +75,13 @@ func (m *Delivery) GetData() []byte { return nil } +func (m *Delivery) GetStamp() []byte { + if m != nil { + return m.Stamp + } + return nil +} + type Receipt struct { Address []byte `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"` } @@ -126,16 +134,17 @@ func init() { func init() { proto.RegisterFile("pushsync.proto", fileDescriptor_723cf31bfc02bfd6) } var fileDescriptor_723cf31bfc02bfd6 = []byte{ - // 139 bytes of a gzipped FileDescriptorProto + // 154 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2b, 0x28, 0x2d, 0xce, - 0x28, 0xae, 0xcc, 0x4b, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x80, 0xf1, 0x95, 0x2c, + 0x28, 0xae, 0xcc, 0x4b, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x80, 0xf1, 0x95, 0xfc, 0xb8, 0x38, 0x5c, 0x52, 0x73, 0x32, 0xcb, 0x52, 0x8b, 0x2a, 0x85, 0x24, 0xb8, 0xd8, 0x1d, 0x53, 0x52, 0x8a, 0x52, 0x8b, 0x8b, 0x25, 0x18, 0x15, 0x18, 0x35, 0x78, 0x82, 0x60, 0x5c, 0x21, 0x21, - 0x2e, 0x16, 0x97, 0xc4, 0x92, 0x44, 0x09, 0x26, 0xb0, 0x30, 0x98, 0xad, 0xa4, 0xcc, 0xc5, 0x1e, - 0x94, 0x9a, 0x9c, 0x9a, 0x59, 0x50, 0x82, 0x5b, 0xa3, 0x93, 0xcc, 0x89, 0x47, 0x72, 0x8c, 0x17, - 0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x38, 0xe1, 0xb1, 0x1c, 0xc3, 0x85, 0xc7, 0x72, 0x0c, - 0x37, 0x1e, 0xcb, 0x31, 0x44, 0x31, 0x15, 0x24, 0x25, 0xb1, 0x81, 0x5d, 0x63, 0x0c, 0x08, 0x00, - 0x00, 0xff, 0xff, 0x1b, 0x63, 0x50, 0x4a, 0x9f, 0x00, 0x00, 0x00, + 0x2e, 0x16, 0x97, 0xc4, 0x92, 0x44, 0x09, 0x26, 0xb0, 0x30, 0x98, 0x2d, 0x24, 0xc2, 0xc5, 0x1a, + 0x5c, 0x92, 0x98, 0x5b, 0x20, 0xc1, 0x0c, 0x16, 0x84, 0x70, 0x94, 0x94, 0xb9, 0xd8, 0x83, 0x52, + 0x93, 0x53, 0x33, 0x0b, 0x4a, 0x70, 0x1b, 0xe7, 0x24, 0x73, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, + 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, 0x4e, 0x78, 0x2c, 0xc7, 0x70, 0xe1, 0xb1, 0x1c, 0xc3, 0x8d, + 0xc7, 0x72, 0x0c, 0x51, 0x4c, 0x05, 0x49, 0x49, 0x6c, 0x60, 0x37, 0x1a, 0x03, 0x02, 0x00, 0x00, + 0xff, 0xff, 0x4b, 0x34, 0x1c, 0x27, 0xb5, 0x00, 0x00, 0x00, } func (m *Delivery) Marshal() (dAtA []byte, err error) { @@ -158,6 +167,13 @@ func (m *Delivery) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.Stamp) > 0 { + i -= len(m.Stamp) + copy(dAtA[i:], m.Stamp) + i = encodeVarintPushsync(dAtA, i, uint64(len(m.Stamp))) + i-- + dAtA[i] = 0x1a + } if len(m.Data) > 0 { i -= len(m.Data) copy(dAtA[i:], m.Data) @@ -230,6 +246,10 @@ func (m *Delivery) Size() (n int) { if l > 0 { n += 1 + l + sovPushsync(uint64(l)) } + l = len(m.Stamp) + if l > 0 { + n += 1 + l + sovPushsync(uint64(l)) + } return n } @@ -349,6 +369,40 @@ func (m *Delivery) Unmarshal(dAtA []byte) error { m.Data = []byte{} } iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Stamp", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPushsync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthPushsync + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthPushsync + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Stamp = append(m.Stamp[:0], dAtA[iNdEx:postIndex]...) + if m.Stamp == nil { + m.Stamp = []byte{} + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipPushsync(dAtA[iNdEx:]) diff --git a/pkg/pushsync/pb/pushsync.proto b/pkg/pushsync/pb/pushsync.proto index e6a271ddf66..9cb88a6940e 100644 --- a/pkg/pushsync/pb/pushsync.proto +++ b/pkg/pushsync/pb/pushsync.proto @@ -11,6 +11,7 @@ option go_package = "pb"; message Delivery { bytes Address = 1; bytes Data = 2; + bytes Stamp = 3; } message Receipt { diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 64f0dd3c777..a1ed095d86e 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -15,7 +15,6 @@ import ( "github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p/protobuf" - "github.com/ethersphere/bee/pkg/postage" "github.com/ethersphere/bee/pkg/pushsync/pb" "github.com/ethersphere/bee/pkg/soc" "github.com/ethersphere/bee/pkg/storage" @@ -107,14 +106,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ps.metrics.TotalReceived.Inc() // these are needed until we wire up the protocol the pass the stamps - fallbackBatchID := make([]byte, 32) - fallbackSig := make([]byte, 65) - stamp := postage.NewStamp(fallbackBatchID, fallbackSig) - b, err := stamp.MarshalBinary() - if err != nil { - return err - } - chunk := swarm.NewChunk(swarm.NewAddress(ch.Address), ch.Data).WithStamp(b) + chunk := swarm.NewChunk(swarm.NewAddress(ch.Address), ch.Data).WithStamp(ch.Stamp) if content.Valid(chunk) { if ps.unwrap != nil { @@ -236,6 +228,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk) (rr *pb.R if err := w.WriteMsgWithContext(ctx, &pb.Delivery{ Address: ch.Address().Bytes(), Data: ch.Data(), + Stamp: ch.Stamp(), }); err != nil { _ = streamer.Reset() lastErr = fmt.Errorf("chunk %s deliver to peer %s: %w", ch.Address().String(), peer.String(), err) diff --git a/pkg/storage/testing/chunk.go b/pkg/storage/testing/chunk.go index 4198ce30514..eb7825b390c 100644 --- a/pkg/storage/testing/chunk.go +++ b/pkg/storage/testing/chunk.go @@ -26,30 +26,38 @@ import ( "github.com/ethersphere/bee/pkg/swarm" ) +var mockStamp []byte + // fixtreuChunks are pregenerated content-addressed chunks necessary for explicit // test scenarios where random generated chunks are not good enough. var fixtureChunks = map[string]swarm.Chunk{ "0025": swarm.NewChunk( swarm.MustParseHexAddress("0025737be11979e91654dffd2be817ac1e52a2dadb08c97a7cef12f937e707bc"), []byte{72, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 149, 179, 31, 244, 146, 247, 129, 123, 132, 248, 215, 77, 44, 47, 91, 248, 229, 215, 89, 156, 210, 243, 3, 110, 204, 74, 101, 119, 53, 53, 145, 188, 193, 153, 130, 197, 83, 152, 36, 140, 150, 209, 191, 214, 193, 4, 144, 121, 32, 45, 205, 220, 59, 227, 28, 43, 161, 51, 108, 14, 106, 180, 135, 2}, - ), + ).WithStamp(mockStamp), "0033": swarm.NewChunk( swarm.MustParseHexAddress("0033153ac8cfb0c343db1795f578c15ed8ef827f3e68ed3c58329900bf0d7276"), []byte{72, 0, 0, 0, 0, 0, 0, 0, 170, 117, 0, 0, 0, 0, 0, 0, 21, 157, 63, 86, 45, 17, 166, 184, 47, 126, 58, 172, 242, 77, 153, 249, 97, 5, 107, 244, 23, 153, 220, 255, 254, 47, 209, 24, 63, 58, 126, 142, 41, 79, 201, 182, 178, 227, 235, 223, 63, 11, 220, 155, 40, 181, 56, 204, 91, 44, 51, 185, 95, 155, 245, 235, 187, 250, 103, 49, 139, 184, 46, 199}, - ), + ).WithStamp(mockStamp), "02c2": swarm.NewChunk( swarm.MustParseHexAddress("02c2bd0db71efb7d245eafcc1c126189c1f598feb80e8f14e7ecef913c6a2ef5"), []byte{72, 0, 0, 0, 0, 0, 0, 0, 226, 0, 0, 0, 0, 0, 0, 0, 67, 234, 252, 231, 229, 11, 121, 163, 131, 171, 41, 107, 57, 191, 221, 32, 62, 204, 159, 124, 116, 87, 30, 244, 99, 137, 121, 248, 119, 56, 74, 102, 140, 73, 178, 7, 151, 22, 47, 126, 173, 30, 43, 7, 61, 187, 13, 236, 59, 194, 245, 18, 25, 237, 106, 125, 78, 241, 35, 34, 116, 154, 105, 205}, - ), + ).WithStamp(mockStamp), "7000": swarm.NewChunk( swarm.MustParseHexAddress("70002115a015d40a1f5ef68c29d072f06fae58854934c1cb399fcb63cf336127"), []byte{72, 0, 0, 0, 0, 0, 0, 0, 124, 59, 0, 0, 0, 0, 0, 0, 44, 67, 19, 101, 42, 213, 4, 209, 212, 189, 107, 244, 111, 22, 230, 24, 245, 103, 227, 165, 88, 74, 50, 11, 143, 197, 220, 118, 175, 24, 169, 193, 15, 40, 225, 196, 246, 151, 1, 45, 86, 7, 36, 99, 156, 86, 83, 29, 46, 207, 115, 112, 126, 88, 101, 128, 153, 113, 30, 27, 50, 232, 77, 215}, - ), + ).WithStamp(mockStamp), } func init() { // needed for GenerateTestRandomChunk rand.Seed(time.Now().UnixNano()) + + var err error + mockStamp, err = postagetesting.MustNewStamp().MarshalBinary() + if err != nil { + panic(err) + } } // GenerateTestRandomChunk generates a valid content addressed chunk.