-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
267 lines (239 loc) · 6.88 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
/*
* Copyright (C) 2023 Nuts community
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
package main
import (
"context"
"embed"
"encoding/json"
"errors"
"fmt"
"github.com/nats-io/nats.go"
"io/fs"
"log"
"net/http"
"nuts-foundation/nuts-monitor/api"
"nuts-foundation/nuts-monitor/client"
"nuts-foundation/nuts-monitor/config"
"nuts-foundation/nuts-monitor/data"
"os"
"path"
"time"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
)
const assetPath = "web"
//go:embed web/*
var embeddedFiles embed.FS
func main() {
// first load the config
config := config.LoadConfig()
config.Print(log.Writer())
// create the Node API Client
client := client.HTTPClient{
Config: config,
}
// then initialize the data storage and fill it with the initial transactions
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := data.NewStore(client)
// connect to the NATS stream of the nuts node
startConsumer(ctx, store, config)
// load history async
loadHistory(ctx, store, config)
// start shifting windows
store.Start(ctx)
// start the web server
e := newEchoServer(config, store)
// Start server
e.Logger.Fatal(e.Start(fmt.Sprintf(":%d", 1313)))
}
// loadHistory uses a Go routine to load the transactions in the background
// On error it will retry every 10 seconds
func loadHistory(context context.Context, store *data.Store, c config.Config) {
// initialize the client
client := client.HTTPClient{
Config: c,
}
go func() {
// As long there's no error, keep retrying
for {
err := loadHistoryOnce(store, client)
select {
case <-context.Done():
return
default:
if err == nil {
return
}
log.Printf("failed to load historic transactions: %s", err)
log.Printf("retrying in 10 seconds")
// sleep for 10 seconds
<-time.After(10 * time.Second)
}
}
}()
}
// loadHistoryOnce loads the transactions from the Nuts node and stores them in the data store
func loadHistoryOnce(store *data.Store, client client.HTTPClient) error {
// load the initial transactions
// ListTransactions per batch of 100, stop if the list is empty
// currentOffset is used to determine the offset for the next batch
currentOffset := 0
for {
transactions, err := client.ListTransactions(context.Background(), currentOffset, currentOffset+100)
if err != nil {
return err
}
if len(transactions) == 0 {
break
}
// the transactions need to be converted from string to Transaction
for _, stringTransaction := range transactions {
transaction, err := data.FromJWS(stringTransaction)
if err != nil {
log.Printf("failed to parse transaction: %s", err)
}
store.Add(*transaction)
}
// increase offset for next batch
currentOffset += 100
}
return nil
}
// startConsumer will try to subscribe to NATS every 10 seconds
// it will retry until it succeeds
func startConsumer(ctx context.Context, store *data.Store, c config.Config) {
go func() {
for {
err := startConsumerOnce(ctx, store, c)
select {
case <-ctx.Done():
return
default:
if err == nil {
return
}
log.Printf("failed to start NATS consumer: %s", err)
log.Printf("retrying in 10 seconds")
// sleep for 10 seconds
<-time.After(10 * time.Second)
}
}
}()
}
// startConsumerOnce starts the NATS consumer
// it creates a non-durable subscription to the nuts node for the "nuts-disposable" stream
// and stores the transactions in the data store
func startConsumerOnce(ctx context.Context, store *data.Store, c config.Config) error {
// create a NATS connection
conn, err := nats.Connect(c.NutsNodeStreamAddr)
if err != nil {
return fmt.Errorf("failed to connect to NATS stream: %w", err)
}
// setup NATS JetStream
js, err := conn.JetStream()
if err != nil {
return fmt.Errorf("failed to connect to JetStream: %w", err)
}
// stream creation
_, err = js.StreamInfo("nuts-monitor")
if errors.Is(err, nats.ErrStreamNotFound) {
_, err = js.AddStream(&nats.StreamConfig{
Name: "nuts-monitor",
Subjects: []string{"TRANSACTIONS.*"},
MaxMsgs: 1000, // max buffer
Retention: nats.LimitsPolicy,
Storage: nats.MemoryStorage,
Discard: nats.DiscardOld,
})
if err != nil {
return fmt.Errorf("failed to create stream: %w", err)
}
} else if err != nil {
return err
}
// Subscriber options
opts := []nats.SubOpt{
nats.BindStream("nuts-monitor"),
nats.DeliverNew(),
nats.Context(ctx),
}
// subscribe through JetStream
_, err = js.Subscribe("TRANSACTIONS.*", func(msg *nats.Msg) {
// parse the transaction, it's in JSON format
event := transactionEvent{}
err := json.Unmarshal(msg.Data, &event)
if err != nil {
log.Printf("failed to parse transaction event: %s", err)
}
transaction, err := data.FromJWS(event.Transaction)
if err != nil {
log.Printf("failed to parse transaction: %s", err)
}
// add transaction to store
store.Add(*transaction)
}, opts...)
if err != nil {
return fmt.Errorf("failed to subscribe to stream: %w", err)
}
return nil
}
type transactionEvent struct {
// Transaction is in compacted JWS format
Transaction string `json:"transaction"`
// Payload is base64
Payload string `json:"payload"`
}
func newEchoServer(config config.Config, store *data.Store) *echo.Echo {
// http server
e := echo.New()
e.HideBanner = true
loggerConfig := middleware.DefaultLoggerConfig
e.Use(middleware.LoggerWithConfig(loggerConfig))
// add status endpoint
e.GET("/status", func(context echo.Context) error {
return context.String(http.StatusOK, "OK")
})
// API endpoints from OAS spec
apiWrapper := api.Wrapper{
Config: config,
Client: client.HTTPClient{
Config: config,
},
DataStore: store,
}
api.RegisterHandlers(e, api.NewStrictHandler(apiWrapper, []api.StrictMiddlewareFunc{}))
// Setup asset serving:
// Check if we use live mode from the file system or using embedded files
useFS := len(os.Args) > 1 && os.Args[1] == "live"
assetHandler := http.FileServer(getFileSystem(useFS))
e.GET("/*", echo.WrapHandler(assetHandler))
return e
}
func getFileSystem(useFS bool) http.FileSystem {
if useFS {
log.Print("using live mode")
return http.FS(os.DirFS(path.Join(assetPath, "dist")))
}
log.Print("using embed mode")
fsys, err := fs.Sub(embeddedFiles, path.Join(assetPath, "dist"))
if err != nil {
panic(err)
}
return http.FS(fsys)
}