Skip to content

Commit

Permalink
dvovk/tunnelwws (#8745)
Browse files Browse the repository at this point in the history
- changed communication tunnel to web socket in order to connect to
remote nodes
- changed diagnostics.url flag to diagnostics.addr as now user need to
enter only address and support command will connect to it through
websocket
- changed flag debug.urls to debug.addrs in order to have ability to
change connection type between erigon and support to websocket and don't
change user API
- added auto trying to connect to connect to ws if connection with was
failed
  • Loading branch information
dvovk authored Nov 16, 2023
1 parent 27d8865 commit a6b5297
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 61 deletions.
4 changes: 2 additions & 2 deletions cmd/devnet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ The devnet runs as a single `go` process which can be started with the following
| metrics | N | false | Enable metrics collection and reporting from devnet nodes |
| metrics.node | N | 0 | At the moment only one node on the network can produce metrics. This value specifies index of the node in the cluster to attach to |
| metrics.port | N | 6060 | The network port of the node to connect to for gather ing metrics |
| diagnostics.url | N | | URL of the diagnostics system provided by the support team, include unique session PIN, if this is specified the devnet will start a `support` tunnel and connect to the diagnostics platform to provide metrics from the specified node on the devnet |
| insecure | N | false | Used if `diagnostics.url` is set to allow communication with diagnostics system using self-signed TLS certificates |
| diagnostics.addr | N | | Address of the diagnostics system provided by the support team, include unique session PIN, if this is specified the devnet will start a `support` tunnel and connect to the diagnostics platform to provide metrics from the specified node on the devnet |
| insecure | N | false | Used if `diagnostics.addr` is set to allow communication with diagnostics system

## Network Configuration

Expand Down
4 changes: 2 additions & 2 deletions cmd/devnet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ var (
}

DiagnosticsURLFlag = cli.StringFlag{
Name: "diagnostics.url",
Usage: "URL of the diagnostics system provided by the support team, include unique session PIN",
Name: "diagnostics.addr",
Usage: "Address of the diagnostics system provided by the support team, include unique session PIN",
}

insecureFlag = cli.BoolFlag{
Expand Down
4 changes: 2 additions & 2 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,8 +832,8 @@ var (
}

DiagnosticsURLFlag = cli.StringFlag{
Name: "diagnostics.url",
Usage: "URL of the diagnostics system provided by the support team",
Name: "diagnostics.addr",
Usage: "Address of the diagnostics system provided by the support team",
}

DiagnosticsInsecureFlag = cli.BoolFlag{
Expand Down
4 changes: 2 additions & 2 deletions rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func (c *Client) write(ctx context.Context, msg interface{}, retry bool) error {
return err
}
}
err := c.writeConn.writeJSON(ctx, msg)
err := c.writeConn.WriteJSON(ctx, msg)
if err != nil {
c.writeConn = nil
if !retry {
Expand Down Expand Up @@ -629,7 +629,7 @@ func (c *Client) read(codec ServerCodec) {
for {
msgs, batch, err := codec.ReadBatch()
if _, ok := err.(*json.SyntaxError); ok {
codec.writeJSON(context.Background(), errorMessage(&parseError{err.Error()}))
codec.WriteJSON(context.Background(), errorMessage(&parseError{err.Error()}))
}
if err != nil {
c.readErr <- err
Expand Down
6 changes: 3 additions & 3 deletions rpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
// Emit error response for empty batches:
if len(msgs) == 0 {
h.startCallProc(func(cp *callProc) {
h.conn.writeJSON(cp.ctx, errorMessage(&invalidRequestError{"empty batch"}))
h.conn.WriteJSON(cp.ctx, errorMessage(&invalidRequestError{"empty batch"}))
})
return
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
}
h.addSubscriptions(cp.notifiers)
if len(answers) > 0 {
h.conn.writeJSON(cp.ctx, answers)
h.conn.WriteJSON(cp.ctx, answers)
}
for _, n := range cp.notifiers {
n.activate()
Expand All @@ -226,7 +226,7 @@ func (h *handler) handleMsg(msg *jsonrpcMessage, stream *jsoniter.Stream) {
stream.Write(buffer)
}
if needWriteStream {
h.conn.writeJSON(cp.ctx, json.RawMessage(stream.Buffer()))
h.conn.WriteJSON(cp.ctx, json.RawMessage(stream.Buffer()))
} else {
stream.Write([]byte("\n"))
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type httpConn struct {
}

// httpConn is treated specially by Client.
func (hc *httpConn) writeJSON(context.Context, interface{}) error {
func (hc *httpConn) WriteJSON(context.Context, interface{}) error {
panic("writeJSON called on httpConn")
}

Expand Down
2 changes: 1 addition & 1 deletion rpc/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (c *jsonCodec) ReadBatch() (messages []*jsonrpcMessage, batch bool, err err
return messages, batch, nil
}

func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}) error {
func (c *jsonCodec) WriteJSON(ctx context.Context, v interface{}) error {
c.encMu.Lock()
defer c.encMu.Unlock()

Expand Down
4 changes: 2 additions & 2 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec, stre
reqs, batch, err := codec.ReadBatch()
if err != nil {
if err != io.EOF {
codec.writeJSON(ctx, errorMessage(&invalidMessageError{"parse error"}))
codec.WriteJSON(ctx, errorMessage(&invalidMessageError{"parse error"}))
}
return
}
if batch {
if s.batchLimit > 0 && len(reqs) > s.batchLimit {
codec.writeJSON(ctx, errorMessage(fmt.Errorf("batch limit %d exceeded (can increase by --rpc.batch.limit). Requested batch of size: %d", s.batchLimit, len(reqs))))
codec.WriteJSON(ctx, errorMessage(fmt.Errorf("batch limit %d exceeded (can increase by --rpc.batch.limit). Requested batch of size: %d", s.batchLimit, len(reqs))))
} else {
h.handleBatch(reqs)
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (n *Notifier) activate() error {
func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data})
ctx := context.Background()
return n.h.conn.writeJSON(ctx, &jsonrpcMessage{
return n.h.conn.WriteJSON(ctx, &jsonrpcMessage{
Version: vsn,
Method: n.namespace + notificationMethodSuffix,
Params: params,
Expand Down
5 changes: 3 additions & 2 deletions rpc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/ledgerwatch/erigon-lib/common/hexutil"
"math"
"math/big"
"strconv"
"strings"

"github.com/ledgerwatch/erigon-lib/common/hexutil"

libcommon "github.com/ledgerwatch/erigon-lib/common"
)

Expand Down Expand Up @@ -61,7 +62,7 @@ type ServerCodec interface {
// jsonWriter can write JSON messages to its underlying connection.
// Implementations must be safe for concurrent use.
type jsonWriter interface {
writeJSON(context.Context, interface{}) error
WriteJSON(context.Context, interface{}) error
// Closed returns a channel which is closed when the connection is closed.
closed() <-chan interface{}
// RemoteAddr returns the peer address of the connection.
Expand Down
10 changes: 5 additions & 5 deletions rpc/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (s *Server) WebsocketHandler(allowedOrigins []string, jwtSecret []byte, com
logger.Warn("WebSocket upgrade failed", "err", err)
return
}
codec := newWebsocketCodec(conn)
codec := NewWebsocketCodec(conn)
s.ServeCodec(codec, 0)
})
}
Expand Down Expand Up @@ -205,7 +205,7 @@ func DialWebsocketWithDialer(ctx context.Context, endpoint, origin string, diale
}
return nil, hErr
}
return newWebsocketCodec(conn), nil
return NewWebsocketCodec(conn), nil
}, logger)
}

Expand Down Expand Up @@ -248,7 +248,7 @@ type websocketCodec struct {
pingReset chan struct{}
}

func newWebsocketCodec(conn *websocket.Conn) ServerCodec {
func NewWebsocketCodec(conn *websocket.Conn) ServerCodec {
conn.SetReadLimit(wsMessageSizeLimit)
wc := &websocketCodec{
jsonCodec: NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON).(*jsonCodec),
Expand All @@ -265,8 +265,8 @@ func (wc *websocketCodec) Close() {
wc.wg.Wait()
}

func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}) error {
err := wc.jsonCodec.writeJSON(ctx, v)
func (wc *websocketCodec) WriteJSON(ctx context.Context, v interface{}) error {
err := wc.jsonCodec.WriteJSON(ctx, v)
if err == nil {
// Notify pingLoop to delay the next idle ping.
select {
Expand Down
Loading

0 comments on commit a6b5297

Please sign in to comment.