diff --git a/pkg/bridge/connectors/wise/client.go b/pkg/bridge/connectors/wise/client.go index c5f32d42..2776129d 100644 --- a/pkg/bridge/connectors/wise/client.go +++ b/pkg/bridge/connectors/wise/client.go @@ -7,9 +7,7 @@ import ( "net/http" ) -const ( - apiEndpoint = "https://api.wise.com" -) +const apiEndpoint = "https://api.wise.com" type apiTransport struct { ApiKey string @@ -20,16 +18,16 @@ func (t *apiTransport) RoundTrip(req *http.Request) (*http.Response, error) { return http.DefaultTransport.RoundTrip(req) } -type Client struct { +type client struct { httpClient *http.Client } -type Profile struct { - Id uint64 `json:"id"` +type profile struct { + ID uint64 `json:"id"` Type string `json:"type"` } -type Transfer struct { +type transfer struct { ID uint64 `json:"id"` Reference string `json:"reference"` Status string `json:"status"` @@ -39,7 +37,7 @@ type Transfer struct { TargetAccount uint64 `json:"targetAccount"` TargetCurrency string `json:"targetCurrency"` TargetValue float64 `json:"targetValue"` - Business string `json:"business"` + Business uint64 `json:"business"` Created string `json:"created"` CustomerTransactionId string `json:"customerTransactionId"` Details struct { @@ -49,26 +47,14 @@ type Transfer struct { User uint64 `json:"user"` } -type BalanceAccount struct { - ID uint64 `json:"id"` - Type string `json:"type"` - Currency string `json:"currency"` - CreationTime string `json:"creationTime"` - Name string `json:"name"` - Amount struct { - Value float64 `json:"value"` - Currency string `json:"currency"` - } `json:"amount"` -} - -func (w *Client) Endpoint(path string) string { +func (w *client) endpoint(path string) string { return fmt.Sprintf("%s/%s", apiEndpoint, path) } -func (w *Client) GetProfiles() ([]Profile, error) { - var profiles []Profile +func (w *client) getProfiles() ([]profile, error) { + var profiles []profile - res, err := w.httpClient.Get(w.Endpoint("v1/profiles")) + res, err := w.httpClient.Get(w.endpoint("v1/profiles")) if err != nil { return profiles, err } @@ -83,23 +69,23 @@ func (w *Client) GetProfiles() ([]Profile, error) { return profiles, nil } -func (w *Client) GetTransfers(profile *Profile) ([]Transfer, error) { - var transfers []Transfer +func (w *client) getTransfers(profile *profile) ([]transfer, error) { + var transfers []transfer limit := 10 offset := 0 for { - var ts []Transfer + var ts []transfer - req, err := http.NewRequest(http.MethodGet, w.Endpoint("v1/transfers"), nil) + req, err := http.NewRequest(http.MethodGet, w.endpoint("v1/transfers"), nil) if err != nil { return transfers, err } q := req.URL.Query() q.Add("limit", fmt.Sprintf("%d", limit)) - q.Add("profile", fmt.Sprintf("%d", profile.Id)) + q.Add("profile", fmt.Sprintf("%d", profile.ID)) q.Add("offset", fmt.Sprintf("%d", offset)) req.URL.RawQuery = q.Encode() @@ -126,14 +112,14 @@ func (w *Client) GetTransfers(profile *Profile) ([]Transfer, error) { return transfers, nil } -func NewClient(apiKey string) *Client { +func newClient(apiKey string) *client { httpClient := &http.Client{ Transport: &apiTransport{ ApiKey: apiKey, }, } - return &Client{ + return &client{ httpClient: httpClient, } } diff --git a/pkg/bridge/connectors/wise/config.go b/pkg/bridge/connectors/wise/config.go new file mode 100644 index 00000000..731fa583 --- /dev/null +++ b/pkg/bridge/connectors/wise/config.go @@ -0,0 +1,13 @@ +package wise + +type Config struct { + APIKey string `json:"apiKey" yaml:"apiKey" bson:"apiKey"` +} + +func (c Config) Validate() error { + if c.APIKey == "" { + return ErrMissingAPIKey + } + + return nil +} diff --git a/pkg/bridge/connectors/wise/errors.go b/pkg/bridge/connectors/wise/errors.go new file mode 100644 index 00000000..a4007d12 --- /dev/null +++ b/pkg/bridge/connectors/wise/errors.go @@ -0,0 +1,11 @@ +package wise + +import "github.com/pkg/errors" + +var ( + // ErrMissingTask is returned when the task is missing. + ErrMissingTask = errors.New("task is not implemented") + + // ErrMissingAPIKey is returned when the api key is missing from config. + ErrMissingAPIKey = errors.New("missing apiKey from config") +) diff --git a/pkg/bridge/connectors/wise/loader.go b/pkg/bridge/connectors/wise/loader.go new file mode 100644 index 00000000..1d44f011 --- /dev/null +++ b/pkg/bridge/connectors/wise/loader.go @@ -0,0 +1,25 @@ +package wise + +import ( + "github.com/numary/go-libs/sharedlogging" + "github.com/numary/payments/pkg/bridge/integration" + "github.com/numary/payments/pkg/bridge/task" +) + +// NewLoader creates a new loader. +func NewLoader() integration.Loader[Config, TaskDefinition] { + loader := integration.NewLoaderBuilder[Config, TaskDefinition]("wise"). + WithLoad(func(logger sharedlogging.Logger, config Config) integration.Connector[TaskDefinition] { + return integration.NewConnectorBuilder[TaskDefinition](). + WithInstall(func(ctx task.ConnectorContext[TaskDefinition]) error { + return ctx.Scheduler(). + Schedule( + TaskDefinition{Name: taskNameFetchProfiles}, + false) + }). + WithResolve(resolveTasks(logger, config)). + Build() + }).Build() + + return loader +} diff --git a/pkg/bridge/connectors/wise/task_fetch_profiles.go b/pkg/bridge/connectors/wise/task_fetch_profiles.go new file mode 100644 index 00000000..c83502cb --- /dev/null +++ b/pkg/bridge/connectors/wise/task_fetch_profiles.go @@ -0,0 +1,39 @@ +package wise + +import ( + "context" + "fmt" + + "github.com/numary/go-libs/sharedlogging" + "github.com/numary/payments/pkg/bridge/task" +) + +func taskFetchProfiles(logger sharedlogging.Logger, config Config) task.Task { + return func( + ctx context.Context, + scheduler task.Scheduler[TaskDefinition], + ) error { + client := newClient(config.APIKey) + + profiles, err := client.getProfiles() + if err != nil { + return err + } + + for _, profile := range profiles { + logger.Infof(fmt.Sprintf("scheduling fetch-transfers: %d", profile.ID)) + + def := TaskDefinition{ + Name: taskNameFetchTransfers, + ProfileID: profile.ID, + } + + err = scheduler.Schedule(def, false) + if err != nil { + return err + } + } + + return nil + } +} diff --git a/pkg/bridge/connectors/wise/task_fetch_transfers.go b/pkg/bridge/connectors/wise/task_fetch_transfers.go new file mode 100644 index 00000000..7f3b3e77 --- /dev/null +++ b/pkg/bridge/connectors/wise/task_fetch_transfers.go @@ -0,0 +1,50 @@ +package wise + +import ( + "context" + "fmt" + + "github.com/numary/go-libs/sharedlogging" + payments "github.com/numary/payments/pkg" + "github.com/numary/payments/pkg/bridge/ingestion" + "github.com/numary/payments/pkg/bridge/task" +) + +func taskFetchTransfers(logger sharedlogging.Logger, config Config, profileID uint64) task.Task { + return func( + ctx context.Context, + scheduler task.Scheduler[TaskDefinition], + ingester ingestion.Ingester, + ) error { + client := newClient(config.APIKey) + + transfers, err := client.getTransfers(&profile{ + ID: profileID, + }) + + if err != nil { + return err + } + + batch := ingestion.Batch{} + + for _, transfer := range transfers { + logger.Info(transfer) + batch = append(batch, ingestion.BatchElement{ + Referenced: payments.Referenced{ + Reference: fmt.Sprintf("%d", transfer.ID), + Type: "transfer", + }, + Payment: &payments.Data{ + Status: payments.StatusSucceeded, + Scheme: payments.SchemeOther, + InitialAmount: int64(transfer.TargetValue * 100), + Asset: fmt.Sprintf("%s/2", transfer.TargetCurrency), + Raw: transfer, + }, + }) + } + + return ingester.Ingest(ctx, batch, struct{}{}) + } +} diff --git a/pkg/bridge/connectors/wise/task_resolve.go b/pkg/bridge/connectors/wise/task_resolve.go new file mode 100644 index 00000000..a7d8f450 --- /dev/null +++ b/pkg/bridge/connectors/wise/task_resolve.go @@ -0,0 +1,35 @@ +package wise + +import ( + "fmt" + + "github.com/numary/go-libs/sharedlogging" + "github.com/numary/payments/pkg/bridge/task" +) + +const ( + taskNameFetchTransfers = "fetch-transfers" + taskNameFetchProfiles = "fetch-profiles" +) + +// TaskDefinition is the definition of a task. +type TaskDefinition struct { + Name string `json:"name" yaml:"name" bson:"name"` + ProfileID uint64 `json:"profileID" yaml:"profileID" bson:"profileID"` +} + +func resolveTasks(logger sharedlogging.Logger, config Config) func(taskDefinition TaskDefinition) task.Task { + return func(taskDefinition TaskDefinition) task.Task { + switch taskDefinition.Name { + case taskNameFetchProfiles: + return taskFetchProfiles(logger, config) + case taskNameFetchTransfers: + return taskFetchTransfers(logger, config, taskDefinition.ProfileID) + } + + // This should never happen. + return func() error { + return fmt.Errorf("key '%s': %w", taskDefinition.Name, ErrMissingTask) + } + } +} diff --git a/pkg/bridge/connectors/wise/tasks.go b/pkg/bridge/connectors/wise/tasks.go deleted file mode 100644 index 7b44024a..00000000 --- a/pkg/bridge/connectors/wise/tasks.go +++ /dev/null @@ -1 +0,0 @@ -package wise diff --git a/pkg/bridge/connectors/wise/wise.go b/pkg/bridge/connectors/wise/wise.go deleted file mode 100644 index 8e920a3f..00000000 --- a/pkg/bridge/connectors/wise/wise.go +++ /dev/null @@ -1,109 +0,0 @@ -package wise - -import ( - "context" - "fmt" - - "github.com/numary/go-libs/sharedlogging" - payments "github.com/numary/payments/pkg" - "github.com/numary/payments/pkg/bridge/ingestion" - "github.com/numary/payments/pkg/bridge/integration" - "github.com/numary/payments/pkg/bridge/task" -) - -const ( - taskNameFetchTransfers = "fetch-transfers" - taskNameFetchProfiles = "fetch-profiles" -) - -type Config struct { - ApiKey string `json:"apiKey" yaml:"apiKey" bson:"apiKey"` -} - -func (c Config) Validate() error { - return nil -} - -type TaskDefinition struct { - Name string `json:"name" yaml:"name" bson:"name"` - ProfileId uint64 `json:"profileId" yaml:"profileId" bson:"profileId"` -} - -func NewLoader() integration.Loader[Config, TaskDefinition] { - loader := integration.NewLoaderBuilder[Config, TaskDefinition]("wise"). - WithLoad(func(logger sharedlogging.Logger, config Config) integration.Connector[TaskDefinition] { - return integration.NewConnectorBuilder[TaskDefinition](). - WithInstall(func(ctx task.ConnectorContext[TaskDefinition]) error { - return ctx.Scheduler().Schedule(TaskDefinition{ - Name: "fetch-profiles", - }, false) - }). - WithResolve(func(def TaskDefinition) task.Task { - if def.Name == taskNameFetchProfiles { - return func( - ctx context.Context, - scheduler task.Scheduler[TaskDefinition], - ) error { - client := NewClient(config.ApiKey) - - profiles, err := client.GetProfiles() - if err != nil { - return err - } - - for _, profile := range profiles { - logger.Infof(fmt.Sprintf("scheduling fetch-transfers: %d", profile.Id)) - err = scheduler.Schedule(TaskDefinition{ - Name: taskNameFetchTransfers, - ProfileId: profile.Id, - }, false) - if err != nil { - return err - } - } - - return nil - } - } - return func( - ctx context.Context, - scheduler task.Scheduler[TaskDefinition], - ingester ingestion.Ingester, - ) error { - client := NewClient(config.ApiKey) - - transfers, err := client.GetTransfers(&Profile{ - Id: def.ProfileId, - }) - - if err != nil { - return err - } - - batch := ingestion.Batch{} - - for _, transfer := range transfers { - logger.Info(transfer) - batch = append(batch, ingestion.BatchElement{ - Referenced: payments.Referenced{ - Reference: fmt.Sprintf("%d", transfer.ID), - Type: "transfer", - }, - Payment: &payments.Data{ - Status: payments.StatusSucceeded, - Scheme: payments.SchemeOther, - InitialAmount: int64(transfer.TargetValue * 100), - Asset: fmt.Sprintf("%s/2", transfer.TargetCurrency), - Raw: transfer, - }, - }) - } - - return ingester.Ingest(ctx, batch, struct{}{}) - } - }). - Build() - }).Build() - - return loader -} diff --git a/swagger.yml b/swagger.yml index e89deaa8..9a66edb5 100644 --- a/swagger.yml +++ b/swagger.yml @@ -72,6 +72,7 @@ paths: enum: - stripe - dummypay + - wise required: true requestBody: required: true @@ -193,6 +194,7 @@ components: oneOf: - $ref: '#/components/schemas/StripeConfig' - $ref: '#/components/schemas/DummyPayConfig' + - $ref: '#/components/schemas/WiseConfig' ConnectorTask: oneOf: - $ref: '#/components/schemas/StripeTask' @@ -252,6 +254,14 @@ components: directory: type: string example: '/tmp/dummypay' + WiseConfig: + type: object + required: + - apiKey + properties: + apiKey: + type: string + example: 'XXX' ListPaymentsResponse: type: object required: