Skip to content

Commit

Permalink
[amqp] added TopologyBuilder (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bartłomiej Klimczak authored and roblaszczak committed May 10, 2019
1 parent e4001c5 commit 2972718
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 43 deletions.
6 changes: 6 additions & 0 deletions message/infrastructure/amqp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func NewDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator
PrefetchCount: 1,
},
},
TopologyBuilder:&DefaultTopologyBuilder{},
}
}

Expand Down Expand Up @@ -102,6 +103,7 @@ func NewNonDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenera
PrefetchCount: 1,
},
},
TopologyBuilder:&DefaultTopologyBuilder{},
}
}

Expand Down Expand Up @@ -147,6 +149,7 @@ func NewDurableQueueConfig(amqpURI string) Config {
PrefetchCount: 1,
},
},
TopologyBuilder:&DefaultTopologyBuilder{},
}
}

Expand Down Expand Up @@ -190,6 +193,7 @@ func NewNonDurableQueueConfig(amqpURI string) Config {
PrefetchCount: 1,
},
},
TopologyBuilder:&DefaultTopologyBuilder{},
}
}

Expand All @@ -204,6 +208,8 @@ type Config struct {

Publish PublishConfig
Consume ConsumeConfig

TopologyBuilder TopologyBuilder
}

func (c Config) validate() error {
Expand Down
12 changes: 0 additions & 12 deletions message/infrastructure/amqp/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,6 @@ func newConnection(
return pubSub, nil
}

func (c *connectionWrapper) exchangeDeclare(channel *amqp.Channel, exchangeName string) error {
return channel.ExchangeDeclare(
exchangeName,
c.config.Exchange.Type,
c.config.Exchange.Durable,
c.config.Exchange.AutoDeleted,
c.config.Exchange.Internal,
c.config.Exchange.NoWait,
c.config.Exchange.Arguments,
)
}

func (c *connectionWrapper) Close() error {
if c.closed {
return nil
Expand Down
2 changes: 1 addition & 1 deletion message/infrastructure/amqp/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (p *Publisher) preparePublishBindings(topic string, channel *amqp.Channel)
defer p.publishBindingsLock.Unlock()

if p.config.Exchange.GenerateName(topic) != "" {
if err := p.exchangeDeclare(channel, p.config.Exchange.GenerateName(topic)); err != nil {
if err := p.config.TopologyBuilder.ExchangeDeclare(channel, p.config.Exchange.GenerateName(topic), p.config); err != nil {
return err
}
}
Expand Down
33 changes: 3 additions & 30 deletions message/infrastructure/amqp/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
type Subscriber struct {
*connectionWrapper

config Config
config Config
}

func NewSubscriber(config Config, logger watermill.LoggerAdapter) (*Subscriber, error) {
Expand Down Expand Up @@ -124,37 +124,10 @@ func (s *Subscriber) prepareConsume(queueName string, exchangeName string, logFi
}
}()

if _, err := channel.QueueDeclare(
queueName,
s.config.Queue.Durable,
s.config.Queue.AutoDelete,
s.config.Queue.Exclusive,
s.config.Queue.NoWait,
s.config.Queue.Arguments,
); err != nil {
return errors.Wrap(err, "cannot declare queue")
}
s.logger.Debug("Queue declared", logFields)

if exchangeName == "" {
s.logger.Debug("No exchange to declare", logFields)
return nil
}

if err := s.exchangeDeclare(channel, exchangeName); err != nil {
return errors.Wrap(err, "cannot declare exchange")
if err = s.config.TopologyBuilder.BuildTopology(channel, queueName, exchangeName, s.config, s.logger); err != nil {
return err
}
s.logger.Debug("Exchange declared", logFields)

if err := channel.QueueBind(
queueName,
s.config.QueueBind.GenerateRoutingKey(queueName),
exchangeName,
s.config.QueueBind.NoWait,
s.config.QueueBind.Arguments,
); err != nil {
return errors.Wrap(err, "cannot bind queue")
}
s.logger.Debug("Queue bound to exchange", logFields)

return nil
Expand Down
63 changes: 63 additions & 0 deletions message/infrastructure/amqp/topology_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package amqp

import (
"github.com/ThreeDotsLabs/watermill"
"github.com/pkg/errors"
"github.com/streadway/amqp"
)

type TopologyBuilder interface {
BuildTopology(channel *amqp.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error
ExchangeDeclare(channel *amqp.Channel, exchangeName string, config Config) error
}

type DefaultTopologyBuilder struct {
}

func (builder DefaultTopologyBuilder) ExchangeDeclare(channel *amqp.Channel, exchangeName string, config Config) error {
return channel.ExchangeDeclare(
exchangeName,
config.Exchange.Type,
config.Exchange.Durable,
config.Exchange.AutoDeleted,
config.Exchange.Internal,
config.Exchange.NoWait,
config.Exchange.Arguments,
)
}

func (builder *DefaultTopologyBuilder) BuildTopology(channel *amqp.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error {
if _, err := channel.QueueDeclare(
queueName,
config.Queue.Durable,
config.Queue.AutoDelete,
config.Queue.Exclusive,
config.Queue.NoWait,
config.Queue.Arguments,
); err != nil {
return errors.Wrap(err, "cannot declare queue")
}

logger.Debug("Queue declared", nil)

if exchangeName == "" {
logger.Debug("No exchange to declare", nil)
return nil
}
if err := builder.ExchangeDeclare(channel, exchangeName, config); err != nil {
return errors.Wrap(err, "cannot declare exchange")
}

logger.Debug("Exchange declared", nil)

if err := channel.QueueBind(
queueName,
config.QueueBind.GenerateRoutingKey(queueName),
exchangeName,
config.QueueBind.NoWait,
config.QueueBind.Arguments,
); err != nil {
return errors.Wrap(err, "cannot bind queue")
}
return nil
}

0 comments on commit 2972718

Please sign in to comment.