Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grab bag of fixes #239

Merged
merged 8 commits into from
Jan 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/evm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ func (self *VMEnv) Value() *big.Int { return self.value }
func (self *VMEnv) GasLimit() *big.Int { return big.NewInt(1000000000) }
func (self *VMEnv) Depth() int { return 0 }
func (self *VMEnv) SetDepth(i int) { self.depth = i }
func (self *VMEnv) GetHash(n uint64) []byte {
if self.block.Number().Cmp(big.NewInt(int64(n))) == 0 {
return self.block.Hash()
}
return nil
}
func (self *VMEnv) AddLog(log state.Log) {
self.state.AddLog(log)
}
Expand Down
34 changes: 18 additions & 16 deletions cmd/peerserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,41 @@ package main

import (
"crypto/elliptic"
"fmt"
"flag"
"log"
"net"
"os"

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/p2p"
)

var (
natType = flag.String("nat", "", "NAT traversal implementation")
pmpGateway = flag.String("gateway", "", "gateway address for NAT-PMP")
listenAddr = flag.String("addr", ":30301", "listen address")
)

func main() {
flag.Parse()
nat, err := p2p.ParseNAT(*natType, *pmpGateway)
if err != nil {
log.Fatal("invalid nat:", err)
}

logger.AddLogSystem(logger.NewStdLogSystem(os.Stdout, log.LstdFlags, logger.InfoLevel))
key, _ := crypto.GenerateKey()
marshaled := elliptic.Marshal(crypto.S256(), key.PublicKey.X, key.PublicKey.Y)

srv := p2p.Server{
MaxPeers: 100,
Identity: p2p.NewSimpleClientIdentity("Ethereum(G)", "0.1", "Peer Server Two", string(marshaled)),
ListenAddr: ":30301",
NAT: p2p.UPNP(),
Identity: p2p.NewSimpleClientIdentity("Ethereum(G)", "0.1", "Peer Server Two", marshaled),
ListenAddr: *listenAddr,
NAT: nat,
NoDial: true,
}
if err := srv.Start(); err != nil {
fmt.Println("could not start server:", err)
os.Exit(1)
log.Fatal("could not start server:", err)
}

// add seed peers
seed, err := net.ResolveTCPAddr("tcp", "poc-8.ethdev.com:30303")
if err != nil {
fmt.Println("couldn't resolve:", err)
} else {
srv.SuggestPeer(seed.IP, seed.Port, nil)
}

select {}
}
3 changes: 1 addition & 2 deletions cmd/rlpdump/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ func dump(s *rlp.Stream, depth int) error {
s.List()
defer s.ListEnd()
if size == 0 {
fmt.Printf(ws(depth) + "[]")
return nil
fmt.Print(ws(depth) + "[]")
} else {
fmt.Println(ws(depth) + "[")
for i := 0; ; i++ {
Expand Down
8 changes: 4 additions & 4 deletions eth/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (self *ethProtocol) handle() error {
return self.protoError(ErrDecode, "->msg %v: %v", msg, err)
}
hashes := self.chainManager.GetBlockHashesFromHash(request.Hash, request.Amount)
return self.rw.EncodeMsg(BlockHashesMsg, ethutil.ByteSliceToInterface(hashes)...)
return p2p.EncodeMsg(self.rw, BlockHashesMsg, ethutil.ByteSliceToInterface(hashes)...)

case BlockHashesMsg:
// TODO: redo using lazy decode , this way very inefficient on known chains
Expand Down Expand Up @@ -185,7 +185,7 @@ func (self *ethProtocol) handle() error {
break
}
}
return self.rw.EncodeMsg(BlocksMsg, blocks...)
return p2p.EncodeMsg(self.rw, BlocksMsg, blocks...)

case BlocksMsg:
msgStream := rlp.NewStream(msg.Payload)
Expand Down Expand Up @@ -298,12 +298,12 @@ func (self *ethProtocol) handleStatus() error {

func (self *ethProtocol) requestBlockHashes(from []byte) error {
self.peer.Debugf("fetching hashes (%d) %x...\n", blockHashesBatchSize, from[0:4])
return self.rw.EncodeMsg(GetBlockHashesMsg, interface{}(from), uint64(blockHashesBatchSize))
return p2p.EncodeMsg(self.rw, GetBlockHashesMsg, interface{}(from), uint64(blockHashesBatchSize))
}

func (self *ethProtocol) requestBlocks(hashes [][]byte) error {
self.peer.Debugf("fetching %v blocks", len(hashes))
return self.rw.EncodeMsg(GetBlocksMsg, ethutil.ByteSliceToInterface(hashes)...)
return p2p.EncodeMsg(self.rw, GetBlocksMsg, ethutil.ByteSliceToInterface(hashes)...)
}

func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *protocolError) {
Expand Down
4 changes: 0 additions & 4 deletions eth/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ func (self *testMsgReadWriter) WriteMsg(msg p2p.Msg) error {
return nil
}

func (self *testMsgReadWriter) EncodeMsg(code uint64, data ...interface{}) error {
return self.WriteMsg(p2p.NewMsg(code, data...))
}

func (self *testMsgReadWriter) ReadMsg() (p2p.Msg, error) {
msg, ok := <-self.in
if !ok {
Expand Down
20 changes: 9 additions & 11 deletions p2p/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,11 @@ type MsgReader interface {
}

type MsgWriter interface {
// WriteMsg sends an existing message.
// The Payload reader of the message is consumed.
// WriteMsg sends a message. It will block until the message's
// Payload has been consumed by the other end.
//
// Note that messages can be sent only once.
WriteMsg(Msg) error

// EncodeMsg writes an RLP-encoded message with the given
// code and data elements.
EncodeMsg(code uint64, data ...interface{}) error
}

// MsgReadWriter provides reading and writing of encoded messages.
Expand All @@ -87,6 +84,12 @@ type MsgReadWriter interface {
MsgWriter
}

// EncodeMsg writes an RLP-encoded message with the given code and
// data elements.
func EncodeMsg(w MsgWriter, code uint64, data ...interface{}) error {
return w.WriteMsg(NewMsg(code, data...))
}

var magicToken = []byte{34, 64, 8, 145}

func writeMsg(w io.Writer, msg Msg) error {
Expand Down Expand Up @@ -209,11 +212,6 @@ func (p *MsgPipeRW) WriteMsg(msg Msg) error {
return ErrPipeClosed
}

// EncodeMsg is a convenient shorthand for sending an RLP-encoded message.
func (p *MsgPipeRW) EncodeMsg(code uint64, data ...interface{}) error {
return p.WriteMsg(NewMsg(code, data...))
}

// ReadMsg returns a message sent on the other end of the pipe.
func (p *MsgPipeRW) ReadMsg() (Msg, error) {
if atomic.LoadInt32(p.closed) == 0 {
Expand Down
6 changes: 3 additions & 3 deletions p2p/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func TestDecodeRealMsg(t *testing.T) {
func ExampleMsgPipe() {
rw1, rw2 := MsgPipe()
go func() {
rw1.EncodeMsg(8, []byte{0, 0})
rw1.EncodeMsg(5, []byte{1, 1})
EncodeMsg(rw1, 8, []byte{0, 0})
EncodeMsg(rw1, 5, []byte{1, 1})
rw1.Close()
}()

Expand All @@ -100,7 +100,7 @@ loop:
rw1, rw2 := MsgPipe()
done := make(chan struct{})
go func() {
if err := rw1.EncodeMsg(1); err == nil {
if err := EncodeMsg(rw1, 1); err == nil {
t.Error("EncodeMsg returned nil error")
} else if err != ErrPipeClosed {
t.Error("EncodeMsg returned wrong error: got %v, want %v", err, ErrPipeClosed)
Expand Down
22 changes: 0 additions & 22 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,25 +460,3 @@ func (r *eofSignal) Read(buf []byte) (int, error) {
}
return n, err
}

func (peer *Peer) PeerList() []interface{} {
peers := peer.otherPeers()
ds := make([]interface{}, 0, len(peers))
for _, p := range peers {
p.infolock.Lock()
addr := p.listenAddr
p.infolock.Unlock()
// filter out this peer and peers that are not listening or
// have not completed the handshake.
// TODO: track previously sent peers and exclude them as well.
if p == peer || addr == nil {
continue
}
ds = append(ds, addr)
}
ourAddr := peer.ourListenAddr
if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() {
ds = append(ds, ourAddr)
}
return ds
}
4 changes: 2 additions & 2 deletions p2p/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ func TestPeerProtoEncodeMsg(t *testing.T) {
Name: "a",
Length: 2,
Run: func(peer *Peer, rw MsgReadWriter) error {
if err := rw.EncodeMsg(2); err == nil {
if err := EncodeMsg(rw, 2); err == nil {
t.Error("expected error for out-of-range msg code, got nil")
}
if err := rw.EncodeMsg(1, "foo", "bar"); err != nil {
if err := EncodeMsg(rw, 1, "foo", "bar"); err != nil {
t.Errorf("write error: %v", err)
}
return nil
Expand Down
34 changes: 28 additions & 6 deletions p2p/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,22 @@ func (bp *baseProtocol) loop(quit <-chan error) error {

getPeersTick := time.NewTicker(10 * time.Second)
defer getPeersTick.Stop()
err := bp.rw.EncodeMsg(getPeersMsg)
err := EncodeMsg(bp.rw, getPeersMsg)

for err == nil {
select {
case err = <-quit:
return err
case <-getPeersTick.C:
err = bp.rw.EncodeMsg(getPeersMsg)
err = EncodeMsg(bp.rw, getPeersMsg)
case event := <-activity.Chan():
ping.Reset(pingTimeout)
lastActive = event.(time.Time)
case t := <-ping.C:
if lastActive.Add(pingTimeout * 2).Before(t) {
err = newPeerError(errPingTimeout, "")
} else if lastActive.Add(pingTimeout).Before(t) {
err = bp.rw.EncodeMsg(pingMsg)
err = EncodeMsg(bp.rw, pingMsg)
}
}
}
Expand Down Expand Up @@ -164,20 +164,20 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error {
return discRequestedError(reason[0])

case pingMsg:
return bp.rw.EncodeMsg(pongMsg)
return EncodeMsg(bp.rw, pongMsg)

case pongMsg:

case getPeersMsg:
peers := bp.peer.PeerList()
peers := bp.peerList()
// this is dangerous. the spec says that we should _delay_
// sending the response if no new information is available.
// this means that would need to send a response later when
// new peers become available.
//
// TODO: add event mechanism to notify baseProtocol for new peers
if len(peers) > 0 {
return bp.rw.EncodeMsg(peersMsg, peers...)
return EncodeMsg(bp.rw, peersMsg, peers...)
}

case peersMsg:
Expand Down Expand Up @@ -264,3 +264,25 @@ func (bp *baseProtocol) handshakeMsg() Msg {
bp.peer.ourID.Pubkey()[1:],
)
}

func (bp *baseProtocol) peerList() []interface{} {
peers := bp.peer.otherPeers()
ds := make([]interface{}, 0, len(peers))
for _, p := range peers {
p.infolock.Lock()
addr := p.listenAddr
p.infolock.Unlock()
// filter out this peer and peers that are not listening or
// have not completed the handshake.
// TODO: track previously sent peers and exclude them as well.
if p == bp.peer || addr == nil {
continue
}
ds = append(ds, addr)
}
ourAddr := bp.peer.ourListenAddr
if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() {
ds = append(ds, ourAddr)
}
return ds
}
Loading