forked from erigontech/erigon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
support.go
188 lines (170 loc) · 5.33 KB
/
support.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package app
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/binary"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/ledgerwatch/log/v3"
"github.com/urfave/cli/v2"
"golang.org/x/net/http2"
)
var (
diagnosticsURLFlag = cli.StringFlag{
Name: "diagnostics.url",
Usage: "URL of the diagnostics system provided by the support team, include unique session PIN",
}
metricsURLsFlag = cli.StringSliceFlag{
Name: "metrics.urls",
Usage: "Comma separated list of URLs to the metrics endpoints thats are being diagnosed",
}
insecureFlag = cli.BoolFlag{
Name: "insecure",
Usage: "Allows communication with diagnostics system using self-signed TLS certificates",
}
)
var supportCommand = cli.Command{
Action: MigrateFlags(connectDiagnostics),
Name: "support",
Usage: "Connect Erigon instance to a diagnostics system for support",
ArgsUsage: "--diagnostics.url <URL for the diagnostics system> --metrics.url <http://erigon_host:metrics_port>",
Flags: []cli.Flag{
&metricsURLsFlag,
&diagnosticsURLFlag,
&insecureFlag,
},
Category: "SUPPORT COMMANDS",
Description: `
The support command connects a running Erigon instances to a diagnostics system specified
by the URL.`,
}
const Version = 1
func connectDiagnostics(cliCtx *cli.Context) error {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
metricsURLs := cliCtx.StringSlice(metricsURLsFlag.Name)
metricsURL := metricsURLs[0] // TODO: Generalise
diagnosticsUrl := cliCtx.String(diagnosticsURLFlag.Name)
// Create a pool with the server certificate since it is not signed
// by a known CA
certPool := x509.NewCertPool()
// Create TLS configuration with the certificate of the server
insecure := cliCtx.Bool(insecureFlag.Name)
tlsConfig := &tls.Config{
RootCAs: certPool,
InsecureSkipVerify: insecure, //nolint:gosec
}
// Perform the requests in a loop (reconnect)
for {
if err := tunnel(ctx, cancel, sigs, tlsConfig, diagnosticsUrl, metricsURL); err != nil {
return err
}
select {
case <-ctx.Done():
// Quit immediately if the context was cancelled (by Ctrl-C or TERM signal)
return nil
default:
}
log.Info("Reconnecting in 1 second...")
timer := time.NewTimer(1 * time.Second)
<-timer.C
}
}
var successLine = []byte("SUCCESS")
// tunnel operates the tunnel from diagnostics system to the metrics URL for one http/2 request
// needs to be called repeatedly to implement re-connect logic
func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, tlsConfig *tls.Config, diagnosticsUrl string, metricsURL string) error {
diagnosticsClient := &http.Client{Transport: &http2.Transport{TLSClientConfig: tlsConfig}}
defer diagnosticsClient.CloseIdleConnections()
metricsClient := &http.Client{}
defer metricsClient.CloseIdleConnections()
// Create a request object to send to the server
reader, writer := io.Pipe()
ctx1, cancel1 := context.WithCancel(ctx)
defer cancel1()
go func() {
select {
case <-sigs:
cancel()
case <-ctx1.Done():
}
reader.Close()
writer.Close()
}()
req, err := http.NewRequestWithContext(ctx1, http.MethodPost, diagnosticsUrl, reader)
if err != nil {
return err
}
// Create a connection
// Apply given context to the sent request
resp, err := diagnosticsClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
defer writer.Close()
// Apply the connection context on the request context
var metricsBuf bytes.Buffer
r := bufio.NewReaderSize(resp.Body, 4096)
line, isPrefix, err := r.ReadLine()
if err != nil {
return fmt.Errorf("reading first line: %v", err)
}
if isPrefix {
return fmt.Errorf("request too long")
}
if !bytes.Equal(line, successLine) {
return fmt.Errorf("connecting to diagnostics system, first line [%s]", line)
}
var versionBytes [8]byte
binary.BigEndian.PutUint64(versionBytes[:], Version)
if _, err = writer.Write(versionBytes[:]); err != nil {
return fmt.Errorf("sending version: %v", err)
}
log.Info("Connected")
for line, isPrefix, err = r.ReadLine(); err == nil && !isPrefix; line, isPrefix, err = r.ReadLine() {
metricsBuf.Reset()
metricsResponse, err := metricsClient.Get(metricsURL + string(line))
if err != nil {
fmt.Fprintf(&metricsBuf, "ERROR: Requesting metrics url [%s], query [%s], err: %v", metricsURL, line, err)
} else {
// Buffer the metrics response, and relay it back to the diagnostics system, prepending with the size
if _, err := io.Copy(&metricsBuf, metricsResponse.Body); err != nil {
metricsBuf.Reset()
fmt.Fprintf(&metricsBuf, "ERROR: Extracting metrics url [%s], query [%s], err: %v", metricsURL, line, err)
}
metricsResponse.Body.Close()
}
var sizeBuf [4]byte
binary.BigEndian.PutUint32(sizeBuf[:], uint32(metricsBuf.Len()))
if _, err = writer.Write(sizeBuf[:]); err != nil {
log.Error("Problem relaying metrics prefix len", "url", metricsURL, "query", line, "err", err)
break
}
if _, err = writer.Write(metricsBuf.Bytes()); err != nil {
log.Error("Problem relaying", "url", metricsURL, "query", line, "err", err)
break
}
}
if err != nil {
select {
case <-ctx.Done():
default:
log.Error("Breaking connection", "err", err)
}
}
if isPrefix {
log.Error("Request too long, circuit breaker")
}
return nil
}