Skip to content

Commit

Permalink
supporting unit tests in consumer and mock class for IBus
Browse files Browse the repository at this point in the history
  • Loading branch information
Hoomanfr committed Jun 25, 2022
1 parent 744472d commit a2252ee
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 13 deletions.
37 changes: 37 additions & 0 deletions mock/bus_moq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package mock

import (
"context"

"github.com/thumperq/thumperq/internal/connection"
"github.com/thumperq/thumperq/internal/publishers"
thumperq "github.com/thumperq/thumperq/pkg"
"github.com/thumperq/thumperq/pkg/config"
"github.com/thumperq/thumperq/pkg/handler"
)

type busMoq struct {
connection connection.IConnection
cfg config.IConfig
publisher publishers.IBusPublisher
}

func NewBusMoq(cfg config.IConfig) thumperq.IBus {
return &busMoq{
connection: nil,
cfg: cfg,
publisher: nil,
}
}

func (b *busMoq) Publish(ctx context.Context, message handler.IMessage) error {
return nil
}

func (b *busMoq) Connection() connection.IConnection {
return b.connection
}

func (b *busMoq) Config() config.IConfig {
return b.cfg
}
30 changes: 17 additions & 13 deletions pkg/consumer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package thumperq

import (
"fmt"
"reflect"

"github.com/thumperq/thumperq/internal/consumer"
"github.com/thumperq/thumperq/internal/formatter"
Expand All @@ -10,45 +11,48 @@ import (
"github.com/thumperq/thumperq/pkg/handler"
)

func CreateConsumer[T handler.IMessage](bus IBus, handler handler.IHandler[T]) {
func CreateConsumer[T handler.IMessage](ibus IBus, handler handler.IHandler[T]) {
if reflect.TypeOf(ibus) != reflect.TypeOf(&bus{}) {
return
}
methodPath := reflection.MethodPath(CreateConsumer[T])
handlerPath := reflection.ObjectTypePath(handler)
handlerErrorName := fmt.Sprintf("%s_error", handlerPath)
errQueue := queue.NewQueue(bus.Connection(), handlerErrorName, handlerErrorName)
errQueue := queue.NewQueue(ibus.Connection(), handlerErrorName, handlerErrorName)
_, err := errQueue.Bind()
if err != nil {
panic(formatter.FormatErr(methodPath, err))
}
msgPath := reflection.TypePath[T]()
if bus.Config().BusConfig().RetryCount <= 0 {
firstQueue := queue.NewQueue(bus.Connection(), msgPath, handlerPath)
firstConsumer := consumer.NewConsumer(bus.Connection(), handler, firstQueue, nil, errQueue, 0)
if ibus.Config().BusConfig().RetryCount <= 0 {
firstQueue := queue.NewQueue(ibus.Connection(), msgPath, handlerPath)
firstConsumer := consumer.NewConsumer(ibus.Connection(), handler, firstQueue, nil, errQueue, 0)
err = firstConsumer.Consume()
if err != nil {
panic(formatter.FormatErr(methodPath, err))
}
return
}
lastRetryName := fmt.Sprintf("%s_retry%d", handlerPath, bus.Config().BusConfig().RetryCount)
lastRetryQueue := queue.NewQueue(bus.Connection(), lastRetryName, lastRetryName)
lastRetryConsumer := consumer.NewConsumer(bus.Connection(), handler, lastRetryQueue, nil, errQueue, bus.Config().BusConfig().RetryIntervalMs)
lastRetryName := fmt.Sprintf("%s_retry%d", handlerPath, ibus.Config().BusConfig().RetryCount)
lastRetryQueue := queue.NewQueue(ibus.Connection(), lastRetryName, lastRetryName)
lastRetryConsumer := consumer.NewConsumer(ibus.Connection(), handler, lastRetryQueue, nil, errQueue, ibus.Config().BusConfig().RetryIntervalMs)
err = lastRetryConsumer.Consume()
if err != nil {
panic(formatter.FormatErr(methodPath, err))
}
nextRetryQueue := lastRetryQueue
for i := bus.Config().BusConfig().RetryCount - 1; i >= 1; i-- {
for i := ibus.Config().BusConfig().RetryCount - 1; i >= 1; i-- {
retryName := fmt.Sprintf("%s_retry%d", handlerPath, i)
retryQueue := queue.NewQueue(bus.Connection(), retryName, retryName)
retryConsumer := consumer.NewConsumer(bus.Connection(), handler, retryQueue, nextRetryQueue, errQueue, bus.Config().BusConfig().RetryIntervalMs)
retryQueue := queue.NewQueue(ibus.Connection(), retryName, retryName)
retryConsumer := consumer.NewConsumer(ibus.Connection(), handler, retryQueue, nextRetryQueue, errQueue, ibus.Config().BusConfig().RetryIntervalMs)
err = retryConsumer.Consume()
if err != nil {
panic(formatter.FormatErr(methodPath, err))
}
nextRetryQueue = retryQueue
}
firstQueue := queue.NewQueue(bus.Connection(), msgPath, handlerPath)
firstConsumer := consumer.NewConsumer(bus.Connection(), handler, firstQueue, nextRetryQueue, errQueue, 0)
firstQueue := queue.NewQueue(ibus.Connection(), msgPath, handlerPath)
firstConsumer := consumer.NewConsumer(ibus.Connection(), handler, firstQueue, nextRetryQueue, errQueue, 0)
err = firstConsumer.Consume()
if err != nil {
panic(formatter.FormatErr(methodPath, err))
Expand Down

0 comments on commit a2252ee

Please sign in to comment.