diff --git a/les/handler.go b/les/handler.go index 64023af0f5af..aaf4e1d7a32c 100644 --- a/les/handler.go +++ b/les/handler.go @@ -259,7 +259,7 @@ func (pm *ProtocolManager) removePeer(id string) { func (pm *ProtocolManager) Start(srvr *p2p.Server) { var topicDisc *discv5.Network if srvr != nil { - topicDisc = srvr.DiscV5 + topicDisc = srvr.DiscV5() } lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8])) if pm.lightSync { diff --git a/les/serverpool.go b/les/serverpool.go index 64fe991c63be..f4c3101cdf8b 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -143,7 +143,7 @@ func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic pool.discSetPeriod = make(chan time.Duration, 1) pool.discNodes = make(chan *discv5.Node, 100) pool.discLookups = make(chan bool, 100) - go pool.server.DiscV5.SearchTopic(topic, pool.discSetPeriod, pool.discNodes, pool.discLookups) + go pool.server.DiscV5().SearchTopic(topic, pool.discSetPeriod, pool.discNodes, pool.discLookups) } go pool.eventLoop() diff --git a/swarm/network/hive.go b/swarm/network/hive.go index da3302052341..e402930569d5 100644 --- a/swarm/network/hive.go +++ b/swarm/network/hive.go @@ -127,23 +127,23 @@ func (self *Hive) Start(server *p2p.Server) error { log.Debug("hive delegate to overlay driver: suggest addr to connect to") // log.Trace("hive delegate to overlay driver: suggest addr to connect to") addr, order, want := self.SuggestPeer() - - if addr != nil { - log.Info(fmt.Sprintf("========> connect to bee %v", addr)) - under, err := discover.ParseNode(string(addr.(Addr).Under())) - if err == nil { - server.AddPeer(under) + if self.Discovery { + if addr != nil { + log.Info(fmt.Sprintf("========> connect to bee %v", addr)) + under, err := discover.ParseNode(string(addr.(Addr).Under())) + if err == nil { + server.AddPeer(under) + } else { + log.Error(fmt.Sprintf("===X====> connect to bee %v failed: invalid node URL: %v", addr, err)) + } } else { - log.Error(fmt.Sprintf("===X====> connect to bee %v failed: invalid node URL: %v", addr, err)) + log.Trace("cannot suggest peers") } - } else { - log.Trace("cannot suggest peers") - } - want = want && self.Discovery - if want { - log.Debug(fmt.Sprintf("========> request peers nearest %v", addr)) - RequestOrder(self.Overlay, uint8(order), self.PeersBroadcastSetSize, self.MaxPeersPerRequest) + if want { + log.Debug(fmt.Sprintf("========> request peers nearest %v", addr)) + RequestOrder(self.Overlay, uint8(order), self.PeersBroadcastSetSize, self.MaxPeersPerRequest) + } } log.Info(fmt.Sprintf("%v", self)) diff --git a/swarm/pss/client.go b/swarm/pss/client.go deleted file mode 100644 index bcf2c5695b8e..000000000000 --- a/swarm/pss/client.go +++ /dev/null @@ -1,226 +0,0 @@ -package pss - -import ( - "context" - "fmt" - "sync" - "sync/atomic" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/discover" - "github.com/ethereum/go-ethereum/p2p/protocols" - "github.com/ethereum/go-ethereum/pot" - "github.com/ethereum/go-ethereum/rpc" -) - -const ( - inboxCapacity = 3000 - outboxCapacity = 100 - addrLen = common.HashLength -) - -type PssClient struct { - localuri string - remoteuri string - ctx context.Context - cancel func() - subscription *rpc.ClientSubscription - topicsC chan []byte - msgC chan PssAPIMsg - quitC chan struct{} - quitting uint32 - ws *rpc.Client - lock sync.Mutex - peerPool map[PssTopic]map[pot.Address]*pssRPCRW - protos []*p2p.Protocol -} - -type pssRPCRW struct { - *PssClient - topic *PssTopic - spec *protocols.Spec - msgC chan []byte - addr pot.Address -} - -func (self *PssClient) newpssRPCRW(addr pot.Address, spec *protocols.Spec, topic *PssTopic) *pssRPCRW { - return &pssRPCRW{ - PssClient: self, - topic: topic, - spec: spec, - msgC: make(chan []byte), - addr: addr, - } -} - -func (rw *pssRPCRW) ReadMsg() (p2p.Msg, error) { - msg := <-rw.msgC - log.Warn("pssrpcrw read", "msg", msg) - pmsg, err := ToP2pMsg(msg) - if err != nil { - return p2p.Msg{}, err - } - - return pmsg, nil -} - -func (rw *pssRPCRW) WriteMsg(msg p2p.Msg) error { - - ifc, found := rw.spec.NewMsg(msg.Code) - if !found { - return fmt.Errorf("could not find interface for msg #%d", msg.Code) - } - msg.Decode(ifc) - pmsg, err := newProtocolMsg(msg.Code, ifc) - if err != nil { - return fmt.Errorf("Could not render protocolmessage", "error", err) - } - - return rw.PssClient.ws.CallContext(rw.PssClient.ctx, nil, "pss_sendRaw", rw.topic, PssAPIMsg{ - Addr: rw.addr.Bytes(), - Msg: pmsg, - }) - -} - -// remotehost: hostname of node running websockets proxy to pss (default localhost) -// remoteport: port of node running websockets proxy to pss (0 = go-ethereum node default) -// secure: whether or not to use secure connection -// originhost: local if host to connect from - -func NewPssClient(ctx context.Context, cancel func(), remotehost string, remoteport int, secure bool, originhost string) *PssClient { - prefix := "ws" - - if ctx == nil { - ctx = context.Background() - cancel = func() { return } - } - pssc := &PssClient{ - msgC: make(chan PssAPIMsg), - quitC: make(chan struct{}), - peerPool: make(map[PssTopic]map[pot.Address]*pssRPCRW), - ctx: ctx, - cancel: cancel, - } - - if remotehost == "" { - remotehost = "localhost" - } - - if remoteport == 0 { - remoteport = node.DefaultWSPort - } - - if originhost == "" { - originhost = "localhost" - } - - if secure { - prefix = "wss" - } - - pssc.remoteuri = fmt.Sprintf("%s://%s:%d", prefix, remotehost, remoteport) - pssc.localuri = fmt.Sprintf("%s://%s", prefix, originhost) - - return pssc -} - -func (self *PssClient) shutdown() { - atomic.StoreUint32(&self.quitting, 1) - self.cancel() -} - -func (self *PssClient) Start() error { - log.Debug("Dialing ws", "src", self.localuri, "dst", self.remoteuri) - ws, err := rpc.DialWebsocket(self.ctx, self.remoteuri, self.localuri) - if err != nil { - return fmt.Errorf("Couldnt dial pss websocket: %v", err) - } - - self.ws = ws - - return nil -} - -func (self *PssClient) RunProtocol(proto *p2p.Protocol, spec *protocols.Spec) error { - topic := NewTopic(spec.Name, int(spec.Version)) - msgC := make(chan PssAPIMsg) - self.peerPool[topic] = make(map[pot.Address]*pssRPCRW) - sub, err := self.ws.Subscribe(self.ctx, "pss", msgC, "newMsg", topic) - if err != nil { - return fmt.Errorf("pss event subscription failed: %v", err) - } - - self.subscription = sub - - // dispatch incoming messages - go func() { - for { - select { - case msg := <-msgC: - var addr pot.Address - copy(addr[:], msg.Addr) - if self.peerPool[topic][addr] == nil { - self.peerPool[topic][addr] = self.newpssRPCRW(addr, spec, &topic) - nid, _ := discover.HexID("0x00") - p := p2p.NewPeer(nid, fmt.Sprintf("%v", addr), []p2p.Cap{}) - go proto.Run(p, self.peerPool[topic][addr]) - } - go func() { - self.peerPool[topic][addr].msgC <- msg.Msg - }() - case <-self.quitC: - self.shutdown() - return - } - } - }() - - self.protos = append(self.protos, proto) - return nil -} - -func (self *PssClient) Stop() error { - self.cancel() - return nil -} - -func (self *PssClient) AddPssPeer(addr pot.Address, spec *protocols.Spec) { - topic := NewTopic(spec.Name, int(spec.Version)) - if self.peerPool[topic][addr] == nil { - self.peerPool[topic][addr] = self.newpssRPCRW(addr, spec, &topic) - } -} - -func (self *PssClient) RemovePssPeer(addr pot.Address, spec *protocols.Spec) { - topic := NewTopic(spec.Name, int(spec.Version)) - delete(self.peerPool[topic], addr) -} - -func (self *PssClient) SubscribeEvents(ch chan *p2p.PeerEvent) event.Subscription { - log.Error("PSS client handles events internally, use the read functions instead") - return nil -} - -func (self *PssClient) PeerCount() int { - return len(self.peerPool) -} - -func (self *PssClient) NodeInfo() *p2p.NodeInfo { - return nil -} - -func (self *PssClient) PeersInfo() []*p2p.PeerInfo { - return nil -} -func (self *PssClient) AddPeer(node *discover.Node) { - log.Error("Cannot add peer in PSS with discover.Node, need swarm overlay address") -} - -func (self *PssClient) RemovePeer(node *discover.Node) { - log.Error("Cannot remove peer in PSS with discover.Node, need swarm overlay address") -} diff --git a/swarm/pss/client/client.go b/swarm/pss/client/client.go new file mode 100644 index 000000000000..a9bef6d3db2b --- /dev/null +++ b/swarm/pss/client/client.go @@ -0,0 +1,245 @@ +package client + +import ( + "context" + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/protocols" + "github.com/ethereum/go-ethereum/pot" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/swarm/pss" +) + +const ( + inboxCapacity = 3000 + outboxCapacity = 100 + defaultWSHost = 8546 + addrLen = common.HashLength +) + +// RemoteHost: hostname of node running websockets proxy to pss (default localhost) +// RemotePort: port of node running websockets proxy to pss (0 = go-ethereum node default) +// Secure: whether or not to use secure connection +// SelfHost: local if host to connect from +type ClientConfig struct { + SelfHost string + RemoteHost string + RemotePort int + Secure bool +} + +func NewClientConfig() *ClientConfig { + return &ClientConfig{ + SelfHost: "localhost", + RemoteHost: "localhost", + RemotePort: 8546, + } +} + +type Client struct { + localuri string + remoteuri string + ctx context.Context + cancel func() + subscription *rpc.ClientSubscription + topicsC chan []byte + msgC chan pss.APIMsg + quitC chan struct{} + ws *rpc.Client + lock sync.Mutex + peerPool map[pss.Topic]map[pot.Address]*pssRPCRW + protos map[pss.Topic]*p2p.Protocol +} + +type pssRPCRW struct { + *Client + topic *pss.Topic + msgC chan []byte + addr pot.Address +} + +func (self *Client) newpssRPCRW(addr pot.Address, topic *pss.Topic) *pssRPCRW { + return &pssRPCRW{ + Client: self, + topic: topic, + msgC: make(chan []byte), + addr: addr, + } +} + +func (rw *pssRPCRW) ReadMsg() (p2p.Msg, error) { + msg := <-rw.msgC + log.Trace("pssrpcrw read", "msg", msg) + pmsg, err := pss.ToP2pMsg(msg) + if err != nil { + return p2p.Msg{}, err + } + + return pmsg, nil +} + +func (rw *pssRPCRW) WriteMsg(msg p2p.Msg) error { + log.Trace("got writemsg pssclient", "msg", msg) + rlpdata := make([]byte, msg.Size) + msg.Payload.Read(rlpdata) + pmsg, err := rlp.EncodeToBytes(pss.ProtocolMsg{ + Code: msg.Code, + Size: msg.Size, + Payload: rlpdata, + }) + if err != nil { + return err + } + return rw.Client.ws.CallContext(rw.Client.ctx, nil, "pss_send", rw.topic, pss.APIMsg{ + Addr: rw.addr.Bytes(), + Msg: pmsg, + }) +} + +func NewClient(ctx context.Context, cancel func(), config *ClientConfig) *Client { + prefix := "ws" + + if ctx == nil { + ctx = context.Background() + } + if cancel == nil { + cancel = func() { return } + } + + pssc := &Client{ + msgC: make(chan pss.APIMsg), + quitC: make(chan struct{}), + peerPool: make(map[pss.Topic]map[pot.Address]*pssRPCRW), + protos: make(map[pss.Topic]*p2p.Protocol), + ctx: ctx, + cancel: cancel, + } + + if config.Secure { + prefix = "wss" + } + + pssc.remoteuri = fmt.Sprintf("%s://%s:%d", prefix, config.RemoteHost, config.RemotePort) + pssc.localuri = fmt.Sprintf("%s://%s", prefix, config.SelfHost) + + return pssc +} + +func NewClientWithRPC(ctx context.Context, client *rpc.Client) *Client { + return &Client{ + msgC: make(chan pss.APIMsg), + quitC: make(chan struct{}), + peerPool: make(map[pss.Topic]map[pot.Address]*pssRPCRW), + protos: make(map[pss.Topic]*p2p.Protocol), + ws: client, + ctx: ctx, + } +} + +func (self *Client) shutdown() { + self.cancel() +} + +func (self *Client) Start() error { + if self.ws != nil { + return nil + } + log.Debug("Dialing ws", "src", self.localuri, "dst", self.remoteuri) + ws, err := rpc.DialWebsocket(self.ctx, self.remoteuri, self.localuri) + if err != nil { + return fmt.Errorf("Couldnt dial pss websocket: %v", err) + } + + self.ws = ws + + return nil +} + +func (self *Client) RunProtocol(proto *p2p.Protocol) error { + topic := pss.NewTopic(proto.Name, int(proto.Version)) + msgC := make(chan pss.APIMsg) + self.peerPool[topic] = make(map[pot.Address]*pssRPCRW) + sub, err := self.ws.Subscribe(self.ctx, "pss", msgC, "receive", topic) + if err != nil { + return fmt.Errorf("pss event subscription failed: %v", err) + } + + self.subscription = sub + + // dispatch incoming messages + go func() { + for { + select { + case msg := <-msgC: + var addr pot.Address + copy(addr[:], msg.Addr) + if self.peerPool[topic][addr] == nil { + self.peerPool[topic][addr] = self.newpssRPCRW(addr, &topic) + nid, _ := discover.HexID("0x00") + p := p2p.NewPeer(nid, fmt.Sprintf("%v", addr), []p2p.Cap{}) + go proto.Run(p, self.peerPool[topic][addr]) + } + go func() { + self.peerPool[topic][addr].msgC <- msg.Msg + }() + case <-self.quitC: + self.shutdown() + return + } + } + }() + + self.protos[topic] = proto + return nil +} + +func (self *Client) Stop() error { + self.cancel() + return nil +} + +func (self *Client) AddPssPeer(addr pot.Address, spec *protocols.Spec) { + topic := pss.NewTopic(spec.Name, int(spec.Version)) + if self.peerPool[topic][addr] == nil { + self.peerPool[topic][addr] = self.newpssRPCRW(addr, &topic) + nid, _ := discover.HexID("0x00") + p := p2p.NewPeer(nid, fmt.Sprintf("%v", addr), []p2p.Cap{}) + go self.protos[topic].Run(p, self.peerPool[topic][addr]) + } +} + +func (self *Client) RemovePssPeer(addr pot.Address, spec *protocols.Spec) { + topic := pss.NewTopic(spec.Name, int(spec.Version)) + delete(self.peerPool[topic], addr) +} + +func (self *Client) SubscribeEvents(ch chan *p2p.PeerEvent) event.Subscription { + log.Error("PSS client handles events internally, use the read functions instead") + return nil +} + +func (self *Client) PeerCount() int { + return len(self.peerPool) +} + +func (self *Client) NodeInfo() *p2p.NodeInfo { + return nil +} + +func (self *Client) PeersInfo() []*p2p.PeerInfo { + return nil +} +func (self *Client) AddPeer(node *discover.Node) { + log.Error("Cannot add peer in PSS with discover.Node, need swarm overlay address") +} + +func (self *Client) RemovePeer(node *discover.Node) { + log.Error("Cannot remove peer in PSS with discover.Node, need swarm overlay address") +} diff --git a/swarm/pss/client/client_test.go b/swarm/pss/client/client_test.go new file mode 100644 index 000000000000..115316501a4d --- /dev/null +++ b/swarm/pss/client/client_test.go @@ -0,0 +1,178 @@ +package client + +import ( + "context" + "fmt" + "net" + "net/http" + "os" + "testing" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/protocols" + "github.com/ethereum/go-ethereum/pot" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/swarm/pss" +) + +func init() { + h := log.CallerFileHandler(log.StreamHandler(os.Stderr, log.TerminalFormat(true))) + log.Root().SetHandler(h) +} + +func TestRunProtocol(t *testing.T) { + quitC := make(chan struct{}) + ps := pss.NewTestPss(nil) + ping := &pss.Ping{ + C: make(chan struct{}), + } + proto := newProtocol(ping) + _, err := baseTester(t, proto, ps, nil, nil, quitC) + if err != nil { + t.Fatalf(err.Error()) + } + quitC <- struct{}{} +} + +func TestIncoming(t *testing.T) { + quitC := make(chan struct{}) + ps := pss.NewTestPss(nil) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + var addr []byte + ping := &pss.Ping{ + C: make(chan struct{}), + } + proto := newProtocol(ping) + client, err := baseTester(t, proto, ps, ctx, cancel, quitC) + if err != nil { + t.Fatalf(err.Error()) + } + + client.ws.Call(&addr, "psstest_baseAddr") + + code, _ := pss.PingProtocol.GetCode(&pss.PingMsg{}) + rlpbundle, err := pss.NewProtocolMsg(code, &pss.PingMsg{ + Created: time.Now(), + }) + if err != nil { + t.Fatalf("couldn't make pssmsg: %v", err) + } + + pssenv := pss.NewEnvelope(addr, pss.NewTopic(proto.Name, int(proto.Version)), rlpbundle) + pssmsg := pss.PssMsg{ + To: addr, + Payload: pssenv, + } + + ps.Process(&pssmsg) + + select { + case <-client.ctx.Done(): + t.Fatalf("outgoing timed out or canceled") + case <-ping.C: + } + + quitC <- struct{}{} +} + +func TestOutgoing(t *testing.T) { + quitC := make(chan struct{}) + ps := pss.NewTestPss(nil) + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*250) + var addr []byte + var potaddr pot.Address + + ping := &pss.Ping{ + C: make(chan struct{}), + } + proto := newProtocol(ping) + client, err := baseTester(t, proto, ps, ctx, cancel, quitC) + if err != nil { + t.Fatalf(err.Error()) + } + + client.ws.Call(&addr, "psstest_baseAddr") + copy(potaddr[:], addr) + + msg := &pss.PingMsg{ + Created: time.Now(), + } + + topic := pss.NewTopic(pss.PingProtocol.Name, int(pss.PingProtocol.Version)) + client.AddPssPeer(potaddr, pss.PingProtocol) + nid, _ := discover.HexID("0x00") + p := p2p.NewPeer(nid, fmt.Sprintf("%v", potaddr), []p2p.Cap{}) + pp := protocols.NewPeer(p, client.peerPool[topic][potaddr], pss.PingProtocol) + pp.Send(msg) + select { + case <-client.ctx.Done(): + t.Fatalf("outgoing timed out or canceled") + case <-ping.C: + } + quitC <- struct{}{} +} + +func baseTester(t *testing.T, proto *p2p.Protocol, ps *pss.Pss, ctx context.Context, cancel func(), quitC chan struct{}) (*Client, error) { + var err error + + client := newClient(t, ctx, cancel, quitC) + + err = client.Start() + if err != nil { + return nil, err + } + + err = client.RunProtocol(proto) + + if err != nil { + return nil, err + } + + return client, nil +} + +func newProtocol(ping *pss.Ping) *p2p.Protocol { + + return &p2p.Protocol{ + Name: pss.PingProtocol.Name, + Version: pss.PingProtocol.Version, + Length: 1, + Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + pp := protocols.NewPeer(p, rw, pss.PingProtocol) + pp.Run(ping.PingHandler) + return nil + }, + } +} + +func newClient(t *testing.T, ctx context.Context, cancel func(), quitC chan struct{}) *Client { + + conf := NewClientConfig() + + pssclient := NewClient(ctx, cancel, conf) + + ps := pss.NewTestPss(nil) + srv := rpc.NewServer() + srv.RegisterName("pss", pss.NewAPI(ps)) + srv.RegisterName("psstest", pss.NewAPITest(ps)) + ws := srv.WebsocketHandler([]string{"*"}) + uri := fmt.Sprintf("%s:%d", "localhost", 8546) + + sock, err := net.Listen("tcp", uri) + if err != nil { + t.Fatalf("Tcp (recv) on %s failed: %v", uri, err) + } + + go func() { + http.Serve(sock, ws) + }() + + go func() { + <-quitC + sock.Close() + }() + return pssclient +} diff --git a/swarm/pss/client_test.go b/swarm/pss/client_test.go deleted file mode 100644 index 05fcd2f54298..000000000000 --- a/swarm/pss/client_test.go +++ /dev/null @@ -1,173 +0,0 @@ -package pss - -import ( - "context" - "fmt" - "os" - "net" - "net/http" - "testing" - "time" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/discover" - "github.com/ethereum/go-ethereum/p2p/protocols" - "github.com/ethereum/go-ethereum/pot" - "github.com/ethereum/go-ethereum/rpc" -) - -func init() { - h := log.CallerFileHandler(log.StreamHandler(os.Stderr, log.TerminalFormat(true))) - log.Root().SetHandler(h) -} - -func TestRunProtocol(t *testing.T) { - quitC := make(chan struct{}) - pss := newTestPss(nil) - ping := &pssPing{ - quitC: make(chan struct{}), - } - proto := newProtocol(ping) - _, err := baseTester(t, proto, pss, nil, nil, quitC) - if err != nil { - t.Fatalf(err.Error()) - } - quitC <- struct{}{} -} - -func TestIncoming(t *testing.T) { - quitC := make(chan struct{}) - pss := newTestPss(nil) - ctx, cancel := context.WithCancel(context.Background()) - var addr []byte - ping := &pssPing{ - quitC: make(chan struct{}), - } - proto := newProtocol(ping) - client, err := baseTester(t, proto, pss, ctx, cancel, quitC) - if err != nil { - t.Fatalf(err.Error()) - } - - client.ws.Call(&addr, "pss_baseAddr") - - code, _ := pssPingProtocol.GetCode(&pssPingMsg{}) - rlpbundle, err := newProtocolMsg(code, &pssPingMsg{ - Created: time.Now(), - }) - if err != nil { - t.Fatalf("couldn't make pssmsg") - } - - pssenv := PssEnvelope{ - From: addr, - Topic: NewTopic(proto.Name, int(proto.Version)), - TTL: DefaultTTL, - Payload: rlpbundle, - } - pssmsg := PssMsg{ - To: addr, - Payload: &pssenv, - } - - pss.Process(&pssmsg) - - go func() { - <-ping.quitC - client.cancel() - }() - - <-client.ctx.Done() - quitC <- struct{}{} -} - -func TestOutgoing(t *testing.T) { - quitC := make(chan struct{}) - pss := newTestPss(nil) - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond * 250) - var addr []byte - var potaddr pot.Address - - ping := &pssPing{ - quitC: make(chan struct{}), - } - proto := newProtocol(ping) - client, err := baseTester(t, proto, pss, ctx, cancel, quitC) - if err != nil { - t.Fatalf(err.Error()) - } - - client.ws.Call(&addr, "pss_baseAddr") - copy(potaddr[:], addr) - - msg := &pssPingMsg{ - Created: time.Now(), - } - - topic := NewTopic(pssPingProtocol.Name, int(pssPingProtocol.Version)) - client.AddPssPeer(potaddr, pssPingProtocol) - nid, _ := discover.HexID("0x00") - p := p2p.NewPeer(nid, fmt.Sprintf("%v", potaddr), []p2p.Cap{}) - pp := protocols.NewPeer(p, client.peerPool[topic][potaddr], pssPingProtocol) - pp.Send(msg) - <-client.ctx.Done() - quitC <- struct{}{} -} - -func baseTester(t *testing.T, proto *p2p.Protocol, pss *Pss, ctx context.Context, cancel func(), quitC chan struct{}) (*PssClient, error) { - var err error - - client := newClient(t, pss, ctx, cancel, quitC) - - err = client.Start() - if err != nil { - return nil, err - } - - err = client.RunProtocol(proto, pssPingProtocol) - - if err != nil { - return nil, err - } - - return client, nil -} - -func newProtocol(ping *pssPing) *p2p.Protocol { - - return &p2p.Protocol{ - Name: pssPingProtocol.Name, - Version: pssPingProtocol.Version, - Length: 1, - Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { - pp := protocols.NewPeer(p, rw, pssPingProtocol) - pp.Run(ping.pssPingHandler) - return nil - }, - } -} - -func newClient(t *testing.T, pss *Pss, ctx context.Context, cancel func(), quitC chan struct{}) *PssClient { - pssclient := NewPssClient(ctx, cancel, "", 0, false, "") - - srv := rpc.NewServer() - srv.RegisterName("pss", NewPssAPI(pss)) - ws := srv.WebsocketHandler([]string{"*"}) - uri := fmt.Sprintf("%s:%d", node.DefaultWSHost, node.DefaultWSPort) - - sock, err := net.Listen("tcp", uri) - if err != nil { - t.Fatalf("Tcp (recv) on %s failed: %v", uri, err) - } - - go func() { - http.Serve(sock, ws) - }() - - go func() { - <-quitC - sock.Close() - }() - return pssclient -} diff --git a/swarm/pss/common.go b/swarm/pss/common.go index c3f03698ec54..d6274bcd630d 100644 --- a/swarm/pss/common.go +++ b/swarm/pss/common.go @@ -11,65 +11,35 @@ import ( "github.com/ethereum/go-ethereum/p2p/protocols" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/storage" -) +) -type pssPingMsg struct { +type PingMsg struct { Created time.Time } -type pssPing struct { - quitC chan struct{} +type Ping struct { + C chan struct{} } -func (self *pssPing) pssPingHandler(msg interface{}) error { +func (self *Ping) PingHandler(msg interface{}) error { log.Warn("got ping", "msg", msg) - self.quitC <- struct{}{} + self.C <- struct{}{} return nil } -var pssPingProtocol = &protocols.Spec{ +var PingProtocol = &protocols.Spec{ Name: "psstest", Version: 1, MaxMsgSize: 10 * 1024 * 1024, Messages: []interface{}{ - pssPingMsg{}, + PingMsg{}, }, } -var pssPingTopic = NewTopic(pssPingProtocol.Name, int(pssPingProtocol.Version)) +var PingTopic = NewTopic(PingProtocol.Name, int(PingProtocol.Version)) -func newTestPss(addr []byte) *Pss { - if addr == nil { - addr = network.RandomAddr().OAddr - } - - // set up storage - cachedir, err := ioutil.TempDir("", "pss-cache") - if err != nil { - log.Error("create pss cache tmpdir failed", "error", err) - os.Exit(1) - } - dpa, err := storage.NewLocalDPA(cachedir) - if err != nil { - log.Error("local dpa creation failed", "error", err) - os.Exit(1) - } - - // set up routing - kp := network.NewKadParams() - kp.MinProxBinSize = 3 - - // create pss - pp := NewPssParams() - - overlay := network.NewKademlia(addr, kp) - ps := NewPss(overlay, dpa, pp) - - return ps -} - -func newPssPingMsg(ps *Pss, to []byte, spec *protocols.Spec, topic PssTopic, senderaddr []byte) PssMsg { - data := pssPingMsg{ +func NewPingMsg(to []byte, spec *protocols.Spec, topic Topic, senderaddr []byte) PssMsg { + data := PingMsg{ Created: time.Now(), } code, found := spec.GetCode(&data) @@ -77,26 +47,26 @@ func newPssPingMsg(ps *Pss, to []byte, spec *protocols.Spec, topic PssTopic, sen return PssMsg{} } - rlpbundle, err := newProtocolMsg(code, data) + rlpbundle, err := NewProtocolMsg(code, data) if err != nil { return PssMsg{} } pssmsg := PssMsg{ - To: to, - Payload: NewPssEnvelope(senderaddr, topic, rlpbundle), + To: to, + Payload: NewEnvelope(senderaddr, topic, rlpbundle), } return pssmsg } -func newPssPingProtocol(handler func (interface{}) error) *p2p.Protocol { +func NewPingProtocol(handler func(interface{}) error) *p2p.Protocol { return &p2p.Protocol{ - Name: pssPingProtocol.Name, - Version: pssPingProtocol.Version, - Length: uint64(pssPingProtocol.MaxMsgSize), + Name: PingProtocol.Name, + Version: PingProtocol.Version, + Length: uint64(PingProtocol.MaxMsgSize), Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { - pp := protocols.NewPeer(p, rw, pssPingProtocol) + pp := protocols.NewPeer(p, rw, PingProtocol) log.Trace(fmt.Sprintf("running pss vprotocol on peer %v", p)) err := pp.Run(handler) return err @@ -104,22 +74,32 @@ func newPssPingProtocol(handler func (interface{}) error) *p2p.Protocol { } } -type testPssPeer struct { - *protocols.Peer - addr []byte -} +func NewTestPss(addr []byte) *Pss { + if addr == nil { + addr = network.RandomAddr().OAddr + } -func (self *testPssPeer) Address() []byte { - return self.addr -} + // set up storage + cachedir, err := ioutil.TempDir("", "pss-cache") + if err != nil { + log.Error("create pss cache tmpdir failed", "error", err) + os.Exit(1) + } + dpa, err := storage.NewLocalDPA(cachedir) + if err != nil { + log.Error("local dpa creation failed", "error", err) + os.Exit(1) + } -func (self *testPssPeer) Off() network.OverlayAddr { - return self -} + // set up routing + kp := network.NewKadParams() + kp.MinProxBinSize = 3 -func (self *testPssPeer) Drop(err error) { -} + // create pss + pp := NewPssParams(true) + + overlay := network.NewKademlia(addr, kp) + ps := NewPss(overlay, dpa, pp) -func (self *testPssPeer) Update(o network.OverlayAddr) network.OverlayAddr { - return self + return ps } diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index 318ae334049d..58e6df735c43 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -2,19 +2,16 @@ package pss import ( "bytes" - "encoding/binary" "errors" "fmt" "sync" "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/protocols" - "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/pot" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" @@ -23,14 +20,11 @@ import ( ) const ( - DefaultTTL = 6000 - TopicLength = 32 TopicResolverLength = 8 PssPeerCapacity = 256 PssPeerTopicDefaultCapacity = 8 digestLength = 32 digestCapacity = 256 - defaultDigestCacheTTL = time.Second ) var ( @@ -42,66 +36,6 @@ type senderPeer interface { Send(interface{}) error } -// Defines params for Pss -type PssParams struct { - Cachettl time.Duration -} - -// Initializes default params for Pss -func NewPssParams() *PssParams { - return &PssParams{ - Cachettl: defaultDigestCacheTTL, - } -} - -// Encapsulates the message transported over pss. -type PssMsg struct { - To []byte - Payload *PssEnvelope -} - -// String representation of PssMsg -func (self *PssMsg) String() string { - return fmt.Sprintf("PssMsg: Recipient: %x", common.ByteLabel(self.To)) -} - -// Topic defines the context of a message being transported over pss -// It is used by pss to determine what action is to be taken on an incoming message -// Typically, one can map protocol handlers for the message payloads by mapping topic to them; see *Pss.Register() -type PssTopic [TopicLength]byte - -func (self *PssTopic) String() string { - return fmt.Sprintf("%x", self) -} - -// Pre-Whisper placeholder, payload of PssMsg -type PssEnvelope struct { - Topic PssTopic - TTL uint16 - Payload []byte - From []byte -} - -// creates Pss envelope from sender address, topic and raw payload -func NewPssEnvelope(addr []byte, topic PssTopic, payload []byte) *PssEnvelope { - return &PssEnvelope{ - From: addr, - Topic: topic, - TTL: DefaultTTL, - Payload: payload, - } -} - -func (msg *PssMsg) serialize() []byte { - rlpdata, _ := rlp.EncodeToBytes(msg) - /*buf := bytes.NewBuffer(nil) - buf.Write(self.PssEnvelope.Topic[:]) - buf.Write(self.PssEnvelope.Payload) - buf.Write(self.PssEnvelope.From) - return buf.Bytes()*/ - return rlpdata -} - var pssSpec = &protocols.Spec{ Name: "pss", Version: 1, @@ -111,14 +45,6 @@ var pssSpec = &protocols.Spec{ }, } -// encapsulates a protocol msg as PssEnvelope data -type PssProtocolMsg struct { - Code uint64 - Size uint32 - Payload []byte - ReceivedAt time.Time -} - type pssCacheEntry struct { expiresAt time.Time receivedFrom []byte @@ -126,9 +52,8 @@ type pssCacheEntry struct { type pssDigest [digestLength]byte -// Message handler func for a topic -type pssHandler func(msg []byte, p *p2p.Peer, from []byte) error - +// implements node.Service +// // pss provides sending messages to nodes without having to be directly connected to them. // // The messages are wrapped in a PssMsg structure and routed using the swarm kademlia routing. @@ -142,21 +67,21 @@ type pssHandler func(msg []byte, p *p2p.Peer, from []byte) error // - a dispatcher lookup, mapping protocols to topics // - a message cache to spot messages that previously have been forwarded type Pss struct { - network.Overlay // we can get the overlayaddress from this - //peerPool map[pot.Address]map[PssTopic]p2p.MsgReadWriter // keep track of all virtual p2p.Peers we are currently speaking to - peerPool map[pot.Address]map[PssTopic]p2p.MsgReadWriter // keep track of all virtual p2p.Peers we are currently speaking to - fwdPool map[pot.Address]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer - handlers map[PssTopic]map[*pssHandler]bool // topic and version based pss payload handlers - fwdcache map[pssDigest]pssCacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg - cachettl time.Duration // how long to keep messages in fwdcache - lock sync.Mutex - dpa *storage.DPA + network.Overlay // we can get the overlayaddress from this + peerPool map[pot.Address]map[Topic]p2p.MsgReadWriter // keep track of all virtual p2p.Peers we are currently speaking to + fwdPool map[pot.Address]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer + handlers map[Topic]map[*Handler]bool // topic and version based pss payload handlers + fwdcache map[pssDigest]pssCacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg + cachettl time.Duration // how long to keep messages in fwdcache + lock sync.Mutex + dpa *storage.DPA + debug bool } func (self *Pss) storeMsg(msg *PssMsg) (pssDigest, error) { swg := &sync.WaitGroup{} wwg := &sync.WaitGroup{} - buf := bytes.NewReader(msg.serialize()) + buf := bytes.NewReader(msg.Serialize()) key, err := self.dpa.Store(buf, int64(buf.Len()), swg, wwg) if err != nil { log.Warn("Could not store in swarm", "err", err) @@ -169,20 +94,23 @@ func (self *Pss) storeMsg(msg *PssMsg) (pssDigest, error) { } // Creates a new Pss instance. A node should only need one of these -// -// TODO: error check overlay integrity func NewPss(k network.Overlay, dpa *storage.DPA, params *PssParams) *Pss { return &Pss{ Overlay: k, - peerPool: make(map[pot.Address]map[PssTopic]p2p.MsgReadWriter, PssPeerCapacity), + peerPool: make(map[pot.Address]map[Topic]p2p.MsgReadWriter, PssPeerCapacity), fwdPool: make(map[pot.Address]*protocols.Peer), - handlers: make(map[PssTopic]map[*pssHandler]bool), + handlers: make(map[Topic]map[*Handler]bool), fwdcache: make(map[pssDigest]pssCacheEntry), cachettl: params.Cachettl, dpa: dpa, + debug: params.Debug, } } +func (self *Pss) BaseAddr() []byte { + return self.Overlay.BaseAddr() +} + func (self *Pss) Start(srv *p2p.Server) error { return nil } @@ -211,35 +139,44 @@ func (self *Pss) Run(p *p2p.Peer, rw p2p.MsgReadWriter) error { } func (self *Pss) APIs() []rpc.API { - return []rpc.API{ + apis := []rpc.API{ rpc.API{ Namespace: "pss", Version: "0.1", - Service: NewPssAPI(self), + Service: NewAPI(self), Public: true, }, } + if self.debug { + apis = append(apis, rpc.API{ + Namespace: "pss", + Version: "0.1", + Service: NewAPITest(self), + Public: true, + }) + } + return apis } -// Takes the generated PssTopic of a protocol/chatroom etc, and links a handler function to it +// Takes the generated Topic of a protocol/chatroom etc, and links a handler function to it // This allows the implementer to retrieve the right handler functions (invoke the right protocol) // for an incoming message by inspecting the topic on it. // a topic allows for multiple handlers // returns a deregister function which needs to be called to deregister the handler // (similar to event.Subscription.Unsubscribe()) -func (self *Pss) Register(topic *PssTopic, handler pssHandler) func() { +func (self *Pss) Register(topic *Topic, handler Handler) func() { self.lock.Lock() defer self.lock.Unlock() handlers := self.handlers[*topic] if handlers == nil { - handlers = make(map[*pssHandler]bool) + handlers = make(map[*Handler]bool) self.handlers[*topic] = handlers } handlers[&handler] = true return func() { self.deregister(topic, &handler) } } -func (self *Pss) deregister(topic *PssTopic, h *pssHandler) { +func (self *Pss) deregister(topic *Topic, h *Handler) { self.lock.Lock() defer self.lock.Unlock() handlers := self.handlers[*topic] @@ -304,13 +241,12 @@ func (self *Pss) checkFwdCache(addr []byte, digest pssDigest) bool { return false } -func (self *Pss) getHandlers(topic PssTopic) map[*pssHandler]bool { +func (self *Pss) getHandlers(topic Topic) map[*Handler]bool { self.lock.Lock() defer self.lock.Unlock() return self.handlers[topic] } -// func (self *Pss) handlePssMsg(msg interface{}) error { pssmsg := msg.(*PssMsg) @@ -341,12 +277,12 @@ func (self *Pss) Process(pssmsg *PssMsg) error { return nil } -// Sends a message using pss. The message could be anything at all, and will be handled by whichever handler function is mapped to PssTopic using *Pss.Register() +// Sends a message using The message could be anything at all, and will be handled by whichever handler function is mapped to Topic using *Pss.Register() // // The to address is a swarm overlay address -func (self *Pss) Send(to []byte, topic PssTopic, msg []byte) error { +func (self *Pss) SendRaw(to []byte, topic Topic, msg []byte) error { sender := self.Overlay.BaseAddr() - pssenv := NewPssEnvelope(sender, topic, msg) + pssenv := NewEnvelope(sender, topic, msg) pssmsg := &PssMsg{ To: to, Payload: pssenv, @@ -415,7 +351,7 @@ func (self *Pss) Forward(msg *PssMsg) error { // Links a pss peer address and topic to a dedicated p2p.MsgReadWriter in the pss peerpool, and runs the specificed protocol on this p2p.MsgReadWriter and the specified peer // // The effect is that now we have a "virtual" protocol running on an artificial p2p.Peer, which can be looked up and piped to through Pss using swarm overlay address and topic -func (self *Pss) AddPeer(p *p2p.Peer, addr pot.Address, run adapters.RunProtocol, topic PssTopic, rw p2p.MsgReadWriter) error { +func (self *Pss) AddPeer(p *p2p.Peer, addr pot.Address, run func(*p2p.Peer, p2p.MsgReadWriter) error, topic Topic, rw p2p.MsgReadWriter) error { self.lock.Lock() defer self.lock.Unlock() self.addPeerTopic(addr, topic, rw) @@ -427,15 +363,15 @@ func (self *Pss) AddPeer(p *p2p.Peer, addr pot.Address, run adapters.RunProtocol return nil } -func (self *Pss) addPeerTopic(id pot.Address, topic PssTopic, rw p2p.MsgReadWriter) error { +func (self *Pss) addPeerTopic(id pot.Address, topic Topic, rw p2p.MsgReadWriter) error { if self.peerPool[id] == nil { - self.peerPool[id] = make(map[PssTopic]p2p.MsgReadWriter, PssPeerTopicDefaultCapacity) + self.peerPool[id] = make(map[Topic]p2p.MsgReadWriter, PssPeerTopicDefaultCapacity) } self.peerPool[id][topic] = rw return nil } -func (self *Pss) removePeerTopic(rw p2p.MsgReadWriter, topic PssTopic) { +func (self *Pss) removePeerTopic(rw p2p.MsgReadWriter, topic Topic) { prw, ok := rw.(*PssReadWriter) if !ok { return @@ -446,7 +382,14 @@ func (self *Pss) removePeerTopic(rw p2p.MsgReadWriter, topic PssTopic) { } } -func (self *Pss) isActive(id pot.Address, topic PssTopic) bool { +func (self *Pss) isSelfRecipient(msg *PssMsg) bool { + return bytes.Equal(msg.To, self.Overlay.BaseAddr()) +} + +func (self *Pss) isActive(id pot.Address, topic Topic) bool { + if self.peerPool[id] == nil { + return false + } return self.peerPool[id][topic] != nil } @@ -462,7 +405,7 @@ type PssReadWriter struct { LastActive time.Time rw chan p2p.Msg spec *protocols.Spec - topic *PssTopic + topic *Topic } // Implements p2p.MsgReader @@ -474,18 +417,18 @@ func (prw PssReadWriter) ReadMsg() (p2p.Msg, error) { // Implements p2p.MsgWriter func (prw PssReadWriter) WriteMsg(msg p2p.Msg) error { - log.Trace(fmt.Sprintf("pssrw writemsg: %v", msg)) - ifc, found := prw.spec.NewMsg(msg.Code) - if !found { - return fmt.Errorf("Writemsg couldn't find matching interface for code %d", msg.Code) - } - msg.Decode(ifc) - - pmsg, err := newProtocolMsg(msg.Code, ifc) + log.Warn("got writemsg pssclient", "msg", msg) + rlpdata := make([]byte, msg.Size) + msg.Payload.Read(rlpdata) + pmsg, err := rlp.EncodeToBytes(ProtocolMsg{ + Code: msg.Code, + Size: msg.Size, + Payload: rlpdata, + }) if err != nil { return err } - return prw.Pss.Send(prw.To.Bytes(), *prw.topic, pmsg) + return prw.SendRaw(prw.To.Bytes(), *prw.topic, pmsg) } // Injects a p2p.Msg into the MsgReadWriter, so that it appears on the associated p2p.MsgReader @@ -499,20 +442,19 @@ func (prw PssReadWriter) injectMsg(msg p2p.Msg) error { type PssProtocol struct { *Pss proto *p2p.Protocol - topic *PssTopic + topic *Topic spec *protocols.Spec } // Constructor -//func RegisterPssProtocol(pss *Pss, topic *PssTopic, spec *protocols.Spec, targetprotocol *p2p.Protocol) *PssProtocol { -func RegisterPssProtocol(pss *Pss, topic *PssTopic, spec *protocols.Spec, targetprotocol *p2p.Protocol) error { +func RegisterPssProtocol(ps *Pss, topic *Topic, spec *protocols.Spec, targetprotocol *p2p.Protocol) error { pp := &PssProtocol{ - Pss: pss, + Pss: ps, proto: targetprotocol, topic: topic, spec: spec, } - pss.Register(topic, pp.handle) + ps.Register(topic, pp.handle) return nil } @@ -539,53 +481,3 @@ func (self *PssProtocol) handle(msg []byte, p *p2p.Peer, senderAddr []byte) erro return nil } - -func (self *Pss) isSelfRecipient(msg *PssMsg) bool { - return bytes.Equal(msg.To, self.Overlay.BaseAddr()) -} - -func newProtocolMsg(code uint64, msg interface{}) ([]byte, error) { - - rlpdata, err := rlp.EncodeToBytes(msg) - if err != nil { - return nil, err - } - - // previous attempts corrupted nested structs in the payload iself upon deserializing - // therefore we use two separate []byte fields instead of peerAddr - // TODO verify that nested structs cannot be used in rlp - smsg := &PssProtocolMsg{ - Code: code, - Size: uint32(len(rlpdata)), - Payload: rlpdata, - } - - return rlp.EncodeToBytes(smsg) -} - -// constructs a new PssTopic from a given name and version. -// -// Analogous to the name and version members of p2p.Protocol -func NewTopic(s string, v int) (topic PssTopic) { - h := sha3.NewKeccak256() - h.Write([]byte(s)) - buf := make([]byte, TopicLength/8) - binary.PutUvarint(buf, uint64(v)) - h.Write(buf) - copy(topic[:], h.Sum(buf)[:]) - return topic -} - -func ToP2pMsg(msg []byte) (p2p.Msg, error) { - payload := &PssProtocolMsg{} - if err := rlp.DecodeBytes(msg, payload); err != nil { - return p2p.Msg{}, fmt.Errorf("pss protocol handler unable to decode payload as p2p message: %v", err) - } - - return p2p.Msg{ - Code: payload.Code, - Size: uint32(len(payload.Payload)), - ReceivedAt: time.Now(), - Payload: bytes.NewBuffer(payload.Payload), - }, nil -} diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index 3b400ade209e..875771b463a6 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -6,11 +6,12 @@ import ( "encoding/hex" "fmt" "io/ioutil" - "math/rand" "os" + "sync" "testing" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" @@ -33,37 +34,37 @@ var services = newServices() func init() { adapters.RegisterServices(services) - h := log.CallerFileHandler(log.StreamHandler(os.Stderr, log.TerminalFormat(true))) + hs := log.StreamHandler(os.Stderr, log.TerminalFormat(true)) + hf := log.LvlFilterHandler(log.LvlTrace, hs) + h := log.CallerFileHandler(hf) log.Root().SetHandler(h) } -func TestPssCache(t *testing.T) { +func TestCache(t *testing.T) { var err error to, _ := hex.DecodeString("08090a0b0c0d0e0f1011121314150001020304050607161718191a1b1c1d1e1f") oaddr, _ := hex.DecodeString("000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f") - //uaddr, _ := hex.DecodeString("101112131415161718191a1b1c1d1e1f000102030405060708090a0b0c0d0e0f") - //proofbytes := []byte{241, 172, 117, 105, 88, 154, 82, 33, 176, 188, 91, 244, 245, 85, 86, 16, 120, 232, 70, 45, 182, 188, 99, 103, 157, 3, 202, 121, 252, 21, 129, 22} - proofbytes, _ := hex.DecodeString("ad312dca94df401555cfdeb85a6a1f87fb8f240f08dc36af246bd9d4d41efd89") - ps := newTestPss(oaddr) - pp := NewPssParams() + proofbytes, _ := hex.DecodeString("822fff7527f7ae630c1224921e50a7ca1b27324f00f3966623bd503780c7ab33") + ps := NewTestPss(oaddr) + pp := NewPssParams(false) data := []byte("foo") datatwo := []byte("bar") fwdaddr := network.RandomAddr() msg := &PssMsg{ - Payload: &PssEnvelope{ + Payload: &Envelope{ TTL: 0, From: oaddr, - Topic: pssPingTopic, + Topic: PingTopic, Payload: data, }, To: to, } msgtwo := &PssMsg{ - Payload: &PssEnvelope{ + Payload: &Envelope{ TTL: 0, From: oaddr, - Topic: pssPingTopic, + Topic: PingTopic, Payload: datatwo, }, To: to, @@ -129,10 +130,10 @@ func TestPssCache(t *testing.T) { } } -func TestPssRegisterHandler(t *testing.T) { +func TestRegisterHandler(t *testing.T) { var err error addr := network.RandomAddr() - ps := newTestPss(addr.OAddr) + ps := NewTestPss(addr.OAddr) from := network.RandomAddr() payload := []byte("payload") topic := NewTopic(pssSpec.Name, int(pssSpec.Version)) @@ -147,13 +148,13 @@ func TestPssRegisterHandler(t *testing.T) { return nil } deregister := ps.Register(&topic, checkMsg) - pssmsg := &PssMsg{Payload: NewPssEnvelope(from.OAddr, topic, payload)} + pssmsg := &PssMsg{Payload: NewEnvelope(from.OAddr, topic, payload)} err = ps.Process(pssmsg) if err != nil { t.Fatal(err) } var i int - err = ps.Process(&PssMsg{Payload: NewPssEnvelope(from.OAddr, wrongtopic, payload)}) + err = ps.Process(&PssMsg{Payload: NewEnvelope(from.OAddr, wrongtopic, payload)}) expErr := "" if err == nil || err.Error() == expErr { t.Fatalf("unhandled topic expected '%v', got '%v'", expErr, err) @@ -168,25 +169,25 @@ func TestPssRegisterHandler(t *testing.T) { } deregister() deregister2() - err = ps.Process(&PssMsg{Payload: NewPssEnvelope(from.OAddr, topic, payload)}) + err = ps.Process(&PssMsg{Payload: NewEnvelope(from.OAddr, topic, payload)}) expErr = "" if err == nil || err.Error() == expErr { t.Fatalf("reregister handler expected %v, got %v", expErr, err) } } -func TestPssSimpleLinear(t *testing.T) { +func TestSimpleLinear(t *testing.T) { var err error nodeconfig := adapters.RandomNodeConfig() addr := network.NewAddrFromNodeID(nodeconfig.ID) _ = p2ptest.NewTestPeerPool() - ps := newTestPss(addr.Over()) + ps := NewTestPss(addr.Over()) - ping := &pssPing{ - quitC: make(chan struct{}), + ping := &Ping{ + C: make(chan struct{}), } - err = RegisterPssProtocol(ps, &pssPingTopic, pssPingProtocol, newPssPingProtocol(ping.pssPingHandler)) + err = RegisterPssProtocol(ps, &PingTopic, PingProtocol, NewPingProtocol(ping.PingHandler)) if err != nil { t.Fatalf("Failed to register virtual protocol in pss: %v", err) @@ -194,7 +195,7 @@ func TestPssSimpleLinear(t *testing.T) { run := func(p *p2p.Peer, rw p2p.MsgReadWriter) error { id := p.ID() pp := protocols.NewPeer(p, rw, pssSpec) - bp := &testPssPeer{ + bp := &testOverlayConn{ Peer: pp, addr: network.ToOverlayAddr(id[:]), } @@ -208,7 +209,7 @@ func TestPssSimpleLinear(t *testing.T) { pt := p2ptest.NewProtocolTester(t, nodeconfig.ID, 2, run) - msg := newPssPingMsg(ps, network.ToOverlayAddr(pt.IDs[0].Bytes()), pssPingProtocol, pssPingTopic, []byte{1, 2, 3}) + msg := NewPingMsg(network.ToOverlayAddr(pt.IDs[0].Bytes()), PingProtocol, PingTopic, []byte{1, 2, 3}) exchange := p2ptest.Exchange{ Expects: []p2ptest.Expect{ @@ -233,26 +234,58 @@ func TestPssSimpleLinear(t *testing.T) { } } -func TestPssFullRandom10_5_5(t *testing.T) { +func TestFullRandom50n(t *testing.T) { adapter := adapters.NewSimAdapter(services) - testPssFullRandom(t, adapter, 10, 5, 5) + testFullRandom(t, adapter, 50, 50, 50) } -func testPssFullRandom(t *testing.T, adapter adapters.NodeAdapter, nodecount int, fullnodecount int, msgcount int) { - var lastid discover.NodeID +func TestFullRandom25n(t *testing.T) { + adapter := adapters.NewSimAdapter(services) + testFullRandom(t, adapter, 25, 25, 25) +} + +func TestFullRandom10n(t *testing.T) { + adapter := adapters.NewSimAdapter(services) + testFullRandom(t, adapter, 10, 10, 10) +} + +func TestFullRandom5n(t *testing.T) { + adapter := adapters.NewSimAdapter(services) + testFullRandom(t, adapter, 5, 5, 5) +} + +func testFullRandom(t *testing.T, adapter adapters.NodeAdapter, nodecount int, fullnodecount int, msgcount int) { + var i int + var msgfromids []discover.NodeID + var msgreceived []discover.NodeID + var cancelmain func() + var triggerptr *chan discover.NodeID + + msgtoids := make([]discover.NodeID, msgcount) + + wg := sync.WaitGroup{} + wg.Add(msgcount) + + psslog := make(map[discover.NodeID]log.Logger) + psslogmain := log.New("psslog", "*") - nodeCount := 5 net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{ ID: "0", }) defer net.Shutdown() + timeout := 15 * time.Second + ctx, cancelmain := context.WithTimeout(context.Background(), timeout) + defer cancelmain() + trigger := make(chan discover.NodeID) - ids := make([]discover.NodeID, nodeCount) + triggerptr = &trigger + + ids := make([]discover.NodeID, nodecount) fullids := ids[0:fullnodecount] - fullpeers := [][]byte{} + fullpeers := make(map[discover.NodeID][]byte) - for i := 0; i < nodeCount; i++ { + for i = 0; i < nodecount; i++ { nodeconfig := adapters.RandomNodeConfig() nodeconfig.Services = []string{"bzz", "pss"} node, err := net.NewNodeWithConfig(nodeconfig) @@ -264,72 +297,72 @@ func testPssFullRandom(t *testing.T, adapter adapters.NodeAdapter, nodecount int t.Fatalf("error starting node %s: %s", node.ID().TerminalString(), err) } - if err := triggerChecks(trigger, net, node.ID()); err != nil { + if err := triggerChecks(ctx, &wg, triggerptr, net, node.ID()); err != nil { t.Fatal("error triggering checks for node %s: %s", node.ID().TerminalString(), err) } ids[i] = node.ID() if i < fullnodecount { - fullpeers = append(fullpeers, network.ToOverlayAddr(node.ID().Bytes())) + fullpeers[ids[i]] = network.ToOverlayAddr(node.ID().Bytes()) + psslog[ids[i]] = log.New("psslog", fmt.Sprintf("%x", fullpeers[ids[i]])) } + log.Debug("psslog starting node", "id", nodeconfig.ID) + } + + for i, id := range fullids { + msgfromids = append(msgfromids, id) + msgtoids[i] = fullids[(i+(len(fullids)/2)+1)%len(fullids)] } // run a simulation which connects the 10 nodes in a ring and waits // for full peer discovery action := func(ctx context.Context) error { for i, id := range ids { - var peerID discover.NodeID - if i == 0 { - peerID = ids[len(ids)-1] - } else { - peerID = ids[i-1] + peerID := ids[(i+1)%len(ids)] + if net.GetConn(id, peerID) != nil { + continue } if err := net.Connect(id, peerID); err != nil { return err } + psslog[id].Debug("conn ok", "one", id, "other", peerID) } return nil } check := func(ctx context.Context, id discover.NodeID) (bool, error) { + var tgt []byte + var fwd struct { + Addr []byte + Count int + } select { case <-ctx.Done(): + wg.Done() + psslog[id].Error("conn failed!", "id", id) return false, ctx.Err() default: } - - node := net.GetNode(id) - if node == nil { - return false, fmt.Errorf("unknown node: %s", id) + for i, fid := range msgfromids { + if id == fid { + tgt = network.ToOverlayAddr(msgtoids[(i+(len(msgtoids)/2)+1)%len(msgtoids)].Bytes()) + break + } + } + p := net.GetNode(id) + if p == nil { + return false, fmt.Errorf("Unknown node: %v", id) } - client, err := node.Client() + c, err := p.Client() if err != nil { - return false, fmt.Errorf("error getting node client: %s", err) + return false, err } - - for _, fid := range fullids { - if fid == id { - fpeeridx := rand.Int() % (fullnodecount - 1) - log.Debug(fmt.Sprintf("fpeeridx %d, fpeer len %d", fpeeridx, len(fullpeers))) - if bytes.Equal(fullpeers[fpeeridx], network.ToOverlayAddr(fid.Bytes())) { - fpeeridx++ - } - msg := pssPingMsg{Created: time.Now()} - code, _ := pssPingProtocol.GetCode(&pssPingMsg{}) - pmsg, _ := newProtocolMsg(code, msg) - client.CallContext(context.Background(), nil, "pss_sendRaw", pssPingTopic, PssAPIMsg{ - Addr: fullpeers[fpeeridx], - Msg: pmsg, - }) - } + for fwd.Count < 2 { + c.CallContext(context.Background(), &fwd, "pss_getForwarder", tgt) + time.Sleep(time.Microsecond * 250) } - lastid = id - + psslog[id].Debug("fwd check ok", "topaddr", fmt.Sprintf("%x", common.ByteLabel(fwd.Addr)), "kadcount", fwd.Count) return true, nil } - timeout := 5 * time.Second - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{ Action: action, Trigger: trigger, @@ -340,49 +373,81 @@ func testPssFullRandom(t *testing.T, adapter adapters.NodeAdapter, nodecount int }) if result.Error != nil { t.Fatalf("simulation failed: %s", result.Error) + cancelmain() } trigger = make(chan discover.NodeID) + triggerptr = &trigger action = func(ctx context.Context) error { + var rpcerr error + for ii, id := range msgfromids { + node := net.GetNode(id) + if node == nil { + return fmt.Errorf("unknown node: %s", id) + } + client, err := node.Client() + if err != nil { + return fmt.Errorf("error getting node client: %s", err) + } + msg := PingMsg{Created: time.Now()} + code, _ := PingProtocol.GetCode(&PingMsg{}) + pmsg, _ := NewProtocolMsg(code, msg) + client.CallContext(ctx, &rpcerr, "pss_send", PingTopic, APIMsg{ + Addr: fullpeers[msgtoids[ii]], + Msg: pmsg, + }) + if rpcerr != nil { + return fmt.Errorf("error rpc send id %x: %v", id, rpcerr) + } + } return nil } check = func(ctx context.Context, id discover.NodeID) (bool, error) { + select { case <-ctx.Done(): + wg.Done() return false, ctx.Err() default: } - + msgreceived = append(msgreceived, id) + psslog[id].Info("trigger received", "id", id, "len", len(msgreceived)) + wg.Done() return true, nil } - timeout = 5 * time.Second - ctx, cancel = context.WithTimeout(context.Background(), timeout) - defer cancel() result = simulations.NewSimulation(net).Run(ctx, &simulations.Step{ Action: action, Trigger: trigger, Expect: &simulations.Expectation{ - Nodes: fullids, + Nodes: msgtoids, Check: check, }, }) if result.Error != nil { + psslogmain.Error("msg failed!", "err", result.Error) + cancelmain() t.Fatalf("simulation failed: %s", result.Error) } - t.Log("Simulation Passed:") - t.Logf("Duration: %s", result.FinishedAt.Sub(result.StartedAt)) + if len(msgreceived) != len(msgtoids) { + t.Fatalf("Simulation Failed, got %d of %d msgs", len(msgreceived), len(msgtoids)) + } - time.Sleep(time.Second * 2) + wg.Wait() + psslogmain.Info("done!") + t.Logf("Simulation Passed, got %d of %d msgs", len(msgreceived), len(msgtoids)) + //t.Logf("Duration: %s", result.FinishedAt.Sub(result.StartedAt)) } // triggerChecks triggers a simulation step check whenever a peer is added or // removed from the given node -func triggerChecks(trigger chan discover.NodeID, net *simulations.Network, id discover.NodeID) error { +// connections and connectionstarget are temporary kademlia check workarounds +func triggerChecks(ctx context.Context, wg *sync.WaitGroup, trigger *chan discover.NodeID, net *simulations.Network, id discover.NodeID) error { - gotpeer := make(map[discover.NodeID]bool) + quitC := make(chan struct{}) + got := false node := net.GetNode(id) if node == nil { @@ -399,10 +464,10 @@ func triggerChecks(trigger chan discover.NodeID, net *simulations.Network, id di return fmt.Errorf("error getting peer events for node %v: %s", id, err) } - msgevents := make(chan PssAPIMsg) - msgsub, err := client.Subscribe(context.Background(), "pss", msgevents, "newMsg", pssPingTopic) + msgevents := make(chan APIMsg) + msgsub, err := client.Subscribe(context.Background(), "pss", msgevents, "receive", PingTopic) if err != nil { - return fmt.Errorf("error getting peer events for node %v: %s", id, err) + return fmt.Errorf("error getting msg events for node %v: %s", id, err) } go func() { @@ -411,12 +476,12 @@ func triggerChecks(trigger chan discover.NodeID, net *simulations.Network, id di for { select { case event := <-peerevents: - if event.Type == "add" && !gotpeer[event.Peer] { - trigger <- id - gotpeer[event.Peer] = true + if event.Type == "add" && !got { + got = true + *trigger <- id } case <-msgevents: - trigger <- id + *trigger <- id case err := <-peersub.Err(): if err != nil { log.Error(fmt.Sprintf("error getting peer events for node %v", id), "err", err) @@ -428,9 +493,17 @@ func triggerChecks(trigger chan discover.NodeID, net *simulations.Network, id di log.Error(fmt.Sprintf("error getting msg for node %v", id), "err", err) } return + case <-quitC: + return } } }() + + go func() { + wg.Wait() + quitC <- struct{}{} + }() + return nil } @@ -453,1009 +526,68 @@ func newServices() adapters.Services { return kademlias[id] } return adapters.Services{ + //"pss": func(id discover.NodeID, snapshot []byte) node.Service { "pss": func(ctx *adapters.ServiceContext) (node.Service, error) { - id := ctx.Config.ID cachedir, err := ioutil.TempDir("", "pss-cache") if err != nil { - return nil, err + return nil, fmt.Errorf("create pss cache tmpdir failed", "error", err) } dpa, err := storage.NewLocalDPA(cachedir) if err != nil { - return nil, err + return nil, fmt.Errorf("local dpa creation failed", "error", err) } - pssp := NewPssParams() - ps := NewPss(kademlia(id), dpa, pssp) + pssp := NewPssParams(true) + ps := NewPss(kademlia(ctx.Config.ID), dpa, pssp) - ping := &pssPing{ - quitC: make(chan struct{}), + ping := &Ping{ + C: make(chan struct{}), } - err = RegisterPssProtocol(ps, &pssPingTopic, pssPingProtocol, newPssPingProtocol(ping.pssPingHandler)) + err = RegisterPssProtocol(ps, &PingTopic, PingProtocol, NewPingProtocol(ping.PingHandler)) if err != nil { - return nil, err + log.Error("Couldnt register pss protocol", "err", err) + os.Exit(1) } return ps, nil }, + //"bzz": func(id discover.NodeID, snapshot []byte) node.Service { "bzz": func(ctx *adapters.ServiceContext) (node.Service, error) { - id := ctx.Config.ID - addr := network.NewAddrFromNodeID(id) + addr := network.NewAddrFromNodeID(ctx.Config.ID) + hp := network.NewHiveParams() + hp.Discovery = true config := &network.BzzConfig{ OverlayAddr: addr.Over(), UnderlayAddr: addr.Under(), - HiveParams: network.NewHiveParams(), - } - return network.NewBzz(config, kademlia(id), stateStore), nil - }, - } -} - -/* -// example protocol implementation peer -// message handlers are methods of this -// channels allow receipt reporting from p2p.Protocol message handler -type pssTestPeer struct { - *protocols.Peer - hasProtocol bool - successC chan bool - resultC chan int -} - -// example node simulation peer -// modeled from swarm/network/simulations/discovery/discovery_test.go - commit 08b1e42f -// contains reporting channel for expect results so we can collect all async incoming msgs before deciding results -type pssTestNode struct { - *Hive - *Pss - - id discover.NodeID - network *simulations.Network - trigger chan discover.NodeID - run adapters.RunProtocol - ct *protocols.CodeMap - expectC chan []int - ws *http.Handler - apifunc func() []rpc.API -} - -func (n *pssTestNode) Add(peer *bzzPeer) error { - err := n.Hive.Add(peer) - time.Sleep(time.Millisecond * 250) - n.triggerCheck() - return err -} - -func (n *pssTestNode) triggerCheck() { - go func() { n.trigger <- n.id }() -} - -func (n *pssTestNode) OverlayAddr() []byte { - return n.Pss.Overlay.BaseAddr() -} - -func (n *pssTestNode) UnderlayAddr() []byte { - return n.id.Bytes() -} - -// the content of the msgs we're sending in the tests -type pssTestPayload struct { - Data string -} - -func (m *pssTestPayload) String() string { - return m.Data -} - -type pssTestService struct { - node *pssTestNode // get addrs from this - msgFunc func(interface{}) error -} - -func newPssTestService(t *testing.T, handlefunc func(interface{}) error, testnode *pssTestNode) *pssTestService { - hp := NewHiveParams() - hp.KeepAliveInterval = 300 - bzz := NewBzz(testnode.OverlayAddr(), testnode.UnderlayAddr(), newTestStore()) - testnode.Hive = NewHive(hp, testnode.Pss.Overlay, bzz) - return &pssTestService{ - //nid := adapters.NewNodeID(addr.UnderlayAddr()) - msgFunc: handlefunc, - node: testnode, - } -} - -func (self *pssTestService) Start(server *p2p.Server) error { - return self.node.Hive.Start(server) -} - -func (self *pssTestService) Stop() error { - self.node.Hive.Stop() - return nil -} - -func (self *pssTestService) Protocols() []p2p.Protocol { - bzz := NewBzz(self.node.OverlayAddr(), self.node.UnderlayAddr(), newTestStore()) - return append(self.node.Hive.Protocols(), p2p.Protocol{ - Name: PssProtocolName, - Version: PssProtocolVersion, - Length: PssProtocol.Length(), - Run: bzz.RunProtocol(PssProtocol, self.Run), - }) -} - -func (self *pssTestService) APIs() []rpc.API { - return []rpc.API{ - rpc.API{ - Namespace: "eth", - Version: "0.1/pss", - Service: NewPssApi(self.node.Pss), - Public: true, - }, - } - return nil -} - -func (self *pssTestService) Run(peer *bzzPeer) error { - self.node.Add(peer) - defer self.node.Remove(peer) - return peer.Run(self.msgFunc) -} -*/ - -/* - - -func testPssFullRandom(t *testing.T, numsends int, numnodes int, numfullnodes int) { - var action func(ctx context.Context) error - var i int - var check func(ctx context.Context, id discover.NodeID) (bool, error) - var ctx context.Context - var result *simulations.StepResult - var timeout time.Duration - var cancel context.CancelFunc - - fullnodes := []discover.NodeID{} - sends := []int{} // sender/receiver ids array indices pairs - expectnodes := make(map[discover.NodeID]int) // how many messages we're expecting on each respective node - expectnodesids := []discover.NodeID{} // the nodes to expect on (needed by checker) - expectnodesresults := make(map[discover.NodeID][]int) // which messages expect actually got - - vct := protocols.NewCodeMap(map[uint64]interface{}{ - 0: pssTestPayload{}, - }) - topic, _ := MakeTopic(protocolName, protocolVersion) - - trigger := make(chan discover.NodeID) - testpeers := make(map[discover.NodeID]*pssTestPeer) - net, nodes := newPssSimulationTester(t, numnodes, numfullnodes, trigger, vct, protocolName, protocolVersion, testpeers) - - ids := []discover.NodeID{} - - // connect the peers - action = func(ctx context.Context) error { - for id, _ := range nodes { - ids = append(ids, id) - if _, ok := testpeers[id]; ok { - log.Trace(fmt.Sprintf("adding fullnode %x to testpeers %p", common.ByteLabel(id.Bytes()), testpeers)) - fullnodes = append(fullnodes, id) - } - } - for i, id := range ids { - var peerID discover.NodeID - if i != 0 { - peerID = ids[i-1] - if err := net.Connect(id, peerID); err != nil { - return err - } - } - } - return nil - } - check = func(ctx context.Context, id discover.NodeID) (bool, error) { - select { - case <-ctx.Done(): - return false, ctx.Err() - default: - } - - node, ok := nodes[id] - if !ok { - return false, fmt.Errorf("unknown node: %s (%v)", id, node) - } else { - log.Trace(fmt.Sprintf("sim check ok node %v", id)) - } - - return true, nil - } - - timeout = 10 * time.Second - ctx, cancel = context.WithTimeout(context.Background(), timeout) - - result = simulations.NewSimulation(net).Run(ctx, &simulations.Step{ - Action: action, - Trigger: trigger, - Expect: &simulations.Expectation{ - Nodes: ids, - Check: check, - }, - }) - if result.Error != nil { - t.Fatalf("simulation failed: %s", result.Error) - } - cancel() - - // ensure that we didn't get lost in concurrency issues - if len(fullnodes) != numfullnodes { - t.Fatalf("corrupt fullnodes array, expected %d, have %d", numfullnodes, len(fullnodes)) - } - - // ensure that the channel is clean - trigger = make(chan discover.NodeID) - - // randomly decide which nodes to send to and from - rand.Seed(time.Now().Unix()) - for i = 0; i < numsends; i++ { - s := rand.Int() % numfullnodes - r := s - for r == s { - r = rand.Int() % numfullnodes - } - log.Trace(fmt.Sprintf("rnd pss: idx %d->%d (%x -> %x)", s, r, common.ByteLabel(fullnodes[s].Bytes()), common.ByteLabel(fullnodes[r].Bytes()))) - expectnodes[fullnodes[r]]++ - sends = append(sends, s, r) - } - - // distinct array of nodes to expect on - for k, _ := range expectnodes { - expectnodesids = append(expectnodesids, k) - } - - // wait a bit for the kademlias to fill up - z, _ := time.ParseDuration(fmt.Sprintf("%dms", (numnodes * 25))) - if z.Seconds() < 1.0 { - z = time.Second - } - time.Sleep(z) - - // send and monitor receive of pss - action = func(ctx context.Context) error { - code, _ := vct.GetCode(&pssTestPayload{}) - - for i := 0; i < len(sends); i += 2 { - msgbytes, _ := makeMsg(code, &pssTestPayload{ - Data: fmt.Sprintf("%v", i+1), - }) - go func(i int, expectnodesresults map[discover.NodeID][]int) { - expectnode := fullnodes[sends[i+1]] // the receiving node - sendnode := fullnodes[sends[i]] // the sending node - oaddr := nodes[expectnode].OverlayAddr() - err := nodes[sendnode].Pss.Send(oaddr, topic, msgbytes) - if err != nil { - t.Fatalf("could not send pss: %v", err) - } - - select { - // if the pss is delivered - case <-testpeers[expectnode].successC: - log.Trace(fmt.Sprintf("got successC from node %x", common.ByteLabel(expectnode.Bytes()))) - expectnodesresults[expectnode] = append(expectnodesresults[expectnode], <-testpeers[expectnode].resultC) - // if not we time out, -1 means fail tick - case <-time.NewTimer(time.Second).C: - log.Trace(fmt.Sprintf("result timed out on node %x", common.ByteLabel(expectnode.Bytes()))) - expectnodesresults[expectnode] = append(expectnodesresults[expectnode], -1) - } - - // we can safely send to the check handler if we got feedback for all msgs we sent to a particular node - if len(expectnodesresults[expectnode]) == expectnodes[expectnode] { - trigger <- expectnode - nodes[expectnode].expectC <- expectnodesresults[expectnode] - } - }(i, expectnodesresults) - } - return nil - } - - // results - check = func(ctx context.Context, id discover.NodeID) (bool, error) { - select { - case <-ctx.Done(): - return false, ctx.Err() - default: - } - - receives := <-nodes[id].expectC - log.Trace(fmt.Sprintf("expect received %d msgs on from node %x: %v", len(receives), common.ByteLabel(id.Bytes()), receives)) - return true, nil - } - - timeout = 10 * time.Second - ctx, cancel = context.WithTimeout(context.Background(), timeout) - defer cancel() - - result = simulations.NewSimulation(net).Run(ctx, &simulations.Step{ - Action: action, - Trigger: trigger, - Expect: &simulations.Expectation{ - Nodes: expectnodesids, - Check: check, - }, - }) - if result.Error != nil { - t.Fatalf("simulation failed: %s", result.Error) - } - - t.Log("Simulation Passed:") - - for i := 0; i < len(sends); i += 2 { - t.Logf("Pss #%d: oaddr %x -> %x (uaddr %x -> %x)", i/2+1, - common.ByteLabel(nodes[fullnodes[sends[i]]].Pss.BaseAddr()), - common.ByteLabel(nodes[fullnodes[sends[i+1]]].Pss.BaseAddr()), - common.ByteLabel(fullnodes[sends[i]].Bytes()), - common.ByteLabel(fullnodes[sends[i+1]].Bytes())) - } - totalfails := 0 - for id, results := range expectnodesresults { - fails := 0 - for _, r := range results { - if r == -1 { - fails++ - } - } - t.Logf("Node oaddr %x (uaddr %x) was sent %d msgs, of which %d failed", common.ByteLabel(nodes[id].Pss.BaseAddr()), common.ByteLabel(id.Bytes()), len(results), fails) - totalfails += fails - } - t.Logf("Total sent: %d, total fail: %d (%.2f%%)", len(sends)/2, totalfails, (float32(totalfails)/float32(len(sends)/2+1))*100) - - for _, node := range nodes { - logstring := fmt.Sprintf("Node oaddr %x kademlia: ", common.ByteLabel(node.Pss.Overlay.BaseAddr())) - node.Pss.Overlay.EachConn(nil, 256, func(p Peer, po int, isprox bool) bool { - logstring += fmt.Sprintf("%x ", common.ByteLabel(p.Over())) - return true - }) - t.Log(logstring) - } -} - -func TestPssFullLinearEcho(t *testing.T) { - - var action func(ctx context.Context) error - var check func(ctx context.Context, id discover.NodeID) (bool, error) - var ctx context.Context - var result *simulations.StepResult - var timeout time.Duration - var cancel context.CancelFunc - - var firstpssnode discover.NodeID - var secondpssnode discover.NodeID - - vct := protocols.NewCodeMap(protocolName, protocolVersion, ProtocolMaxMsgSize) - vct.Register(0, &pssTestPayload{}) - topic, _ := MakeTopic(protocolName, protocolVersion) - - fullnodes := []discover.NodeID{} - trigger := make(chan discover.NodeID) - testpeers := make(map[discover.NodeID]*pssTestPeer) - net, nodes := newPssSimulationTester(t, 3, 2, trigger, vct, protocolName, protocolVersion, testpeers) - ids := []discover.NodeID{} // ohh risky! but the action for a specific id should come before the expect anyway - - action = func(ctx context.Context) error { - var thinnodeid discover.NodeID - for id, _ := range nodes { - ids = append(ids, id) - if _, ok := testpeers[id]; ok { - log.Trace(fmt.Sprintf("adding fullnode %x to testpeers %p", common.ByteLabel(id.Bytes()), testpeers)) - fullnodes = append(fullnodes, id) - } else { - thinnodeid = id - } - } - if err := net.Connect(fullnodes[0], thinnodeid); err != nil { - return err - } - if err := net.Connect(thinnodeid, fullnodes[1]); err != nil { - return err - } - - // for i, id := range ids { - // var peerID discover.NodeID - // if i != 0 { - // peerID = ids[i-1] - // if err := net.Connect(id, peerID); err != nil { - // return err - // } - // } - // } - return nil - } - check = func(ctx context.Context, id discover.NodeID) (bool, error) { - select { - case <-ctx.Done(): - return false, ctx.Err() - default: - } - - node, ok := nodes[id] - if !ok { - return false, fmt.Errorf("unknown node: %s (%v)", id, node) - } - log.Trace(fmt.Sprintf("sim check ok node %v", id)) - - return true, nil - } - - timeout = 10 * time.Second - ctx, cancel = context.WithTimeout(context.Background(), timeout) - - result = simulations.NewSimulation(net).Run(ctx, &simulations.Step{ - Action: action, - Trigger: trigger, - Expect: &simulations.Expectation{ - Nodes: ids, - Check: check, - }, - }) - if result.Error != nil { - t.Fatalf("simulation failed: %s", result.Error) - } - cancel() - - nonode := &adapters.NodeID{} - firstpssnode = nonode - secondpssnode = nonode - - // first find a node that we're connected to - for firstpssnode == nonode { - log.Debug(fmt.Sprintf("Waiting for pss relaypeer for %x close to %x ...", common.ByteLabel(nodes[fullnodes[0]].OverlayAddr()), common.ByteLabel(nodes[ids[1]].OverlayAddr()))) - nodes[fullnodes[0]].Pss.Overlay.EachConn(nodes[fullnodes[1]].OverlayAddr(), 256, func(p Peer, po int, isprox bool) bool { - for _, id := range ids { - if id.NodeID == p.ID() { - firstpssnode = id - log.Debug(fmt.Sprintf("PSS relay found; relaynode %v kademlia %v", common.ByteLabel(id.Bytes()), common.ByteLabel(firstpssnode.Bytes()))) - } - } - if firstpssnode == nonode { - return true - } - return false - }) - if firstpssnode == nonode { - time.Sleep(time.Millisecond * 100) - } - } - - // then find the node it's connected to - for secondpssnode == nonode { - log.Debug(fmt.Sprintf("PSS kademlia: Waiting for recipientpeer for %x close to %x ...", common.ByteLabel(nodes[firstpssnode].OverlayAddr()), common.ByteLabel(nodes[fullnodes[1]].OverlayAddr()))) - nodes[firstpssnode].Pss.Overlay.Eachc(nodes[fullnodes[1]].OverlayAddr(), 256, func(p Peer, po int, isprox bool) bool { - for _, id := range ids { - if id.NodeID == p.ID() && id.NodeID != fullnodes[0].NodeID { - secondpssnode = id - log.Debug(fmt.Sprintf("PSS recipient found; relaynode %v kademlia %v", common.ByteLabel(id.Bytes()), common.ByteLabel(secondpssnode.Bytes()))) - } - } - if secondpssnode == nonode { - return true - } - return false - }) - if secondpssnode == nonode { - time.Sleep(time.Millisecond * 100) - } - } - - action = func(ctx context.Context) error { - code, _ := vct.GetCode(&pssTestPayload{}) - msgbytes, _ := makeMsg(code, &pssTestPayload{ - Data: "ping", - }) - - go func() { - oaddr := nodes[secondpssnode].OverlayAddr() - err := nodes[ids[0]].Pss.Send(oaddr, topic, msgbytes) - if err != nil { - t.Fatalf("could not send pss: %v", err) - } - trigger <- ids[0] - }() - - return nil - } - check = func(ctx context.Context, id discover.NodeID) (bool, error) { - select { - case <-ctx.Done(): - return false, ctx.Err() - default: - } - - // also need to know if the protocolpeer is set up - time.Sleep(time.Millisecond * 100) - return <-testpeers[ids[0]].successC, nil - //return true, nil - } - - timeout = 10 * time.Second - ctx, cancel = context.WithTimeout(context.Background(), timeout) - defer cancel() - - result = simulations.NewSimulation(net).Run(ctx, &simulations.Step{ - Action: action, - Trigger: trigger, - Expect: &simulations.Expectation{ - Nodes: []discover.NodeID{ids[0]}, - Check: check, - }, - }) - if result.Error != nil { - t.Fatalf("simulation failed: %s", result.Error) - } - - t.Log("Simulation Passed:") -} - -func TestPssFullWS(t *testing.T) { - - // settings for ws servers - var srvsendep = "localhost:18546" - var srvrecvep = "localhost:18547" - var clientrecvok, clientsendok bool - var clientrecv, clientsend *rpc.Client - - var action func(ctx context.Context) error - var check func(ctx context.Context, id discover.NodeID) (bool, error) - var ctx context.Context - var result *simulations.StepResult - var timeout time.Duration - var cancel context.CancelFunc - - var firstpssnode, secondpssnode discover.NodeID - fullnodes := []discover.NodeID{} - vct := protocols.NewCodeMap(protocolName, protocolVersion, ProtocolMaxMsgSize) - vct.Register(0, &pssTestPayload{}) - topic, _ := MakeTopic(pingTopicName, pingTopicVersion) - - trigger := make(chan discover.NodeID) - testpeers := make(map[discover.NodeID]*pssTestPeer) - simnet, nodes := newPssSimulationTester(t, 3, 2, trigger, vct, protocolName, protocolVersion, testpeers) - ids := []discover.NodeID{} // ohh risky! but the action for a specific id should come before the expect anyway - - action = func(ctx context.Context) error { - var thinnodeid discover.NodeID - for id, node := range nodes { - ids = append(ids, id) - if _, ok := testpeers[id]; ok { - log.Trace(fmt.Sprintf("adding fullnode %x to testpeers %p", common.ByteLabel(id.Bytes()), testpeers)) - fullnodes = append(fullnodes, id) - node.Pss.Register(topic, node.Pss.GetPingHandler()) - srv := rpc.NewServer() - for _, rpcapi := range node.apifunc() { - srv.RegisterName(rpcapi.Namespace, rpcapi.Service) - } - ws := srv.WebsocketHandler([]string{"*"}) - node.ws = &ws - } else { - thinnodeid = id + HiveParams: hp, } - } - if err := simnet.Connect(fullnodes[0], thinnodeid); err != nil { - return err - } - if err := simnet.Connect(thinnodeid, fullnodes[1]); err != nil { - return err - } - - return nil - } - - check = func(ctx context.Context, id discover.NodeID) (bool, error) { - select { - case <-ctx.Done(): - return false, ctx.Err() - default: - } - - node, ok := nodes[id] - if !ok { - return false, fmt.Errorf("unknown node: %s (%v)", id, node) - } else { - log.Trace(fmt.Sprintf("sim check ok node %v", id)) - } - - return true, nil - } - - timeout = 10 * time.Second - ctx, cancel = context.WithTimeout(context.Background(), timeout) - - result = simulations.NewSimulation(simnet).Run(ctx, &simulations.Step{ - Action: action, - Trigger: trigger, - Expect: &simulations.Expectation{ - Nodes: ids, - Check: check, - }, - }) - if result.Error != nil { - t.Fatalf("simulation failed: %s", result.Error) - } - cancel() - - nonode := &adapters.NodeID{} - firstpssnode = nonode - secondpssnode = nonode - - // first find a node that we're connected to - for firstpssnode == nonode { - log.Debug(fmt.Sprintf("Waiting for pss relaypeer for %x close to %x ...", common.ByteLabel(nodes[fullnodes[0]].OverlayAddr()), common.ByteLabel(nodes[fullnodes[1]].OverlayAddr()))) - nodes[fullnodes[0]].Pss.Overlay.EachLivePeer(nodes[fullnodes[1]].OverlayAddr(), 256, func(p Peer, po int, isprox bool) bool { - for _, id := range ids { - if id.NodeID == p.ID() { - firstpssnode = id - log.Debug(fmt.Sprintf("PSS relay found; relaynode %x", common.ByteLabel(nodes[firstpssnode].OverlayAddr()))) - } - } - if firstpssnode == nonode { - return true - } - return false - }) - if firstpssnode == nonode { - time.Sleep(time.Millisecond * 100) - } - } - - // then find the node it's connected to - for secondpssnode == nonode { - log.Debug(fmt.Sprintf("PSS kademlia: Waiting for recipientpeer for %x close to %x ...", common.ByteLabel(nodes[firstpssnode].OverlayAddr()), common.ByteLabel(nodes[fullnodes[1]].OverlayAddr()))) - nodes[firstpssnode].Pss.Overlay.EachConn(nodes[fullnodes[1]].OverlayAddr(), 256, func(p Peer, po int, isprox bool) bool { - for _, id := range ids { - if id.NodeID == p.ID() && id.NodeID != fullnodes[0].NodeID { - secondpssnode = id - log.Debug(fmt.Sprintf("PSS recipient found; relaynode %x", common.ByteLabel(nodes[secondpssnode].OverlayAddr()))) - } - } - if secondpssnode == nonode { - return true - } - return false - }) - if secondpssnode == nonode { - time.Sleep(time.Millisecond * 100) - } - } - - srvrecvl, err := net.Listen("tcp", srvrecvep) - if err != nil { - t.Fatalf("Tcp (recv) on %s failed: %v", srvrecvep, err) - } - go func() { - err := http.Serve(srvrecvl, *nodes[fullnodes[1]].ws) - if err != nil { - t.Fatalf("http serve (recv) on %s failed: %v", srvrecvep, err) - } - }() - - srvsendl, err := net.Listen("tcp", srvsendep) - if err != nil { - t.Fatalf("Tcp (send) on %s failed: %v", srvsendep, err) - } - go func() { - err := http.Serve(srvsendl, *nodes[fullnodes[0]].ws) - if err != nil { - t.Fatalf("http serve (send) on %s failed: %v", srvrecvep, err) - } - }() - - for !clientrecvok { - log.Trace("attempting clientrecv connect") - clientrecv, err = rpc.DialWebsocket(context.Background(), "ws://"+srvrecvep, "ws://localhost") - if err == nil { - clientrecvok = true - } else { - log.Debug("clientrecv failed, retrying", "error", err) - time.Sleep(time.Millisecond * 250) - } - } - - for !clientsendok { - log.Trace("attempting clientsend connect") - clientsend, err = rpc.DialWebsocket(context.Background(), "ws://"+srvsendep, "ws://localhost") - if err == nil { - clientsendok = true - } else { - log.Debug("clientsend failed, retrying", "error", err) - time.Sleep(time.Millisecond * 250) - } - } - - trigger = make(chan discover.NodeID) - ch := make(chan string) - - action = func(ctx context.Context) error { - go func() { - clientrecv.EthSubscribe(ctx, ch, "newMsg", topic) - clientsend.Call(nil, "eth_sendRaw", nodes[secondpssnode].Pss.Overlay.BaseAddr(), topic, []byte("ping")) - trigger <- secondpssnode - }() - return nil - } - check = func(ctx context.Context, id discover.NodeID) (bool, error) { - select { - case <-ctx.Done(): - return false, ctx.Err() - default: - } - - select { - case msg := <-ch: - log.Trace(fmt.Sprintf("notify!: %v", msg)) - case <-time.NewTimer(time.Second).C: - log.Trace(fmt.Sprintf("no notifies :'(")) - } - // also need to know if the protocolpeer is set up - - return true, nil - } - - timeout = 10 * time.Second - ctx, cancel = context.WithTimeout(context.Background(), timeout) - defer cancel() - - result = simulations.NewSimulation(simnet).Run(ctx, &simulations.Step{ - Action: action, - Trigger: trigger, - Expect: &simulations.Expectation{ - Nodes: []discover.NodeID{secondpssnode}, - Check: check, + return network.NewBzz(config, kademlia(ctx.Config.ID), stateStore), nil }, - }) - if result.Error != nil { - t.Fatalf("simulation failed: %s", result.Error) } - - t.Log("Simulation Passed:") } -// test framework below - -// numnodes: how many nodes to create -// pssnodeidx: on which node indices to start the pss -// net: the simulated network -// trigger: hook needed for simulation event reporting -// vct: codemap for virtual protocol -// name: name for virtual protocol (and pss topic) -// version: name for virtual protocol (and pss topic) -// testpeers: pss-specific peers, with hook needed for simulation event reporting - -// the simulation tester constructor is currently a hack to fit previous code with later stack using node.Services to start SimNodes - -func newPssSimulationTester(t *testing.T, numnodes int, numfullnodes int, trigger chan discover.NodeID, vct *protocols.CodeMap, name string, version int, testpeers map[discover.NodeID]*pssTestPeer) (*simulations.Network, map[discover.NodeID]*pssTestNode) { - topic, _ := MakeTopic(name, version) - nodes := make(map[discover.NodeID]*pssTestNode, numnodes) - psss := make(map[discover.NodeID]*Pss) - var simnet *simulations.Network - serviceFunc := func(id discover.NodeID) node.Service { - node := &pssTestNode{ - Pss: psss[id], - Hive: nil, - id: id, - network: simnet, - trigger: trigger, - ct: vct, - apifunc: func() []rpc.API { return nil }, - expectC: make(chan []int), - } - - // set up handlers for the encapsulating PssMsg - - var handlefunc func(interface{}) error - - addr := NewPeerAddrFromNodeID(id) - - if testpeers[id] != nil { - handlefunc = makePssHandleProtocol(psss[id]) - log.Trace(fmt.Sprintf("Making full protocol id %x addr %x (testpeers %p)", common.ByteLabel(id.Bytes()), common.ByteLabel(addr.Over()), testpeers)) - } else { - handlefunc = makePssHandleForward(psss[id]) - } - - // protocols are now registered by invoking node services - // since adapters.SimNode implements p2p.Server, needed for the services to start, we use this as a convenience wrapper - - testservice := newPssTestService(t, handlefunc, node) - - // the network sim wants a adapters.NodeAdapter, so we pass back to it a SimNode - // this is the SimNode member of the testNode initialized above, but assigned through the service start - // that is so say: node == testservice.node, but we access it as a member of testservice below for clarity (to the extent that this can be clear) - - nodes[id] = testservice.node - testservice.node.apifunc = testservice.APIs - return testservice - } - adapter := adapters.NewSimAdapter(map[string]adapters.ServiceFunc{"pss": serviceFunc}) - simnet = simulations.NewNetwork(adapter, &simulations.NetworkConfig{ - ID: "0", - Backend: true, - }) - configs := make([]*adapters.NodeConfig, numnodes) - for i := 0; i < numnodes; i++ { - configs[i] = adapters.RandomNodeConfig() - configs[i].Service = "pss" - } - for i, conf := range configs { - addr := NewPeerAddrFromNodeID(conf.ID) - psss[conf.ID] = makePss(addr.Over()) - if i < numfullnodes { - tp := &pssTestPeer{ - Peer: &protocols.Peer{ - Peer: &p2p.Peer{}, - }, - successC: make(chan bool), - resultC: make(chan int), - } - testpeers[conf.ID] = tp - targetprotocol := makeCustomProtocol(name, version, vct, testpeers[conf.ID]) - pssprotocol := NewPssProtocol(psss[conf.ID], &topic, vct, targetprotocol) - psss[conf.ID].Register(topic, pssprotocol.GetHandler()) - } - - if err := simnet.NewNodeWithConfig(conf); err != nil { - t.Fatalf("error creating node %s: %s", conf.ID.Label(), err) - } - if err := simnet.Start(conf.ID); err != nil { - t.Fatalf("error starting node %s: %s", conf.ID.Label(), err) - } - } - - return simnet, nodes +type connmap struct { + conns map[discover.NodeID][]discover.NodeID + healthy map[discover.NodeID]bool + lock sync.Mutex } -func makePss(addr []byte) *Pss { - - // set up storage - cachedir, err := ioutil.TempDir("", "pss-cache") - if err != nil { - log.Error("create pss cache tmpdir failed", "error", err) - os.Exit(1) - } - - dpa, err := storage.NewLocalDPA(cachedir) - if err != nil { - log.Error("local dpa creation failed", "error", err) - os.Exit(1) - } - // cannot use pyramidchunker as it still lacks joinfunc TestPssRegisterHandler(t *testing.T) { - addr := RandomAddr() - ps := newTestPss(addr.UnderlayAddr()) - from := RandomAddr() - payload := []byte("payload") - topic := NewTopic(protocolName, protocolVersion) - checkMsg := func(msg []byte, p *p2p.Peer, sender []byte) error { - if !bytes.Equal(from.OverlayAddr(), sender) { - return fmt.Errorf("sender mismatch. expected %x, got %x", from.OverlayAddr(), sender) - } - if !bytes.Equal(msg, payload) { - return fmt.Errorf("sender mismatch. expected %x, got %x", msg, payload) - } - if !bytes.Equal(from.UnderlayAddr(), p.ID()) { - return fmt.Errorf("sender mismatch. expected %x, got %x", from.UnderlayAddr(), p.ID()) - } - } - deregister := ps.Register(topic, checkMsg) - pssmsg := &PssMsg{Data: NewPssEnvelope(from, topic, payload)} - err = ps.Process(pssmsg) - if err != nil { - t.Fatal(err) - } - var i int - err = ps.Process(&PssMsg{Data: NewPssEnvelope(from, []byte("topic"), payload)}) - expErr := "" - if err == nil || err.Error() != expErr { - t.Fatalf("unhandled topic expected %v, got %v", expErr, err) - } - deregister2 := ps.Register(topic, func(msg []byte, p *p2p.Peer, sender []byte) error { i++; return nil }) - ps.Process(pssmsg) - if err != nil { - t.Fatal(err) - } - if i != 1 { - t.Fatalf("second registerer handler did not run") - } - deregister() - deregister2() - err = ps.Process(&PssMsg{Data: NewPssEnvelope(from, topic, payload)}) - expErr = "" - if err == nil || err.Error() != expErr { - t.Fatalf("reregister handler expected %v, got %v", expErr, err) - } -} - // dpa.Chunker = storage.NewPyramidChunker(storage.NewChunkerParams()) - - kp := network.NewKadParams() - kp.MinProxBinSize = 3 - - pp := NewPssParams() - - overlay := network.NewKademlia(addr, kp) - ps := NewPss(overlay, dpa, pp) - //overlay.Prune(time.Tick(time.Millisecond * 250)) - return ps +type testOverlayConn struct { + *protocols.Peer + addr []byte } -func makeCustomProtocol(name string, version int, ct *protocols.CodeMap, testpeer *pssTestPeer) *p2p.Protocol { - run := func(p *protocols.Peer) error { - log.Trace(fmt.Sprintf("running pss vprotocol on peer %v", p)) - if testpeer == nil { - testpeer = &pssTestPeer{} - } - testpeer.Peer = p - p.Register(&pssTestPayload{}, testpeer.SimpleHandlePssPayload) - err := p.Run() - return err - } - - return protocols.NewProtocol(name, uint(version), run, ct, nil, nil) +func (self *testOverlayConn) Address() []byte { + return self.addr } -func makePssHandleForward(ps *Pss) func(msg interface{}) error { - // for the simple check it passes on the message if it's not for us - return func(msg interface{}) error { - pssmsg := msg.(*PssMsg) - if ps.IsSelfRecipient(pssmsg) { - log.Trace("pss for us .. yay!") - } else { - log.Trace("passing on pss") - return ps.Forward(pssmsg) - } - return nil - } +func (self *testOverlayConn) Off() network.OverlayAddr { + return self } -func makePssHandleProtocol(ps *Pss) func(msg interface{}) error { - return func(msg interface{}) error { - pssmsg := msg.(*PssMsg) - - if ps.IsSelfRecipient(pssmsg) { - log.Trace("pss for us ... let's process!") - env := pssmsg.Payload - umsg := env.Payload // this will be rlp encrypted - f := ps.GetHandler(env.Topic) - if f == nil { - return fmt.Errorf("No registered handler for topic '%s'", env.Topic) - } - nid := adapters.NewNodeID(env.SenderUAddr) - p := p2p.NewPeer(nid.NodeID, fmt.Sprintf("%x", common.ByteLabel(nid.Bytes())), []p2p.Cap{}) - return f(umsg, p, env.SenderOAddr) - } else { - log.Trace("pss was for someone else :'(") - return ps.Forward(pssmsg) - } - return nil - } +func (self *testOverlayConn) Drop(err error) { } -// echoes an incoming message -// it comes in through -// Any pointer receiver that has protocols.Peer -func (ptp *pssTestPeer) SimpleHandlePssPayload(msg interface{}) error { - pmsg := msg.(*pssTestPayload) - log.Trace(fmt.Sprintf("pssTestPayloadhandler got message %v", pmsg)) - if pmsg.Data == "ping" { - pmsg.Data = "pong" - log.Trace(fmt.Sprintf("pssTestPayloadhandler reply %v", pmsg)) - ptp.Send(pmsg) - } else if pmsg.Data == "pong" { - ptp.successC <- true - } else { - res, err := strconv.Atoi(pmsg.Data) - if err != nil { - log.Trace(fmt.Sprintf("pssTestPayloadhandlererr %v", err)) - ptp.successC <- false - } else { - log.Trace(fmt.Sprintf("pssTestPayloadhandler sending %d on chan", pmsg)) - ptp.successC <- true - ptp.resultC <- res - } - } - - return nil +func (self *testOverlayConn) Update(o network.OverlayAddr) network.OverlayAddr { + return self } -*/ diff --git a/swarm/pss/pssapi.go b/swarm/pss/pssapi.go index f0d4b6bf4408..e5221c74949b 100644 --- a/swarm/pss/pssapi.go +++ b/swarm/pss/pssapi.go @@ -1,33 +1,28 @@ package pss import ( + "bytes" "context" "fmt" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/swarm/network" ) -// PssAPI is the RPC API module for Pss -type PssAPI struct { +// API is the RPC API module for Pss +type API struct { *Pss } -// NewPssAPI constructs a PssAPI instance -func NewPssAPI(ps *Pss) *PssAPI { - return &PssAPI{Pss: ps} -} - -// PssAPIMsg is the type for messages, it extends the rlp encoded protocol Msg -// with the Sender's overlay address -type PssAPIMsg struct { - Msg []byte - Addr []byte +// NewAPI constructs a PssAPI instance +func NewAPI(ps *Pss) *API { + return &API{Pss: ps} } // NewMsg API endpoint creates an RPC subscription -func (pssapi *PssAPI) NewMsg(ctx context.Context, topic PssTopic) (*rpc.Subscription, error) { +func (pssapi *API) Receive(ctx context.Context, topic Topic) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return nil, fmt.Errorf("Subscribe not supported") @@ -35,7 +30,7 @@ func (pssapi *PssAPI) NewMsg(ctx context.Context, topic PssTopic) (*rpc.Subscrip psssub := notifier.CreateSubscription() handler := func(msg []byte, p *p2p.Peer, from []byte) error { - apimsg := &PssAPIMsg{ + apimsg := &APIMsg{ Msg: msg, Addr: from, } @@ -44,14 +39,13 @@ func (pssapi *PssAPI) NewMsg(ctx context.Context, topic PssTopic) (*rpc.Subscrip } return nil } - deregf := pssapi.Pss.Register(&topic, handler) + deregf := pssapi.Register(&topic, handler) go func() { defer deregf() - //defer psssub.Unsubscribe() select { case err := <-psssub.Err(): - log.Warn(fmt.Sprintf("caught subscription error in pss sub topic: %v", topic, err)) + log.Warn(fmt.Sprintf("caught subscription error in pss sub topic %x: %v", topic, err)) case <-notifier.Closed(): log.Warn(fmt.Sprintf("rpc sub notifier closed")) } @@ -61,16 +55,45 @@ func (pssapi *PssAPI) NewMsg(ctx context.Context, topic PssTopic) (*rpc.Subscrip } // SendRaw sends the message (serialized into byte slice) to a peer with topic -func (pssapi *PssAPI) SendRaw(topic PssTopic, msg PssAPIMsg) error { - err := pssapi.Pss.Send(msg.Addr, topic, msg.Msg) - if err != nil { - return fmt.Errorf("send error: %v", err) +func (pssapi *API) Send(topic Topic, msg APIMsg) error { + if pssapi.debug && bytes.Equal(msg.Addr, pssapi.BaseAddr()) { + log.Warn("Pss debug enabled; send to self shortcircuit", "apimsg", msg, "topic", topic) + env := NewEnvelope(msg.Addr, topic, msg.Msg) + return pssapi.Process(&PssMsg{ + To: pssapi.BaseAddr(), + Payload: env, + }) } - return fmt.Errorf("ok sent") + return pssapi.SendRaw(msg.Addr, topic, msg.Msg) +} + +// PssAPITest are temporary API calls for development use only +// These symbols should not be included in production environment +type APITest struct { + *Pss +} + +// NewAPI constructs a API instance +func NewAPITest(ps *Pss) *APITest { + return &APITest{Pss: ps} +} + +// temporary for access to overlay while faking kademlia healthy routines +func (pssapitest *APITest) GetForwarder(addr []byte) (fwd struct { + Addr []byte + Count int +}) { + pssapitest.Overlay.EachConn(addr, 255, func(op network.OverlayConn, po int, isproxbin bool) bool { + if bytes.Equal(fwd.Addr, []byte{}) { + fwd.Addr = op.Address() + } + fwd.Count++ + return true + }) + return } // BaseAddr gets our own overlayaddress -func (pssapi *PssAPI) BaseAddr() ([]byte, error) { - log.Warn("inside baseaddr") - return pssapi.Pss.Overlay.BaseAddr(), nil +func (pssapitest *APITest) BaseAddr() ([]byte, error) { + return pssapitest.Pss.BaseAddr(), nil } diff --git a/swarm/pss/simulations/service_test.go b/swarm/pss/simulations/service_test.go new file mode 100644 index 000000000000..8fb1d1026947 --- /dev/null +++ b/swarm/pss/simulations/service_test.go @@ -0,0 +1,232 @@ +package pss_simulations + +import ( + "context" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/protocols" + "github.com/ethereum/go-ethereum/p2p/simulations" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/pot" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/pss" + "github.com/ethereum/go-ethereum/swarm/pss/client" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +func init() { + h := log.CallerFileHandler(log.StreamHandler(os.Stderr, log.TerminalFormat(true))) + log.Root().SetHandler(h) +} + +// TestPssProtocol starts a pss network along with two test nodes which run +// protocols via the pss network, connects those two test nodes and then +// waits for them to handshake +func TestProtocol(t *testing.T) { + // define the services + w := &testWrapper{} + services := adapters.Services{ + "pss": w.newPssService, + "test": w.newTestService, + } + + // create the network + adapter := adapters.NewSimAdapter(services) + net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{ + ID: "pss", + }) + defer net.Shutdown() + startNode := func(service string) *simulations.Node { + config := adapters.RandomNodeConfig() + config.Services = []string{service} + node, err := net.NewNodeWithConfig(config) + if err != nil { + t.Fatalf("error starting node: %s", err) + } + if err := net.Start(node.ID()); err != nil { + t.Fatalf("error starting node %s: %s", node.ID().TerminalString(), err) + } + return node + } + + // start 20 pss nodes + nodeCount := 20 + for i := 0; i < nodeCount; i++ { + startNode("pss") + } + + // start two test nodes (they will use the first two pss nodes) + node1 := startNode("test") + node2 := startNode("test") + + // subscribe to handshake events from both nodes + handshakes := make(chan *testHandshake, 2) + subscribe := func(client *rpc.Client) *rpc.ClientSubscription { + sub, err := client.Subscribe(context.Background(), "test", handshakes, "handshake") + if err != nil { + t.Fatal(err) + } + return sub + } + client1, err := node1.Client() + if err != nil { + t.Fatal(err) + } + sub1 := subscribe(client1) + defer sub1.Unsubscribe() + client2, err := node2.Client() + if err != nil { + t.Fatal(err) + } + sub2 := subscribe(client2) + defer sub2.Unsubscribe() + + // call AddPeer on node1 with node2's pss address + if err := client1.Call(nil, "test_addPeer", network.ToOverlayAddr(w.pssNodes[1].Bytes())); err != nil { + t.Fatal(err) + } + + // wait for both handshakes + timeout := time.After(10 * time.Second) + for i := 0; i < 2; i++ { + select { + case hs := <-handshakes: + t.Logf("got handshake: %+v", hs) + case <-timeout: + t.Fatal("timed out waiting for handshakes") + } + } +} + +// testWrapper creates pss and test nodes, assigning pss nodes to test +// nodes as they are started +type testWrapper struct { + pssNodes []discover.NodeID + index int +} + +func (t *testWrapper) newPssService(ctx *adapters.ServiceContext) (node.Service, error) { + // track the pss node's id so we can use it for the test nodes + t.pssNodes = append(t.pssNodes, ctx.Config.ID) + + dir, err := ioutil.TempDir("", "pss-test") + if err != nil { + panic(err) + } + dpa, err := storage.NewLocalDPA(dir) + if err != nil { + panic(err) + } + addr := network.NewAddrFromNodeID(ctx.Config.ID) + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + return pss.NewPss(kad, dpa, pss.NewPssParams(false)), nil +} + +func (t *testWrapper) newTestService(ctx *adapters.ServiceContext) (node.Service, error) { + // connect to the next pss node + pssNode := t.pssNodes[t.index] + t.index++ + rpcClient, err := ctx.DialRPC(pssNode) + if err != nil { + panic(err) + } + return &testService{ + id: ctx.Config.ID, + pss: client.NewClientWithRPC(context.Background(), rpcClient), + handshakes: make(chan *testHandshake), + }, nil +} + +type testHandshake struct { + ID discover.NodeID +} + +// testService runs a simple handshake protocol over pss and exposes an API +// so that clients can wait for handshakes to complete +type testService struct { + id discover.NodeID + pss *client.Client + handshakes chan *testHandshake +} + +func (t *testService) Protocols() []p2p.Protocol { + return nil +} + +func (t *testService) APIs() []rpc.API { + return []rpc.API{{ + Namespace: "test", + Version: "1.0", + Service: &TestAPI{t.pss, t.handshakes}, + }} +} + +func (t *testService) Start(*p2p.Server) error { + return t.pss.RunProtocol(&p2p.Protocol{ + Name: "test", + Version: 1, + Run: t.run, + }) +} + +func (t *testService) Stop() error { + return nil +} + +func (t *testService) run(_ *p2p.Peer, rw p2p.MsgReadWriter) error { + // send a handshake and wait for one back + go p2p.Send(rw, 0, &testHandshake{t.id}) + msg, err := rw.ReadMsg() + if err != nil { + return err + } + defer msg.Discard() + var hs testHandshake + if err := msg.Decode(&hs); err != nil { + return err + } + t.handshakes <- &hs + return nil +} + +type TestAPI struct { + pss *client.Client + handshakes chan *testHandshake +} + +func (t *TestAPI) AddPeer(addr []byte) { + t.pss.AddPssPeer(pot.Address(common.BytesToHash(addr)), &protocols.Spec{ + Name: "test", + Version: 1, + }) +} + +func (t *TestAPI) Handshake(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } + sub := notifier.CreateSubscription() + go func() { + for { + select { + case hs := <-t.handshakes: + notifier.Notify(sub.ID, hs) + case <-sub.Err(): + return + case <-notifier.Closed(): + return + } + } + }() + return sub, nil +} diff --git a/swarm/pss/types.go b/swarm/pss/types.go new file mode 100644 index 000000000000..92b1f520df72 --- /dev/null +++ b/swarm/pss/types.go @@ -0,0 +1,140 @@ +package pss + +import ( + "bytes" + "encoding/binary" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto/sha3" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" +) + +const ( + TopicLength = 32 + DefaultTTL = 6000 + defaultDigestCacheTTL = time.Second +) + +// Defines params for Pss +type PssParams struct { + Cachettl time.Duration + Debug bool +} + +// Initializes default params for Pss +func NewPssParams(debug bool) *PssParams { + return &PssParams{ + Cachettl: defaultDigestCacheTTL, + Debug: debug, + } +} + +// Encapsulates the message transported over pss. +type PssMsg struct { + To []byte + Payload *Envelope +} + +func (msg *PssMsg) Serialize() []byte { + rlpdata, _ := rlp.EncodeToBytes(msg) + return rlpdata +} + +// String representation of PssMsg +func (self *PssMsg) String() string { + return fmt.Sprintf("PssMsg: Recipient: %x", common.ByteLabel(self.To)) +} + +// Topic defines the context of a message being transported over pss +// It is used by pss to determine what action is to be taken on an incoming message +// Typically, one can map protocol handlers for the message payloads by mapping topic to them; see *Pss.Register() +type Topic [TopicLength]byte + +func (self *Topic) String() string { + return fmt.Sprintf("%x", self) +} + +// Pre-Whisper placeholder, payload of PssMsg +type Envelope struct { + Topic Topic + TTL uint16 + Payload []byte + From []byte +} + +// creates Pss envelope from sender address, topic and raw payload +func NewEnvelope(addr []byte, topic Topic, payload []byte) *Envelope { + return &Envelope{ + From: addr, + Topic: topic, + TTL: DefaultTTL, + Payload: payload, + } +} + +// encapsulates a protocol msg as PssEnvelope data +type ProtocolMsg struct { + Code uint64 + Size uint32 + Payload []byte + ReceivedAt time.Time +} + +// PssAPIMsg is the type for messages, it extends the rlp encoded protocol Msg +// with the Sender's overlay address +type APIMsg struct { + Msg []byte + Addr []byte +} + +func NewProtocolMsg(code uint64, msg interface{}) ([]byte, error) { + + rlpdata, err := rlp.EncodeToBytes(msg) + if err != nil { + return nil, err + } + + // previous attempts corrupted nested structs in the payload iself upon deserializing + // therefore we use two separate []byte fields instead of peerAddr + // TODO verify that nested structs cannot be used in rlp + smsg := &ProtocolMsg{ + Code: code, + Size: uint32(len(rlpdata)), + Payload: rlpdata, + } + + return rlp.EncodeToBytes(smsg) +} + +// Message handler func for a topic +type Handler func(msg []byte, p *p2p.Peer, from []byte) error + +// constructs a new PssTopic from a given name and version. +// +// Analogous to the name and version members of p2p.Protocol +func NewTopic(s string, v int) (topic Topic) { + h := sha3.NewKeccak256() + h.Write([]byte(s)) + buf := make([]byte, TopicLength/8) + binary.PutUvarint(buf, uint64(v)) + h.Write(buf) + copy(topic[:], h.Sum(buf)[:]) + return topic +} + +func ToP2pMsg(msg []byte) (p2p.Msg, error) { + payload := &ProtocolMsg{} + if err := rlp.DecodeBytes(msg, payload); err != nil { + return p2p.Msg{}, fmt.Errorf("pss protocol handler unable to decode payload as p2p message: %v", err) + } + + return p2p.Msg{ + Code: payload.Code, + Size: uint32(len(payload.Payload)), + ReceivedAt: time.Now(), + Payload: bytes.NewBuffer(payload.Payload), + }, nil +} diff --git a/swarm/swarm.go b/swarm/swarm.go index 9d5d5a0b21b5..09260c634fe1 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -21,7 +21,6 @@ import ( "context" "crypto/ecdsa" "fmt" - "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -37,26 +36,27 @@ import ( httpapi "github.com/ethereum/go-ethereum/swarm/api/http" "github.com/ethereum/go-ethereum/swarm/fuse" "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/pss" "github.com/ethereum/go-ethereum/swarm/storage" ) // the swarm stack type Swarm struct { - config *api.Config // swarm configuration - api *api.Api // high level api layer (fs/manifest) - dns api.Resolver // DNS registrar - storage storage.ChunkStore // internal access to storage, common interface to cloud storage backends - dpa *storage.DPA // distributed preimage archive, the local API to the storage with document level storage/retrieval support - cloud storage.CloudStore // procurement, cloud storage backend (can multi-cloud) - hive *network.Hive // the logistic manager + config *api.Config // swarm configuration + api *api.Api // high level api layer (fs/manifest) + dns api.Resolver // DNS registrar + storage storage.ChunkStore // internal access to storage, common interface to cloud storage backends + dpa *storage.DPA // distributed preimage archive, the local API to the storage with document level storage/retrieval support + cloud storage.CloudStore // procurement, cloud storage backend (can multi-cloud) + //hive *network.Hive // the logistic manager + bzz *network.Bzz // hive and bzz protocol backend chequebook.Backend // simple blockchain Backend privateKey *ecdsa.PrivateKey corsString string swapEnabled bool - pssEnabled bool - pss *network.Pss lstore *storage.LocalStore // local store, needs to store for releasing resources after node stopped sfs *fuse.SwarmFS // need this to cleanup all the active mounts on node exit + pss *pss.Pss } type SwarmAPI struct { @@ -89,7 +89,6 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api. backend: backend, privateKey: config.Swap.PrivateKey(), corsString: cors, - pssEnabled: pssEnabled, } log.Debug(fmt.Sprintf("Setting up Swarm service components")) @@ -102,18 +101,26 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api. log.Debug("Set up local db access (iterator/counter)") kp := network.NewKadParams() - to := network.NewKademlia( common.FromHex(config.BzzKey), kp, ) - // set up the kademlia hive - self.hive = network.NewHive( - config.HiveParams, // configuration parameters - to, - ) - log.Debug(fmt.Sprintf("Set up swarm network with Kademlia hive")) + bzzconfig := &network.BzzConfig{ + UnderlayAddr: common.FromHex(config.PublicKey), + OverlayAddr: common.FromHex(config.BzzKey), + HiveParams: config.HiveParams, + } + self.bzz = network.NewBzz(bzzconfig, to, nil) + + // set up the kademlia hive + // self.hive = network.NewHive( + // config.HiveParams, // configuration parameters + // self.Kademlia, + // stateStore, + // ) + // log.Debug(fmt.Sprintf("Set up swarm network with Kademlia hive")) + // // setup cloud storage internal access layer self.storage = storage.NewNetStore(hash, self.lstore, nil, config.StoreParams) log.Debug("-> swarm net store shared access layer to Swarm Chunk Store") @@ -126,6 +133,12 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api. self.dpa = storage.NewDPA(dpaChunkStore, self.config.ChunkerParams) log.Debug(fmt.Sprintf("-> Content Store API")) + // Pss = postal service over swarm (devp2p over bzz) + if pssEnabled { + pssparams := pss.NewPssParams(false) + self.pss = pss.NewPss(to, self.dpa, pssparams) + } + // set up high level api transactOpts := bind.NewKeyedTransactor(self.privateKey) @@ -176,25 +189,16 @@ func (self *Swarm) Start(net *p2p.Server) error { log.Warn(fmt.Sprintf("Starting Swarm service")) - self.hive.Start( - net, - func() <-chan time.Time { - return time.NewTicker(time.Second).C - }, - nil, - ) - - log.Info(fmt.Sprintf("Swarm network started on bzz address: %v", self.hive.BaseAddr())) - - if self.pssEnabled { - pssparams := network.NewPssParams() - self.pss = network.NewPss(self.hive.Overlay, pssparams) - - // for testing purposes, shold be removed in production environment!! - pingtopic, _ := network.MakeTopic("pss", 1) - self.pss.Register(pingtopic, self.pss.GetPingHandler()) + // self.hive.Start(net) + err := self.bzz.Start(net) + if err != nil { + log.Error("bzz failed", "err", err) + return err + } + log.Info(fmt.Sprintf("Swarm network started on bzz address: %v", self.bzz.Hive.Overlay.BaseAddr())) - log.Debug("Pss started: %v", self.pss) + if self.pss != nil { + self.pss.Start(net) } self.dpa.Start() @@ -222,7 +226,11 @@ func (self *Swarm) Start(net *p2p.Server) error { // stops all component services. func (self *Swarm) Stop() error { self.dpa.Stop() - self.hive.Stop() + //self.hive.Stop() + self.bzz.Stop() + if self.pss != nil { + self.pss.Stop() + } if ch := self.config.Swap.Chequebook(); ch != nil { ch.Stop() ch.Save() @@ -236,53 +244,30 @@ func (self *Swarm) Stop() error { } // implements the node.Service interface -func (self *Swarm) Protocols() []p2p.Protocol { - ct := network.BzzCodeMap() - if self.pssEnabled { - ct.Register(1, &network.PssMsg{}) +func (self *Swarm) Protocols() (protos []p2p.Protocol) { + + for _, p := range self.bzz.Protocols() { + protos = append(protos, p) } - ct.Register(2, network.DiscoveryMsgs...) - // srv := func(p network.Peer) error { - // if self.pssEnabled { - // p.Register(&network.PssMsg{}, func(msg interface{}) error { - // pssmsg := msg.(*network.PssMsg) + if self.pss != nil { + for _, p := range self.pss.Protocols() { + protos = append(protos, p) + } + } + // ct := network.BzzCodeMap() + // ct.Register(2, network.DiscoveryMsgs...) + + // proto := network.Bzz( + // self.hive.Overlay.GetAddr().Over(), + // self.hive.Overlay.GetAddr().Under(), + // ct, + // nil, + // nil, + // nil, + // ) // - // if self.pss.IsSelfRecipient(pssmsg) { - // log.Trace("pss for us, yay! ... let's process!") - // env := pssmsg.Payload - // umsg := env.Payload - // f := self.pss.GetHandler(env.Topic) - // if f == nil { - // return fmt.Errorf("No registered handler for topic '%s'", env.Topic) - // } - // nid := discover.MustBytesID(env.SenderUAddr) - // p := p2p.NewPeer(nid.NodeID, fmt.Sprintf("%x", common.ByteLabel(nid.Bytes())), []p2p.Cap{}) - // return f(umsg, p, env.SenderOAddr) - // } else { - // log.Trace("pss was for someone else :'( ... forwarding") - // return self.pss.Forward(pssmsg) - // } - // return nil - // }) - // } - // self.hive.Add(p) - // p.DisconnectHook(func(err error) { - // self.hive.Remove(p) - // }) - // return nil - // } - - proto := network.Bzz( - self.hive.Overlay.GetAddr().Over(), - self.hive.Overlay.GetAddr().Under(), - ct, - nil, - nil, - nil, - ) - - return []p2p.Protocol{*proto} + return } // implements node.Service @@ -301,7 +286,7 @@ func (self *Swarm) APIs() []rpc.API { { Namespace: "bzz", Version: "0.1", - Service: api.NewControl(self.api, self.hive), + Service: api.NewControl(self.api, self.bzz.Hive), Public: false, }, { @@ -333,13 +318,14 @@ func (self *Swarm) APIs() []rpc.API { // {Namespace, Version, api.NewAdmin(self), false}, } - if self.pssEnabled { - apis = append(apis, rpc.API{ - Namespace: "eth", - Version: "0.1/pss", - Service: network.NewPssApi(self.pss), - Public: true, - }) + for _, api := range self.bzz.APIs() { + apis = append(apis, api) + } + + if self.pss != nil { + for _, api := range self.pss.APIs() { + apis = append(apis, api) + } } return apis