Skip to content

Commit

Permalink
feat(payments): Add transfer initiation status updates (#1016)
Browse files Browse the repository at this point in the history
  • Loading branch information
lwagner-getmomo authored Dec 15, 2023
1 parent 3fc6712 commit fe18b6e
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func FetchAccountsTask(config Config, client *atlar_client.Rest) task.Task {
// Fetch payments after inserting all accounts in order to link them correctly
taskPayments, err := models.EncodeTaskDescriptor(TaskDescriptor{
Name: "Fetch payments from Atlar",
Key: taskNameFetchPayments,
Key: taskNameFetchTransactions,
})
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var (
paymentsAttrs = metric.WithAttributes(append(connectorAttrs, attribute.String(metrics.ObjectAttributeKey, "payments"))...)
)

func FetchPaymentsTask(config Config, client *atlar_client.Rest, account string) task.Task {
func FetchTransactionsTask(config Config, client *atlar_client.Rest) task.Task {
return func(
ctx context.Context,
logger logging.Logger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ func InitiatePaymentTask(config Config, client *atlar_client.Rest, transferID st
return err
}

ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

paymentSchemeType := "SCT" // SEPA Credit Transfer
remittanceInformationType := "UNSTRUCTURED"
remittanceInformationValue := transfer.Description
Expand Down Expand Up @@ -117,8 +114,10 @@ func InitiatePaymentTask(config Config, client *atlar_client.Rest, transferID st
},
}

requestCtx, cancel := contextutil.DetachedWithTimeout(ctx, 30*time.Second)
defer cancel()
postCreditTransfersParams := credit_transfers.PostV1CreditTransfersParams{
Context: ctx,
Context: requestCtx,
CreditTransfer: &createPaymentRequest,
}
postCreditTransferResponse, err := client.CreditTransfers.PostV1CreditTransfers(&postCreditTransfersParams)
Expand All @@ -132,12 +131,30 @@ func InitiatePaymentTask(config Config, client *atlar_client.Rest, transferID st
},
ConnectorID: connectorID,
}
err = ingester.AddTransferInitiationPaymentID(ctx, transfer, paymentID, time.Now())
if err != nil {
return err
}

err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusProcessed, "", transfer.Attempts, time.Now())
taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor{
Name: fmt.Sprintf("Update transfer initiation status of transfer %s", transfer.ID.String()),
Key: taskNameUpdatePaymentStatus,
TransferID: transfer.ID.String(),
PaymentID: paymentID.String(),
Attempt: 1,
})
if err != nil {
return err
}

err = scheduler.Schedule(ctx, taskDescriptor, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
}

return nil
}
}
Expand All @@ -149,12 +166,119 @@ func ValidateTransferInitiation(transfer *models.TransferInitiation) error {
if transfer.Description == "" {
return errors.New("description of transfer initiation can not be empty")
}
if transfer.Type.String() != "TRANSFER" {
return errors.New("this connector only supports type TRANSFER")
if transfer.Type.String() != "PAYOUT" {
return errors.New("this connector only supports type PAYOUT")
}
return nil
}

var (
updatePayoutAttrs = metric.WithAttributes(append(connectorAttrs, attribute.String(metrics.ObjectAttributeKey, "update_payout"))...)
)

func UpdatePaymentStatusTask(
config Config,
client *atlar_client.Rest,
transferID string,
stringPaymentID string,
attempt int,
) task.Task {
return func(
ctx context.Context,
logger logging.Logger,
connectorID models.ConnectorID,
resolver task.StateResolver,
scheduler task.Scheduler,
storageReader storage.Reader,
ingester ingestion.Ingester,
metricsRegistry metrics.MetricsRegistry,
) error {
paymentID := models.MustPaymentIDFromString(stringPaymentID)
transferInitiationID := models.MustTransferInitiationIDFromString(transferID)
transfer, err := getTransfer(ctx, storageReader, transferInitiationID, true)
if err != nil {
return err
}
logger.Info("attempt: ", attempt, " fetching status of ", paymentID)

attrs := updatePayoutAttrs
now := time.Now()
defer func() {
metricsRegistry.ConnectorObjectsLatency().Record(ctx, time.Since(now).Milliseconds(), attrs)
}()

defer func() {
if err != nil {
metricsRegistry.ConnectorObjectsErrors().Add(ctx, 1, attrs)
}
}()

requestCtx, cancel := contextutil.DetachedWithTimeout(ctx, 30*time.Second)
defer cancel()
getCreditTransferParams := credit_transfers.GetV1CreditTransfersGetByExternalIDExternalIDParams{
Context: requestCtx,
ExternalID: serializeAtlarPaymentExternalID(transfer.ID.Reference, transfer.Attempts),
}
getCreditTransferResponse, err := client.CreditTransfers.GetV1CreditTransfersGetByExternalIDExternalID(&getCreditTransferParams)
if err != nil {
return err
}

status := getCreditTransferResponse.Payload.Status
// Status docs: https://docs.atlar.com/docs/payment-details#payment-states--events
switch status {
case "CREATED", "APPROVED", "PENDING_SUBMISSION", "SENT", "PENDING_AT_BANK", "ACCEPTED", "EXECUTED":
taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor{
Name: fmt.Sprintf("Update transfer initiation status of transfer %s", transfer.ID.String()),
Key: taskNameUpdatePaymentStatus,
TransferID: transfer.ID.String(),
PaymentID: paymentID.String(),
Attempt: attempt + 1,
})
if err != nil {
return err
}

err = scheduler.Schedule(ctx, taskDescriptor, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_IN_DURATION,
Duration: 2 * time.Minute,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
}
return nil

case "RECONCILED":
// this is done
err = ingester.UpdateTransferInitiationPaymentsStatus(ctx, transfer, paymentID, models.TransferInitiationStatusProcessed, "", transfer.Attempts, time.Now())
if err != nil {
return err
}

return nil

case "REJECTED", "FAILED", "RETURNED":
// this has failed
err = ingester.UpdateTransferInitiationPaymentsStatus(
ctx, transfer, paymentID, models.TransferInitiationStatusFailed,
fmt.Sprintf("paymant initiation status is \"%s\"", status), transfer.Attempts, time.Now(),
)
if err != nil {
return err
}

return nil

default:
return fmt.Errorf(
"unknown status \"%s\" encountered while fetching payment initiation status of payment \"%s\"",
status, getCreditTransferResponse.Payload.ID,
)
}
}
}

func amountToString(amount big.Int, precision int) string {
raw := amount.String()
if precision < 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@ import (

const (
taskNameFetchAccounts = "fetch_accounts"
taskNameFetchPayments = "fetch_payments"
taskNameFetchTransactions = "fetch_transactions"
taskNameCreateExternalBankAccount = "create_external_bank_account"
taskNameInitiatePayment = "initiate_payment"
taskNameUpdatePaymentStatus = "update_payment_status"
)

// TaskDescriptor is the definition of a task.
type TaskDescriptor struct {
Name string `json:"name" yaml:"name" bson:"name"`
Key string `json:"key" yaml:"key" bson:"key"`
Main bool `json:"main,omitempty" yaml:"main" bson:"main"`
Account string `json:"account,omitempty" yaml:"account" bson:"account"`
BankAccount *models.BankAccount `json:"bankAccount,omitempty" yaml:"bankAccount" bson:"bankAccount"`
TransferID string `json:"transferId,omitempty" yaml:"transferId" bson:"transferId"`
PaymentID string `json:"paymentId,omitempty" yaml:"paymentId" bson:"paymentId"`
Attempt int `json:"attempt,omitempty" yaml:"attempt" bson:"attempt"`
}

func resolveTasks(logger logging.Logger, config Config) func(taskDefinition TaskDescriptor) task.Task {
Expand All @@ -34,12 +36,14 @@ func resolveTasks(logger logging.Logger, config Config) func(taskDefinition Task
switch taskDescriptor.Key {
case taskNameFetchAccounts:
return FetchAccountsTask(config, client)
case taskNameFetchPayments:
return FetchPaymentsTask(config, client, taskDescriptor.Account)
case taskNameFetchTransactions:
return FetchTransactionsTask(config, client)
case taskNameCreateExternalBankAccount:
return CreateExternalBankAccountTask(config, client, taskDescriptor.BankAccount)
case taskNameInitiatePayment:
return InitiatePaymentTask(config, client, taskDescriptor.TransferID)
case taskNameUpdatePaymentStatus:
return UpdatePaymentStatusTask(config, client, taskDescriptor.TransferID, taskDescriptor.PaymentID, taskDescriptor.Attempt)
default:
return nil
}
Expand Down

1 comment on commit fe18b6e

@vercel
Copy link

@vercel vercel bot commented on fe18b6e Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.