Skip to content

Commit

Permalink
Merge pull request #89 from blnkfinance/skip-queue
Browse files Browse the repository at this point in the history
feat: add skip_queue option for transaction processing
  • Loading branch information
jerry-enebeli authored Feb 8, 2025
2 parents d8a0fa9 + 82ae6dc commit ebbabe9
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 56 deletions.
2 changes: 1 addition & 1 deletion api/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,5 +182,5 @@ func (t *RecordTransaction) ToTransaction() *model.Transaction {

}

return &model.Transaction{Currency: t.Currency, Source: t.Source, Description: t.Description, Reference: t.Reference, ScheduledFor: scheduledFor, Destination: t.Destination, Amount: t.Amount, AllowOverdraft: t.AllowOverDraft, MetaData: t.MetaData, Sources: t.Sources, Destinations: t.Destinations, Inflight: t.Inflight, Precision: t.Precision, InflightExpiryDate: inflightExpiryDate, Rate: t.Rate}
return &model.Transaction{Currency: t.Currency, Source: t.Source, Description: t.Description, Reference: t.Reference, ScheduledFor: scheduledFor, Destination: t.Destination, Amount: t.Amount, AllowOverdraft: t.AllowOverDraft, MetaData: t.MetaData, Sources: t.Sources, Destinations: t.Destinations, Inflight: t.Inflight, Precision: t.Precision, InflightExpiryDate: inflightExpiryDate, Rate: t.Rate, SkipQueue: t.SkipQueue}
}
4 changes: 4 additions & 0 deletions api/model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func TestValidateRecordTransaction(t *testing.T) {
Description: "Test transaction",
Source: "source1",
Destination: "dest1",
SkipQueue: true,
},
wantErr: false,
},
Expand All @@ -228,6 +229,7 @@ func TestValidateRecordTransaction(t *testing.T) {
Source: "source1",
Destination: "dest1",
ScheduledFor: "invalid-date",
SkipQueue: true,
},
wantErr: true,
},
Expand Down Expand Up @@ -318,6 +320,7 @@ func TestToTransaction(t *testing.T) {
Precision: 2,
InflightExpiryDate: inflightExpiryDate.Format(time.RFC3339),
Rate: 1.5,
SkipQueue: true,
}

transaction := recordTransaction.ToTransaction()
Expand All @@ -335,4 +338,5 @@ func TestToTransaction(t *testing.T) {
assert.Equal(t, recordTransaction.Inflight, transaction.Inflight)
assert.Equal(t, recordTransaction.Precision, transaction.Precision)
assert.Equal(t, recordTransaction.Rate, transaction.Rate)
assert.Equal(t, recordTransaction.SkipQueue, transaction.SkipQueue)
}
1 change: 1 addition & 0 deletions api/model/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type RecordTransaction struct {
Precision float64 `json:"precision"`
AllowOverDraft bool `json:"allow_overdraft"`
Inflight bool `json:"inflight"`
SkipQueue bool `json:"skip_queue"`
Source string `json:"source"`
Reference string `json:"reference"`
Destination string `json:"destination"`
Expand Down
1 change: 1 addition & 0 deletions blnk-infrastructure
Submodule blnk-infrastructure added at 84e41f
31 changes: 15 additions & 16 deletions model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,14 @@ func ApplyPrecision(transaction *Transaction) int64 {
return int64(transaction.Amount * transaction.Precision)
}

// ApplyRate applies the exchange rate to the transaction amount.
// If no rate is provided, it defaults to 1.
func ApplyRate(transaction *Transaction) float64 {
if transaction.Rate == 0 {
transaction.Rate = 1
// ApplyRate applies the exchange rate to the precise amount and returns an int64.
// The rate is applied after precision to maintain accuracy.
func ApplyRate(preciseAmount int64, rate float64) int64 {
if rate == 0 {
rate = 1
}
return transaction.Amount * transaction.Rate
// Convert to float64 for rate calculation, then back to int64
return int64(float64(preciseAmount) * rate)
}

// validate checks if the transaction is valid (e.g., ensuring positive amount).
Expand All @@ -209,14 +210,14 @@ func (transaction *Transaction) validate() error {
// UpdateBalances updates the balances for both the source and destination based on the transaction details.
// It ensures precision is applied and checks for overdraft before updating.
func UpdateBalances(transaction *Transaction, source, destination *Balance) error {
// Apply precision to get precise amount
transaction.PreciseAmount = ApplyPrecision(transaction)
originalAmount := transaction.Amount
err := transaction.validate()
if err != nil {
return err
}

// Ensure the source balance has sufficient funds.
// Check if source has sufficient funds
err = canProcessTransaction(transaction, source)
if err != nil {
return err
Expand All @@ -225,19 +226,17 @@ func UpdateBalances(transaction *Transaction, source, destination *Balance) erro
source.InitializeBalanceFields()
destination.InitializeBalanceFields()

// Compute the source balance after debiting.
// Update source balance with original precise amount
source.addDebit(transaction.PreciseAmount, transaction.Inflight)
source.computeBalance(transaction.Inflight)

// Apply exchange rate to the destination if needed.
transaction.Amount = ApplyRate(transaction)
transaction.PreciseAmount = ApplyPrecision(transaction)
destination.addCredit(transaction.PreciseAmount, transaction.Inflight)
// Calculate destination amount with rate
destinationAmount := ApplyRate(transaction.PreciseAmount, transaction.Rate)

// Update destination balance with rate-adjusted amount
destination.addCredit(destinationAmount, transaction.Inflight)
destination.computeBalance(transaction.Inflight)

// Revert the transaction amount to its original state.
transaction.Amount = originalAmount
transaction.PreciseAmount = ApplyPrecision(transaction)
return nil
}

Expand Down
61 changes: 55 additions & 6 deletions model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,38 @@ func TestApplyPrecision(t *testing.T) {
}

func TestApplyRate(t *testing.T) {
txn := &Transaction{
Amount: 100.0,
Rate: 1.2,
tests := []struct {
name string
preciseAmount int64
rate float64
expected int64
}{
{
name: "regular rate",
preciseAmount: 1000,
rate: 1.5,
expected: 1500,
},
{
name: "zero rate defaults to 1",
preciseAmount: 1000,
rate: 0,
expected: 1000,
},
{
name: "rate less than 1",
preciseAmount: 1000,
rate: 0.5,
expected: 500,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := ApplyRate(tt.preciseAmount, tt.rate)
assert.Equal(t, tt.expected, result)
})
}
adjustedAmount := ApplyRate(txn)
expected := 120.0
assert.Equal(t, expected, adjustedAmount)
}

func TestTransaction_Validate(t *testing.T) {
Expand Down Expand Up @@ -208,6 +233,30 @@ func TestUpdateBalances(t *testing.T) {
assert.Equal(t, big.NewInt(200), destinationBalance.Balance)
}

func TestUpdateBalances_WithRate(t *testing.T) {
sourceBalance := &Balance{
Balance: big.NewInt(0),
}
destinationBalance := &Balance{
Balance: big.NewInt(0),
}

txn := &Transaction{
Amount: 100.0,
AllowOverdraft: true,
Precision: 100, // Will make precise amount 10000
Rate: 1.5, // Should make destination receive 15000
}

err := UpdateBalances(txn, sourceBalance, destinationBalance)
assert.NoError(t, err)

// Source balance should decrease by precise amount
assert.Equal(t, big.NewInt(-10000), sourceBalance.Balance)
// Destination balance should increase by rate-adjusted amount
assert.Equal(t, big.NewInt(15000), destinationBalance.Balance)
}

func TestBalanceMonitor_CheckCondition(t *testing.T) {
balance := &Balance{
Balance: big.NewInt(1000),
Expand Down
1 change: 1 addition & 0 deletions model/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Transaction struct {
AllowOverdraft bool `json:"allow_overdraft"`
Inflight bool `json:"inflight"`
SkipBalanceUpdate bool `json:"-"`
SkipQueue bool `json:"skip_queue"`
GroupIds []string `json:"-"`
Sources []Distribution `json:"sources,omitempty"`
Destinations []Distribution `json:"destinations,omitempty"`
Expand Down
3 changes: 3 additions & 0 deletions reconciliation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ func TestOneToOneReconciliation(t *testing.T) {
BatchSize: 100000,
MaxWorkers: 1,
},
Queue: config.QueueConfig{
TransactionQueue: "transaction_queue",
},
}
config.ConfigStore.Store(cnf)
mockDS := new(mocks.MockDataSource)
Expand Down
11 changes: 10 additions & 1 deletion transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,8 +792,8 @@ func (l *Blnk) executeWithLock(ctx context.Context, transaction *model.Transacti
span.RecordError(err)
return nil, fmt.Errorf("failed to acquire lock: %w", err)
}
defer l.releaseLock(ctx, locker)

defer l.releaseLock(ctx, locker)
// Execute the provided function with the lock
return fn(ctx)
}
Expand Down Expand Up @@ -1294,6 +1294,12 @@ func (l *Blnk) QueueTransaction(ctx context.Context, transaction *model.Transact
setTransactionStatus(transaction)
originalTxnID := transaction.TransactionID

// Check if we should skip queueing
if transaction.SkipQueue {
span.AddEvent("Skipping queue, directly recording transaction")
return l.RecordTransaction(ctx, transaction)
}

// Handle split transactions if needed
transactions, err := l.handleSplitTransactions(ctx, transaction)
if err != nil {
Expand Down Expand Up @@ -1453,6 +1459,9 @@ func setTransactionMetadata(transaction *model.Transaction) {
if transaction.TransactionID == "" {
transaction.TransactionID = model.GenerateUUIDWithSuffix("txn")
}
if transaction.Rate == 0 {
transaction.Rate = 1
}
}

// createQueueCopy creates a new copy of a transaction specifically for queueing.
Expand Down
Loading

0 comments on commit ebbabe9

Please sign in to comment.