diff --git a/p2p/transport/websocket/addrs_test.go b/p2p/transport/websocket/addrs_test.go index e2779fd462..e3182affda 100644 --- a/p2p/transport/websocket/addrs_test.go +++ b/p2p/transport/websocket/addrs_test.go @@ -69,7 +69,7 @@ func TestConvertWebsocketMultiaddrToNetAddr(t *testing.T) { } func TestListeningOnDNSAddr(t *testing.T) { - ln, err := newListener(ma.StringCast("/dns/localhost/tcp/0/ws"), nil) + ln, err := newListener(ma.StringCast("/dns/localhost/tcp/0/ws"), nil, false) require.NoError(t, err) addr := ln.Multiaddr() first, rest := ma.SplitFirst(addr) diff --git a/p2p/transport/websocket/listener.go b/p2p/transport/websocket/listener.go index d7a1b885b8..b01ebb010a 100644 --- a/p2p/transport/websocket/listener.go +++ b/p2p/transport/websocket/listener.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/libp2p/go-libp2p/core/transport" + "github.com/libp2p/go-reuseport" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" @@ -40,7 +41,8 @@ func (pwma *parsedWebsocketMultiaddr) toMultiaddr() ma.Multiaddr { // newListener creates a new listener from a raw net.Listener. // tlsConf may be nil (for unencrypted websockets). -func newListener(a ma.Multiaddr, tlsConf *tls.Config) (*listener, error) { +func newListener(a ma.Multiaddr, tlsConf *tls.Config, reuseportAvailable bool) (*listener, error) { + var nl net.Listener parsed, err := parseWebsocketMultiaddr(a) if err != nil { return nil, err @@ -54,11 +56,20 @@ func newListener(a ma.Multiaddr, tlsConf *tls.Config) (*listener, error) { if err != nil { return nil, err } - nl, err := net.Listen(lnet, lnaddr) - if err != nil { - return nil, err + if reuseportAvailable { + nl, err = reuseport.Listen(lnet, lnaddr) + if err != nil { + nl, err = net.Listen(lnet, lnaddr) + if err != nil { + return nil, err + } + } + } else { + nl, err = net.Listen(lnet, lnaddr) + if err != nil { + return nil, err + } } - laddr, err := manet.FromNetAddr(nl.Addr()) if err != nil { return nil, err diff --git a/p2p/transport/websocket/websocket.go b/p2p/transport/websocket/websocket.go index e1965123d9..319137fbb5 100644 --- a/p2p/transport/websocket/websocket.go +++ b/p2p/transport/websocket/websocket.go @@ -11,12 +11,13 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/transport" + "github.com/libp2p/go-reuseport" + ws "github.com/gorilla/websocket" + reuseTransport "github.com/libp2p/go-libp2p/p2p/net/reuseport" ma "github.com/multiformats/go-multiaddr" mafmt "github.com/multiformats/go-multiaddr-fmt" manet "github.com/multiformats/go-multiaddr/net" - - ws "github.com/gorilla/websocket" ) // WsFmt is multiaddr formatter for WsProtocol @@ -60,6 +61,13 @@ var upgrader = ws.Upgrader{ type Option func(*WebsocketTransport) error +func EnableReuseport() Option { + return func(tr *WebsocketTransport) error { + tr.reuseport = true + return nil + } +} + // WithTLSClientConfig sets a TLS client configuration on the WebSocket Dialer. Only // relevant for non-browser usages. // @@ -85,8 +93,10 @@ type WebsocketTransport struct { upgrader transport.Upgrader rcmgr network.ResourceManager - tlsClientConf *tls.Config - tlsConf *tls.Config + tlsClientConf *tls.Config + tlsConf *tls.Config + reuseport bool //reuseport is disabled by default, can be enabled by passing it as an option. + reuseTransport reuseTransport.Transport } var _ transport.Transport = (*WebsocketTransport)(nil) @@ -95,6 +105,7 @@ func New(u transport.Upgrader, rcmgr network.ResourceManager, opts ...Option) (* if rcmgr == nil { rcmgr = &network.NullResourceManager{} } + t := &WebsocketTransport{ upgrader: u, rcmgr: rcmgr, @@ -187,7 +198,13 @@ func (t *WebsocketTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (ma return nil, err } isWss := wsurl.Scheme == "wss" + dialer := ws.Dialer{HandshakeTimeout: 30 * time.Second} + if t.UseReuseport() { + dialer.NetDial = func(network, address string) (net.Conn, error) { + return t.reuseTransport.DialContext(ctx, raddr) + } + } if isWss { sni := "" sni, err = raddr.ValueForProtocol(ma.P_SNI) @@ -229,7 +246,7 @@ func (t *WebsocketTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (ma } func (t *WebsocketTransport) maListen(a ma.Multiaddr) (manet.Listener, error) { - l, err := newListener(a, t.tlsConf) + l, err := newListener(a, t.tlsConf, t.UseReuseport()) if err != nil { return nil, err } @@ -244,3 +261,8 @@ func (t *WebsocketTransport) Listen(a ma.Multiaddr) (transport.Listener, error) } return &transportListener{Listener: t.upgrader.UpgradeListener(t, malist)}, nil } + +// UseReuseport returns true if reuseport is enabled and available. +func (t *WebsocketTransport) UseReuseport() bool { + return t.reuseport && reuseport.Available() +} diff --git a/p2p/transport/websocket/websocket_test.go b/p2p/transport/websocket/websocket_test.go index aa06c62825..ef836d7ab6 100644 --- a/p2p/transport/websocket/websocket_test.go +++ b/p2p/transport/websocket/websocket_test.go @@ -15,6 +15,7 @@ import ( "math/big" "net" "net/http" + "runtime" "strings" "testing" "time" @@ -32,6 +33,7 @@ import ( ttransport "github.com/libp2p/go-libp2p/p2p/transport/testsuite" ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" "github.com/stretchr/testify/require" ) @@ -548,3 +550,102 @@ func TestResolveMultiaddr(t *testing.T) { }) } } + +func startListeners(t *testing.T, tpt *WebsocketTransport) (ma.Multiaddr, []*manet.Listener, error) { + t.Helper() + laddr := ma.StringCast("/ip4/127.0.0.1/tcp/0/ws") + listeners := make([]*manet.Listener, 2) + + l, err := tpt.maListen(laddr) + require.NoError(t, err) + listeners[0] = &l + + port := l.Addr().(*net.TCPAddr).Port + t.Logf("Port allocated for listener: %d", port) + laddr = ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/ws", port)) + l1, err := tpt.maListen(laddr) + require.NoError(t, err) + listeners[1] = &l1 + return laddr, listeners, nil +} + +func TestListenerReusePort(t *testing.T) { + noOfClientConns := 20 + var opts []Option + opts = append(opts, EnableReuseport()) + _, u := newUpgrader(t) + tpt, err := New(u, &network.NullResourceManager{}, opts...) + require.NoError(t, err) + c := make(chan int, noOfClientConns) + raddr, listeners, err := startListeners(t, tpt) + require.NoErrorf(t, err, "failed to start listeners") + for i := 0; i < 2; i++ { + go func(index int, ln *manet.Listener, ch chan int) { + l := *ln + defer l.Close() + /* Looping noOfClientConns times to ensure all 4 connections are handled. + With SO_REUSEPORT the distribution happens based on threads that are waiting on Accept call as mentioned below. + We cannot gaurantee when Server go-routines would block on Accept. + Sometimes the client routines get scheduled first causing unequal distribution of connections. + Ref: https://lwn.net/Articles/542629/ + By contrast, the SO_REUSEPORT implementation distributes connections evenly across all of the threads (or processes) + that are blocked in accept() on the same port. */ + for j := 0; j < noOfClientConns; j++ { + //j := 0 + conn, err := l.Accept() + require.NoErrorf(t, err, "Server Routine-", index, " Failed accepting connection ", j, " due to error ") + defer conn.Close() + buf := make([]byte, 5) + n, err := conn.Read(buf) + require.NoError(t, err) + require.Equal(t, 5, n) + n, err = conn.Write(buf) + require.NoError(t, err) + require.Equal(t, 5, n) + ch <- index + } + + }(i, listeners[i], c) + } + + for i := 0; i < noOfClientConns; i++ { + go func(index int) { + conn, err := tpt.maDial(context.Background(), raddr) + require.NoError(t, err) + defer conn.Close() + msg := "Hello" + n, err := conn.Write([]byte(msg)) + require.Equal(t, 5, n) + require.NoError(t, err) + buf := make([]byte, 5) + n, err = conn.Read(buf) + require.NoError(t, err) + require.Equal(t, 5, n) + }(i) + } + var connsHandled [2]int + //Waiting to ensure all 4 connections are handled. + for i := 0; i < noOfClientConns; i++ { + temp := <-c + connsHandled[temp]++ + } + /* + For windows and macOS load balancing is not done by kernel as per references below. + For other architectures, behaviour is not known. + For ubuntu load balancing doesn't seem to happen consistently. + In order to not have a flaky test, commenting this additiona check. + Hence, Check for load balancing only for linux based architectures. + Refer https://learn.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse?redirectedfrom=MSDN for windows + References for MACOS + Ref - (https://stackoverflow.com/questions/41247790/so-reuseport-on-macos-with-libuv) + Ref -(https://github.com/uNetworking/uWebSockets/issues/1194) + */ + if runtime.GOOS == "linux" { + for i := 0; i < 2; i++ { + /*Not checking for equal distribution of connections due to above explanation.*/ + require.NotEqualf(t, 0, connsHandled[i], "No connections handled by listener %d", i) + t.Logf("Listener %d handled %d connections.", i, connsHandled[i]) + } + } + +}