diff --git a/message/infrastructure/amqp/config.go b/message/infrastructure/amqp/config.go index 30c22c7a3..cc131b5c2 100644 --- a/message/infrastructure/amqp/config.go +++ b/message/infrastructure/amqp/config.go @@ -57,6 +57,7 @@ func NewDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator PrefetchCount: 1, }, }, + TopologyBuilder:&DefaultTopologyBuilder{}, } } @@ -102,6 +103,7 @@ func NewNonDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenera PrefetchCount: 1, }, }, + TopologyBuilder:&DefaultTopologyBuilder{}, } } @@ -147,6 +149,7 @@ func NewDurableQueueConfig(amqpURI string) Config { PrefetchCount: 1, }, }, + TopologyBuilder:&DefaultTopologyBuilder{}, } } @@ -190,6 +193,7 @@ func NewNonDurableQueueConfig(amqpURI string) Config { PrefetchCount: 1, }, }, + TopologyBuilder:&DefaultTopologyBuilder{}, } } @@ -204,6 +208,8 @@ type Config struct { Publish PublishConfig Consume ConsumeConfig + + TopologyBuilder TopologyBuilder } func (c Config) validate() error { diff --git a/message/infrastructure/amqp/connection.go b/message/infrastructure/amqp/connection.go index 04f54982e..be872699d 100644 --- a/message/infrastructure/amqp/connection.go +++ b/message/infrastructure/amqp/connection.go @@ -47,18 +47,6 @@ func newConnection( return pubSub, nil } -func (c *connectionWrapper) exchangeDeclare(channel *amqp.Channel, exchangeName string) error { - return channel.ExchangeDeclare( - exchangeName, - c.config.Exchange.Type, - c.config.Exchange.Durable, - c.config.Exchange.AutoDeleted, - c.config.Exchange.Internal, - c.config.Exchange.NoWait, - c.config.Exchange.Arguments, - ) -} - func (c *connectionWrapper) Close() error { if c.closed { return nil diff --git a/message/infrastructure/amqp/publisher.go b/message/infrastructure/amqp/publisher.go index c5f0448a7..fc3453abd 100644 --- a/message/infrastructure/amqp/publisher.go +++ b/message/infrastructure/amqp/publisher.go @@ -149,7 +149,7 @@ func (p *Publisher) preparePublishBindings(topic string, channel *amqp.Channel) defer p.publishBindingsLock.Unlock() if p.config.Exchange.GenerateName(topic) != "" { - if err := p.exchangeDeclare(channel, p.config.Exchange.GenerateName(topic)); err != nil { + if err := p.config.TopologyBuilder.ExchangeDeclare(channel, p.config.Exchange.GenerateName(topic), p.config); err != nil { return err } } diff --git a/message/infrastructure/amqp/subscriber.go b/message/infrastructure/amqp/subscriber.go index 962baa3ff..7c2ecec74 100644 --- a/message/infrastructure/amqp/subscriber.go +++ b/message/infrastructure/amqp/subscriber.go @@ -15,7 +15,7 @@ import ( type Subscriber struct { *connectionWrapper - config Config + config Config } func NewSubscriber(config Config, logger watermill.LoggerAdapter) (*Subscriber, error) { @@ -124,37 +124,10 @@ func (s *Subscriber) prepareConsume(queueName string, exchangeName string, logFi } }() - if _, err := channel.QueueDeclare( - queueName, - s.config.Queue.Durable, - s.config.Queue.AutoDelete, - s.config.Queue.Exclusive, - s.config.Queue.NoWait, - s.config.Queue.Arguments, - ); err != nil { - return errors.Wrap(err, "cannot declare queue") - } - s.logger.Debug("Queue declared", logFields) - - if exchangeName == "" { - s.logger.Debug("No exchange to declare", logFields) - return nil - } - - if err := s.exchangeDeclare(channel, exchangeName); err != nil { - return errors.Wrap(err, "cannot declare exchange") + if err = s.config.TopologyBuilder.BuildTopology(channel, queueName, exchangeName, s.config, s.logger); err != nil { + return err } - s.logger.Debug("Exchange declared", logFields) - if err := channel.QueueBind( - queueName, - s.config.QueueBind.GenerateRoutingKey(queueName), - exchangeName, - s.config.QueueBind.NoWait, - s.config.QueueBind.Arguments, - ); err != nil { - return errors.Wrap(err, "cannot bind queue") - } s.logger.Debug("Queue bound to exchange", logFields) return nil diff --git a/message/infrastructure/amqp/topology_builder.go b/message/infrastructure/amqp/topology_builder.go new file mode 100644 index 000000000..999bf30cf --- /dev/null +++ b/message/infrastructure/amqp/topology_builder.go @@ -0,0 +1,63 @@ +package amqp + +import ( + "github.com/ThreeDotsLabs/watermill" + "github.com/pkg/errors" + "github.com/streadway/amqp" +) + +type TopologyBuilder interface { + BuildTopology(channel *amqp.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error + ExchangeDeclare(channel *amqp.Channel, exchangeName string, config Config) error +} + +type DefaultTopologyBuilder struct { +} + +func (builder DefaultTopologyBuilder) ExchangeDeclare(channel *amqp.Channel, exchangeName string, config Config) error { + return channel.ExchangeDeclare( + exchangeName, + config.Exchange.Type, + config.Exchange.Durable, + config.Exchange.AutoDeleted, + config.Exchange.Internal, + config.Exchange.NoWait, + config.Exchange.Arguments, + ) +} + +func (builder *DefaultTopologyBuilder) BuildTopology(channel *amqp.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error { + if _, err := channel.QueueDeclare( + queueName, + config.Queue.Durable, + config.Queue.AutoDelete, + config.Queue.Exclusive, + config.Queue.NoWait, + config.Queue.Arguments, + ); err != nil { + return errors.Wrap(err, "cannot declare queue") + } + + logger.Debug("Queue declared", nil) + + if exchangeName == "" { + logger.Debug("No exchange to declare", nil) + return nil + } + if err := builder.ExchangeDeclare(channel, exchangeName, config); err != nil { + return errors.Wrap(err, "cannot declare exchange") + } + + logger.Debug("Exchange declared", nil) + + if err := channel.QueueBind( + queueName, + config.QueueBind.GenerateRoutingKey(queueName), + exchangeName, + config.QueueBind.NoWait, + config.QueueBind.Arguments, + ); err != nil { + return errors.Wrap(err, "cannot bind queue") + } + return nil +}