Skip to content

Commit

Permalink
add timestamp to heartbeat event and include parent transaction in tr…
Browse files Browse the repository at this point in the history
…ansaction queries
  • Loading branch information
jerry-enebeli committed Jan 22, 2025
1 parent a48fd76 commit 36b1dc7
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 12 deletions.
1 change: 1 addition & 0 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func sendHeartbeat(client posthog.Client, heartbeatID string) {
if err := client.Enqueue(posthog.Capture{
DistinctId: heartbeatID,
Event: "server_heartbeat",
Timestamp: time.Now().UTC(),
Properties: map[string]interface{}{
"timestamp": time.Now().UTC(),
},
Expand Down
4 changes: 2 additions & 2 deletions database/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,15 @@ func (d Datasource) GetTransactionByRef(ctx context.Context, reference string) (

// Query the transaction by reference
row := d.Conn.QueryRowContext(ctx, `
SELECT transaction_id, source, reference, amount, precise_amount, currency, destination, description, status, created_at, meta_data
SELECT transaction_id, source, reference, amount, precise_amount, currency, destination, description, status, created_at, meta_data, parent_transaction
FROM blnk.transactions
WHERE reference = $1
`, reference)

// Initialize the transaction object and scan the query result into it
txn := model.Transaction{}
var metaDataJSON []byte
err := row.Scan(&txn.TransactionID, &txn.Source, &txn.Reference, &txn.Amount, &txn.PreciseAmount, &txn.Currency, &txn.Destination, &txn.Description, &txn.Status, &txn.CreatedAt, &metaDataJSON)
err := row.Scan(&txn.TransactionID, &txn.Source, &txn.Reference, &txn.Amount, &txn.PreciseAmount, &txn.Currency, &txn.Destination, &txn.Description, &txn.Status, &txn.CreatedAt, &metaDataJSON, &txn.ParentTransaction)
if err != nil {
if err == sql.ErrNoRows {
span.RecordError(err)
Expand Down
12 changes: 3 additions & 9 deletions reconciliation.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,14 +926,12 @@ func (s *Blnk) processTransactions(ctx context.Context, uploadID string, process
func(ctx context.Context, txns <-chan *model.Transaction, results chan<- BatchJobResult, wg *sync.WaitGroup, _ float64) {
defer wg.Done()
for txn := range txns {
// Process each transaction.
if err := processor.process(ctx, txn); err != nil {
log.Printf("Error processing transaction %s: %v", txn.TransactionID, err)
results <- BatchJobResult{Error: err}
return
}
processedCount++
// Log progress every 10 transactions.
if processedCount%10 == 0 {
log.Printf("Processed %d transactions", processedCount)
}
Expand Down Expand Up @@ -962,11 +960,9 @@ func (s *Blnk) finalizeReconciliation(ctx context.Context, reconciliation model.

log.Printf("Finalizing reconciliation. Matches: %d, Unmatched: %d", matchCount, unmatchedCount)

// If not a dry run, perform post-reconciliation actions (e.g., indexing).
if !reconciliation.IsDryRun {
s.postReconciliationActions(ctx, reconciliation)
} else {
// Log the results of the dry run.
log.Printf("Dry run completed. Matches: %d, Unmatched: %d", matchCount, unmatchedCount)
}

Expand Down Expand Up @@ -1056,7 +1052,6 @@ func (s *Blnk) oneToManyReconciliation(ctx context.Context, externalTxns []*mode
log.Printf("Error in one-to-many reconciliation: %v", err)
}

// Close channels after processing.
go func() {
wg.Wait()
close(matchChan)
Expand Down Expand Up @@ -1310,7 +1305,6 @@ func (s *Blnk) findMatchingInternalTransaction(ctx context.Context, externalTxn

matchFound := false

// Process transactions in batches, applying the matching rules.
_, err = s.ProcessTransactionInBatches(
ctx,
externalTxn.TransactionID,
Expand Down Expand Up @@ -1441,10 +1435,10 @@ func (s *Blnk) matchesGroup(externalTxn *model.Transaction, group []*model.Trans
func (s *Blnk) dominantCurrency(currencies map[string]bool) string {
if len(currencies) == 1 {
for currency := range currencies {
return currency // Return the only currency if there's exactly one.
return currency
}
}
return "MIXED" // Return "MIXED" if there are multiple currencies.
return "MIXED"
}

// CreateMatchingRule creates a new matching rule after validating it.
Expand Down Expand Up @@ -1713,7 +1707,7 @@ func (s *Blnk) partialMatch(str1, str2 string, allowableDrift float64) bool {
// Returns true if the currencies match, otherwise false.
func (s *Blnk) matchesCurrency(externalValue, internalValue string, criteria model.MatchingCriteria) bool {
if internalValue == "MIXED" {
// Handle the special case where the internal value is "MIXED" (multiple currencies in a group).
// TODO: Handle the special case where the internal value is "MIXED" (multiple currencies in a group).
return true
}
return s.matchesString(externalValue, internalValue, criteria)
Expand Down
124 changes: 123 additions & 1 deletion transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,22 @@ package blnk
import (
"context"
"encoding/json"
"fmt"
"math/big"
"regexp"
"testing"
"time"

"github.com/jerry-enebeli/blnk/config"
"github.com/jerry-enebeli/blnk/database"
"github.com/jerry-enebeli/blnk/model"

"github.com/brianvoe/gofakeit/v6"

"github.com/DATA-DOG/go-sqlmock"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestRecordTransaction(t *testing.T) {
Expand Down Expand Up @@ -208,7 +211,6 @@ func TestRecordTransactionWithRate(t *testing.T) {
destinationBalanceRows := sqlmock.NewRows([]string{"balance_id", "indicator", "currency", "currency_multiplier", "ledger_id", "balance", "credit_balance", "debit_balance", "inflight_balance", "inflight_credit_balance", "inflight_debit_balance", "created_at", "version"}).
AddRow(destination, "", "NGN", 1, "ledger-id-destination", 0, 0, 0, 0, 0, 0, time.Now(), 0)

// Updated regex to be more flexible
balanceQuery := `SELECT balance_id, indicator, currency, currency_multiplier, ledger_id, balance, credit_balance, debit_balance, inflight_balance, inflight_credit_balance, inflight_debit_balance, created_at, version FROM blnk.balances WHERE balance_id = \$1`
balanceQueryPattern := regexp.MustCompile(`\s+`).ReplaceAllString(balanceQuery, `\s*`)

Expand Down Expand Up @@ -333,3 +335,123 @@ func TestVoidInflightTransaction_Negative(t *testing.T) {
t.Errorf("there were unfulfilled expectations: %s", err)
}
}

func TestQueueTransactionFlow(t *testing.T) {
// Skip in short mode as this is a long-running test
if testing.Short() {
t.Skip("Skipping queue flow test in short mode")
}

// Initialize test context
ctx := context.Background()

// Setup real test configuration
cnf := &config.Configuration{
Redis: config.RedisConfig{
Dns: "localhost:6379",
},
DataSource: config.DataSourceConfig{
Dns: "postgres://postgres:password@localhost:5432/blnk?sslmode=disable",
},
Queue: config.QueueConfig{
WebhookQueue: "webhook_queue_test",
IndexQueue: "index_queue_test",
TransactionQueue: "transaction_queue_test",
NumberOfQueues: 1,
},
Server: config.ServerConfig{
SecretKey: "test-secret",
},
Transaction: config.TransactionConfig{
BatchSize: 100,
MaxQueueSize: 1000,
LockDuration: time.Second * 30,
IndexQueuePrefix: "test_index",
},
}
config.ConfigStore.Store(cnf)

// Create real datasource for integration test
ds, err := database.NewDataSource(cnf)
require.NoError(t, err, "Failed to create datasource")

// Create real Blnk instance
blnk, err := NewBlnk(ds)
require.NoError(t, err, "Failed to create Blnk instance")

txnRef := "txn_" + model.GenerateUUIDWithSuffix("test")

// Create source balance
sourceBalance := &model.Balance{
Currency: "USD",
LedgerID: "general_ledger_id",
}

// Create destination balance
destBalance := &model.Balance{
Currency: "USD",
LedgerID: "general_ledger_id",
}

// Create balances in database
source, err := ds.CreateBalance(*sourceBalance)
require.NoError(t, err, "Failed to create source balance")

dest, err := ds.CreateBalance(*destBalance)
require.NoError(t, err, "Failed to create destination balance")

sourceID := source.BalanceID
destID := dest.BalanceID

// Create transaction
txn := &model.Transaction{
Reference: txnRef,
Source: sourceID,
Destination: destID,
Amount: 500, // $5.00
Currency: "USD",
AllowOverdraft: true,
Precision: 100,
MetaData: map[string]interface{}{"test": true},
}

// Queue the transaction
queuedTxn, err := blnk.QueueTransaction(ctx, txn)
require.NoError(t, err, "Failed to queue transaction")

// Verify initial transaction state
require.Equal(t, StatusQueued, queuedTxn.Status, "Initial transaction should be QUEUED")

// Store the original transaction ID
originalTxnID := queuedTxn.TransactionID

queueCopy := createQueueCopy(queuedTxn, queuedTxn.Reference)
blnk.RecordTransaction(ctx, queueCopy)

Check failure on line 429 in transaction_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `blnk.RecordTransaction` is not checked (errcheck)

ref := fmt.Sprintf("%s_%s", txnRef, "q")
// Verify the processed transaction
appliedTxn, err := ds.GetTransactionByRef(ctx, ref)
require.NoError(t, err, "Failed to get processed transactions")
require.NotEmpty(t, appliedTxn, "Should have processed transactions")

// Verify the applied transaction
require.NotNil(t, appliedTxn, "Should have an APPLIED transaction")
require.Equal(t, originalTxnID, appliedTxn.ParentTransaction, "Applied transaction should reference original transaction")
require.Equal(t, txn.Amount, appliedTxn.Amount, "Amount should match")

// Verify final balance states
updatedSource, err := ds.GetBalanceByIDLite(sourceID)
require.NoError(t, err, "Failed to get updated source balance")

updatedDest, err := ds.GetBalanceByIDLite(destID)
require.NoError(t, err, "Failed to get updated destination balance")

fmt.Println("Updated Source Balance: ", updatedSource.Balance)
fmt.Println("Updated Destination Balance: ", updatedDest.Balance)

require.Equal(t, 0, updatedSource.Balance.Cmp(updatedSource.Balance),
"Source balance should be reduced by transaction amount")
require.Equal(t, 0, updatedDest.Balance.Cmp(updatedDest.Balance),
"Destination balance should be increased by transaction amount")

}

0 comments on commit 36b1dc7

Please sign in to comment.