From 9b02048a9e65a065d57643f67de6af55d6c2dd7c Mon Sep 17 00:00:00 2001 From: Paul Nicolas Date: Wed, 6 Dec 2023 17:05:46 +0100 Subject: [PATCH] feat(payments): do not publish payments if no update (#958) --- .../connectors/internal/ingestion/accounts.go | 2 +- .../connectors/internal/ingestion/balances.go | 2 +- .../internal/ingestion/bank_account.go | 2 +- .../connectors/internal/ingestion/ingester.go | 10 +- .../internal/ingestion/ingester_test.go | 86 +++++++++ .../connectors/internal/ingestion/payments.go | 15 +- .../internal/ingestion/payments_test.go | 169 ++++++++++++++++++ .../internal/ingestion/transfer_initiation.go | 4 +- .../connectors/internal/storage/payments.go | 88 +++++++-- .../internal/storage/payments_test.go | 22 ++- .../payments/internal/models/payment.go | 12 +- 11 files changed, 372 insertions(+), 40 deletions(-) create mode 100644 components/payments/cmd/connectors/internal/ingestion/ingester_test.go create mode 100644 components/payments/cmd/connectors/internal/ingestion/payments_test.go diff --git a/components/payments/cmd/connectors/internal/ingestion/accounts.go b/components/payments/cmd/connectors/internal/ingestion/accounts.go index 6f0d5fd0eb..b16856488e 100644 --- a/components/payments/cmd/connectors/internal/ingestion/accounts.go +++ b/components/payments/cmd/connectors/internal/ingestion/accounts.go @@ -28,7 +28,7 @@ func (i *DefaultIngester) IngestAccounts(ctx context.Context, batch AccountBatch "startingAt": startingAt, }).Debugf("Ingest accounts batch") - if err := i.repo.UpsertAccounts(ctx, batch); err != nil { + if err := i.store.UpsertAccounts(ctx, batch); err != nil { return fmt.Errorf("error upserting accounts: %w", err) } diff --git a/components/payments/cmd/connectors/internal/ingestion/balances.go b/components/payments/cmd/connectors/internal/ingestion/balances.go index a937337eda..14a98bcb1b 100644 --- a/components/payments/cmd/connectors/internal/ingestion/balances.go +++ b/components/payments/cmd/connectors/internal/ingestion/balances.go @@ -28,7 +28,7 @@ func (i *DefaultIngester) IngestBalances(ctx context.Context, batch BalanceBatch "startingAt": startingAt, }).Debugf("Ingest balances batch") - if err := i.repo.InsertBalances(ctx, batch, checkIfAccountExists); err != nil { + if err := i.store.InsertBalances(ctx, batch, checkIfAccountExists); err != nil { return fmt.Errorf("error inserting balances: %w", err) } diff --git a/components/payments/cmd/connectors/internal/ingestion/bank_account.go b/components/payments/cmd/connectors/internal/ingestion/bank_account.go index 65415b4bad..bc195a995d 100644 --- a/components/payments/cmd/connectors/internal/ingestion/bank_account.go +++ b/components/payments/cmd/connectors/internal/ingestion/bank_account.go @@ -10,7 +10,7 @@ import ( ) func (i *DefaultIngester) LinkBankAccountWithAccount(ctx context.Context, bankAccount *models.BankAccount, accountID *models.AccountID) error { - if err := i.repo.LinkBankAccountWithAccount(ctx, bankAccount.ID, accountID); err != nil { + if err := i.store.LinkBankAccountWithAccount(ctx, bankAccount.ID, accountID); err != nil { return err } diff --git a/components/payments/cmd/connectors/internal/ingestion/ingester.go b/components/payments/cmd/connectors/internal/ingestion/ingester.go index 8c9ac379d2..0aab7f9ad3 100644 --- a/components/payments/cmd/connectors/internal/ingestion/ingester.go +++ b/components/payments/cmd/connectors/internal/ingestion/ingester.go @@ -21,14 +21,14 @@ type Ingester interface { type DefaultIngester struct { provider models.ConnectorProvider - repo Repository + store Store descriptor models.TaskDescriptor publisher message.Publisher } -type Repository interface { +type Store interface { UpsertAccounts(ctx context.Context, accounts []*models.Account) error - UpsertPayments(ctx context.Context, payments []*models.Payment) error + UpsertPayments(ctx context.Context, payments []*models.Payment) ([]*models.PaymentID, error) InsertBalances(ctx context.Context, balances []*models.Balance, checkIfAccountExists bool) error UpdateTaskState(ctx context.Context, connectorID models.ConnectorID, descriptor models.TaskDescriptor, state json.RawMessage) error UpdateTransferInitiationPaymentsStatus(ctx context.Context, id models.TransferInitiationID, paymentID *models.PaymentID, status models.TransferInitiationStatus, errorMessage string, attempts int, updatedAt time.Time) error @@ -39,13 +39,13 @@ type Repository interface { func NewDefaultIngester( provider models.ConnectorProvider, descriptor models.TaskDescriptor, - repo Repository, + repo Store, publisher message.Publisher, ) *DefaultIngester { return &DefaultIngester{ provider: provider, descriptor: descriptor, - repo: repo, + store: repo, publisher: publisher, } } diff --git a/components/payments/cmd/connectors/internal/ingestion/ingester_test.go b/components/payments/cmd/connectors/internal/ingestion/ingester_test.go new file mode 100644 index 0000000000..564a337a75 --- /dev/null +++ b/components/payments/cmd/connectors/internal/ingestion/ingester_test.go @@ -0,0 +1,86 @@ +package ingestion + +import ( + "context" + "encoding/json" + "time" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/formancehq/payments/internal/models" + "github.com/google/uuid" +) + +type MockStore struct { + paymentIDsNotModified map[string]struct{} +} + +func NewMockStore() *MockStore { + return &MockStore{ + paymentIDsNotModified: make(map[string]struct{}), + } +} + +func (m *MockStore) WithPaymentIDsNotModified(paymentsIDs []models.PaymentID) *MockStore { + for _, id := range paymentsIDs { + m.paymentIDsNotModified[id.String()] = struct{}{} + } + return m +} + +func (m *MockStore) UpsertAccounts(ctx context.Context, accounts []*models.Account) error { + return nil +} + +func (m *MockStore) UpsertPayments(ctx context.Context, payments []*models.Payment) ([]*models.PaymentID, error) { + ids := make([]*models.PaymentID, 0, len(payments)) + for _, payment := range payments { + if _, ok := m.paymentIDsNotModified[payment.ID.String()]; !ok { + ids = append(ids, &payment.ID) + } + } + + return ids, nil +} + +func (m *MockStore) InsertBalances(ctx context.Context, balances []*models.Balance, checkIfAccountExists bool) error { + return nil +} + +func (m *MockStore) UpdateTaskState(ctx context.Context, connectorID models.ConnectorID, descriptor models.TaskDescriptor, state json.RawMessage) error { + return nil +} + +func (m *MockStore) UpdateTransferInitiationPaymentsStatus(ctx context.Context, id models.TransferInitiationID, paymentID *models.PaymentID, status models.TransferInitiationStatus, errorMessage string, attempts int, updatedAt time.Time) error { + return nil +} + +func (m *MockStore) AddTransferInitiationPaymentID(ctx context.Context, id models.TransferInitiationID, paymentID *models.PaymentID, updatedAt time.Time) error { + return nil +} + +func (m *MockStore) LinkBankAccountWithAccount(ctx context.Context, id uuid.UUID, accountID *models.AccountID) error { + return nil +} + +type MockPublisher struct { + messages chan *message.Message +} + +func NewMockPublisher() *MockPublisher { + return &MockPublisher{ + messages: make(chan *message.Message, 100), + } +} + +func (m *MockPublisher) Publish(topic string, messages ...*message.Message) error { + for _, msg := range messages { + m.messages <- msg + } + + return nil +} + +func (m *MockPublisher) Close() error { + close(m.messages) + return nil +} diff --git a/components/payments/cmd/connectors/internal/ingestion/payments.go b/components/payments/cmd/connectors/internal/ingestion/payments.go index ff6f789abf..b64c5aba42 100644 --- a/components/payments/cmd/connectors/internal/ingestion/payments.go +++ b/components/payments/cmd/connectors/internal/ingestion/payments.go @@ -53,20 +53,31 @@ func (i *DefaultIngester) IngestPayments( allPayments = append(allPayments, payment) } - if err := i.repo.UpsertPayments(ctx, allPayments); err != nil { + idsInserted, err := i.store.UpsertPayments(ctx, allPayments) + if err != nil { return fmt.Errorf("error upserting payments: %w", err) } + idsInsertedMap := make(map[string]struct{}, len(idsInserted)) + for idx := range idsInserted { + idsInsertedMap[idsInserted[idx].String()] = struct{}{} + } + taskState, err := json.Marshal(commitState) if err != nil { return fmt.Errorf("error marshaling task state: %w", err) } - if err = i.repo.UpdateTaskState(ctx, connectorID, i.descriptor, taskState); err != nil { + if err = i.store.UpdateTaskState(ctx, connectorID, i.descriptor, taskState); err != nil { return fmt.Errorf("error updating task state: %w", err) } for paymentIdx := range allPayments { + _, ok := idsInsertedMap[allPayments[paymentIdx].ID.String()] + if !ok { + // No need to publish an event for an already existing payment + continue + } err = i.publisher.Publish(events.TopicPayments, publish.NewMessage(ctx, messages.NewEventSavedPayments(i.provider, allPayments[paymentIdx]))) if err != nil { diff --git a/components/payments/cmd/connectors/internal/ingestion/payments_test.go b/components/payments/cmd/connectors/internal/ingestion/payments_test.go new file mode 100644 index 0000000000..05d9f3d060 --- /dev/null +++ b/components/payments/cmd/connectors/internal/ingestion/payments_test.go @@ -0,0 +1,169 @@ +package ingestion + +import ( + "context" + "encoding/json" + "math/big" + "testing" + "time" + + "github.com/formancehq/payments/internal/models" + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +var ( + connectorID = models.ConnectorID{ + Reference: uuid.New(), + Provider: models.ConnectorProviderDummyPay, + } + + p1 = &models.Payment{ + ID: models.PaymentID{ + PaymentReference: models.PaymentReference{ + Reference: "p1", + Type: models.PaymentTypePayIn, + }, + ConnectorID: connectorID, + }, + ConnectorID: connectorID, + CreatedAt: time.Date(2023, 11, 14, 4, 55, 0, 0, time.UTC), + Reference: "p1", + Amount: big.NewInt(100), + Type: models.PaymentTypePayIn, + Status: models.PaymentStatusCancelled, + Scheme: models.PaymentSchemeA2A, + Asset: models.Asset("USD/2"), + } + + p2 = &models.Payment{ + ID: models.PaymentID{ + PaymentReference: models.PaymentReference{ + Reference: "p2", + Type: models.PaymentTypeTransfer, + }, + ConnectorID: connectorID, + }, + ConnectorID: connectorID, + CreatedAt: time.Date(2023, 11, 14, 4, 54, 0, 0, time.UTC), + Reference: "p2", + Amount: big.NewInt(150), + Type: models.PaymentTypeTransfer, + Status: models.PaymentStatusSucceeded, + Scheme: models.PaymentSchemeApplePay, + Asset: models.Asset("EUR/2"), + } + + p3 = &models.Payment{ + ID: models.PaymentID{ + PaymentReference: models.PaymentReference{ + Reference: "p3", + Type: models.PaymentTypePayOut, + }, + ConnectorID: connectorID, + }, + ConnectorID: connectorID, + CreatedAt: time.Date(2023, 11, 14, 4, 53, 0, 0, time.UTC), + Reference: "p3", + Amount: big.NewInt(200), + Type: models.PaymentTypePayOut, + Status: models.PaymentStatusPending, + Scheme: models.PaymentSchemeCardMasterCard, + Asset: models.Asset("USD/2"), + } +) + +type paymentMessagePayload struct { + Paylaod struct { + ID string `json:"id"` + } `json:"payload"` +} + +func TestIngestPayments(t *testing.T) { + t.Parallel() + + type testCase struct { + name string + batch PaymentBatch + paymentIDsNotModified []models.PaymentID + requiredPublishedPaymentIDs []models.PaymentID + } + + testCases := []testCase{ + { + name: "nominal", + batch: PaymentBatch{ + { + Payment: p1, + }, + { + Payment: p2, + }, + { + Payment: p3, + }, + }, + paymentIDsNotModified: []models.PaymentID{}, + requiredPublishedPaymentIDs: []models.PaymentID{p1.ID, p2.ID, p3.ID}, + }, + { + name: "only one payment upserted, should publish only one message", + batch: PaymentBatch{ + { + Payment: p1, + }, + { + Payment: p2, + }, + { + Payment: p3, + }, + }, + paymentIDsNotModified: []models.PaymentID{p1.ID, p2.ID}, + requiredPublishedPaymentIDs: []models.PaymentID{p3.ID}, + }, + { + name: "all payments are not modified, should not publish any message", + batch: PaymentBatch{ + { + Payment: p1, + }, + { + Payment: p2, + }, + { + Payment: p3, + }, + }, + paymentIDsNotModified: []models.PaymentID{p1.ID, p2.ID, p3.ID}, + requiredPublishedPaymentIDs: []models.PaymentID{}, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + publisher := NewMockPublisher() + + ingester := NewDefaultIngester( + models.ConnectorProviderDummyPay, + nil, + NewMockStore().WithPaymentIDsNotModified(tc.paymentIDsNotModified), + publisher, + ) + + err := ingester.IngestPayments(context.Background(), connectorID, tc.batch, nil) + publisher.Close() + require.NoError(t, err) + + require.Len(t, publisher.messages, len(tc.requiredPublishedPaymentIDs)) + i := 0 + for msg := range publisher.messages { + var payload paymentMessagePayload + require.NoError(t, json.Unmarshal(msg.Payload, &payload)) + require.Equal(t, tc.requiredPublishedPaymentIDs[i].String(), payload.Paylaod.ID) + i++ + } + }) + } +} diff --git a/components/payments/cmd/connectors/internal/ingestion/transfer_initiation.go b/components/payments/cmd/connectors/internal/ingestion/transfer_initiation.go index b447388b6e..d4cc61c349 100644 --- a/components/payments/cmd/connectors/internal/ingestion/transfer_initiation.go +++ b/components/payments/cmd/connectors/internal/ingestion/transfer_initiation.go @@ -17,7 +17,7 @@ func (i *DefaultIngester) UpdateTransferInitiationPaymentsStatus(ctx context.Con tf.Attempts = attempts tf.UpdatedAt = updatedAt - if err := i.repo.UpdateTransferInitiationPaymentsStatus(ctx, tf.ID, paymentID, tf.Status, tf.Error, tf.Attempts, tf.UpdatedAt); err != nil { + if err := i.store.UpdateTransferInitiationPaymentsStatus(ctx, tf.ID, paymentID, tf.Status, tf.Error, tf.Attempts, tf.UpdatedAt); err != nil { return err } @@ -46,7 +46,7 @@ func (i *DefaultIngester) AddTransferInitiationPaymentID(ctx context.Context, tf Status: models.TransferInitiationStatusProcessing, }) - if err := i.repo.AddTransferInitiationPaymentID(ctx, tf.ID, paymentID, updatedAt); err != nil { + if err := i.store.AddTransferInitiationPaymentID(ctx, tf.ID, paymentID, updatedAt); err != nil { return err } diff --git a/components/payments/cmd/connectors/internal/storage/payments.go b/components/payments/cmd/connectors/internal/storage/payments.go index 50ad08e70d..d2144f058c 100644 --- a/components/payments/cmd/connectors/internal/storage/payments.go +++ b/components/payments/cmd/connectors/internal/storage/payments.go @@ -80,25 +80,77 @@ func (s *Storage) GetPayment(ctx context.Context, id string) (*models.Payment, e return &payment, nil } -func (s *Storage) UpsertPayments(ctx context.Context, payments []*models.Payment) error { +func (s *Storage) UpsertPayments(ctx context.Context, payments []*models.Payment) ([]*models.PaymentID, error) { if len(payments) == 0 { - return nil + return nil, nil } - _, err := s.db.NewInsert(). - Model(&payments). - On("CONFLICT (reference) DO UPDATE"). - Set("amount = EXCLUDED.amount"). - Set("type = EXCLUDED.type"). - Set("status = EXCLUDED.status"). - Set("raw_data = EXCLUDED.raw_data"). - Set("scheme = EXCLUDED.scheme"). - Set("asset = EXCLUDED.asset"). - Set("source_account_id = EXCLUDED.source_account_id"). - Set("destination_account_id = EXCLUDED.destination_account_id"). - Exec(ctx) + var idsUpdated []string + err := s.db.NewUpdate(). + With("_data", + s.db.NewValues(&payments). + Column( + "id", + "amount", + "type", + "scheme", + "asset", + "source_account_id", + "destination_account_id", + "status", + "raw_data", + ), + ). + Model((*models.Payment)(nil)). + TableExpr("_data"). + Set("amount = _data.amount"). + Set("type = _data.type"). + Set("scheme = _data.scheme"). + Set("asset = _data.asset"). + Set("source_account_id = _data.source_account_id"). + Set("destination_account_id = _data.destination_account_id"). + Set("status = _data.status"). + Set("raw_data = _data.raw_data"). + Where(`(payment.id = _data.id) AND + (payment.amount != _data.amount OR payment.type != _data.type OR payment.scheme != _data.scheme OR + payment.asset != _data.asset OR payment.source_account_id != _data.source_account_id OR + payment.destination_account_id != _data.destination_account_id OR payment.status != _data.status)`). + Returning("payment.id"). + Scan(ctx, &idsUpdated) if err != nil { - return e("failed to create payments", err) + return nil, e("failed to update payments", err) + } + + idsUpdatedMap := make(map[string]struct{}) + for _, id := range idsUpdated { + idsUpdatedMap[id] = struct{}{} + } + + paymentsToInsert := make([]*models.Payment, 0, len(payments)) + for _, payment := range payments { + if _, ok := idsUpdatedMap[payment.ID.String()]; !ok { + paymentsToInsert = append(paymentsToInsert, payment) + } + } + + var idsInserted []string + if len(paymentsToInsert) > 0 { + err = s.db.NewInsert(). + Model(&paymentsToInsert). + On("CONFLICT (id) DO NOTHING"). + Returning("payment.id"). + Scan(ctx, &idsInserted) + if err != nil { + return nil, e("failed to create payments", err) + } + } + + res := make([]*models.PaymentID, 0, len(idsUpdated)+len(idsInserted)) + for _, id := range idsUpdated { + res = append(res, models.MustPaymentIDFromString(id)) + } + for _, id := range idsInserted { + res = append(res, models.MustPaymentIDFromString(id)) } var adjustments []*models.Adjustment @@ -133,7 +185,7 @@ func (s *Storage) UpsertPayments(ctx context.Context, payments []*models.Payment On("CONFLICT (reference) DO NOTHING"). Exec(ctx) if err != nil { - return e("failed to create adjustments", err) + return nil, e("failed to create adjustments", err) } } @@ -145,9 +197,9 @@ func (s *Storage) UpsertPayments(ctx context.Context, payments []*models.Payment Set("changelog = metadata.changelog || EXCLUDED.changelog"). Exec(ctx) if err != nil { - return e("failed to create metadata", err) + return nil, e("failed to create metadata", err) } } - return nil + return res, nil } diff --git a/components/payments/cmd/connectors/internal/storage/payments_test.go b/components/payments/cmd/connectors/internal/storage/payments_test.go index cb1baf752d..c1d3aec784 100644 --- a/components/payments/cmd/connectors/internal/storage/payments_test.go +++ b/components/payments/cmd/connectors/internal/storage/payments_test.go @@ -87,14 +87,27 @@ func testCreatePayments(t *testing.T, store *storage.Storage) { }, } - err := store.UpsertPayments(context.Background(), []*models.Payment{pFail}) + ids, err := store.UpsertPayments(context.Background(), []*models.Payment{pFail}) require.Error(t, err) + require.Len(t, ids, 0) - err = store.UpsertPayments(context.Background(), []*models.Payment{p1}) + ids, err = store.UpsertPayments(context.Background(), []*models.Payment{p1}) require.NoError(t, err) + require.Len(t, ids, 1) - err = store.UpsertPayments(context.Background(), []*models.Payment{p2}) + ids, err = store.UpsertPayments(context.Background(), []*models.Payment{p2}) require.NoError(t, err) + require.Len(t, ids, 1) + + p1.Status = models.PaymentStatusPending + p2.Status = models.PaymentStatusSucceeded + ids, err = store.UpsertPayments(context.Background(), []*models.Payment{p1, p2}) + require.NoError(t, err) + require.Len(t, ids, 2) + + ids, err = store.UpsertPayments(context.Background(), []*models.Payment{p1, p2}) + require.NoError(t, err) + require.Len(t, ids, 0) testGetPayment(t, store, *p1ID, p1, nil) testGetPayment(t, store, *p2ID, p2, nil) @@ -142,8 +155,9 @@ func testUpdatePayment(t *testing.T, store *storage.Storage) { p1.Scheme = models.PaymentSchemeCardVisa p1.Asset = models.Asset("USD/2") - err := store.UpsertPayments(context.Background(), []*models.Payment{p1}) + ids, err := store.UpsertPayments(context.Background(), []*models.Payment{p1}) require.NoError(t, err) + require.Len(t, ids, 1) payment, err := store.GetPayment(context.Background(), p1ID.String()) require.NoError(t, err) diff --git a/components/payments/internal/models/payment.go b/components/payments/internal/models/payment.go index 1b612894f4..613b6443cd 100644 --- a/components/payments/internal/models/payment.go +++ b/components/payments/internal/models/payment.go @@ -91,20 +91,20 @@ func (pid *PaymentID) Scan(value interface{}) error { type Payment struct { bun.BaseModel `bun:"payments.payment"` - ID PaymentID `bun:",pk,nullzero"` + ID PaymentID `bun:",pk,type:character varying,nullzero"` ConnectorID ConnectorID `bun:",nullzero"` CreatedAt time.Time `bun:",nullzero"` Reference string - Amount *big.Int `bun:"type:numeric"` - Type PaymentType - Status PaymentStatus + Amount *big.Int `bun:"type:numeric"` + Type PaymentType `bun:",type:payment_type"` + Status PaymentStatus `bun:",type:payment_status"` Scheme PaymentScheme Asset Asset RawData json.RawMessage - SourceAccountID *AccountID `bun:",nullzero"` - DestinationAccountID *AccountID `bun:",nullzero"` + SourceAccountID *AccountID `bun:",type:character varying,nullzero"` + DestinationAccountID *AccountID `bun:",type:character varying,nullzero"` Adjustments []*Adjustment `bun:"rel:has-many,join:id=payment_id"` Metadata []*Metadata `bun:"rel:has-many,join:id=payment_id"`