From 6ba73762b2ae321bcf8c7bbff4e297fb347373d0 Mon Sep 17 00:00:00 2001 From: jerry-enebeli Date: Fri, 17 Jan 2025 00:19:20 +0100 Subject: [PATCH 1/4] extend config to include transaction and queue configs --- cmd/workers.go | 32 ++++--- config/config.go | 193 ++++++++++++++++++++++++++++++++++------- model/events.go | 32 ------- queue.go | 72 ++++++++++----- queue_test.go | 36 +++++--- reconciliation.go | 40 +++++++-- reconciliation_test.go | 41 +++++++++ transaction.go | 29 +++++-- webhooks.go | 4 +- webhooks_test.go | 21 ++--- 10 files changed, 368 insertions(+), 132 deletions(-) delete mode 100644 model/events.go diff --git a/cmd/workers.go b/cmd/workers.go index 08220c8..67de327 100644 --- a/cmd/workers.go +++ b/cmd/workers.go @@ -137,13 +137,19 @@ func (b *blnkInstance) processInflightExpiry(cxt context.Context, t *asynq.Task) } func initializeQueues() map[string]int { + cfg, err := config.Fetch() + if err != nil { + log.Printf("Error fetching config, using defaults: %v", err) + return nil + } + queues := make(map[string]int) - queues[blnk.WEBHOOK_QUEUE] = 3 - queues[blnk.INDEX_QUEUE] = 1 - queues[blnk.EXPIREDINFLIGHT_QUEUE] = 3 + queues[cfg.Queue.WebhookQueue] = 3 + queues[cfg.Queue.IndexQueue] = 1 + queues[cfg.Queue.InflightExpiryQueue] = 3 - for i := 1; i <= blnk.NumberOfQueues; i++ { - queueName := fmt.Sprintf("%s_%d", blnk.TRANSACTION_QUEUE, i) + for i := 1; i <= cfg.Queue.NumberOfQueues; i++ { + queueName := fmt.Sprintf("%s_%d", cfg.Queue.TransactionQueue, i) queues[queueName] = 1 } return queues @@ -169,16 +175,22 @@ func initializeWorkerServer(conf *config.Configuration, queues map[string]int) ( } func initializeTaskHandlers(b *blnkInstance, mux *asynq.ServeMux) { + cfg, err := config.Fetch() + if err != nil { + log.Printf("Error fetching config, using defaults: %v", err) + return + } + // Register handlers for transaction queues - for i := 1; i <= blnk.NumberOfQueues; i++ { - queueName := fmt.Sprintf("%s_%d", blnk.TRANSACTION_QUEUE, i) + for i := 1; i <= cfg.Queue.NumberOfQueues; i++ { + queueName := fmt.Sprintf("%s_%d", cfg.Queue.TransactionQueue, i) mux.HandleFunc(queueName, b.processTransaction) } // Register handlers for other task types - mux.HandleFunc(blnk.INDEX_QUEUE, b.indexData) - mux.HandleFunc(blnk.WEBHOOK_QUEUE, blnk.ProcessWebhook) - mux.HandleFunc(blnk.EXPIREDINFLIGHT_QUEUE, b.processInflightExpiry) + mux.HandleFunc(cfg.Queue.IndexQueue, b.indexData) + mux.HandleFunc(cfg.Queue.WebhookQueue, blnk.ProcessWebhook) + mux.HandleFunc(cfg.Queue.InflightExpiryQueue, b.processInflightExpiry) } // workerCommands defines the "workers" command to start worker processes. diff --git a/config/config.go b/config/config.go index e5b4732..5fe295d 100644 --- a/config/config.go +++ b/config/config.go @@ -23,14 +23,44 @@ import ( "os" "strings" "sync/atomic" + "time" "github.com/kelseyhightower/envconfig" "github.com/sirupsen/logrus" ) +// Default constants const ( - DEFAULT_PORT = "5001" + DEFAULT_PORT = "5001" + DEFAULT_TYPESENSE_URL = "http://typesense:8108" + DEFAULT_CLEANUP_SEC = 10800 // 3 hours in seconds +) + +// Default values for different configurations +var ( + defaultTransaction = TransactionConfig{ + BatchSize: 100000, + MaxQueueSize: 1000, + MaxWorkers: 10, + LockDuration: 30 * time.Minute, + IndexQueuePrefix: "transactions", + } + + defaultReconciliation = ReconciliationConfig{ + DefaultStrategy: "one_to_one", + ProgressInterval: 100, + MaxRetries: 3, + RetryDelay: 5 * time.Second, + } + + defaultQueue = QueueConfig{ + TransactionQueue: "new:transaction", + WebhookQueue: "new:webhook", + IndexQueue: "new:index", + InflightExpiryQueue: "new:inflight-expiry", + NumberOfQueues: 20, + } ) var ConfigStore atomic.Value @@ -78,12 +108,37 @@ type SlackWebhook struct { WebhookUrl string `json:"webhook_url"` } +type WebhookConfig struct { + Url string `json:"url"` + Headers map[string]string `json:"headers"` +} + type Notification struct { - Slack SlackWebhook `json:"slack"` - Webhook struct { - Url string `json:"url"` - Headers map[string]string `json:"headers"` - } `json:"webhook"` + Slack SlackWebhook `json:"slack"` + Webhook WebhookConfig `json:"webhook"` +} + +type TransactionConfig struct { + BatchSize int `json:"batch_size" envconfig:"BLNK_TRANSACTION_BATCH_SIZE"` + MaxQueueSize int `json:"max_queue_size" envconfig:"BLNK_TRANSACTION_MAX_QUEUE_SIZE"` + MaxWorkers int `json:"max_workers" envconfig:"BLNK_TRANSACTION_MAX_WORKERS"` + LockDuration time.Duration `json:"lock_duration" envconfig:"BLNK_TRANSACTION_LOCK_DURATION"` + IndexQueuePrefix string `json:"index_queue_prefix" envconfig:"BLNK_TRANSACTION_INDEX_QUEUE_PREFIX"` +} + +type ReconciliationConfig struct { + DefaultStrategy string `json:"default_strategy" envconfig:"BLNK_RECONCILIATION_DEFAULT_STRATEGY"` + ProgressInterval int `json:"progress_interval" envconfig:"BLNK_RECONCILIATION_PROGRESS_INTERVAL"` + MaxRetries int `json:"max_retries" envconfig:"BLNK_RECONCILIATION_MAX_RETRIES"` + RetryDelay time.Duration `json:"retry_delay" envconfig:"BLNK_RECONCILIATION_RETRY_DELAY"` +} + +type QueueConfig struct { + TransactionQueue string `json:"transaction_queue" envconfig:"BLNK_QUEUE_TRANSACTION"` + WebhookQueue string `json:"webhook_queue" envconfig:"BLNK_QUEUE_WEBHOOK"` + IndexQueue string `json:"index_queue" envconfig:"BLNK_QUEUE_INDEX"` + InflightExpiryQueue string `json:"inflight_expiry_queue" envconfig:"BLNK_QUEUE_INFLIGHT_EXPIRY"` + NumberOfQueues int `json:"number_of_queues" envconfig:"BLNK_QUEUE_NUMBER_OF_QUEUES"` } type Configuration struct { @@ -103,6 +158,9 @@ type Configuration struct { Notification Notification `json:"notification"` RateLimit RateLimitConfig `json:"rate_limit"` EnableTelemetry bool `json:"enable_telemetry" envconfig:"BLNK_ENABLE_TELEMETRY"` + Transaction TransactionConfig `json:"transaction"` + Reconciliation ReconciliationConfig `json:"reconciliation"` + Queue QueueConfig `json:"queue"` } func loadConfigFromFile(file string) error { @@ -152,38 +210,118 @@ func Fetch() (*Configuration, error) { } func (cnf *Configuration) validateAndAddDefaults() error { - if cnf.ProjectName == "" { - log.Println("Warning: Project name is empty. Setting a default name.") - cnf.ProjectName = "Blnk Server" + if err := cnf.validateRequiredFields(); err != nil { + return err } - if cnf.TypeSense.Dns == "" { - cnf.TypeSense.Dns = "http://typesense:8108" - } + cnf.setDefaultValues() + cnf.trimWhitespace() + cnf.setupRateLimiting() + return nil +} + +func (cnf *Configuration) validateRequiredFields() error { if cnf.DataSource.Dns == "" { - log.Println("Error: Data source DNS is empty. It's a required field.") return errors.New("data source DNS is required") } if cnf.Redis.Dns == "" { - log.Println("Error: Redis DNS is empty. It's a required field.") return errors.New("redis DNS is required") } - // Trim white spaces from fields - cnf.ProjectName = strings.TrimSpace(cnf.ProjectName) - cnf.Server.Port = strings.TrimSpace(cnf.Server.Port) - cnf.DataSource.Dns = strings.TrimSpace(cnf.DataSource.Dns) - cnf.Redis.Dns = strings.TrimSpace(cnf.Redis.Dns) + return nil +} - // Set default value for Port if it's empty +func (cnf *Configuration) setDefaultValues() { + // Project defaults + if cnf.ProjectName == "" { + cnf.ProjectName = "Blnk Server" + log.Println("Warning: Project name is empty. Setting a default name.") + } + + // Server defaults if cnf.Server.Port == "" { cnf.Server.Port = DEFAULT_PORT log.Printf("Warning: Port not specified in config. Setting default port: %s", DEFAULT_PORT) } - // Rate limiting is disabled by default (when both RPS and Burst are nil) + // TypeSense defaults + if cnf.TypeSense.Dns == "" { + cnf.TypeSense.Dns = DEFAULT_TYPESENSE_URL + } + + // Set module defaults + cnf.setTransactionDefaults() + cnf.setReconciliationDefaults() + cnf.setQueueDefaults() + + // Enable telemetry by default + if !cnf.EnableTelemetry { + cnf.EnableTelemetry = true + log.Println("Warning: Telemetry setting not specified. Enabling by default.") + } +} + +func (cnf *Configuration) setTransactionDefaults() { + if cnf.Transaction.BatchSize == 0 { + cnf.Transaction.BatchSize = defaultTransaction.BatchSize + } + if cnf.Transaction.MaxQueueSize == 0 { + cnf.Transaction.MaxQueueSize = defaultTransaction.MaxQueueSize + } + if cnf.Transaction.MaxWorkers == 0 { + cnf.Transaction.MaxWorkers = defaultTransaction.MaxWorkers + } + if cnf.Transaction.LockDuration == 0 { + cnf.Transaction.LockDuration = defaultTransaction.LockDuration + } + if cnf.Transaction.IndexQueuePrefix == "" { + cnf.Transaction.IndexQueuePrefix = defaultTransaction.IndexQueuePrefix + } +} + +func (cnf *Configuration) setReconciliationDefaults() { + if cnf.Reconciliation.DefaultStrategy == "" { + cnf.Reconciliation.DefaultStrategy = defaultReconciliation.DefaultStrategy + } + if cnf.Reconciliation.ProgressInterval == 0 { + cnf.Reconciliation.ProgressInterval = defaultReconciliation.ProgressInterval + } + if cnf.Reconciliation.MaxRetries == 0 { + cnf.Reconciliation.MaxRetries = defaultReconciliation.MaxRetries + } + if cnf.Reconciliation.RetryDelay == 0 { + cnf.Reconciliation.RetryDelay = defaultReconciliation.RetryDelay + } +} + +func (cnf *Configuration) setQueueDefaults() { + if cnf.Queue.TransactionQueue == "" { + cnf.Queue.TransactionQueue = defaultQueue.TransactionQueue + } + if cnf.Queue.WebhookQueue == "" { + cnf.Queue.WebhookQueue = defaultQueue.WebhookQueue + } + if cnf.Queue.IndexQueue == "" { + cnf.Queue.IndexQueue = defaultQueue.IndexQueue + } + if cnf.Queue.InflightExpiryQueue == "" { + cnf.Queue.InflightExpiryQueue = defaultQueue.InflightExpiryQueue + } + if cnf.Queue.NumberOfQueues == 0 { + cnf.Queue.NumberOfQueues = defaultQueue.NumberOfQueues + } +} + +func (cnf *Configuration) trimWhitespace() { + cnf.ProjectName = strings.TrimSpace(cnf.ProjectName) + cnf.Server.Port = strings.TrimSpace(cnf.Server.Port) + cnf.DataSource.Dns = strings.TrimSpace(cnf.DataSource.Dns) + cnf.Redis.Dns = strings.TrimSpace(cnf.Redis.Dns) +} + +func (cnf *Configuration) setupRateLimiting() { if cnf.RateLimit.RequestsPerSecond != nil && cnf.RateLimit.Burst == nil { defaultBurst := 2 * int(*cnf.RateLimit.RequestsPerSecond) cnf.RateLimit.Burst = &defaultBurst @@ -194,25 +332,16 @@ func (cnf *Configuration) validateAndAddDefaults() error { cnf.RateLimit.RequestsPerSecond = &defaultRPS log.Printf("Warning: Rate limit RPS not specified. Setting default value: %.2f", defaultRPS) } - - // Set default cleanup interval if not specified if cnf.RateLimit.CleanupIntervalSec == nil { - defaultCleanup := 10800 // 3 hours in seconds + defaultCleanup := DEFAULT_CLEANUP_SEC cnf.RateLimit.CleanupIntervalSec = &defaultCleanup log.Printf("Warning: Rate limit cleanup interval not specified. Setting default value: %d seconds", defaultCleanup) } - - // Set default value for EnableTelemetry - if !cnf.EnableTelemetry { - cnf.EnableTelemetry = true - log.Println("Warning: Telemetry setting not specified. Enabling by default.") - } - - return nil } // MockConfig sets a mock configuration for testing purposes. func MockConfig(mockConfig *Configuration) { + mockConfig.validateAndAddDefaults() ConfigStore.Store(mockConfig) } diff --git a/model/events.go b/model/events.go deleted file mode 100644 index 3481a19..0000000 --- a/model/events.go +++ /dev/null @@ -1,32 +0,0 @@ -/* -Copyright 2024 Blnk Finance Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package model - -import "time" - -type EventMapper struct { - MapperID string `json:"mapper_id"` - Name string `json:"name"` - CreatedAt time.Time `json:"created_at"` - MappingInstruction map[string]string `json:"mapping_instruction"` -} - -type Event struct { - MapperID string `json:"mapper_id"` - Drcr string `json:"drcr"` - BalanceID string `json:"balance_id"` - Data map[string]interface{} `json:"data"` -} diff --git a/queue.go b/queue.go index f10ef09..6dcb997 100644 --- a/queue.go +++ b/queue.go @@ -31,14 +31,6 @@ import ( "github.com/jerry-enebeli/blnk/model" ) -const ( - TRANSACTION_QUEUE = "new:transaction" - WEBHOOK_QUEUE = "new:webhoook" - INDEX_QUEUE = "new:index" - EXPIREDINFLIGHT_QUEUE = "new:inflight-expiry" - NumberOfQueues = 20 -) - // Queue represents a queue for handling various tasks. type Queue struct { Client *asynq.Client @@ -80,12 +72,21 @@ func NewQueue(conf *config.Configuration) *Queue { // Returns: // - error: An error if the task could not be enqueued. func (q *Queue) queueInflightExpiry(transactionID string, expiresAt time.Time) error { + cfg, err := config.Fetch() + if err != nil { + return err + } + IPayload, err := json.Marshal(transactionID) if err != nil { return err } - taskOptions := []asynq.Option{asynq.TaskID(transactionID), asynq.Queue(EXPIREDINFLIGHT_QUEUE), asynq.ProcessIn(time.Until(expiresAt))} - task := asynq.NewTask(EXPIREDINFLIGHT_QUEUE, IPayload, taskOptions...) + taskOptions := []asynq.Option{ + asynq.TaskID(transactionID), + asynq.Queue(cfg.Queue.InflightExpiryQueue), + asynq.ProcessIn(time.Until(expiresAt)), + } + task := asynq.NewTask(cfg.Queue.InflightExpiryQueue, IPayload, taskOptions...) info, err := q.Client.Enqueue(task) if err != nil { log.Println(err, info) @@ -105,6 +106,11 @@ func (q *Queue) queueInflightExpiry(transactionID string, expiresAt time.Time) e // Returns: // - error: An error if the task could not be enqueued. func (q *Queue) queueIndexData(id string, collection string, data interface{}) error { + cfg, err := config.Fetch() + if err != nil { + return err + } + payload := map[string]interface{}{ "collection": collection, "payload": data, @@ -115,8 +121,8 @@ func (q *Queue) queueIndexData(id string, collection string, data interface{}) e return err } - taskOptions := []asynq.Option{asynq.Queue(INDEX_QUEUE)} - task := asynq.NewTask(INDEX_QUEUE, IPayload, taskOptions...) + taskOptions := []asynq.Option{asynq.Queue(cfg.Queue.IndexQueue)} + task := asynq.NewTask(cfg.Queue.IndexQueue, IPayload, taskOptions...) info, err := q.Client.Enqueue(task) if err != nil { log.Println(err, info) @@ -168,20 +174,39 @@ func (q *Queue) Enqueue(ctx context.Context, transaction *model.Transaction) err // Returns: // - *asynq.Task: The generated task ready to be enqueued. func (q *Queue) geTask(transaction *model.Transaction, payload []byte) *asynq.Task { - // Hash the balance ID and use modulo to select a queue - queueIndex := hashBalanceID(transaction.Source) % NumberOfQueues - // Queue names are 1-based, so we add 1 to the index - queueName := fmt.Sprintf("%s_%d", TRANSACTION_QUEUE, queueIndex+1) + cnf, err := config.Fetch() + if err != nil { + log.Printf("Error fetching config: %v", err) + // Use default values if config fetch fails + return q.geTaskWithDefaults(transaction, payload) + } + log.Println("here", cnf.Queue) + queueIndex := hashBalanceID(transaction.Source) % cnf.Queue.NumberOfQueues + queueName := fmt.Sprintf("%s_%d", cnf.Queue.TransactionQueue, queueIndex+1) - // Initialize task options with the task ID and the selected queue name taskOptions := []asynq.Option{asynq.TaskID(transaction.TransactionID), asynq.Queue(queueName)} + if !transaction.ScheduledFor.IsZero() { + taskOptions = append(taskOptions, asynq.ProcessIn(time.Until(transaction.ScheduledFor))) + } - // If the transaction is scheduled for a future time, add a processing delay + return asynq.NewTask(queueName, payload, taskOptions...) +} + +// Fallback function for when config fetch fails +func (q *Queue) geTaskWithDefaults(transaction *model.Transaction, payload []byte) *asynq.Task { + conf, err := config.Fetch() + if err != nil { + log.Printf("Error fetching config: %v", err) + return nil + } + queueIndex := hashBalanceID(transaction.Source) % conf.Queue.NumberOfQueues + queueName := fmt.Sprintf("new:transaction_%d", queueIndex+1) // Default prefix + + taskOptions := []asynq.Option{asynq.TaskID(transaction.TransactionID), asynq.Queue(queueName)} if !transaction.ScheduledFor.IsZero() { taskOptions = append(taskOptions, asynq.ProcessIn(time.Until(transaction.ScheduledFor))) } - // Create and return the new task with the specified options return asynq.NewTask(queueName, payload, taskOptions...) } @@ -207,9 +232,14 @@ func hashBalanceID(balanceID string) int { // - *model.Transaction: A pointer to the Transaction model if found. // - error: An error if the transaction could not be retrieved. func (q *Queue) GetTransactionFromQueue(transactionID string) (*model.Transaction, error) { + cfg, err := config.Fetch() + if err != nil { + return nil, err + } + // Iterate over all specific transaction queues - for i := 1; i <= NumberOfQueues; i++ { - queueName := fmt.Sprintf("%s_%d", TRANSACTION_QUEUE, i) + for i := 1; i <= cfg.Queue.NumberOfQueues; i++ { + queueName := fmt.Sprintf("%s_%d", cfg.Queue.TransactionQueue, i) task, err := q.Inspector.GetTaskInfo(queueName, transactionID) if err == nil && task != nil { var txn model.Transaction diff --git a/queue_test.go b/queue_test.go index 16021b6..8016ebf 100644 --- a/queue_test.go +++ b/queue_test.go @@ -29,6 +29,17 @@ import ( ) func TestEnqueueImmediateTransactionSuccess(t *testing.T) { + cnf := &config.Configuration{ + Redis: config.RedisConfig{ + Dns: "localhost:6379", + }, + Queue: config.QueueConfig{ + WebhookQueue: "webhook_queue", + NumberOfQueues: 1, + }, + } + config.MockConfig(cnf) + redisOption, err := redis_db.ParseRedisURL("localhost:6379") if err != nil { log.Fatalf("Error parsing Redis URL: %v", err) @@ -37,11 +48,7 @@ func TestEnqueueImmediateTransactionSuccess(t *testing.T) { client := asynq.NewClient(queueOptions) inspector := asynq.NewInspector(queueOptions) - q := NewQueue(&config.Configuration{ - Redis: config.RedisConfig{ - Dns: "localhost:6379", - }, - }) + q := NewQueue(cnf) q.Client = client q.Inspector = inspector @@ -52,7 +59,7 @@ func TestEnqueueImmediateTransactionSuccess(t *testing.T) { err = q.Enqueue(context.Background(), &transaction) assert.NoError(t, err) - task, err := inspector.GetTaskInfo(WEBHOOK_QUEUE, transaction.TransactionID) + task, err := inspector.GetTaskInfo(cnf.Queue.WebhookQueue, transaction.TransactionID) if err != nil { return } @@ -61,6 +68,10 @@ func TestEnqueueImmediateTransactionSuccess(t *testing.T) { } func TestEnqueueScheduledTransaction(t *testing.T) { + conf, err := config.Fetch() + if err != nil { + assert.NoError(t, err) + } client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"}) inspector := asynq.NewInspector(asynq.RedisClientOpt{Addr: "localhost:6379"}) @@ -75,13 +86,13 @@ func TestEnqueueScheduledTransaction(t *testing.T) { transaction := getTransactionMock(100, false) - _, err := json.Marshal(transaction) + _, err = json.Marshal(transaction) assert.NoError(t, err) err = q.Enqueue(context.Background(), &transaction) assert.NoError(t, err) - task, err := inspector.GetTaskInfo(WEBHOOK_QUEUE, transaction.TransactionID) + task, err := inspector.GetTaskInfo(conf.Queue.WebhookQueue, transaction.TransactionID) if err != nil { return } @@ -89,6 +100,11 @@ func TestEnqueueScheduledTransaction(t *testing.T) { } func TestEnqueueWithAsynqClientEnqueueError(t *testing.T) { + conf, err := config.Fetch() + if err != nil { + assert.NoError(t, err) + } + client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"}) inspector := asynq.NewInspector(asynq.RedisClientOpt{Addr: "localhost:6379"}) @@ -102,13 +118,13 @@ func TestEnqueueWithAsynqClientEnqueueError(t *testing.T) { transaction := getTransactionMock(100, false) - _, err := json.Marshal(transaction) + _, err = json.Marshal(transaction) assert.NoError(t, err) err = q.Enqueue(context.Background(), &transaction) assert.NoError(t, err) - task, err := inspector.GetTaskInfo(WEBHOOK_QUEUE, "tx_1235") + task, err := inspector.GetTaskInfo(conf.Queue.WebhookQueue, "tx_1235") if err != nil { return } diff --git a/reconciliation.go b/reconciliation.go index 4fec977..d794222 100644 --- a/reconciliation.go +++ b/reconciliation.go @@ -36,6 +36,7 @@ import ( "sync" "time" + "github.com/jerry-enebeli/blnk/config" "github.com/jerry-enebeli/blnk/database" "github.com/jerry-enebeli/blnk/internal/notification" "github.com/jerry-enebeli/blnk/model" @@ -826,12 +827,16 @@ func (s *Blnk) createReconciler(strategy string, groupCriteria string, matchingR // Returns: // - *transactionProcessor: The created transaction processor. func (s *Blnk) createTransactionProcessor(reconciliation model.Reconciliation, progress model.ReconciliationProgress, reconciler func(ctx context.Context, txns []*model.Transaction) ([]model.Match, []string)) *transactionProcessor { + conf, err := config.Fetch() + if err != nil { + log.Printf("Error fetching configuration: %v", err) + } return &transactionProcessor{ reconciliation: reconciliation, progress: progress, reconciler: reconciler, datasource: s.datasource, - progressSaveCount: 100, // Save progress every 100 transactions. + progressSaveCount: conf.Reconciliation.ProgressInterval, } } @@ -897,6 +902,10 @@ func (tp *transactionProcessor) getResults() (int, int) { // Returns: // - error: If any error occurs during processing. func (s *Blnk) processTransactions(ctx context.Context, uploadID string, processor *transactionProcessor, strategy string) error { + conf, err := config.Fetch() + if err != nil { + return err + } processedCount := 0 var transactionProcessor getTxns // Use different transaction retrieval methods depending on the strategy. @@ -907,11 +916,11 @@ func (s *Blnk) processTransactions(ctx context.Context, uploadID string, process } // Process the transactions in batches. - _, err := s.ProcessTransactionInBatches( + _, err = s.ProcessTransactionInBatches( ctx, uploadID, - 0, // Offset for pagination. - 10, // Batch size. + 0, + conf.Transaction.MaxWorkers, false, // Stream mode is disabled. transactionProcessor, func(ctx context.Context, txns <-chan *model.Transaction, results chan<- BatchJobResult, wg *sync.WaitGroup, _ float64) { @@ -1029,6 +1038,11 @@ func (s *Blnk) oneToOneReconciliation(ctx context.Context, externalTxns []*model // - []model.Match: A list of matched transactions. // - []string: A list of unmatched transaction IDs. func (s *Blnk) oneToManyReconciliation(ctx context.Context, externalTxns []*model.Transaction, groupCriteria string, matchingRules []model.MatchingRule, isExternalGrouped bool) ([]model.Match, []string) { + conf, err := config.Fetch() + if err != nil { + log.Printf("Error fetching configuration: %v", err) + } + var matches []model.Match var unmatched []string @@ -1037,7 +1051,7 @@ func (s *Blnk) oneToManyReconciliation(ctx context.Context, externalTxns []*mode var wg sync.WaitGroup // Initiate the one-to-many reconciliation process. - err := s.oneToMany(ctx, externalTxns, matchingRules, isExternalGrouped, &wg, groupCriteria, 100000, matchChan, unmatchedChan) + err = s.oneToMany(ctx, externalTxns, matchingRules, isExternalGrouped, &wg, groupCriteria, conf.Transaction.BatchSize, matchChan, unmatchedChan) if err != nil { log.Printf("Error in one-to-many reconciliation: %v", err) } @@ -1066,6 +1080,11 @@ func (s *Blnk) oneToManyReconciliation(ctx context.Context, externalTxns []*mode // - []model.Match: A list of matched transactions. // - []string: A list of unmatched transaction IDs. func (s *Blnk) manyToOneReconciliation(ctx context.Context, internalTxns []*model.Transaction, groupCriteria string, matchingRules []model.MatchingRule, isExternalGrouped bool) ([]model.Match, []string) { + conf, err := config.Fetch() + if err != nil { + log.Printf("Error fetching configuration: %v", err) + } + var matches []model.Match var unmatched []string @@ -1074,7 +1093,7 @@ func (s *Blnk) manyToOneReconciliation(ctx context.Context, internalTxns []*mode var wg sync.WaitGroup // Initiate the many-to-one reconciliation process. - err := s.manyToOne(ctx, internalTxns, matchingRules, isExternalGrouped, &wg, groupCriteria, 100000, matchChan, unmatchedChan) + err = s.manyToOne(ctx, internalTxns, matchingRules, isExternalGrouped, &wg, groupCriteria, conf.Transaction.BatchSize, matchChan, unmatchedChan) if err != nil { log.Printf("Error in many-to-one reconciliation: %v", err) } @@ -1284,14 +1303,19 @@ func (s *Blnk) groupInternalTransactions(ctx context.Context, groupingCriteria s // Returns: // - error: If any error occurs during processing. func (s *Blnk) findMatchingInternalTransaction(ctx context.Context, externalTxn *model.Transaction, matchingRules []model.MatchingRule, matchChan chan model.Match, unMatchChan chan string) error { + conf, err := config.Fetch() + if err != nil { + return err + } + matchFound := false // Process transactions in batches, applying the matching rules. - _, err := s.ProcessTransactionInBatches( + _, err = s.ProcessTransactionInBatches( ctx, externalTxn.TransactionID, externalTxn.Amount, - 10, // Batch size + conf.Transaction.MaxWorkers, false, // Stream mode s.getInternalTransactionsPaginated, func(ctx context.Context, jobs <-chan *model.Transaction, results chan<- BatchJobResult, wg *sync.WaitGroup, amount float64) { diff --git a/reconciliation_test.go b/reconciliation_test.go index e9309db..c34c21f 100644 --- a/reconciliation_test.go +++ b/reconciliation_test.go @@ -23,11 +23,19 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/jerry-enebeli/blnk/config" "github.com/jerry-enebeli/blnk/database/mocks" "github.com/jerry-enebeli/blnk/model" ) func TestOneToOneReconciliation(t *testing.T) { + cnf := &config.Configuration{ + Transaction: config.TransactionConfig{ + BatchSize: 100000, + MaxWorkers: 1, + }, + } + config.MockConfig(cnf) mockDS := new(mocks.MockDataSource) blnk := &Blnk{datasource: mockDS} @@ -72,6 +80,12 @@ func TestOneToOneReconciliation(t *testing.T) { // TestOneToManyReconciliation tests the one-to-many reconciliation strategy func TestOneToManyReconciliation(t *testing.T) { + cnf := &config.Configuration{ + Transaction: config.TransactionConfig{ + BatchSize: 100000, + }, + } + config.MockConfig(cnf) mockDS := new(mocks.MockDataSource) blnk := &Blnk{datasource: mockDS} @@ -121,6 +135,19 @@ func TestOneToManyReconciliation(t *testing.T) { } func TestOneToManyReconciliationNoMatches(t *testing.T) { + cnf := &config.Configuration{ + Redis: config.RedisConfig{ + Dns: "localhost:6379", + }, + Transaction: config.TransactionConfig{ + BatchSize: 100000, + }, + Queue: config.QueueConfig{ + WebhookQueue: "webhook_queue", + NumberOfQueues: 1, + }, + } + config.MockConfig(cnf) mockDS := new(mocks.MockDataSource) blnk := &Blnk{datasource: mockDS} @@ -343,6 +370,20 @@ func TestMatchingRules(t *testing.T) { // TestReconciliationEdgeCases tests edge cases in the reconciliation process func TestReconciliationEdgeCases(t *testing.T) { + cnf := &config.Configuration{ + Redis: config.RedisConfig{ + Dns: "localhost:6379", + }, + Transaction: config.TransactionConfig{ + BatchSize: 100000, + }, + Queue: config.QueueConfig{ + WebhookQueue: "webhook_queue", + NumberOfQueues: 1, + }, + } + config.MockConfig(cnf) + mockDS := new(mocks.MockDataSource) blnk := &Blnk{datasource: mockDS} diff --git a/transaction.go b/transaction.go index a073dea..feeb908 100644 --- a/transaction.go +++ b/transaction.go @@ -26,6 +26,7 @@ import ( "sync" "time" + "github.com/jerry-enebeli/blnk/config" redlock "github.com/jerry-enebeli/blnk/internal/lock" "github.com/jerry-enebeli/blnk/internal/notification" "go.opentelemetry.io/otel/attribute" @@ -159,8 +160,13 @@ func (l *Blnk) acquireLock(ctx context.Context, transaction *model.Transaction) ctx, span := tracer.Start(ctx, "Acquiring Lock") defer span.End() + config, err := config.Fetch() + if err != nil { + return nil, err + } + locker := redlock.NewLocker(l.redis, transaction.Source, model.GenerateUUIDWithSuffix("loc")) - err := locker.Lock(ctx, time.Minute*30) + err = locker.Lock(ctx, config.Transaction.LockDuration) if err != nil { span.RecordError(err) return nil, err @@ -240,8 +246,14 @@ func (l *Blnk) postTransactionActions(ctx context.Context, transaction *model.Tr _, span := tracer.Start(ctx, "Post Transaction Actions") defer span.End() + config, err := config.Fetch() + if err != nil { + span.RecordError(err) + return + } + go func() { - err := l.queue.queueIndexData(transaction.TransactionID, "transactions", transaction) + err := l.queue.queueIndexData(transaction.TransactionID, config.Transaction.IndexQueuePrefix, transaction) if err != nil { span.RecordError(err) notification.NotifyError(err) @@ -460,11 +472,14 @@ func (l *Blnk) ProcessTransactionInBatches(ctx context.Context, parentTransactio ctx, span := tracer.Start(ctx, "ProcessTransactionInBatches") defer span.End() - // Constants for batch and queue sizes - const ( - batchSize = 100000 - maxQueueSize = 1000 - ) + config, err := config.Fetch() + if err != nil { + return nil, err + } + + // Use configuration values + batchSize := config.Transaction.BatchSize + maxQueueSize := config.Transaction.MaxQueueSize // Slice to collect all processed transactions and errors var allTxns []*model.Transaction diff --git a/webhooks.go b/webhooks.go index a012cdf..1107403 100644 --- a/webhooks.go +++ b/webhooks.go @@ -159,8 +159,8 @@ func SendWebhook(newWebhook NewWebhook) error { if err != nil { return err } - taskOptions := []asynq.Option{asynq.Queue(WEBHOOK_QUEUE)} - task := asynq.NewTask(WEBHOOK_QUEUE, payload, taskOptions...) + taskOptions := []asynq.Option{asynq.Queue(conf.Queue.WebhookQueue)} + task := asynq.NewTask(conf.Queue.WebhookQueue, payload, taskOptions...) info, err := client.Enqueue(task) if err != nil { log.Println(err, info) diff --git a/webhooks_test.go b/webhooks_test.go index dcc4a69..d575e30 100644 --- a/webhooks_test.go +++ b/webhooks_test.go @@ -42,20 +42,22 @@ func TestSendWebhook(t *testing.T) { } defer mr.Close() - mockConfig := &config.Configuration{ + cnf := &config.Configuration{ Redis: config.RedisConfig{ Dns: mr.Addr(), }, - Notification: config.Notification{Webhook: struct { - Url string `json:"url"` - Headers map[string]string `json:"headers"` - }(struct { - Url string - Headers map[string]string - }{Url: "https:localhost:5001/webhook", Headers: nil})}, + Queue: config.QueueConfig{ + WebhookQueue: "webhook_queue", + NumberOfQueues: 1, + }, + Notification: config.Notification{ + Webhook: config.WebhookConfig{ + Url: "http://localhost:8080", + }, + }, } + config.MockConfig(cnf) - config.ConfigStore.Store(mockConfig) testData := NewWebhook{ Event: "transaction.queued", Payload: getTransactionMock(10000, false), @@ -67,7 +69,6 @@ func TestSendWebhook(t *testing.T) { // Verify that the task was enqueued assert.NoError(t, err) tasks := mr.Keys() - t.Log(tasks) assert.NoError(t, err) assert.NotEmpty(t, tasks) From 90614795eab5ccbc6121d6bd6aca1276ed20ebdc Mon Sep 17 00:00:00 2001 From: jerry-enebeli Date: Fri, 17 Jan 2025 00:21:04 +0100 Subject: [PATCH 2/4] remove debug log statement from geTask function in queue.go --- queue.go | 1 - 1 file changed, 1 deletion(-) diff --git a/queue.go b/queue.go index 6dcb997..32f2a13 100644 --- a/queue.go +++ b/queue.go @@ -180,7 +180,6 @@ func (q *Queue) geTask(transaction *model.Transaction, payload []byte) *asynq.Ta // Use default values if config fetch fails return q.geTaskWithDefaults(transaction, payload) } - log.Println("here", cnf.Queue) queueIndex := hashBalanceID(transaction.Source) % cnf.Queue.NumberOfQueues queueName := fmt.Sprintf("%s_%d", cnf.Queue.TransactionQueue, queueIndex+1) From 90db126e5325484f9eb19db8b74cdd556c72953b Mon Sep 17 00:00:00 2001 From: jerry-enebeli Date: Fri, 17 Jan 2025 00:23:38 +0100 Subject: [PATCH 3/4] add error logging for mock configuration validation in config.go --- config/config.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/config/config.go b/config/config.go index 5fe295d..7e9d507 100644 --- a/config/config.go +++ b/config/config.go @@ -341,7 +341,11 @@ func (cnf *Configuration) setupRateLimiting() { // MockConfig sets a mock configuration for testing purposes. func MockConfig(mockConfig *Configuration) { - mockConfig.validateAndAddDefaults() + err := mockConfig.validateAndAddDefaults() + if err != nil { + log.Printf("Error setting mock config: %v", err) + return + } ConfigStore.Store(mockConfig) } From 95d49673d1273a74b3d67ca0a4a672fb9590b028 Mon Sep 17 00:00:00 2001 From: jerry-enebeli Date: Fri, 17 Jan 2025 13:40:23 +0100 Subject: [PATCH 4/4] refactor configuration handling in tests to use ConfigStore instead of MockConfig --- account_test.go | 37 ++++++++++++++++++++++++++++++++++-- balance_test.go | 1 + cmd/main.go | 1 + cmd/server.go | 2 +- database/db_test.go | 2 +- internal/cache/cache_test.go | 18 +++++++++++++++++- ledger_test.go | 18 +++++++++++++++++- queue_test.go | 29 +++++++++++++++++++--------- reconciliation_test.go | 24 +++++++++++++---------- transaction_test.go | 35 ++++++++++++++++++++++++++++++++++ webhooks_test.go | 2 +- 11 files changed, 143 insertions(+), 26 deletions(-) diff --git a/account_test.go b/account_test.go index 007240d..00a9501 100644 --- a/account_test.go +++ b/account_test.go @@ -78,6 +78,7 @@ func TestCreateAccount(t *testing.T) { } func TestCreateAccountWithExternalGenerator(t *testing.T) { + // Initialize the mock HTTP responder httpmock.Activate() defer httpmock.DeactivateAndReset() @@ -92,8 +93,6 @@ func TestCreateAccountWithExternalGenerator(t *testing.T) { httpmock.RegisterResponder("GET", "http://example.com/generateAccount", httpmock.NewStringResponder(200, `{"account_number": "123456789", "bank_name": "Blnk Bank"}`)) - config.MockConfig(&config.Configuration{Server: config.ServerConfig{SecretKey: "some-secret"}, AccountNumberGeneration: config.AccountNumberGenerationConfig{HttpService: config.AccountGenerationHttpService{Url: "http://example.com/generateAccount"}}}) - account := model.Account{ Name: "Test Account", BankName: "Blnk Bank", @@ -129,6 +128,23 @@ func TestCreateAccountWithExternalGenerator(t *testing.T) { } func TestGetAccountByID(t *testing.T) { + cnf := &config.Configuration{ + Redis: config.RedisConfig{ + Dns: "localhost:6379", + }, + Queue: config.QueueConfig{ + WebhookQueue: "webhook_queue", + NumberOfQueues: 1, + }, + Server: config.ServerConfig{SecretKey: "some-secret"}, + AccountNumberGeneration: config.AccountNumberGenerationConfig{ + HttpService: config.AccountGenerationHttpService{ + Url: "http://example.com/generateAccount", + }, + }, + } + + config.ConfigStore.Store(cnf) datasource, mock, err := newTestDataSource() assert.NoError(t, err) @@ -172,6 +188,23 @@ func TestGetAccountByID(t *testing.T) { } func TestGetAllAccounts(t *testing.T) { + cnf := &config.Configuration{ + Redis: config.RedisConfig{ + Dns: "localhost:6379", + }, + Queue: config.QueueConfig{ + WebhookQueue: "webhook_queue", + NumberOfQueues: 1, + }, + Server: config.ServerConfig{SecretKey: "some-secret"}, + AccountNumberGeneration: config.AccountNumberGenerationConfig{ + HttpService: config.AccountGenerationHttpService{ + Url: "http://example.com/generateAccount", + }, + }, + } + + config.ConfigStore.Store(cnf) datasource, mock, err := newTestDataSource() assert.NoError(t, err) diff --git a/balance_test.go b/balance_test.go index 1ad6188..f0d0726 100644 --- a/balance_test.go +++ b/balance_test.go @@ -221,6 +221,7 @@ func TestGetBalanceByID(t *testing.T) { } func TestGetAllBalances(t *testing.T) { + datasource, mock, err := newTestDataSource() if err != nil { t.Fatalf("Error creating test data source: %s", err) diff --git a/cmd/main.go b/cmd/main.go index 36536d7..5a6333c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -92,6 +92,7 @@ func setupBlnk(cfg *config.Configuration) (*blnk.Blnk, error) { // Create a new Blnk instance using the initialized data source. newBlnk, err := blnk.NewBlnk(db) if err != nil { + logrus.Error(err) // Log the error using Logrus return &blnk.Blnk{}, fmt.Errorf("error creating blnk: %v", err) } return newBlnk, nil diff --git a/cmd/server.go b/cmd/server.go index f50a369..4d74dd1 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -184,7 +184,7 @@ func serverCommands(b *blnkInstance) *cobra.Command { // Load configuration cfg, err := config.Fetch() if err != nil { - log.Fatal(err) + log.Println(err) } // Initialize observability (tracing and PostHog) diff --git a/database/db_test.go b/database/db_test.go index b919939..fcfb07b 100644 --- a/database/db_test.go +++ b/database/db_test.go @@ -20,7 +20,7 @@ func TestGetDBConnection_Singleton(t *testing.T) { }, } - config.MockConfig(mockConfig) + config.ConfigStore.Store(mockConfig) // First call to GetDBConnection should initialize the instance ds1, err := GetDBConnection(mockConfig) diff --git a/internal/cache/cache_test.go b/internal/cache/cache_test.go index 6267a9f..6f0241d 100644 --- a/internal/cache/cache_test.go +++ b/internal/cache/cache_test.go @@ -26,7 +26,23 @@ import ( ) func TestSet(t *testing.T) { - config.MockConfig(&config.Configuration{}) + cnf := &config.Configuration{ + Redis: config.RedisConfig{ + Dns: "localhost:6379", + }, + Queue: config.QueueConfig{ + WebhookQueue: "webhook_queue", + NumberOfQueues: 1, + }, + Server: config.ServerConfig{SecretKey: "some-secret"}, + AccountNumberGeneration: config.AccountNumberGenerationConfig{ + HttpService: config.AccountGenerationHttpService{ + Url: "http://example.com/generateAccount", + }, + }, + } + + config.ConfigStore.Store(cnf) ctx := context.Background() mockCache, err := NewCache() assert.NoError(t, err) diff --git a/ledger_test.go b/ledger_test.go index 55a6271..384ae98 100644 --- a/ledger_test.go +++ b/ledger_test.go @@ -36,7 +36,23 @@ import ( ) func newTestDataSource() (database.IDataSource, sqlmock.Sqlmock, error) { - config.MockConfig(&config.Configuration{}) + cnf := &config.Configuration{ + Redis: config.RedisConfig{ + Dns: "localhost:6379", + }, + Queue: config.QueueConfig{ + WebhookQueue: "webhook_queue", + NumberOfQueues: 1, + }, + Server: config.ServerConfig{SecretKey: "some-secret"}, + AccountNumberGeneration: config.AccountNumberGenerationConfig{ + HttpService: config.AccountGenerationHttpService{ + Url: "http://example.com/generateAccount", + }, + }, + } + + config.ConfigStore.Store(cnf) db, mock, err := sqlmock.New() if err != nil { log.Printf("an error '%s' was not expected when opening a stub database Connection", err) diff --git a/queue_test.go b/queue_test.go index 8016ebf..55ec114 100644 --- a/queue_test.go +++ b/queue_test.go @@ -38,7 +38,7 @@ func TestEnqueueImmediateTransactionSuccess(t *testing.T) { NumberOfQueues: 1, }, } - config.MockConfig(cnf) + config.ConfigStore.Store(cnf) redisOption, err := redis_db.ParseRedisURL("localhost:6379") if err != nil { @@ -68,10 +68,16 @@ func TestEnqueueImmediateTransactionSuccess(t *testing.T) { } func TestEnqueueScheduledTransaction(t *testing.T) { - conf, err := config.Fetch() - if err != nil { - assert.NoError(t, err) + conf := &config.Configuration{ + Redis: config.RedisConfig{ + Dns: "localhost:6379", + }, + Queue: config.QueueConfig{ + WebhookQueue: "webhook_queue", + NumberOfQueues: 1, + }, } + config.ConfigStore.Store(conf) client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"}) inspector := asynq.NewInspector(asynq.RedisClientOpt{Addr: "localhost:6379"}) @@ -86,7 +92,7 @@ func TestEnqueueScheduledTransaction(t *testing.T) { transaction := getTransactionMock(100, false) - _, err = json.Marshal(transaction) + _, err := json.Marshal(transaction) assert.NoError(t, err) err = q.Enqueue(context.Background(), &transaction) @@ -100,9 +106,14 @@ func TestEnqueueScheduledTransaction(t *testing.T) { } func TestEnqueueWithAsynqClientEnqueueError(t *testing.T) { - conf, err := config.Fetch() - if err != nil { - assert.NoError(t, err) + conf := &config.Configuration{ + Redis: config.RedisConfig{ + Dns: "localhost:6379", + }, + Queue: config.QueueConfig{ + WebhookQueue: "webhook_queue", + NumberOfQueues: 1, + }, } client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"}) @@ -118,7 +129,7 @@ func TestEnqueueWithAsynqClientEnqueueError(t *testing.T) { transaction := getTransactionMock(100, false) - _, err = json.Marshal(transaction) + _, err := json.Marshal(transaction) assert.NoError(t, err) err = q.Enqueue(context.Background(), &transaction) diff --git a/reconciliation_test.go b/reconciliation_test.go index c34c21f..6a49937 100644 --- a/reconciliation_test.go +++ b/reconciliation_test.go @@ -35,7 +35,7 @@ func TestOneToOneReconciliation(t *testing.T) { MaxWorkers: 1, }, } - config.MockConfig(cnf) + config.ConfigStore.Store(cnf) mockDS := new(mocks.MockDataSource) blnk := &Blnk{datasource: mockDS} @@ -82,10 +82,11 @@ func TestOneToOneReconciliation(t *testing.T) { func TestOneToManyReconciliation(t *testing.T) { cnf := &config.Configuration{ Transaction: config.TransactionConfig{ - BatchSize: 100000, + BatchSize: 100000, + MaxWorkers: 1, }, } - config.MockConfig(cnf) + config.ConfigStore.Store(cnf) mockDS := new(mocks.MockDataSource) blnk := &Blnk{datasource: mockDS} @@ -140,14 +141,15 @@ func TestOneToManyReconciliationNoMatches(t *testing.T) { Dns: "localhost:6379", }, Transaction: config.TransactionConfig{ - BatchSize: 100000, + BatchSize: 100000, + MaxWorkers: 1, }, Queue: config.QueueConfig{ WebhookQueue: "webhook_queue", NumberOfQueues: 1, }, } - config.MockConfig(cnf) + config.ConfigStore.Store(cnf) mockDS := new(mocks.MockDataSource) blnk := &Blnk{datasource: mockDS} @@ -375,15 +377,18 @@ func TestReconciliationEdgeCases(t *testing.T) { Dns: "localhost:6379", }, Transaction: config.TransactionConfig{ - BatchSize: 100000, + BatchSize: 100000, + MaxWorkers: 1, }, Queue: config.QueueConfig{ WebhookQueue: "webhook_queue", NumberOfQueues: 1, }, + Reconciliation: config.ReconciliationConfig{ + ProgressInterval: 100, + }, } - config.MockConfig(cnf) - + config.ConfigStore.Store(cnf) mockDS := new(mocks.MockDataSource) blnk := &Blnk{datasource: mockDS} @@ -411,9 +416,8 @@ func TestReconciliationEdgeCases(t *testing.T) { externalTxns := []*model.Transaction{ {TransactionID: "ext1", Amount: 100, CreatedAt: time.Now()}, } - internalTxns := []*model.Transaction{} - mockDS.On("GetTransactionsPaginated", mock.Anything, "", 100000, int64(0)).Return(internalTxns, nil) + mockDS.On("GetTransactionsPaginated", mock.Anything, "", 100000, int64(0)).Return([]*model.Transaction{}, nil).Once() matchingRules := []model.MatchingRule{ { diff --git a/transaction_test.go b/transaction_test.go index 8d562f1..8a3f950 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/jerry-enebeli/blnk/config" "github.com/jerry-enebeli/blnk/model" "github.com/brianvoe/gofakeit/v6" @@ -34,6 +35,23 @@ import ( ) func TestRecordTransaction(t *testing.T) { + cnf := &config.Configuration{ + Redis: config.RedisConfig{ + Dns: "localhost:6379", + }, + Queue: config.QueueConfig{ + WebhookQueue: "webhook_queue", + NumberOfQueues: 1, + }, + Server: config.ServerConfig{SecretKey: "some-secret"}, + AccountNumberGeneration: config.AccountNumberGenerationConfig{ + HttpService: config.AccountGenerationHttpService{ + Url: "http://example.com/generateAccount", + }, + }, + } + + config.ConfigStore.Store(cnf) datasource, mock, err := newTestDataSource() assert.NoError(t, err) @@ -143,6 +161,23 @@ func TestRecordTransaction(t *testing.T) { } func TestRecordTransactionWithRate(t *testing.T) { + cnf := &config.Configuration{ + Redis: config.RedisConfig{ + Dns: "localhost:6379", + }, + Queue: config.QueueConfig{ + WebhookQueue: "webhook_queue", + NumberOfQueues: 1, + }, + Server: config.ServerConfig{SecretKey: "some-secret"}, + AccountNumberGeneration: config.AccountNumberGenerationConfig{ + HttpService: config.AccountGenerationHttpService{ + Url: "http://example.com/generateAccount", + }, + }, + } + + config.ConfigStore.Store(cnf) datasource, mock, err := newTestDataSource() assert.NoError(t, err) diff --git a/webhooks_test.go b/webhooks_test.go index d575e30..22d158b 100644 --- a/webhooks_test.go +++ b/webhooks_test.go @@ -56,7 +56,7 @@ func TestSendWebhook(t *testing.T) { }, }, } - config.MockConfig(cnf) + config.ConfigStore.Store(cnf) testData := NewWebhook{ Event: "transaction.queued",