diff --git a/cmd/devnet/README.md b/cmd/devnet/README.md index cd281a53a99..a364d567ea0 100644 --- a/cmd/devnet/README.md +++ b/cmd/devnet/README.md @@ -23,8 +23,8 @@ The devnet runs as a single `go` process which can be started with the following | metrics | N | false | Enable metrics collection and reporting from devnet nodes | | metrics.node | N | 0 | At the moment only one node on the network can produce metrics. This value specifies index of the node in the cluster to attach to | | metrics.port | N | 6060 | The network port of the node to connect to for gather ing metrics | -| diagnostics.url | N | | URL of the diagnostics system provided by the support team, include unique session PIN, if this is specified the devnet will start a `support` tunnel and connect to the diagnostics platform to provide metrics from the specified node on the devnet | -| insecure | N | false | Used if `diagnostics.url` is set to allow communication with diagnostics system using self-signed TLS certificates | +| diagnostics.addr | N | | Address of the diagnostics system provided by the support team, include unique session PIN, if this is specified the devnet will start a `support` tunnel and connect to the diagnostics platform to provide metrics from the specified node on the devnet | +| insecure | N | false | Used if `diagnostics.addr` is set to allow communication with diagnostics system ## Network Configuration diff --git a/cmd/devnet/main.go b/cmd/devnet/main.go index d241040c09a..69f66e7a795 100644 --- a/cmd/devnet/main.go +++ b/cmd/devnet/main.go @@ -105,8 +105,8 @@ var ( } DiagnosticsURLFlag = cli.StringFlag{ - Name: "diagnostics.url", - Usage: "URL of the diagnostics system provided by the support team, include unique session PIN", + Name: "diagnostics.addr", + Usage: "Address of the diagnostics system provided by the support team, include unique session PIN", } insecureFlag = cli.BoolFlag{ diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 20a5dc83244..189b5d61566 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -832,8 +832,8 @@ var ( } DiagnosticsURLFlag = cli.StringFlag{ - Name: "diagnostics.url", - Usage: "URL of the diagnostics system provided by the support team", + Name: "diagnostics.addr", + Usage: "Address of the diagnostics system provided by the support team", } DiagnosticsInsecureFlag = cli.BoolFlag{ diff --git a/rpc/client.go b/rpc/client.go index 3e61b8a393a..c12e56de2fb 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -496,7 +496,7 @@ func (c *Client) write(ctx context.Context, msg interface{}, retry bool) error { return err } } - err := c.writeConn.writeJSON(ctx, msg) + err := c.writeConn.WriteJSON(ctx, msg) if err != nil { c.writeConn = nil if !retry { @@ -629,7 +629,7 @@ func (c *Client) read(codec ServerCodec) { for { msgs, batch, err := codec.ReadBatch() if _, ok := err.(*json.SyntaxError); ok { - codec.writeJSON(context.Background(), errorMessage(&parseError{err.Error()})) + codec.WriteJSON(context.Background(), errorMessage(&parseError{err.Error()})) } if err != nil { c.readErr <- err diff --git a/rpc/handler.go b/rpc/handler.go index 66334087aaf..389cdf6e10b 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -142,7 +142,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { // Emit error response for empty batches: if len(msgs) == 0 { h.startCallProc(func(cp *callProc) { - h.conn.writeJSON(cp.ctx, errorMessage(&invalidRequestError{"empty batch"})) + h.conn.WriteJSON(cp.ctx, errorMessage(&invalidRequestError{"empty batch"})) }) return } @@ -200,7 +200,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { } h.addSubscriptions(cp.notifiers) if len(answers) > 0 { - h.conn.writeJSON(cp.ctx, answers) + h.conn.WriteJSON(cp.ctx, answers) } for _, n := range cp.notifiers { n.activate() @@ -226,7 +226,7 @@ func (h *handler) handleMsg(msg *jsonrpcMessage, stream *jsoniter.Stream) { stream.Write(buffer) } if needWriteStream { - h.conn.writeJSON(cp.ctx, json.RawMessage(stream.Buffer())) + h.conn.WriteJSON(cp.ctx, json.RawMessage(stream.Buffer())) } else { stream.Write([]byte("\n")) } diff --git a/rpc/http.go b/rpc/http.go index a1938b585b2..bcfd0948703 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -55,7 +55,7 @@ type httpConn struct { } // httpConn is treated specially by Client. -func (hc *httpConn) writeJSON(context.Context, interface{}) error { +func (hc *httpConn) WriteJSON(context.Context, interface{}) error { panic("writeJSON called on httpConn") } diff --git a/rpc/json.go b/rpc/json.go index abe2a02facb..330f704ca7a 100644 --- a/rpc/json.go +++ b/rpc/json.go @@ -220,7 +220,7 @@ func (c *jsonCodec) ReadBatch() (messages []*jsonrpcMessage, batch bool, err err return messages, batch, nil } -func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}) error { +func (c *jsonCodec) WriteJSON(ctx context.Context, v interface{}) error { c.encMu.Lock() defer c.encMu.Unlock() diff --git a/rpc/server.go b/rpc/server.go index 34fba30c225..e3d67c4ea7b 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -124,13 +124,13 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec, stre reqs, batch, err := codec.ReadBatch() if err != nil { if err != io.EOF { - codec.writeJSON(ctx, errorMessage(&invalidMessageError{"parse error"})) + codec.WriteJSON(ctx, errorMessage(&invalidMessageError{"parse error"})) } return } if batch { if s.batchLimit > 0 && len(reqs) > s.batchLimit { - codec.writeJSON(ctx, errorMessage(fmt.Errorf("batch limit %d exceeded (can increase by --rpc.batch.limit). Requested batch of size: %d", s.batchLimit, len(reqs)))) + codec.WriteJSON(ctx, errorMessage(fmt.Errorf("batch limit %d exceeded (can increase by --rpc.batch.limit). Requested batch of size: %d", s.batchLimit, len(reqs)))) } else { h.handleBatch(reqs) } diff --git a/rpc/subscription.go b/rpc/subscription.go index 38789f57b63..03dd8d5d912 100644 --- a/rpc/subscription.go +++ b/rpc/subscription.go @@ -174,7 +174,7 @@ func (n *Notifier) activate() error { func (n *Notifier) send(sub *Subscription, data json.RawMessage) error { params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data}) ctx := context.Background() - return n.h.conn.writeJSON(ctx, &jsonrpcMessage{ + return n.h.conn.WriteJSON(ctx, &jsonrpcMessage{ Version: vsn, Method: n.namespace + notificationMethodSuffix, Params: params, diff --git a/rpc/types.go b/rpc/types.go index d3e872620e3..f51311c3e7c 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -20,12 +20,13 @@ import ( "context" "encoding/json" "fmt" - "github.com/ledgerwatch/erigon-lib/common/hexutil" "math" "math/big" "strconv" "strings" + "github.com/ledgerwatch/erigon-lib/common/hexutil" + libcommon "github.com/ledgerwatch/erigon-lib/common" ) @@ -61,7 +62,7 @@ type ServerCodec interface { // jsonWriter can write JSON messages to its underlying connection. // Implementations must be safe for concurrent use. type jsonWriter interface { - writeJSON(context.Context, interface{}) error + WriteJSON(context.Context, interface{}) error // Closed returns a channel which is closed when the connection is closed. closed() <-chan interface{} // RemoteAddr returns the peer address of the connection. diff --git a/rpc/websocket.go b/rpc/websocket.go index ce945e656de..6f79dae834b 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -63,7 +63,7 @@ func (s *Server) WebsocketHandler(allowedOrigins []string, jwtSecret []byte, com logger.Warn("WebSocket upgrade failed", "err", err) return } - codec := newWebsocketCodec(conn) + codec := NewWebsocketCodec(conn) s.ServeCodec(codec, 0) }) } @@ -205,7 +205,7 @@ func DialWebsocketWithDialer(ctx context.Context, endpoint, origin string, diale } return nil, hErr } - return newWebsocketCodec(conn), nil + return NewWebsocketCodec(conn), nil }, logger) } @@ -248,7 +248,7 @@ type websocketCodec struct { pingReset chan struct{} } -func newWebsocketCodec(conn *websocket.Conn) ServerCodec { +func NewWebsocketCodec(conn *websocket.Conn) ServerCodec { conn.SetReadLimit(wsMessageSizeLimit) wc := &websocketCodec{ jsonCodec: NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON).(*jsonCodec), @@ -265,8 +265,8 @@ func (wc *websocketCodec) Close() { wc.wg.Wait() } -func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}) error { - err := wc.jsonCodec.writeJSON(ctx, v) +func (wc *websocketCodec) WriteJSON(ctx context.Context, v interface{}) error { + err := wc.jsonCodec.WriteJSON(ctx, v) if err == nil { // Notify pingLoop to delay the next idle ping. select { diff --git a/turbo/app/support_cmd.go b/turbo/app/support_cmd.go index 4a3d54fd17e..5bbb44f5581 100644 --- a/turbo/app/support_cmd.go +++ b/turbo/app/support_cmd.go @@ -12,25 +12,36 @@ import ( "os" "os/signal" "strconv" + "sync" "syscall" "time" + "github.com/gorilla/websocket" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/gointerfaces/types" "github.com/ledgerwatch/erigon/rpc" "github.com/ledgerwatch/log/v3" "github.com/urfave/cli/v2" - "golang.org/x/net/http2" ) +const ( + wsReadBuffer = 1024 + wsWriteBuffer = 1024 + wsPingInterval = 60 * time.Second + wsPingWriteTimeout = 5 * time.Second + wsMessageSizeLimit = 32 * 1024 * 1024 +) + +var wsBufferPool = new(sync.Pool) + var ( diagnosticsURLFlag = cli.StringFlag{ - Name: "diagnostics.url", - Usage: "URL of the diagnostics system provided by the support team, include unique session PIN", + Name: "diagnostics.addr", + Usage: "Address of the diagnostics system provided by the support team, include unique session PIN", } debugURLsFlag = cli.StringSliceFlag{ - Name: "debug.urls", + Name: "debug.addrs", Usage: "Comma separated list of URLs to the debug endpoints thats are being diagnosed", } @@ -49,7 +60,7 @@ var supportCommand = cli.Command{ Action: MigrateFlags(connectDiagnostics), Name: "support", Usage: "Connect Erigon instance to a diagnostics system for support", - ArgsUsage: "--diagnostics.url --ids --metrics.urls ", + ArgsUsage: "--diagnostics.addr --ids --metrics.urls ", Flags: []cli.Flag{ &debugURLsFlag, &diagnosticsURLFlag, @@ -73,7 +84,11 @@ func ConnectDiagnostics(cliCtx *cli.Context, logger log.Logger) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - debugURLs := cliCtx.StringSlice(debugURLsFlag.Name) + debugURLs := []string{} + + for _, debugURL := range cliCtx.StringSlice(debugURLsFlag.Name) { + debugURLs = append(debugURLs, "http://"+debugURL) + } diagnosticsUrl := cliCtx.String(diagnosticsURLFlag.Name) + "/bridge" @@ -117,28 +132,23 @@ func (c *conn) SetWriteDeadline(time time.Time) error { return nil } +// tunnel operates the tunnel from diagnostics system to the metrics URL for one http/2 request +// needs to be called repeatedly to implement re-connect logic // tunnel operates the tunnel from diagnostics system to the metrics URL for one http/2 request // needs to be called repeatedly to implement re-connect logic func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, tlsConfig *tls.Config, diagnosticsUrl string, sessionIds []string, debugURLs []string, logger log.Logger) error { - diagnosticsClient := &http.Client{Transport: &http2.Transport{TLSClientConfig: tlsConfig}} - defer diagnosticsClient.CloseIdleConnections() metricsClient := &http.Client{} defer metricsClient.CloseIdleConnections() ctx1, cancel1 := context.WithCancel(ctx) defer cancel1() - // Create a request object to send to the server - reader, writer := io.Pipe() - go func() { select { case <-sigs: cancel() case <-ctx1.Done(): } - reader.Close() - writer.Close() }() type enode struct { @@ -206,21 +216,23 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, } } - req, err := http.NewRequestWithContext(ctx1, http.MethodPost, diagnosticsUrl, reader) - - if err != nil { - return err + dialer := websocket.Dialer{ + ReadBufferSize: wsReadBuffer, + WriteBufferSize: wsWriteBuffer, + WriteBufferPool: wsBufferPool, } - // Create a connection - // Apply given context to the sent request - resp, err := diagnosticsClient.Do(req) + conn, resp, err := dialer.DialContext(ctx1, "wss://"+diagnosticsUrl, nil) if err != nil { - return err + conn, resp, err = dialer.DialContext(ctx1, "ws://"+diagnosticsUrl, nil) + + if err != nil { + return err + } } - if resp.StatusCode != http.StatusOK { + if resp.StatusCode != http.StatusSwitchingProtocols { return fmt.Errorf("support request to %s failed: %s", diagnosticsUrl, resp.Status) } @@ -230,7 +242,10 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, Nodes []*info `json:"nodes"` } - err = json.NewEncoder(writer).Encode(&connectionInfo{ + codec := rpc.NewWebsocketCodec(conn) + defer codec.Close() + + err = codec.WriteJSON(ctx1, &connectionInfo{ Version: Version, Sessions: sessionIds, Nodes: func() (replies []*info) { @@ -248,12 +263,6 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, logger.Info("Connected") - codec := rpc.NewCodec(&conn{ - ReadCloser: resp.Body, - PipeWriter: writer, - }) - defer codec.Close() - for { requests, _, err := codec.ReadBatch() @@ -306,11 +315,10 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, } debugURL := node.debugURL + "/debug/" + requests[0].Method + queryString - debugResponse, err := metricsClient.Get(debugURL) if err != nil { - return json.NewEncoder(writer).Encode(&nodeResponse{ + return codec.WriteJSON(ctx1, &nodeResponse{ Id: requestId, Error: &responseError{ Code: http.StatusFailedDependency, @@ -322,9 +330,10 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, defer debugResponse.Body.Close() - if resp.StatusCode != http.StatusOK { + //Websocket ok message + if resp.StatusCode != http.StatusSwitchingProtocols { body, _ := io.ReadAll(debugResponse.Body) - return json.NewEncoder(writer).Encode(&nodeResponse{ + return codec.WriteJSON(ctx1, &nodeResponse{ Id: requestId, Error: &responseError{ Code: int64(resp.StatusCode), @@ -339,7 +348,7 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, switch debugResponse.Header.Get("Content-Type") { case "application/json": if _, err := io.Copy(buffer, debugResponse.Body); err != nil { - return json.NewEncoder(writer).Encode(&nodeResponse{ + return codec.WriteJSON(ctx1, &nodeResponse{ Id: requestId, Error: &responseError{ Code: http.StatusInternalServerError, @@ -347,10 +356,11 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, }, Last: true, }) + } case "application/octet-stream": if _, err := io.Copy(buffer, debugResponse.Body); err != nil { - return json.NewEncoder(writer).Encode(&nodeResponse{ + return codec.WriteJSON(ctx1, &nodeResponse{ Id: requestId, Error: &responseError{ Code: int64(http.StatusInternalServerError), @@ -376,7 +386,7 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, buffer = bytes.NewBuffer(data) if err != nil { - return json.NewEncoder(writer).Encode(&nodeResponse{ + return codec.WriteJSON(ctx1, &nodeResponse{ Id: requestId, Error: &responseError{ Code: int64(http.StatusInternalServerError), @@ -387,7 +397,7 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, } default: - return json.NewEncoder(writer).Encode(&nodeResponse{ + return codec.WriteJSON(ctx1, &nodeResponse{ Id: requestId, Error: &responseError{ Code: int64(http.StatusInternalServerError), @@ -397,7 +407,7 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, }) } - return json.NewEncoder(writer).Encode(&nodeResponse{ + return codec.WriteJSON(ctx1, &nodeResponse{ Id: requestId, Result: json.RawMessage(buffer.Bytes()), Last: true,