Skip to content

Commit

Permalink
feat(payments): do not publish payments if no update (#958)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored Dec 6, 2023
1 parent f516b97 commit 9b02048
Show file tree
Hide file tree
Showing 11 changed files with 372 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (i *DefaultIngester) IngestAccounts(ctx context.Context, batch AccountBatch
"startingAt": startingAt,
}).Debugf("Ingest accounts batch")

if err := i.repo.UpsertAccounts(ctx, batch); err != nil {
if err := i.store.UpsertAccounts(ctx, batch); err != nil {
return fmt.Errorf("error upserting accounts: %w", err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (i *DefaultIngester) IngestBalances(ctx context.Context, batch BalanceBatch
"startingAt": startingAt,
}).Debugf("Ingest balances batch")

if err := i.repo.InsertBalances(ctx, batch, checkIfAccountExists); err != nil {
if err := i.store.InsertBalances(ctx, batch, checkIfAccountExists); err != nil {
return fmt.Errorf("error inserting balances: %w", err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func (i *DefaultIngester) LinkBankAccountWithAccount(ctx context.Context, bankAccount *models.BankAccount, accountID *models.AccountID) error {
if err := i.repo.LinkBankAccountWithAccount(ctx, bankAccount.ID, accountID); err != nil {
if err := i.store.LinkBankAccountWithAccount(ctx, bankAccount.ID, accountID); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ type Ingester interface {

type DefaultIngester struct {
provider models.ConnectorProvider
repo Repository
store Store
descriptor models.TaskDescriptor
publisher message.Publisher
}

type Repository interface {
type Store interface {
UpsertAccounts(ctx context.Context, accounts []*models.Account) error
UpsertPayments(ctx context.Context, payments []*models.Payment) error
UpsertPayments(ctx context.Context, payments []*models.Payment) ([]*models.PaymentID, error)
InsertBalances(ctx context.Context, balances []*models.Balance, checkIfAccountExists bool) error
UpdateTaskState(ctx context.Context, connectorID models.ConnectorID, descriptor models.TaskDescriptor, state json.RawMessage) error
UpdateTransferInitiationPaymentsStatus(ctx context.Context, id models.TransferInitiationID, paymentID *models.PaymentID, status models.TransferInitiationStatus, errorMessage string, attempts int, updatedAt time.Time) error
Expand All @@ -39,13 +39,13 @@ type Repository interface {
func NewDefaultIngester(
provider models.ConnectorProvider,
descriptor models.TaskDescriptor,
repo Repository,
repo Store,
publisher message.Publisher,
) *DefaultIngester {
return &DefaultIngester{
provider: provider,
descriptor: descriptor,
repo: repo,
store: repo,
publisher: publisher,
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package ingestion

import (
"context"
"encoding/json"
"time"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/formancehq/payments/internal/models"
"github.com/google/uuid"
)

type MockStore struct {
paymentIDsNotModified map[string]struct{}
}

func NewMockStore() *MockStore {
return &MockStore{
paymentIDsNotModified: make(map[string]struct{}),
}
}

func (m *MockStore) WithPaymentIDsNotModified(paymentsIDs []models.PaymentID) *MockStore {
for _, id := range paymentsIDs {
m.paymentIDsNotModified[id.String()] = struct{}{}
}
return m
}

func (m *MockStore) UpsertAccounts(ctx context.Context, accounts []*models.Account) error {
return nil
}

func (m *MockStore) UpsertPayments(ctx context.Context, payments []*models.Payment) ([]*models.PaymentID, error) {
ids := make([]*models.PaymentID, 0, len(payments))
for _, payment := range payments {
if _, ok := m.paymentIDsNotModified[payment.ID.String()]; !ok {
ids = append(ids, &payment.ID)
}
}

return ids, nil
}

func (m *MockStore) InsertBalances(ctx context.Context, balances []*models.Balance, checkIfAccountExists bool) error {
return nil
}

func (m *MockStore) UpdateTaskState(ctx context.Context, connectorID models.ConnectorID, descriptor models.TaskDescriptor, state json.RawMessage) error {
return nil
}

func (m *MockStore) UpdateTransferInitiationPaymentsStatus(ctx context.Context, id models.TransferInitiationID, paymentID *models.PaymentID, status models.TransferInitiationStatus, errorMessage string, attempts int, updatedAt time.Time) error {
return nil
}

func (m *MockStore) AddTransferInitiationPaymentID(ctx context.Context, id models.TransferInitiationID, paymentID *models.PaymentID, updatedAt time.Time) error {
return nil
}

func (m *MockStore) LinkBankAccountWithAccount(ctx context.Context, id uuid.UUID, accountID *models.AccountID) error {
return nil
}

type MockPublisher struct {
messages chan *message.Message
}

func NewMockPublisher() *MockPublisher {
return &MockPublisher{
messages: make(chan *message.Message, 100),
}
}

func (m *MockPublisher) Publish(topic string, messages ...*message.Message) error {
for _, msg := range messages {
m.messages <- msg
}

return nil
}

func (m *MockPublisher) Close() error {
close(m.messages)
return nil
}
15 changes: 13 additions & 2 deletions components/payments/cmd/connectors/internal/ingestion/payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,31 @@ func (i *DefaultIngester) IngestPayments(
allPayments = append(allPayments, payment)
}

if err := i.repo.UpsertPayments(ctx, allPayments); err != nil {
idsInserted, err := i.store.UpsertPayments(ctx, allPayments)
if err != nil {
return fmt.Errorf("error upserting payments: %w", err)
}

idsInsertedMap := make(map[string]struct{}, len(idsInserted))
for idx := range idsInserted {
idsInsertedMap[idsInserted[idx].String()] = struct{}{}
}

taskState, err := json.Marshal(commitState)
if err != nil {
return fmt.Errorf("error marshaling task state: %w", err)
}

if err = i.repo.UpdateTaskState(ctx, connectorID, i.descriptor, taskState); err != nil {
if err = i.store.UpdateTaskState(ctx, connectorID, i.descriptor, taskState); err != nil {
return fmt.Errorf("error updating task state: %w", err)
}

for paymentIdx := range allPayments {
_, ok := idsInsertedMap[allPayments[paymentIdx].ID.String()]
if !ok {
// No need to publish an event for an already existing payment
continue
}
err = i.publisher.Publish(events.TopicPayments,
publish.NewMessage(ctx, messages.NewEventSavedPayments(i.provider, allPayments[paymentIdx])))
if err != nil {
Expand Down
169 changes: 169 additions & 0 deletions components/payments/cmd/connectors/internal/ingestion/payments_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package ingestion

import (
"context"
"encoding/json"
"math/big"
"testing"
"time"

"github.com/formancehq/payments/internal/models"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)

var (
connectorID = models.ConnectorID{
Reference: uuid.New(),
Provider: models.ConnectorProviderDummyPay,
}

p1 = &models.Payment{
ID: models.PaymentID{
PaymentReference: models.PaymentReference{
Reference: "p1",
Type: models.PaymentTypePayIn,
},
ConnectorID: connectorID,
},
ConnectorID: connectorID,
CreatedAt: time.Date(2023, 11, 14, 4, 55, 0, 0, time.UTC),
Reference: "p1",
Amount: big.NewInt(100),
Type: models.PaymentTypePayIn,
Status: models.PaymentStatusCancelled,
Scheme: models.PaymentSchemeA2A,
Asset: models.Asset("USD/2"),
}

p2 = &models.Payment{
ID: models.PaymentID{
PaymentReference: models.PaymentReference{
Reference: "p2",
Type: models.PaymentTypeTransfer,
},
ConnectorID: connectorID,
},
ConnectorID: connectorID,
CreatedAt: time.Date(2023, 11, 14, 4, 54, 0, 0, time.UTC),
Reference: "p2",
Amount: big.NewInt(150),
Type: models.PaymentTypeTransfer,
Status: models.PaymentStatusSucceeded,
Scheme: models.PaymentSchemeApplePay,
Asset: models.Asset("EUR/2"),
}

p3 = &models.Payment{
ID: models.PaymentID{
PaymentReference: models.PaymentReference{
Reference: "p3",
Type: models.PaymentTypePayOut,
},
ConnectorID: connectorID,
},
ConnectorID: connectorID,
CreatedAt: time.Date(2023, 11, 14, 4, 53, 0, 0, time.UTC),
Reference: "p3",
Amount: big.NewInt(200),
Type: models.PaymentTypePayOut,
Status: models.PaymentStatusPending,
Scheme: models.PaymentSchemeCardMasterCard,
Asset: models.Asset("USD/2"),
}
)

type paymentMessagePayload struct {
Paylaod struct {
ID string `json:"id"`
} `json:"payload"`
}

func TestIngestPayments(t *testing.T) {
t.Parallel()

type testCase struct {
name string
batch PaymentBatch
paymentIDsNotModified []models.PaymentID
requiredPublishedPaymentIDs []models.PaymentID
}

testCases := []testCase{
{
name: "nominal",
batch: PaymentBatch{
{
Payment: p1,
},
{
Payment: p2,
},
{
Payment: p3,
},
},
paymentIDsNotModified: []models.PaymentID{},
requiredPublishedPaymentIDs: []models.PaymentID{p1.ID, p2.ID, p3.ID},
},
{
name: "only one payment upserted, should publish only one message",
batch: PaymentBatch{
{
Payment: p1,
},
{
Payment: p2,
},
{
Payment: p3,
},
},
paymentIDsNotModified: []models.PaymentID{p1.ID, p2.ID},
requiredPublishedPaymentIDs: []models.PaymentID{p3.ID},
},
{
name: "all payments are not modified, should not publish any message",
batch: PaymentBatch{
{
Payment: p1,
},
{
Payment: p2,
},
{
Payment: p3,
},
},
paymentIDsNotModified: []models.PaymentID{p1.ID, p2.ID, p3.ID},
requiredPublishedPaymentIDs: []models.PaymentID{},
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
publisher := NewMockPublisher()

ingester := NewDefaultIngester(
models.ConnectorProviderDummyPay,
nil,
NewMockStore().WithPaymentIDsNotModified(tc.paymentIDsNotModified),
publisher,
)

err := ingester.IngestPayments(context.Background(), connectorID, tc.batch, nil)
publisher.Close()
require.NoError(t, err)

require.Len(t, publisher.messages, len(tc.requiredPublishedPaymentIDs))
i := 0
for msg := range publisher.messages {
var payload paymentMessagePayload
require.NoError(t, json.Unmarshal(msg.Payload, &payload))
require.Equal(t, tc.requiredPublishedPaymentIDs[i].String(), payload.Paylaod.ID)
i++
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (i *DefaultIngester) UpdateTransferInitiationPaymentsStatus(ctx context.Con
tf.Attempts = attempts
tf.UpdatedAt = updatedAt

if err := i.repo.UpdateTransferInitiationPaymentsStatus(ctx, tf.ID, paymentID, tf.Status, tf.Error, tf.Attempts, tf.UpdatedAt); err != nil {
if err := i.store.UpdateTransferInitiationPaymentsStatus(ctx, tf.ID, paymentID, tf.Status, tf.Error, tf.Attempts, tf.UpdatedAt); err != nil {
return err
}

Expand Down Expand Up @@ -46,7 +46,7 @@ func (i *DefaultIngester) AddTransferInitiationPaymentID(ctx context.Context, tf
Status: models.TransferInitiationStatusProcessing,
})

if err := i.repo.AddTransferInitiationPaymentID(ctx, tf.ID, paymentID, updatedAt); err != nil {
if err := i.store.AddTransferInitiationPaymentID(ctx, tf.ID, paymentID, updatedAt); err != nil {
return err
}

Expand Down
Loading

1 comment on commit 9b02048

@vercel
Copy link

@vercel vercel bot commented on 9b02048 Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.