Skip to content

Commit

Permalink
Merge pull request #86 from blnkfinance/jerry-config-extend
Browse files Browse the repository at this point in the history
extend config to include transaction and queue configs
  • Loading branch information
jerry-enebeli authored Jan 17, 2025
2 parents ffedabc + 95d4967 commit f3ec0ad
Show file tree
Hide file tree
Showing 18 changed files with 494 additions and 138 deletions.
37 changes: 35 additions & 2 deletions account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func TestCreateAccount(t *testing.T) {
}

func TestCreateAccountWithExternalGenerator(t *testing.T) {

// Initialize the mock HTTP responder
httpmock.Activate()
defer httpmock.DeactivateAndReset()
Expand All @@ -92,8 +93,6 @@ func TestCreateAccountWithExternalGenerator(t *testing.T) {
httpmock.RegisterResponder("GET", "http://example.com/generateAccount",
httpmock.NewStringResponder(200, `{"account_number": "123456789", "bank_name": "Blnk Bank"}`))

config.MockConfig(&config.Configuration{Server: config.ServerConfig{SecretKey: "some-secret"}, AccountNumberGeneration: config.AccountNumberGenerationConfig{HttpService: config.AccountGenerationHttpService{Url: "http://example.com/generateAccount"}}})

account := model.Account{
Name: "Test Account",
BankName: "Blnk Bank",
Expand Down Expand Up @@ -129,6 +128,23 @@ func TestCreateAccountWithExternalGenerator(t *testing.T) {
}

func TestGetAccountByID(t *testing.T) {
cnf := &config.Configuration{
Redis: config.RedisConfig{
Dns: "localhost:6379",
},
Queue: config.QueueConfig{
WebhookQueue: "webhook_queue",
NumberOfQueues: 1,
},
Server: config.ServerConfig{SecretKey: "some-secret"},
AccountNumberGeneration: config.AccountNumberGenerationConfig{
HttpService: config.AccountGenerationHttpService{
Url: "http://example.com/generateAccount",
},
},
}

config.ConfigStore.Store(cnf)
datasource, mock, err := newTestDataSource()
assert.NoError(t, err)

Expand Down Expand Up @@ -172,6 +188,23 @@ func TestGetAccountByID(t *testing.T) {
}

func TestGetAllAccounts(t *testing.T) {
cnf := &config.Configuration{
Redis: config.RedisConfig{
Dns: "localhost:6379",
},
Queue: config.QueueConfig{
WebhookQueue: "webhook_queue",
NumberOfQueues: 1,
},
Server: config.ServerConfig{SecretKey: "some-secret"},
AccountNumberGeneration: config.AccountNumberGenerationConfig{
HttpService: config.AccountGenerationHttpService{
Url: "http://example.com/generateAccount",
},
},
}

config.ConfigStore.Store(cnf)
datasource, mock, err := newTestDataSource()
assert.NoError(t, err)

Expand Down
1 change: 1 addition & 0 deletions balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func TestGetBalanceByID(t *testing.T) {
}

func TestGetAllBalances(t *testing.T) {

datasource, mock, err := newTestDataSource()
if err != nil {
t.Fatalf("Error creating test data source: %s", err)
Expand Down
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func setupBlnk(cfg *config.Configuration) (*blnk.Blnk, error) {
// Create a new Blnk instance using the initialized data source.
newBlnk, err := blnk.NewBlnk(db)
if err != nil {
logrus.Error(err) // Log the error using Logrus
return &blnk.Blnk{}, fmt.Errorf("error creating blnk: %v", err)
}
return newBlnk, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func serverCommands(b *blnkInstance) *cobra.Command {
// Load configuration
cfg, err := config.Fetch()
if err != nil {
log.Fatal(err)
log.Println(err)
}

// Initialize observability (tracing and PostHog)
Expand Down
32 changes: 22 additions & 10 deletions cmd/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,19 @@ func (b *blnkInstance) processInflightExpiry(cxt context.Context, t *asynq.Task)
}

func initializeQueues() map[string]int {
cfg, err := config.Fetch()
if err != nil {
log.Printf("Error fetching config, using defaults: %v", err)
return nil
}

queues := make(map[string]int)
queues[blnk.WEBHOOK_QUEUE] = 3
queues[blnk.INDEX_QUEUE] = 1
queues[blnk.EXPIREDINFLIGHT_QUEUE] = 3
queues[cfg.Queue.WebhookQueue] = 3
queues[cfg.Queue.IndexQueue] = 1
queues[cfg.Queue.InflightExpiryQueue] = 3

for i := 1; i <= blnk.NumberOfQueues; i++ {
queueName := fmt.Sprintf("%s_%d", blnk.TRANSACTION_QUEUE, i)
for i := 1; i <= cfg.Queue.NumberOfQueues; i++ {
queueName := fmt.Sprintf("%s_%d", cfg.Queue.TransactionQueue, i)
queues[queueName] = 1
}
return queues
Expand All @@ -169,16 +175,22 @@ func initializeWorkerServer(conf *config.Configuration, queues map[string]int) (
}

func initializeTaskHandlers(b *blnkInstance, mux *asynq.ServeMux) {
cfg, err := config.Fetch()
if err != nil {
log.Printf("Error fetching config, using defaults: %v", err)
return
}

// Register handlers for transaction queues
for i := 1; i <= blnk.NumberOfQueues; i++ {
queueName := fmt.Sprintf("%s_%d", blnk.TRANSACTION_QUEUE, i)
for i := 1; i <= cfg.Queue.NumberOfQueues; i++ {
queueName := fmt.Sprintf("%s_%d", cfg.Queue.TransactionQueue, i)
mux.HandleFunc(queueName, b.processTransaction)
}

// Register handlers for other task types
mux.HandleFunc(blnk.INDEX_QUEUE, b.indexData)
mux.HandleFunc(blnk.WEBHOOK_QUEUE, blnk.ProcessWebhook)
mux.HandleFunc(blnk.EXPIREDINFLIGHT_QUEUE, b.processInflightExpiry)
mux.HandleFunc(cfg.Queue.IndexQueue, b.indexData)
mux.HandleFunc(cfg.Queue.WebhookQueue, blnk.ProcessWebhook)
mux.HandleFunc(cfg.Queue.InflightExpiryQueue, b.processInflightExpiry)
}

// workerCommands defines the "workers" command to start worker processes.
Expand Down
Loading

0 comments on commit f3ec0ad

Please sign in to comment.