diff --git a/go.mod b/go.mod index 55e0cf4e..4f66d6ed 100644 --- a/go.mod +++ b/go.mod @@ -25,9 +25,12 @@ require ( github.com/stripe/stripe-go/v72 v72.122.0 github.com/uptrace/opentelemetry-go-extra/otellogrus v0.1.15 github.com/xdg-go/scram v1.1.1 - go.mongodb.org/mongo-driver v1.10.1 + go.mongodb.org/mongo-driver v1.10.3 go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.34.0 + go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.36.4 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.36.4 go.opentelemetry.io/otel v1.11.1 + go.opentelemetry.io/otel/trace v1.11.1 go.uber.org/dig v1.15.0 go.uber.org/fx v1.18.1 golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde @@ -105,8 +108,8 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.1 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.11.1 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.11.1 // indirect + go.opentelemetry.io/otel/metric v0.33.0 // indirect go.opentelemetry.io/otel/sdk v1.11.1 // indirect - go.opentelemetry.io/otel/trace v1.11.1 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.8.0 // indirect diff --git a/go.sum b/go.sum index 40fe14ea..d87a2677 100644 --- a/go.sum +++ b/go.sum @@ -493,8 +493,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= -go.mongodb.org/mongo-driver v1.10.1 h1:NujsPveKwHaWuKUer/ceo9DzEe7HIj1SlJ6uvXZG0S4= -go.mongodb.org/mongo-driver v1.10.1/go.mod h1:z4XpeoU6w+9Vht+jAFyLgVrD+jGSQQe0+CBWFHNiHt8= +go.mongodb.org/mongo-driver v1.10.3 h1:XDQEvmh6z1EUsXuIkXE9TaVeqHw6SwS1uf93jFs0HBA= +go.mongodb.org/mongo-driver v1.10.3/go.mod h1:z4XpeoU6w+9Vht+jAFyLgVrD+jGSQQe0+CBWFHNiHt8= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -505,6 +505,10 @@ go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.34.0/go.mod h1:8cfNbNK5aJIRQnqOFGEALF17iZPDym2mmYP+ECIxi8M= go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.34.0 h1:OkXMRbgldT4yZR7RwB4SFYTjYJGTXwPQVX69pYtTnc4= go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.34.0/go.mod h1:zMu+r6aEorSQi8Ad0Y1fNrznm+VM8F10D2WlZp3HeFw= +go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.36.4 h1:IKvVGMy0s5MH0cKfwmwiHVtnrVOFuHU/wznLa8eN+Cs= +go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.36.4/go.mod h1:mHrZBcL5tUSxYX1emmDCNDDf9an1PedCEGum4p9+Ep8= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.36.4 h1:aUEBEdCa6iamGzg6fuYxDA8ThxvOG240mAvWDU+XLio= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.36.4/go.mod h1:l2MdsbKTocpPS5nQZscqTR9jd8u96VYZdcpF8Sye7mA= go.opentelemetry.io/contrib/propagators/b3 v1.11.1 h1:icQ6ttRV+r/2fnU46BIo/g/mPu6Rs5Ug8Rtohe3KqzI= go.opentelemetry.io/contrib/propagators/b3 v1.11.1/go.mod h1:ECIveyMXgnl4gorxFcA7RYjJY/Ql9n20ubhbfDc3QfA= go.opentelemetry.io/otel v1.11.1 h1:4WLLAmcfkmDk2ukNXJyq3/kiz/3UzCaYq6PskJsaou4= @@ -521,6 +525,8 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.11.1 h1:tFl63 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.11.1/go.mod h1:X620Jww3RajCJXw/unA+8IRTgxkdS7pi+ZwK9b7KUJk= go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.11.1 h1:3Yvzs7lgOw8MmbxmLRsQGwYdCubFmUHSooKaEhQunFQ= go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.11.1/go.mod h1:pyHDt0YlyuENkD2VwHsiRDf+5DfI3EH7pfhUYW6sQUE= +go.opentelemetry.io/otel/metric v0.33.0 h1:xQAyl7uGEYvrLAiV/09iTJlp1pZnQ9Wl793qbVvED1E= +go.opentelemetry.io/otel/metric v0.33.0/go.mod h1:QlTYc+EnYNq/M2mNk1qDDMRLpqCOj2f/r5c7Fd5FYaI= go.opentelemetry.io/otel/sdk v1.11.1 h1:F7KmQgoHljhUuJyA+9BiU+EkJfyX5nVVF4wyzWZpKxs= go.opentelemetry.io/otel/sdk v1.11.1/go.mod h1:/l3FE4SupHJ12TduVjUkZtlfFqDCQJlOlithYrdktys= go.opentelemetry.io/otel/trace v1.11.1 h1:ofxdnzsNrGBYXbP7t7zpUK281+go5rF7dvdIZXF8gdQ= diff --git a/internal/app/database/module.go b/internal/app/database/module.go index a3b098f0..0b294807 100644 --- a/internal/app/database/module.go +++ b/internal/app/database/module.go @@ -12,6 +12,7 @@ import ( "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readpref" + "go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo" "go.uber.org/fx" ) @@ -22,6 +23,7 @@ func MongoModule(uri string, dbName string) fx.Option { reg := bson.NewRegistryBuilder().RegisterTypeMapEntry(bsontype.EmbeddedDocument, tM).Build() return options.Client(). + SetMonitor(otelmongo.NewMonitor()). SetRegistry(reg). ApplyURI(uri) }), diff --git a/internal/pkg/connectors/bankingcircle/client.go b/internal/pkg/connectors/bankingcircle/client.go index 2ff81b1f..a91fb3e0 100644 --- a/internal/pkg/connectors/bankingcircle/client.go +++ b/internal/pkg/connectors/bankingcircle/client.go @@ -1,6 +1,7 @@ package bankingcircle import ( + "context" "encoding/json" "fmt" "io" @@ -8,6 +9,7 @@ import ( "time" "github.com/formancehq/go-libs/sharedlogging" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) type client struct { @@ -25,9 +27,16 @@ type client struct { accessTokenExpiresAt time.Time } +func newHTTPClient() *http.Client { + return &http.Client{ + Timeout: 10 * time.Second, + Transport: otelhttp.NewTransport(http.DefaultTransport), + } +} + func newClient(username, password, endpoint, authorizationEndpoint string, logger sharedlogging.Logger) (*client, error) { c := &client{ - httpClient: &http.Client{Timeout: 10 * time.Second}, + httpClient: newHTTPClient(), username: username, password: password, @@ -37,15 +46,16 @@ func newClient(username, password, endpoint, authorizationEndpoint string, logge logger: logger, } - if err := c.login(); err != nil { + if err := c.login(context.TODO()); err != nil { return nil, err } return c, nil } -func (c *client) login() error { - req, err := http.NewRequest(http.MethodGet, c.authorizationEndpoint+"/api/v1/authorizations/authorize", http.NoBody) +func (c *client) login(ctx context.Context) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, + c.authorizationEndpoint+"/api/v1/authorizations/authorize", http.NoBody) if err != nil { return fmt.Errorf("failed to create login request: %w", err) } @@ -87,12 +97,12 @@ func (c *client) login() error { return nil } -func (c *client) ensureAccessTokenIsValid() error { +func (c *client) ensureAccessTokenIsValid(ctx context.Context) error { if c.accessTokenExpiresAt.After(time.Now()) { return nil } - return c.login() + return c.login(ctx) } //nolint:tagliatelle // allow for client-side structures @@ -164,11 +174,11 @@ type payment struct { } `json:"creditorInformation"` } -func (c *client) getAllPayments() ([]*payment, error) { +func (c *client) getAllPayments(ctx context.Context) ([]*payment, error) { var payments []*payment for page := 0; ; page++ { - pagedPayments, err := c.getPayments(page) + pagedPayments, err := c.getPayments(ctx, page) if err != nil { return nil, err } @@ -183,12 +193,12 @@ func (c *client) getAllPayments() ([]*payment, error) { return payments, nil } -func (c *client) getPayments(page int) ([]*payment, error) { - if err := c.ensureAccessTokenIsValid(); err != nil { +func (c *client) getPayments(ctx context.Context, page int) ([]*payment, error) { + if err := c.ensureAccessTokenIsValid(ctx); err != nil { return nil, err } - req, err := http.NewRequest(http.MethodGet, c.endpoint+"/api/v1/payments/singles", http.NoBody) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.endpoint+"/api/v1/payments/singles", http.NoBody) if err != nil { return nil, fmt.Errorf("failed to create login request: %w", err) } diff --git a/internal/pkg/connectors/bankingcircle/task_fetch_payments.go b/internal/pkg/connectors/bankingcircle/task_fetch_payments.go index bde33cd1..e83d3f23 100644 --- a/internal/pkg/connectors/bankingcircle/task_fetch_payments.go +++ b/internal/pkg/connectors/bankingcircle/task_fetch_payments.go @@ -16,7 +16,7 @@ func taskFetchPayments(logger sharedlogging.Logger, client *client) task.Task { scheduler task.Scheduler[TaskDescriptor], ingester ingestion.Ingester, ) error { - paymentsList, err := client.getAllPayments() + paymentsList, err := client.getAllPayments(ctx) if err != nil { return err } diff --git a/internal/pkg/connectors/currencycloud/client/client.go b/internal/pkg/connectors/currencycloud/client/client.go index 14833b57..d402d85e 100644 --- a/internal/pkg/connectors/currencycloud/client/client.go +++ b/internal/pkg/connectors/currencycloud/client/client.go @@ -4,16 +4,19 @@ import ( "context" "fmt" "net/http" + + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) type apiTransport struct { - authToken string + authToken string + underlying *otelhttp.Transport } func (t *apiTransport) RoundTrip(req *http.Request) (*http.Response, error) { req.Header.Add("X-Auth-Token", t.authToken) - return http.DefaultTransport.RoundTrip(req) + return t.underlying.RoundTrip(req) } type Client struct { @@ -29,6 +32,21 @@ func (c *Client) buildEndpoint(path string, args ...interface{}) string { const devAPIEndpoint = "https://devapi.currencycloud.com" +func newAuthenticatedHTTPClient(authToken string) *http.Client { + return &http.Client{ + Transport: &apiTransport{ + authToken: authToken, + underlying: otelhttp.NewTransport(http.DefaultTransport), + }, + } +} + +func newHTTPClient() *http.Client { + return &http.Client{ + Transport: otelhttp.NewTransport(http.DefaultTransport), + } +} + // NewClient creates a new client for the CurrencyCloud API. func NewClient(ctx context.Context, loginID, apiKey, endpoint string) (*Client, error) { if endpoint == "" { @@ -36,7 +54,7 @@ func NewClient(ctx context.Context, loginID, apiKey, endpoint string) (*Client, } c := &Client{ - httpClient: &http.Client{}, + httpClient: newHTTPClient(), endpoint: endpoint, loginID: loginID, apiKey: apiKey, @@ -47,7 +65,7 @@ func NewClient(ctx context.Context, loginID, apiKey, endpoint string) (*Client, return nil, err } - c.httpClient.Transport = &apiTransport{authToken: authToken} + c.httpClient = newAuthenticatedHTTPClient(authToken) return c, nil } diff --git a/internal/pkg/connectors/modulr/client/client.go b/internal/pkg/connectors/modulr/client/client.go index b0e9b3b7..cf5734ee 100644 --- a/internal/pkg/connectors/modulr/client/client.go +++ b/internal/pkg/connectors/modulr/client/client.go @@ -5,17 +5,19 @@ import ( "net/http" "github.com/formancehq/payments/internal/pkg/connectors/modulr/hmac" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) type apiTransport struct { - apiKey string - headers map[string]string + apiKey string + headers map[string]string + underlying http.RoundTripper } func (t *apiTransport) RoundTrip(req *http.Request) (*http.Response, error) { req.Header.Add("Authorization", t.apiKey) - return http.DefaultTransport.RoundTrip(req) + return t.underlying.RoundTrip(req) } type responseWrapper[t any] struct { @@ -50,8 +52,9 @@ func NewClient(apiKey, apiSecret, endpoint string) (*Client, error) { return &Client{ httpClient: &http.Client{ Transport: &apiTransport{ - headers: headers, - apiKey: apiKey, + headers: headers, + apiKey: apiKey, + underlying: otelhttp.NewTransport(http.DefaultTransport), }, }, endpoint: endpoint, diff --git a/internal/pkg/connectors/stripe/client.go b/internal/pkg/connectors/stripe/client.go index 7e48d88b..f6df9a06 100644 --- a/internal/pkg/connectors/stripe/client.go +++ b/internal/pkg/connectors/stripe/client.go @@ -7,6 +7,7 @@ import ( "net/http" "github.com/formancehq/payments/internal/pkg/writeonly" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "github.com/pkg/errors" "github.com/stripe/stripe-go/v72" @@ -51,13 +52,11 @@ func (d *DefaultClient) ForAccount(account string) Client { func (d *DefaultClient) BalanceTransactions(ctx context.Context, options ...ClientOption, ) ([]*stripe.BalanceTransaction, bool, error) { - req, err := http.NewRequest(http.MethodGet, balanceTransactionsEndpoint, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, balanceTransactionsEndpoint, nil) if err != nil { return nil, false, errors.Wrap(err, "creating http request") } - req = req.WithContext(ctx) - for _, opt := range options { opt.apply(req) } @@ -127,9 +126,15 @@ func (d *DefaultClient) BalanceTransactions(ctx context.Context, return asBalanceTransactions, rsp.HasMore, nil } -func NewDefaultClient(httpClient *http.Client, apiKey string, storage writeonly.Storage) *DefaultClient { +func newHTTPClient() *http.Client { + return &http.Client{ + Transport: otelhttp.NewTransport(http.DefaultTransport), + } +} + +func NewDefaultClient(apiKey string, storage writeonly.Storage) *DefaultClient { return &DefaultClient{ - httpClient: httpClient, + httpClient: newHTTPClient(), apiKey: apiKey, storage: storage, } diff --git a/internal/pkg/connectors/stripe/task_connected_account.go b/internal/pkg/connectors/stripe/task_connected_account.go index d6197c00..088dc621 100644 --- a/internal/pkg/connectors/stripe/task_connected_account.go +++ b/internal/pkg/connectors/stripe/task_connected_account.go @@ -2,7 +2,6 @@ package stripe import ( "context" - "net/http" "github.com/formancehq/payments/internal/pkg/ingestion" "github.com/formancehq/payments/internal/pkg/task" @@ -57,7 +56,7 @@ func ConnectedAccountTask(config Config, account string) func(ctx context.Contex IngesterFn(func(ctx context.Context, bts []*stripe.BalanceTransaction, commitState TimelineState, tail bool) error { return ingestBatch(ctx, logger, ingester, bts, commitState, tail) }), - NewTimeline(NewDefaultClient(http.DefaultClient, config.APIKey, storage). + NewTimeline(NewDefaultClient(config.APIKey, storage). ForAccount(account), config.TimelineConfig, task.MustResolveTo(ctx, resolver, TimelineState{})), ) diff --git a/internal/pkg/connectors/stripe/task_main.go b/internal/pkg/connectors/stripe/task_main.go index 5d8057e9..acc2473a 100644 --- a/internal/pkg/connectors/stripe/task_main.go +++ b/internal/pkg/connectors/stripe/task_main.go @@ -2,13 +2,11 @@ package stripe import ( "context" - "net/http" + "github.com/formancehq/go-libs/sharedlogging" "github.com/formancehq/payments/internal/pkg/ingestion" "github.com/formancehq/payments/internal/pkg/task" "github.com/formancehq/payments/internal/pkg/writeonly" - - "github.com/formancehq/go-libs/sharedlogging" "github.com/pkg/errors" "github.com/stripe/stripe-go/v72" ) @@ -62,7 +60,7 @@ func MainTask(config Config) func(ctx context.Context, logger sharedlogging.Logg ) error { return ingest(ctx, logger, scheduler, ingester, batch, commitState, tail) }), - NewTimeline(NewDefaultClient(http.DefaultClient, config.APIKey, storage), + NewTimeline(NewDefaultClient(config.APIKey, storage), config.TimelineConfig, task.MustResolveTo(ctx, resolver, TimelineState{})), ), config.PollingPeriod.Duration, diff --git a/internal/pkg/connectors/wise/client.go b/internal/pkg/connectors/wise/client.go index c8e89bdf..fb722599 100644 --- a/internal/pkg/connectors/wise/client.go +++ b/internal/pkg/connectors/wise/client.go @@ -6,18 +6,21 @@ import ( "fmt" "io" "net/http" + + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) const apiEndpoint = "https://api.wise.com" type apiTransport struct { - APIKey string + APIKey string + underlying http.RoundTripper } func (t *apiTransport) RoundTrip(req *http.Request) (*http.Response, error) { req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", t.APIKey)) - return http.DefaultTransport.RoundTrip(req) + return t.underlying.RoundTrip(req) } type client struct { @@ -77,14 +80,14 @@ func (w *client) getProfiles() ([]profile, error) { return profiles, nil } -func (w *client) getTransfers(profile *profile) ([]transfer, error) { +func (w *client) getTransfers(ctx context.Context, profile *profile) ([]transfer, error) { var transfers []transfer limit := 10 offset := 0 for { - req, err := http.NewRequestWithContext(context.TODO(), + req, err := http.NewRequestWithContext(ctx, http.MethodGet, w.endpoint("v1/transfers"), http.NoBody) if err != nil { return transfers, err @@ -134,7 +137,8 @@ func (w *client) getTransfers(profile *profile) ([]transfer, error) { func newClient(apiKey string) *client { httpClient := &http.Client{ Transport: &apiTransport{ - APIKey: apiKey, + APIKey: apiKey, + underlying: otelhttp.NewTransport(http.DefaultTransport), }, } diff --git a/internal/pkg/connectors/wise/task_fetch_transfers.go b/internal/pkg/connectors/wise/task_fetch_transfers.go index 2fddf40e..53b3c475 100644 --- a/internal/pkg/connectors/wise/task_fetch_transfers.go +++ b/internal/pkg/connectors/wise/task_fetch_transfers.go @@ -16,7 +16,7 @@ func taskFetchTransfers(logger sharedlogging.Logger, client *client, profileID u scheduler task.Scheduler[TaskDescriptor], ingester ingestion.Ingester, ) error { - transfers, err := client.getTransfers(&profile{ + transfers, err := client.getTransfers(ctx, &profile{ ID: profileID, }) if err != nil { diff --git a/internal/pkg/task/scheduler.go b/internal/pkg/task/scheduler.go index 022b367e..521552bd 100644 --- a/internal/pkg/task/scheduler.go +++ b/internal/pkg/task/scheduler.go @@ -7,11 +7,13 @@ import ( "sync" "time" - "github.com/formancehq/payments/internal/pkg/payments" - "github.com/formancehq/go-libs/sharedlogging" + "github.com/formancehq/payments/internal/pkg/payments" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) var ( @@ -202,9 +204,12 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) startTask(descriptor TaskDescript return ErrUnableToResolve } - // TODO: Check task using reflection - ctx, cancel := context.WithCancel(context.Background()) + ctx, span := otel.Tracer("com.formance.payments").Start(ctx, "Task", trace.WithAttributes( + attribute.String("id", taskID), + attribute.String("connector", s.provider), + )) + holder := &taskHolder[TaskDescriptor]{ cancel: cancel, logger: logger, @@ -269,6 +274,7 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) startTask(descriptor TaskDescript logger.Infof("Starting task...") defer func() { + defer span.End() defer s.deleteTask(holder) if e := recover(); e != nil {