Skip to content

Commit

Permalink
feat(opentelemetry): instrumentalize http calls and database accesses (
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored Nov 25, 2022
1 parent a7d0c9b commit c8d90b3
Show file tree
Hide file tree
Showing 13 changed files with 100 additions and 46 deletions.
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ require (
github.com/stripe/stripe-go/v72 v72.122.0
github.com/uptrace/opentelemetry-go-extra/otellogrus v0.1.15
github.com/xdg-go/scram v1.1.1
go.mongodb.org/mongo-driver v1.10.1
go.mongodb.org/mongo-driver v1.10.3
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.34.0
go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.36.4
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.36.4
go.opentelemetry.io/otel v1.11.1
go.opentelemetry.io/otel/trace v1.11.1
go.uber.org/dig v1.15.0
go.uber.org/fx v1.18.1
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde
Expand Down Expand Up @@ -105,8 +108,8 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.11.1 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.11.1 // indirect
go.opentelemetry.io/otel/metric v0.33.0 // indirect
go.opentelemetry.io/otel/sdk v1.11.1 // indirect
go.opentelemetry.io/otel/trace v1.11.1 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
Expand Down
10 changes: 8 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.mongodb.org/mongo-driver v1.10.1 h1:NujsPveKwHaWuKUer/ceo9DzEe7HIj1SlJ6uvXZG0S4=
go.mongodb.org/mongo-driver v1.10.1/go.mod h1:z4XpeoU6w+9Vht+jAFyLgVrD+jGSQQe0+CBWFHNiHt8=
go.mongodb.org/mongo-driver v1.10.3 h1:XDQEvmh6z1EUsXuIkXE9TaVeqHw6SwS1uf93jFs0HBA=
go.mongodb.org/mongo-driver v1.10.3/go.mod h1:z4XpeoU6w+9Vht+jAFyLgVrD+jGSQQe0+CBWFHNiHt8=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
Expand All @@ -505,6 +505,10 @@ go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama
go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.34.0/go.mod h1:8cfNbNK5aJIRQnqOFGEALF17iZPDym2mmYP+ECIxi8M=
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.34.0 h1:OkXMRbgldT4yZR7RwB4SFYTjYJGTXwPQVX69pYtTnc4=
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.34.0/go.mod h1:zMu+r6aEorSQi8Ad0Y1fNrznm+VM8F10D2WlZp3HeFw=
go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.36.4 h1:IKvVGMy0s5MH0cKfwmwiHVtnrVOFuHU/wznLa8eN+Cs=
go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.36.4/go.mod h1:mHrZBcL5tUSxYX1emmDCNDDf9an1PedCEGum4p9+Ep8=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.36.4 h1:aUEBEdCa6iamGzg6fuYxDA8ThxvOG240mAvWDU+XLio=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.36.4/go.mod h1:l2MdsbKTocpPS5nQZscqTR9jd8u96VYZdcpF8Sye7mA=
go.opentelemetry.io/contrib/propagators/b3 v1.11.1 h1:icQ6ttRV+r/2fnU46BIo/g/mPu6Rs5Ug8Rtohe3KqzI=
go.opentelemetry.io/contrib/propagators/b3 v1.11.1/go.mod h1:ECIveyMXgnl4gorxFcA7RYjJY/Ql9n20ubhbfDc3QfA=
go.opentelemetry.io/otel v1.11.1 h1:4WLLAmcfkmDk2ukNXJyq3/kiz/3UzCaYq6PskJsaou4=
Expand All @@ -521,6 +525,8 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.11.1 h1:tFl63
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.11.1/go.mod h1:X620Jww3RajCJXw/unA+8IRTgxkdS7pi+ZwK9b7KUJk=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.11.1 h1:3Yvzs7lgOw8MmbxmLRsQGwYdCubFmUHSooKaEhQunFQ=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.11.1/go.mod h1:pyHDt0YlyuENkD2VwHsiRDf+5DfI3EH7pfhUYW6sQUE=
go.opentelemetry.io/otel/metric v0.33.0 h1:xQAyl7uGEYvrLAiV/09iTJlp1pZnQ9Wl793qbVvED1E=
go.opentelemetry.io/otel/metric v0.33.0/go.mod h1:QlTYc+EnYNq/M2mNk1qDDMRLpqCOj2f/r5c7Fd5FYaI=
go.opentelemetry.io/otel/sdk v1.11.1 h1:F7KmQgoHljhUuJyA+9BiU+EkJfyX5nVVF4wyzWZpKxs=
go.opentelemetry.io/otel/sdk v1.11.1/go.mod h1:/l3FE4SupHJ12TduVjUkZtlfFqDCQJlOlithYrdktys=
go.opentelemetry.io/otel/trace v1.11.1 h1:ofxdnzsNrGBYXbP7t7zpUK281+go5rF7dvdIZXF8gdQ=
Expand Down
2 changes: 2 additions & 0 deletions internal/app/database/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo"
"go.uber.org/fx"
)

Expand All @@ -22,6 +23,7 @@ func MongoModule(uri string, dbName string) fx.Option {
reg := bson.NewRegistryBuilder().RegisterTypeMapEntry(bsontype.EmbeddedDocument, tM).Build()

return options.Client().
SetMonitor(otelmongo.NewMonitor()).
SetRegistry(reg).
ApplyURI(uri)
}),
Expand Down
32 changes: 21 additions & 11 deletions internal/pkg/connectors/bankingcircle/client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package bankingcircle

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"

"github.com/formancehq/go-libs/sharedlogging"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

type client struct {
Expand All @@ -25,9 +27,16 @@ type client struct {
accessTokenExpiresAt time.Time
}

func newHTTPClient() *http.Client {
return &http.Client{
Timeout: 10 * time.Second,
Transport: otelhttp.NewTransport(http.DefaultTransport),
}
}

func newClient(username, password, endpoint, authorizationEndpoint string, logger sharedlogging.Logger) (*client, error) {
c := &client{
httpClient: &http.Client{Timeout: 10 * time.Second},
httpClient: newHTTPClient(),

username: username,
password: password,
Expand All @@ -37,15 +46,16 @@ func newClient(username, password, endpoint, authorizationEndpoint string, logge
logger: logger,
}

if err := c.login(); err != nil {
if err := c.login(context.TODO()); err != nil {
return nil, err
}

return c, nil
}

func (c *client) login() error {
req, err := http.NewRequest(http.MethodGet, c.authorizationEndpoint+"/api/v1/authorizations/authorize", http.NoBody)
func (c *client) login(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
c.authorizationEndpoint+"/api/v1/authorizations/authorize", http.NoBody)
if err != nil {
return fmt.Errorf("failed to create login request: %w", err)
}
Expand Down Expand Up @@ -87,12 +97,12 @@ func (c *client) login() error {
return nil
}

func (c *client) ensureAccessTokenIsValid() error {
func (c *client) ensureAccessTokenIsValid(ctx context.Context) error {
if c.accessTokenExpiresAt.After(time.Now()) {
return nil
}

return c.login()
return c.login(ctx)
}

//nolint:tagliatelle // allow for client-side structures
Expand Down Expand Up @@ -164,11 +174,11 @@ type payment struct {
} `json:"creditorInformation"`
}

func (c *client) getAllPayments() ([]*payment, error) {
func (c *client) getAllPayments(ctx context.Context) ([]*payment, error) {
var payments []*payment

for page := 0; ; page++ {
pagedPayments, err := c.getPayments(page)
pagedPayments, err := c.getPayments(ctx, page)
if err != nil {
return nil, err
}
Expand All @@ -183,12 +193,12 @@ func (c *client) getAllPayments() ([]*payment, error) {
return payments, nil
}

func (c *client) getPayments(page int) ([]*payment, error) {
if err := c.ensureAccessTokenIsValid(); err != nil {
func (c *client) getPayments(ctx context.Context, page int) ([]*payment, error) {
if err := c.ensureAccessTokenIsValid(ctx); err != nil {
return nil, err
}

req, err := http.NewRequest(http.MethodGet, c.endpoint+"/api/v1/payments/singles", http.NoBody)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.endpoint+"/api/v1/payments/singles", http.NoBody)
if err != nil {
return nil, fmt.Errorf("failed to create login request: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func taskFetchPayments(logger sharedlogging.Logger, client *client) task.Task {
scheduler task.Scheduler[TaskDescriptor],
ingester ingestion.Ingester,
) error {
paymentsList, err := client.getAllPayments()
paymentsList, err := client.getAllPayments(ctx)
if err != nil {
return err
}
Expand Down
26 changes: 22 additions & 4 deletions internal/pkg/connectors/currencycloud/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ import (
"context"
"fmt"
"net/http"

"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

type apiTransport struct {
authToken string
authToken string
underlying *otelhttp.Transport
}

func (t *apiTransport) RoundTrip(req *http.Request) (*http.Response, error) {
req.Header.Add("X-Auth-Token", t.authToken)

return http.DefaultTransport.RoundTrip(req)
return t.underlying.RoundTrip(req)
}

type Client struct {
Expand All @@ -29,14 +32,29 @@ func (c *Client) buildEndpoint(path string, args ...interface{}) string {

const devAPIEndpoint = "https://devapi.currencycloud.com"

func newAuthenticatedHTTPClient(authToken string) *http.Client {
return &http.Client{
Transport: &apiTransport{
authToken: authToken,
underlying: otelhttp.NewTransport(http.DefaultTransport),
},
}
}

func newHTTPClient() *http.Client {
return &http.Client{
Transport: otelhttp.NewTransport(http.DefaultTransport),
}
}

// NewClient creates a new client for the CurrencyCloud API.
func NewClient(ctx context.Context, loginID, apiKey, endpoint string) (*Client, error) {
if endpoint == "" {
endpoint = devAPIEndpoint
}

c := &Client{
httpClient: &http.Client{},
httpClient: newHTTPClient(),
endpoint: endpoint,
loginID: loginID,
apiKey: apiKey,
Expand All @@ -47,7 +65,7 @@ func NewClient(ctx context.Context, loginID, apiKey, endpoint string) (*Client,
return nil, err
}

c.httpClient.Transport = &apiTransport{authToken: authToken}
c.httpClient = newAuthenticatedHTTPClient(authToken)

return c, nil
}
13 changes: 8 additions & 5 deletions internal/pkg/connectors/modulr/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ import (
"net/http"

"github.com/formancehq/payments/internal/pkg/connectors/modulr/hmac"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

type apiTransport struct {
apiKey string
headers map[string]string
apiKey string
headers map[string]string
underlying http.RoundTripper
}

func (t *apiTransport) RoundTrip(req *http.Request) (*http.Response, error) {
req.Header.Add("Authorization", t.apiKey)

return http.DefaultTransport.RoundTrip(req)
return t.underlying.RoundTrip(req)
}

type responseWrapper[t any] struct {
Expand Down Expand Up @@ -50,8 +52,9 @@ func NewClient(apiKey, apiSecret, endpoint string) (*Client, error) {
return &Client{
httpClient: &http.Client{
Transport: &apiTransport{
headers: headers,
apiKey: apiKey,
headers: headers,
apiKey: apiKey,
underlying: otelhttp.NewTransport(http.DefaultTransport),
},
},
endpoint: endpoint,
Expand Down
15 changes: 10 additions & 5 deletions internal/pkg/connectors/stripe/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"

"github.com/formancehq/payments/internal/pkg/writeonly"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

"github.com/pkg/errors"
"github.com/stripe/stripe-go/v72"
Expand Down Expand Up @@ -51,13 +52,11 @@ func (d *DefaultClient) ForAccount(account string) Client {
func (d *DefaultClient) BalanceTransactions(ctx context.Context,
options ...ClientOption,
) ([]*stripe.BalanceTransaction, bool, error) {
req, err := http.NewRequest(http.MethodGet, balanceTransactionsEndpoint, nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, balanceTransactionsEndpoint, nil)
if err != nil {
return nil, false, errors.Wrap(err, "creating http request")
}

req = req.WithContext(ctx)

for _, opt := range options {
opt.apply(req)
}
Expand Down Expand Up @@ -127,9 +126,15 @@ func (d *DefaultClient) BalanceTransactions(ctx context.Context,
return asBalanceTransactions, rsp.HasMore, nil
}

func NewDefaultClient(httpClient *http.Client, apiKey string, storage writeonly.Storage) *DefaultClient {
func newHTTPClient() *http.Client {
return &http.Client{
Transport: otelhttp.NewTransport(http.DefaultTransport),
}
}

func NewDefaultClient(apiKey string, storage writeonly.Storage) *DefaultClient {
return &DefaultClient{
httpClient: httpClient,
httpClient: newHTTPClient(),
apiKey: apiKey,
storage: storage,
}
Expand Down
3 changes: 1 addition & 2 deletions internal/pkg/connectors/stripe/task_connected_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package stripe

import (
"context"
"net/http"

"github.com/formancehq/payments/internal/pkg/ingestion"
"github.com/formancehq/payments/internal/pkg/task"
Expand Down Expand Up @@ -57,7 +56,7 @@ func ConnectedAccountTask(config Config, account string) func(ctx context.Contex
IngesterFn(func(ctx context.Context, bts []*stripe.BalanceTransaction, commitState TimelineState, tail bool) error {
return ingestBatch(ctx, logger, ingester, bts, commitState, tail)
}),
NewTimeline(NewDefaultClient(http.DefaultClient, config.APIKey, storage).
NewTimeline(NewDefaultClient(config.APIKey, storage).
ForAccount(account), config.TimelineConfig, task.MustResolveTo(ctx, resolver, TimelineState{})),
)

Expand Down
6 changes: 2 additions & 4 deletions internal/pkg/connectors/stripe/task_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package stripe

import (
"context"
"net/http"

"github.com/formancehq/go-libs/sharedlogging"
"github.com/formancehq/payments/internal/pkg/ingestion"
"github.com/formancehq/payments/internal/pkg/task"
"github.com/formancehq/payments/internal/pkg/writeonly"

"github.com/formancehq/go-libs/sharedlogging"
"github.com/pkg/errors"
"github.com/stripe/stripe-go/v72"
)
Expand Down Expand Up @@ -62,7 +60,7 @@ func MainTask(config Config) func(ctx context.Context, logger sharedlogging.Logg
) error {
return ingest(ctx, logger, scheduler, ingester, batch, commitState, tail)
}),
NewTimeline(NewDefaultClient(http.DefaultClient, config.APIKey, storage),
NewTimeline(NewDefaultClient(config.APIKey, storage),
config.TimelineConfig, task.MustResolveTo(ctx, resolver, TimelineState{})),
),
config.PollingPeriod.Duration,
Expand Down
Loading

0 comments on commit c8d90b3

Please sign in to comment.