Skip to content

Commit

Permalink
fix: fix payments balance pagination (#465)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored Aug 2, 2023
1 parent 508e566 commit 5063539
Show file tree
Hide file tree
Showing 18 changed files with 177 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: stack.formance.com/v1beta3
kind: Migration
metadata:
annotations:
reloader.stakater.com/auto: "true"
generation: 1
labels:
stack: "true"
name: payments-v0.9.4-pre-upgrade
namespace: monopod-disabled-one-service
spec:
configuration: monopod-disabled-one-service
module: payments
postUpgrade: false
targetedVersion: v0.9.4
version: monopod-disabled-one-service
status:
terminated: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: stack.formance.com/v1beta3
kind: Migration
metadata:
annotations:
reloader.stakater.com/auto: "true"
generation: 1
labels:
stack: "true"
name: payments-v0.9.4-pre-upgrade
namespace: monopod-latest
spec:
configuration: monopod-latest
module: payments
postUpgrade: false
targetedVersion: v0.9.4
version: monopod-latest
status:
terminated: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: stack.formance.com/v1beta3
kind: Migration
metadata:
annotations:
reloader.stakater.com/auto: "true"
generation: 1
labels:
stack: "true"
name: payments-v0.9.4-pre-upgrade
namespace: monopod-ledgerv1
spec:
configuration: monopod-ledgerv1
module: payments
postUpgrade: false
targetedVersion: v0.9.4
version: monopod-ledgerv1
status:
terminated: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: stack.formance.com/v1beta3
kind: Migration
metadata:
annotations:
reloader.stakater.com/auto: "true"
generation: 1
labels:
stack: "true"
name: payments-v0.9.4-pre-upgrade
namespace: monopod-search-before-v0-7-0
spec:
configuration: monopod-search-before-v0-7-0
module: payments
postUpgrade: false
targetedVersion: v0.9.4
version: monopod-search-before-v0-7-0
status:
terminated: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: stack.formance.com/v1beta3
kind: Migration
metadata:
annotations:
reloader.stakater.com/auto: "true"
generation: 1
labels:
stack: "true"
name: payments-v0.9.4-pre-upgrade
namespace: multipod-debug
spec:
configuration: multipod-debug
module: payments
postUpgrade: false
targetedVersion: v0.9.4
version: multipod-debug
status:
terminated: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: stack.formance.com/v1beta3
kind: Migration
metadata:
annotations:
reloader.stakater.com/auto: "true"
generation: 1
labels:
stack: "true"
name: payments-v0.9.4-pre-upgrade
namespace: multipod-disabled-one-service
spec:
configuration: multipod-disabled-one-service
module: payments
postUpgrade: false
targetedVersion: v0.9.4
version: multipod-disabled-one-service
status:
terminated: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: stack.formance.com/v1beta3
kind: Migration
metadata:
annotations:
reloader.stakater.com/auto: "true"
generation: 1
labels:
stack: "true"
name: payments-v0.9.4-pre-upgrade
namespace: multipod-latest
spec:
configuration: multipod-latest
module: payments
postUpgrade: false
targetedVersion: v0.9.4
version: multipod-latest
status:
terminated: true
9 changes: 9 additions & 0 deletions components/operator/internal/handlers/handler_payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ func init() {
return paymentsServices(ctx, env)
},
},
"v0.9.4": {
PreUpgrade: func(ctx modules.Context) error {
// Add payment accounts
return paymentsPreUpgradeMigration(ctx)
},
Services: func(ctx modules.ModuleContext) modules.Services {
return paymentsServices(ctx, env)
},
},
},
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func ingestAccountsBatch(
accountsBatch := ingestion.AccountBatch{}
balanceBatch := ingestion.BalanceBatch{}

now := time.Now()
for _, account := range accounts {
raw, err := json.Marshal(account)
if err != nil {
Expand Down Expand Up @@ -121,6 +120,7 @@ func ingestAccountsBatch(
var amountInt big.Int
amount.Mul(&amount, big.NewFloat(100)).Int(&amountInt)

now := time.Now()
balanceBatch = append(balanceBatch, &models.Balance{
AccountID: models.AccountID{
Reference: account.AccountID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func ingestBalancesBatch(
balances []*client.Balance,
) error {
batch := ingestion.BalanceBatch{}
now := time.Now()
for _, balance := range balances {
var amount big.Float
_, ok := amount.SetString(balance.Amount)
Expand All @@ -77,6 +76,7 @@ func ingestBalancesBatch(
var amountInt big.Int
amount.Mul(&amount, big.NewFloat(100)).Int(&amountInt)

now := time.Now()
batch = append(batch, &models.Balance{
AccountID: models.AccountID{
Reference: balance.AccountID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func taskFetchWallets(logger logging.Logger, client *client.Client, userID strin
var accountBatch ingestion.AccountBatch
var balanceBatch ingestion.BalanceBatch
var transactionTasks []models.TaskDescriptor
now := time.Now()
for _, wallet := range pagedWallets {
transactionTask, err := models.EncodeTaskDescriptor(TaskDescriptor{
Name: "Fetch transactions from client by user and wallets",
Expand Down Expand Up @@ -91,6 +90,7 @@ func taskFetchWallets(logger logging.Logger, client *client.Client, userID strin
return fmt.Errorf("failed to parse amount: %s", wallet.Balance.Amount.String())
}

now := time.Now()
balanceBatch = append(balanceBatch, &models.Balance{
AccountID: models.AccountID{
Reference: wallet.ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func ingestAccountsBatch(
accountsBatch := ingestion.AccountBatch{}
balancesBatch := ingestion.BalanceBatch{}

now := time.Now()
for _, account := range accounts {
raw, err := json.Marshal(account)
if err != nil {
Expand Down Expand Up @@ -118,6 +117,7 @@ func ingestAccountsBatch(
var balance big.Int
amount.Mul(&amount, big.NewFloat(100)).Int(&balance)

now := time.Now()
balancesBatch = append(balancesBatch, &models.Balance{
AccountID: models.AccountID{
Reference: account.ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func ingestBalancesBatch(
balances []*client.Balance,
) error {
batch := ingestion.BalanceBatch{}
now := time.Now()
for _, balance := range balances {
var amount big.Float
_, ok := amount.SetString(balance.Attributes.AvailableBalance.String())
Expand All @@ -68,6 +67,7 @@ func ingestBalancesBatch(
var amountInt big.Int
amount.Mul(&amount, big.NewFloat(math.Pow(10, float64(currency.GetPrecision(balance.Attributes.CurrencyCode))))).Int(&amountInt)

now := time.Now()
batch = append(batch, &models.Balance{
AccountID: models.AccountID{
Reference: accountID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func BalancesTask(config Config, account string, client *DefaultClient) func(ctx
}

batch := ingestion.BalanceBatch{}
timestamp := time.Now()
for _, balance := range balances.Available {
timestamp := time.Now()
batch = append(batch, &models.Balance{
AccountID: models.AccountID{
Reference: account,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func ingestAccountsBatch(

accountsBatch := ingestion.AccountBatch{}
balancesBatch := ingestion.BalanceBatch{}
now := time.Now()
for _, balance := range balances {
raw, err := json.Marshal(balance)
if err != nil {
Expand Down Expand Up @@ -124,6 +123,7 @@ func ingestAccountsBatch(
var amountInt big.Int
amount.Mul(&amount, big.NewFloat(100)).Int(&amountInt)

now := time.Now()
balancesBatch = append(balancesBatch, &models.Balance{
AccountID: models.AccountID{
Reference: fmt.Sprintf("%d", balance.ID),
Expand Down
2 changes: 2 additions & 0 deletions components/payments/internal/app/models/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"time"

"github.com/gibson042/canonicaljson-go"
"github.com/google/uuid"
"github.com/uptrace/bun"
)

type Account struct {
bun.BaseModel `bun:"accounts.account"`

ID AccountID `bun:",pk,nullzero"`
ConnectorID uuid.UUID `bun:",nullzero"`
CreatedAt time.Time `bun:",nullzero"`
Reference string
Provider ConnectorProvider
Expand Down
10 changes: 9 additions & 1 deletion components/payments/internal/app/storage/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storage

import (
"context"
"fmt"
"sort"
"time"

Expand All @@ -18,14 +19,21 @@ func (s *Storage) UpsertAccounts(ctx context.Context, provider models.ConnectorP
accountsMap[account.Reference] = account
}

connector, err := s.GetConnector(ctx, provider)
if err != nil {
return fmt.Errorf("failed to get connector: %w", err)
}

accounts = make([]*models.Account, 0, len(accountsMap))
for _, account := range accountsMap {
account.ConnectorID = connector.ID
accounts = append(accounts, account)
}

_, err := s.db.NewInsert().
_, err = s.db.NewInsert().
Model(&accounts).
On("CONFLICT (id) DO UPDATE").
Set("connector_id = EXCLUDED.connector_id").
Set("provider = EXCLUDED.provider").
Set("raw_data = EXCLUDED.raw_data").
Set("default_currency = EXCLUDED.default_currency").
Expand Down
24 changes: 24 additions & 0 deletions components/payments/internal/app/storage/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,30 @@ func registerMigrations(migrator *migrations.Migrator) {
return err
}

return nil
},
},
migrations.Migration{
Up: func(tx bun.Tx) error {
// In this migration, we have to delete the accounts table since
// we wanna reset the connector, but the connector_id was not
// added, hence the table will not be cleaned up when resetting.
_, err := tx.Exec(`
DELETE FROM accounts.account CASCADE;
ALTER TABLE accounts.account ADD COLUMN IF NOT EXISTS "connector_id" uuid;
ALTER TABLE accounts.account ADD CONSTRAINT accounts_connector
FOREIGN KEY (connector_id)
REFERENCES connectors.connector (id)
ON DELETE CASCADE
NOT DEFERRABLE
INITIALLY IMMEDIATE
;
`)
if err != nil {
return err
}

return nil
},
},
Expand Down

1 comment on commit 5063539

@vercel
Copy link

@vercel vercel bot commented on 5063539 Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.