Skip to content

Commit

Permalink
Merge pull request ethereum#71 from ethersphere/network-testing-frame…
Browse files Browse the repository at this point in the history
…work-docker-adapter

p2p/adapters: Add "docker" node adapter
  • Loading branch information
zelig authored Apr 25, 2017
2 parents 48299ce + 92551cf commit fb458ff
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 63 deletions.
11 changes: 11 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,17 @@ func (n *Node) Attach() (*rpc.Client, error) {
return rpc.DialInProc(n.inprocHandler), nil
}

// RPCHandler returns the in-process RPC request handler.
func (n *Node) RPCHandler() (*rpc.Server, error) {
n.lock.RLock()
defer n.lock.RUnlock()

if n.inprocHandler == nil {
return nil, ErrNodeStopped
}
return n.inprocHandler, nil
}

// Server retrieves the currently running P2P network layer. This method is meant
// only to inspect fields of the currently running server, life cycle management
// should be left to this Node entity.
Expand Down
147 changes: 111 additions & 36 deletions p2p/adapters/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,127 @@ package adapters

import (
"fmt"
"net"
// "net/http"
// "time"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"runtime"
"sync"

"github.com/docker/docker/pkg/reexec"
"github.com/ethereum/go-ethereum/node"
)

func NewRemoteNode(id *NodeId, n Network) *RemoteNode {
return &RemoteNode{
ID: id,
Network: n,
}
// DockerNode is a NodeAdapter which wraps an ExecNode but exec's the current
// binary in a docker container rather than locally
type DockerNode struct {
ExecNode
}

// RemoteNode is the network adapter that
type RemoteNode struct {
ID *NodeId
addr net.Addr
Network
}
// NewDockerNode creates a new DockerNode, building the docker image if
// necessary
func NewDockerNode(id *NodeId, service string) (*DockerNode, error) {
if runtime.GOOS != "linux" {
return nil, fmt.Errorf("NewDockerNode can only be used on Linux as it uses the current binary (which must be a Linux binary)")
}

func Name(id []byte) string {
return fmt.Sprintf("test-%08x", id)
}
if _, exists := serviceFuncs[service]; !exists {
return nil, fmt.Errorf("unknown node service %q", service)
}

// inject(s) sends an RPC command remotely via ssh to the particular dockernode
func (self *RemoteNode) inject(string) error {
return nil
}
// build the docker image
var err error
dockerOnce.Do(func() {
err = buildDockerImage()
})
if err != nil {
return nil, err
}

func (self *RemoteNode) LocalAddr() []byte {
return []byte(self.addr.String())
}
// generate the config
conf := node.DefaultConfig
conf.DataDir = "/data"
conf.P2P.NoDiscovery = true
conf.P2P.NAT = nil

func (self *RemoteNode) ParseAddr(p []byte, s string) ([]byte, error) {
return p, nil
node := &DockerNode{
ExecNode: ExecNode{
ID: id,
Service: service,
Config: &conf,
},
}
node.newCmd = node.dockerCommand
return node, nil
}

func (self *RemoteNode) Disconnect(rid []byte) error {
// ssh+ipc -> drop
// assumes the remote node is running the p2p module as part of the protocol
cmd := fmt.Sprintf(`p2p.Drop("%v")`, string(rid))
return self.inject(cmd)
// dockerCommand returns a command which exec's the binary in a docker
// container.
//
// It uses a shell so that we can pass the _P2P_NODE_CONFIG environment
// variable to the container using the --env flag.
func (n *DockerNode) dockerCommand() *exec.Cmd {
return exec.Command(
"sh", "-c",
fmt.Sprintf(
`exec docker run --interactive --env _P2P_NODE_CONFIG="${_P2P_NODE_CONFIG}" %s p2p-node %s %s`,
dockerImage, n.Service, n.ID.String(),
),
)
}

func (self *RemoteNode) Connect(rid []byte) error {
// ssh+ipc -> connect
//
cmd := fmt.Sprintf(`admin.addPeer("%v")`, string(rid))
return self.inject(cmd)
// dockerImage is the name of the docker image
const dockerImage = "p2p-node"

// dockerOnce is used to build the docker image only once
var dockerOnce sync.Once

// buildDockerImage builds the docker image which is used to run devp2p nodes
// using docker.
//
// It adds the current binary as "p2p-node" so that it runs execP2PNode
// when executed.
func buildDockerImage() error {
// create a directory to use as the docker build context
dir, err := ioutil.TempDir("", "p2p-docker")
if err != nil {
return err
}
defer os.RemoveAll(dir)

// copy the current binary into the build context
bin, err := os.Open(reexec.Self())
if err != nil {
return err
}
defer bin.Close()
dst, err := os.OpenFile(filepath.Join(dir, "self.bin"), os.O_WRONLY|os.O_CREATE, 0755)
if err != nil {
return err
}
defer dst.Close()
if _, err := io.Copy(dst, bin); err != nil {
return err
}

// create the Dockerfile
dockerfile := []byte(`
FROM ubuntu:16.04
RUN mkdir /data
ADD self.bin /bin/p2p-node
`)
if err := ioutil.WriteFile(filepath.Join(dir, "Dockerfile"), dockerfile, 0644); err != nil {
return err
}

// run 'docker build'
cmd := exec.Command("docker", "build", "-t", dockerImage, dir)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return fmt.Errorf("error building docker image: %s", err)
}

return nil
}
97 changes: 73 additions & 24 deletions p2p/adapters/exec.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package adapters

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"os"
"os/exec"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"

Expand Down Expand Up @@ -36,7 +40,12 @@ func RegisterService(name string, f serviceFunc) {
}

// ExecNode is a NodeAdapter which starts the node by exec'ing the current
// binary and running a registered serviceFunc
// binary and running a registered serviceFunc.
//
// Communication with the node is performed using RPC over stdin / stdout
// so that we don't need access to either the node's filesystem or TCP stack
// (so for example we can run the node in a remote Docker container and
// still communicate with it).
type ExecNode struct {
ID *NodeId
Service string
Expand All @@ -45,6 +54,8 @@ type ExecNode struct {
Cmd *exec.Cmd
Client *rpc.Client
Info *p2p.NodeInfo

newCmd func() *exec.Cmd
}

// NewExecNode creates a new ExecNode which will run the given service using a
Expand All @@ -63,17 +74,18 @@ func NewExecNode(id *NodeId, service, baseDir string) (*ExecNode, error) {
// generate the config
conf := node.DefaultConfig
conf.DataDir = filepath.Join(dir, "data")
conf.IPCPath = filepath.Join(dir, "ipc.sock")
conf.P2P.ListenAddr = "127.0.0.1:0"
conf.P2P.NoDiscovery = true
conf.P2P.NAT = nil

return &ExecNode{
node := &ExecNode{
ID: id,
Service: service,
Dir: dir,
Config: &conf,
}, nil
}
node.newCmd = node.execCommand
return node, nil
}

// Addr returns the node's enode URL
Expand Down Expand Up @@ -104,40 +116,43 @@ func (n *ExecNode) Start() (err error) {
return fmt.Errorf("error generating node config: %s", err)
}

// create a net.Pipe for RPC communication over stdin / stdout
pipe1, pipe2 := net.Pipe()

// start the node
cmd := &exec.Cmd{
Path: reexec.Self(),
Args: []string{"p2p-node", n.Service, n.ID.String()},
Stdout: os.Stdout,
Stderr: os.Stderr,
Env: append(os.Environ(), fmt.Sprintf("_P2P_NODE_CONFIG=%s", conf)),
}
cmd := n.newCmd()
cmd.Stdin = pipe1
cmd.Stdout = pipe1
cmd.Stderr = os.Stderr
cmd.Env = append(os.Environ(), fmt.Sprintf("_P2P_NODE_CONFIG=%s", conf))
if err := cmd.Start(); err != nil {
return fmt.Errorf("error starting node: %s", err)
}
n.Cmd = cmd

// create the RPC client
for start := time.Now(); time.Since(start) < 10*time.Second; time.Sleep(50 * time.Millisecond) {
n.Client, err = rpc.Dial(n.Config.IPCPath)
if err == nil {
break
}
}
if n.Client == nil {
return fmt.Errorf("error creating IPC client: %s", err)
}

// load info
// create the RPC client and load the node info
n.Client = rpc.NewClientWithConn(pipe2)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var info p2p.NodeInfo
if err := n.Client.Call(&info, "admin_nodeInfo"); err != nil {
if err := n.Client.CallContext(ctx, &info, "admin_nodeInfo"); err != nil {
return fmt.Errorf("error getting node info: %s", err)
}
n.Info = &info

return nil
}

// execCommand returns a command which runs the node locally by exec'ing
// the current binary but setting argv[0] to "p2p-node" so that the child
// runs execP2PNode
func (n *ExecNode) execCommand() *exec.Cmd {
return &exec.Cmd{
Path: reexec.Self(),
Args: []string{"p2p-node", n.Service, n.ID.String()},
}
}

// Stop stops the node by first sending SIGTERM and then SIGKILL if the node
// doesn't stop within 5s
func (n *ExecNode) Stop() error {
Expand Down Expand Up @@ -221,6 +236,20 @@ func execP2PNode() {
log.Crit(fmt.Sprintf("unknown node service %q", serviceName))
}

// use explicit IP address in ListenAddr so that Enode URL is usable
if strings.HasPrefix(conf.P2P.ListenAddr, ":") {
addrs, err := net.InterfaceAddrs()
if err != nil {
log.Crit("error getting IP address", "err", err)
}
for _, addr := range addrs {
if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() {
conf.P2P.ListenAddr = ip.IP.String() + conf.P2P.ListenAddr
break
}
}
}

// start the devp2p stack
stack, err := node.New(&conf)
if err != nil {
Expand All @@ -233,6 +262,15 @@ func execP2PNode() {
log.Crit("error starting node", "err", err)
}

// use stdin / stdout for RPC to avoid the parent needing to access
// either the local filesystem or TCP stack (useful when running in
// Docker)
handler, err := stack.RPCHandler()
if err != nil {
log.Crit("error getting RPC server", "err", err)
}
go handler.ServeCodec(rpc.NewJSONCodec(&stdioConn{os.Stdin, os.Stdout}), rpc.OptionMethodInvocation|rpc.OptionSubscriptions)

go func() {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGTERM)
Expand All @@ -244,3 +282,14 @@ func execP2PNode() {

stack.Wait()
}

// stdioConn wraps os.Stdin / os.Stdout with a nop Close method so we can
// use them to handle RPC messages
type stdioConn struct {
io.Reader
io.Writer
}

func (r *stdioConn) Close() error {
return nil
}
2 changes: 1 addition & 1 deletion p2p/adapters/inproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (self *SimNode) RunProtocol(id *NodeId, rw, rrw p2p.MsgReadWriter, peer *Pe
return nil
}
log.Trace(fmt.Sprintf("protocol starting on peer %v (connection with %v)", self.Id, id))
p := p2p.NewPeer(id.NodeID, Name(id.Bytes()), []p2p.Cap{})
p := p2p.NewPeer(id.NodeID, id.Label(), []p2p.Cap{})
go func() {
self.network.DidConnect(self.Id, id)
err := self.Run(p, rw)
Expand Down
8 changes: 8 additions & 0 deletions rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@ func DialContext(ctx context.Context, rawurl string) (*Client, error) {
}
}

// NewClientWithConn creates an RPC client which uses the provided net.Conn.
func NewClientWithConn(conn net.Conn) *Client {
client, _ := newClient(context.Background(), func(context.Context) (net.Conn, error) {
return conn, nil
})
return client
}

func newClient(initctx context.Context, connectFunc func(context.Context) (net.Conn, error)) (*Client, error) {
conn, err := connectFunc(initctx)
if err != nil {
Expand Down
Loading

0 comments on commit fb458ff

Please sign in to comment.