From 04f583fc68f611bca974562e5ce4c8ba67de8267 Mon Sep 17 00:00:00 2001 From: Paul Nicolas Date: Sun, 27 Aug 2023 01:14:14 +0200 Subject: [PATCH] link payment inititation and connectors --- .../payments/internal/app/api/connector.go | 195 ++++++++---------- .../internal/app/api/connectormodule.go | 10 +- .../payments/internal/app/api/router.go | 15 +- .../internal/app/api/transfer_intiiation.go | 63 +++++- .../mangopay/task_fetch_bank_accounts.go | 5 +- .../connectors/mangopay/task_fetch_wallets.go | 5 +- .../internal/app/integration/manager.go | 13 +- .../payments/internal/app/models/account.go | 1 + .../payments/internal/app/models/connector.go | 24 +++ .../app/models/transfer_initiation.go | 1 - .../payments/internal/app/storage/accounts.go | 1 + .../internal/app/storage/migrations.go | 2 + 12 files changed, 203 insertions(+), 132 deletions(-) diff --git a/components/payments/internal/app/api/connector.go b/components/payments/internal/app/api/connector.go index 395046d63a..167aac5449 100644 --- a/components/payments/internal/app/api/connector.go +++ b/components/payments/internal/app/api/connector.go @@ -2,12 +2,9 @@ package api import ( "encoding/json" - "math/big" "net/http" "time" - "github.com/pkg/errors" - "github.com/formancehq/payments/internal/app/storage" "github.com/google/uuid" @@ -255,107 +252,97 @@ func reset[Config models.ConnectorConfigObject]( } } -type transferRequest struct { - Amount *big.Int `json:"amount"` - Source string `json:"source"` - Destination string `json:"destination"` - Asset string `json:"asset"` - - currency string -} - -func (req *transferRequest) validate() error { - if req.Amount.Cmp(big.NewInt(0)) <= 0 { - return errors.New("amount must be greater than 0") - } - - if req.Asset == "" { - return errors.New("asset is required") - } - - if len(req.Asset) < 3 { //nolint:gomnd // allow length 3 for POC - return errors.New("asset is invalid") - } - - req.currency = req.Asset[:3] - - if req.Destination == "" { - return errors.New("destination is required") - } - - return nil -} - -type initiateTransferResponse struct { - ID string `json:"id"` -} - -func initiatePayment[Config models.ConnectorConfigObject](connectorManager *integration.ConnectorManager[Config], -) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - // var req transferRequest - - // err := json.NewDecoder(r.Body).Decode(&req) - // if err != nil { - // handleError(w, r, err) - - // return - // } - - // err = req.validate() - // if err != nil { - // handleErrorBadRequest(w, r, err) - - // return - // } - - // installed, err := connectorManager.IsInstalled(r.Context()) - // if err != nil { - // handleError(w, r, err) - - // return - // } - - // if !installed { - // handleError(w, r, errors.New("connector not installed")) - - // return - // } - - // transfer := integration.Transfer{ - // Source: req.Source, - // Destination: req.Destination, - // Currency: req.currency, - // Amount: req.Amount, - // } - - // transferID, err := connectorManager.InitiateTransfer(r.Context(), transfer) - // if err != nil { - // handleError(w, r, err) - - // return - // } - - // err = json.NewEncoder(w).Encode(api.BaseResponse[initiateTransferResponse]{ - // Data: &initiateTransferResponse{ - // ID: transferID.String(), - // }, - // }) - // if err != nil { - // panic(err) - // } - } -} - -type listTransfersResponseElement struct { - ID string `json:"id"` - Source string `json:"source"` - Destination string `json:"destination"` - Amount *big.Int `json:"amount"` - Currency string `json:"asset"` - Status string `json:"status"` - Error *string `json:"error"` -} +// type transferRequest struct { +// Amount *big.Int `json:"amount"` +// Source string `json:"source"` +// Destination string `json:"destination"` +// Asset string `json:"asset"` + +// currency string +// } + +// func (req *transferRequest) validate() error { +// if req.Amount.Cmp(big.NewInt(0)) <= 0 { +// return errors.New("amount must be greater than 0") +// } + +// if req.Asset == "" { +// return errors.New("asset is required") +// } + +// if len(req.Asset) < 3 { //nolint:gomnd // allow length 3 for POC +// return errors.New("asset is invalid") +// } + +// req.currency = req.Asset[:3] + +// if req.Destination == "" { +// return errors.New("destination is required") +// } + +// return nil +// } + +// type initiateTransferResponse struct { +// ID string `json:"id"` +// } + +// func initiatePayment[Config models.ConnectorConfigObject](connectorManager *integration.ConnectorManager[Config], +// ) http.HandlerFunc { +// return func(w http.ResponseWriter, r *http.Request) { +// var req transferRequest + +// err := json.NewDecoder(r.Body).Decode(&req) +// if err != nil { +// handleError(w, r, err) + +// return +// } + +// err = req.validate() +// if err != nil { +// handleErrorBadRequest(w, r, err) + +// return +// } + +// installed, err := connectorManager.IsInstalled(r.Context()) +// if err != nil { +// handleError(w, r, err) + +// return +// } + +// if !installed { +// handleError(w, r, errors.New("connector not installed")) + +// return +// } + +// transfer := integration.Transfer{ +// Source: req.Source, +// Destination: req.Destination, +// Currency: req.currency, +// Amount: req.Amount, +// } + +// transferID, err := connectorManager.InitiateTransfer(r.Context(), transfer) +// if err != nil { +// handleError(w, r, err) + +// return +// } + +// err = json.NewEncoder(w).Encode(api.BaseResponse[initiateTransferResponse]{ +// Data: &initiateTransferResponse{ +// ID: transferID.String(), +// }, +// }) +// if err != nil { +// panic(err) +// } +// } +// } func connectorNotInstalled[Config models.ConnectorConfigObject](connectorManager *integration.ConnectorManager[Config], w http.ResponseWriter, r *http.Request, diff --git a/components/payments/internal/app/api/connectormodule.go b/components/payments/internal/app/api/connectormodule.go index 9bce2151b0..07d2b30f26 100644 --- a/components/payments/internal/app/api/connectormodule.go +++ b/components/payments/internal/app/api/connectormodule.go @@ -23,6 +23,9 @@ import ( type connectorHandler struct { Handler http.Handler Provider models.ConnectorProvider + + // TODO(polo): refactor to remove this ugly hack to access the connector manager + initiatePayment func(ctx context.Context, transfer models.TransferInitiation) error } func addConnector[ConnectorConfig models.ConnectorConfigObject](loader integration.Loader[ConnectorConfig], @@ -51,14 +54,15 @@ func addConnector[ConnectorConfig models.ConnectorConfigObject](loader integrati }, resolver, metricsRegistry, maxTasks) }) - return integration.NewConnectorManager[ConnectorConfig]( + return integration.NewConnectorManager( store, loader, schedulerFactory, publisher) }), fx.Provide(fx.Annotate(func(cm *integration.ConnectorManager[ConnectorConfig], ) connectorHandler { return connectorHandler{ - Handler: connectorRouter(loader.Name(), cm), - Provider: loader.Name(), + Handler: connectorRouter(loader.Name(), cm), + Provider: loader.Name(), + initiatePayment: cm.InitiatePayment, } }, fx.ResultTags(`group:"connectorHandlers"`))), fx.Invoke(func(lc fx.Lifecycle, cm *integration.ConnectorManager[ConnectorConfig]) { diff --git a/components/payments/internal/app/api/router.go b/components/payments/internal/app/api/router.go index 096a1521f6..c5cbad8e9a 100644 --- a/components/payments/internal/app/api/router.go +++ b/components/payments/internal/app/api/router.go @@ -54,9 +54,13 @@ func httpRouter( authGroup.Path("/accounts/{accountID}").Methods(http.MethodGet).Handler(readAccountHandler(store)) authGroup.Path("/accounts/{accountID}/balances").Methods(http.MethodGet).Handler(listBalancesForAccount(store)) - authGroup.Path("/transfer-initiation").Methods(http.MethodPost).Handler(createTransferInitiationHandler(store)) + paymentsHandlers := make(map[models.ConnectorProvider]paymentHandler) + for _, h := range connectorHandlers { + paymentsHandlers[h.Provider] = h.initiatePayment + } + authGroup.Path("/transfer-initiation").Methods(http.MethodPost).Handler(createTransferInitiationHandler(store, paymentsHandlers)) authGroup.Path("/transfer-initiation").Methods(http.MethodGet).Handler(listTransferInitiationsHandler(store)) - authGroup.Path("/transfer-initiation/{transferID}/status").Methods(http.MethodPost).Handler(udateTransferInitiationStatusHandler(store)) + authGroup.Path("/transfer-initiation/{transferID}/status").Methods(http.MethodPost).Handler(udateTransferInitiationStatusHandler(store, paymentsHandlers)) authGroup.Path("/transfer-initiation/{transferID}").Methods(http.MethodGet).Handler(readTransferInitiationHandler(store)) authGroup.Path("/transfer-initiation/{transferID}").Methods(http.MethodDelete).Handler(deleteTransferInitiationHandler(store)) @@ -66,12 +70,6 @@ func httpRouter( connectorGroup.Path("/configs").Handler(connectorConfigsHandler()) - // Deprecated - // TODO: Remove this endpoint - // Use /connectors/stripe/transfers instead - connectorGroup.Path("/stripe/transfers").Methods(http.MethodPost). - Handler(handleStripeTransfers(store)) - for _, h := range connectorHandlers { connectorGroup.PathPrefix("/" + h.Provider.String()).Handler( http.StripPrefix("/connectors", h.Handler)) @@ -95,7 +93,6 @@ func connectorRouter[Config models.ConnectorConfigObject]( addRoute(r, provider, "/reset", http.MethodPost, reset(manager)) addRoute(r, provider, "/tasks", http.MethodGet, listTasks(manager)) addRoute(r, provider, "/tasks/{taskID}", http.MethodGet, readTask(manager)) - addRoute(r, provider, "/payments", http.MethodPost, initiatePayment(manager)) return r } diff --git a/components/payments/internal/app/api/transfer_intiiation.go b/components/payments/internal/app/api/transfer_intiiation.go index 1b250ead8a..0426980315 100644 --- a/components/payments/internal/app/api/transfer_intiiation.go +++ b/components/payments/internal/app/api/transfer_intiiation.go @@ -16,6 +16,8 @@ import ( "github.com/pkg/errors" ) +type paymentHandler func(ctx context.Context, transfer models.TransferInitiation) error + type transferInitiationResponse struct { ID uuid.UUID `json:"id"` CreatedAt time.Time `json:"createdAt"` @@ -46,7 +48,10 @@ type createTransferInitiationRepository interface { CreateTransferInitiation(ctx context.Context, transferInitiation *models.TransferInitiation) error } -func createTransferInitiationHandler(repo createTransferInitiationRepository) http.HandlerFunc { +func createTransferInitiationHandler( + repo createTransferInitiationRepository, + paymentHandlers map[models.ConnectorProvider]paymentHandler, +) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") @@ -84,6 +89,13 @@ func createTransferInitiationHandler(repo createTransferInitiationRepository) ht return } + provider, err := models.ConnectorProviderFromString(payload.Provider) + if err != nil { + handleValidationError(w, r, err) + + return + } + if payload.Amount == nil { handleValidationError(w, r, errors.New("amount is required")) @@ -111,6 +123,7 @@ func createTransferInitiationHandler(repo createTransferInitiationRepository) ht Description: payload.Description, SourceAccountID: *sourceAccountID, DestinationAccountID: *destinationAccountID, + Provider: provider, Type: transferInitiationType, Amount: payload.Amount, Asset: models.Asset(payload.Asset), @@ -124,7 +137,19 @@ func createTransferInitiationHandler(repo createTransferInitiationRepository) ht } if status == models.TransferInitiationStatusProcessing { - // TODO(polo): link with payouts endpoint + f, ok := paymentHandlers[provider] + if !ok { + handleServerError(w, r, errors.New("no payment handler for provider "+provider.String())) + + return + } + + err = f(r.Context(), *tf) + if err != nil { + handleServerError(w, r, err) + + return + } } data := &transferInitiationResponse{ @@ -153,6 +178,7 @@ func createTransferInitiationHandler(repo createTransferInitiationRepository) ht } type udateTransferInitiationStatusRepository interface { + ReadTransferInitiation(ctx context.Context, id uuid.UUID) (*models.TransferInitiation, error) UpdateTransferInitiationStatus(ctx context.Context, id uuid.UUID, status models.TransferInitiationStatus) error } @@ -160,7 +186,10 @@ type updateTransferInitiationStatusRequest struct { Status string `json:"status"` } -func udateTransferInitiationStatusHandler(repo udateTransferInitiationStatusRepository) http.HandlerFunc { +func udateTransferInitiationStatusHandler( + repo udateTransferInitiationStatusRepository, + paymentHandlers map[models.ConnectorProvider]paymentHandler, +) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { payload := &updateTransferInitiationStatusRequest{} if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { @@ -189,11 +218,25 @@ func udateTransferInitiationStatusHandler(repo udateTransferInitiationStatusRepo return } + previousTransferInitiation, err := repo.ReadTransferInitiation(r.Context(), transferID) + if err != nil { + handleStorageErrors(w, r, err) + + return + } + + if previousTransferInitiation.Status != models.TransferInitiationStatusWaitingForValidation { + handleValidationError(w, r, errors.New("only waiting for validation transfer initiation can be updated")) + + return + } + isValidated := false if status == models.TransferInitiationStatusValidated { isValidated = true status = models.TransferInitiationStatusProcessing } + previousTransferInitiation.Status = status err = repo.UpdateTransferInitiationStatus(r.Context(), transferID, status) if err != nil { @@ -203,7 +246,19 @@ func udateTransferInitiationStatusHandler(repo udateTransferInitiationStatusRepo } if isValidated { - // TODO: link with payouts endpoint + f, ok := paymentHandlers[previousTransferInitiation.Provider] + if !ok { + handleServerError(w, r, errors.New("no payment handler for provider "+previousTransferInitiation.Provider.String())) + + return + } + + err = f(r.Context(), *previousTransferInitiation) + if err != nil { + handleServerError(w, r, err) + + return + } } w.WriteHeader(http.StatusNoContent) diff --git a/components/payments/internal/app/connectors/mangopay/task_fetch_bank_accounts.go b/components/payments/internal/app/connectors/mangopay/task_fetch_bank_accounts.go index a2a05bb4b6..a96b0ae5de 100644 --- a/components/payments/internal/app/connectors/mangopay/task_fetch_bank_accounts.go +++ b/components/payments/internal/app/connectors/mangopay/task_fetch_bank_accounts.go @@ -61,7 +61,10 @@ func taskFetchBankAccounts(logger logging.Logger, client *client.Client, userID Provider: models.ConnectorProviderMangopay, AccountName: bankAccount.OwnerName, Type: models.AccountTypeExternal, - RawData: buf, + Metadata: map[string]string{ + "user_id": userID, + }, + RawData: buf, }) } diff --git a/components/payments/internal/app/connectors/mangopay/task_fetch_wallets.go b/components/payments/internal/app/connectors/mangopay/task_fetch_wallets.go index fdffb93aea..d62b341826 100644 --- a/components/payments/internal/app/connectors/mangopay/task_fetch_wallets.go +++ b/components/payments/internal/app/connectors/mangopay/task_fetch_wallets.go @@ -82,7 +82,10 @@ func taskFetchWallets(logger logging.Logger, client *client.Client, userID strin AccountName: wallet.Description, // Wallets are internal accounts on our side, since we // can have their balances. - Type: models.AccountTypeInternal, + Type: models.AccountTypeInternal, + Metadata: map[string]string{ + "user_id": userID, + }, RawData: buf, }) diff --git a/components/payments/internal/app/integration/manager.go b/components/payments/internal/app/integration/manager.go index bc0723c77f..8af4fb8d33 100644 --- a/components/payments/internal/app/integration/manager.go +++ b/components/payments/internal/app/integration/manager.go @@ -6,18 +6,13 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/formancehq/payments/internal/app/messages" - "github.com/formancehq/payments/pkg/events" - - "github.com/formancehq/stack/libs/go-libs/publish" - - "github.com/formancehq/payments/internal/app/storage" - - "github.com/google/uuid" - "github.com/formancehq/payments/internal/app/models" - + "github.com/formancehq/payments/internal/app/storage" "github.com/formancehq/payments/internal/app/task" + "github.com/formancehq/payments/pkg/events" "github.com/formancehq/stack/libs/go-libs/logging" + "github.com/formancehq/stack/libs/go-libs/publish" + "github.com/google/uuid" "github.com/pkg/errors" ) diff --git a/components/payments/internal/app/models/account.go b/components/payments/internal/app/models/account.go index 6a5290998d..92cd1296b8 100644 --- a/components/payments/internal/app/models/account.go +++ b/components/payments/internal/app/models/account.go @@ -24,6 +24,7 @@ type Account struct { DefaultAsset Asset `bun:"default_currency"` // Is optional and default to '' AccountName string // Is optional and default to '' Type AccountType + Metadata map[string]string RawData json.RawMessage } diff --git a/components/payments/internal/app/models/connector.go b/components/payments/internal/app/models/connector.go index 686678c3b8..ac93395cd1 100644 --- a/components/payments/internal/app/models/connector.go +++ b/components/payments/internal/app/models/connector.go @@ -2,6 +2,7 @@ package models import ( "encoding/json" + "errors" "fmt" "strings" "time" @@ -58,6 +59,29 @@ func (p ConnectorProvider) StringLower() string { return strings.ToLower(string(p)) } +func ConnectorProviderFromString(s string) (ConnectorProvider, error) { + switch s { + case "BANKING-CIRCLE": + return ConnectorProviderBankingCircle, nil + case "CURRENCY-CLOUD": + return ConnectorProviderCurrencyCloud, nil + case "DUMMY-PAY": + return ConnectorProviderDummyPay, nil + case "MODULR": + return ConnectorProviderModulr, nil + case "STRIPE": + return ConnectorProviderStripe, nil + case "WISE": + return ConnectorProviderWise, nil + case "MANGOPAY": + return ConnectorProviderMangopay, nil + case "MONEYCORP": + return ConnectorProviderMoneycorp, nil + default: + return "", errors.New("unknown connector provider") + } +} + func (c Connector) ParseConfig(to interface{}) error { if c.Config == nil { return nil diff --git a/components/payments/internal/app/models/transfer_initiation.go b/components/payments/internal/app/models/transfer_initiation.go index 7789d3ea30..1a49021511 100644 --- a/components/payments/internal/app/models/transfer_initiation.go +++ b/components/payments/internal/app/models/transfer_initiation.go @@ -93,7 +93,6 @@ type TransferInitiation struct { SourceAccountID AccountID DestinationAccountID AccountID Provider ConnectorProvider - ProviderSpecificData map[string]interface{} Amount *big.Int `bun:"type:numeric"` Asset Asset diff --git a/components/payments/internal/app/storage/accounts.go b/components/payments/internal/app/storage/accounts.go index 2ab04254fe..44ac9bf911 100644 --- a/components/payments/internal/app/storage/accounts.go +++ b/components/payments/internal/app/storage/accounts.go @@ -38,6 +38,7 @@ func (s *Storage) UpsertAccounts(ctx context.Context, provider models.ConnectorP Set("raw_data = EXCLUDED.raw_data"). Set("default_currency = EXCLUDED.default_currency"). Set("account_name = EXCLUDED.account_name"). + Set("metadata = EXCLUDED.metadata"). Exec(ctx) if err != nil { return e("failed to create accounts", err) diff --git a/components/payments/internal/app/storage/migrations.go b/components/payments/internal/app/storage/migrations.go index ea9251cafc..3a31f9379e 100644 --- a/components/payments/internal/app/storage/migrations.go +++ b/components/payments/internal/app/storage/migrations.go @@ -485,6 +485,8 @@ func registerMigrations(migrator *migrations.Migrator) { // we wanna reset the connector, but the connector_id was not // added, hence the table will not be cleaned up when resetting. _, err := tx.Exec(` + ALTER TABLE accounts.account ADD COLUMN IF NOT EXISTS metadata jsonb; + CREATE SCHEMA IF NOT EXISTS transfers; CREATE TABLE IF NOT EXISTS transfers.transfer_initiation (