diff --git a/message/infrastructure/amqp/config.go b/message/infrastructure/amqp/config.go index 1461e6cd2..30c22c7a3 100644 --- a/message/infrastructure/amqp/config.go +++ b/message/infrastructure/amqp/config.go @@ -42,7 +42,11 @@ func NewDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator GenerateName: generateQueueName, Durable: true, }, - QueueBind: QueueBindConfig{}, + QueueBind: QueueBindConfig{ + GenerateRoutingKey: func(topic string) string { + return "" + }, + }, Publish: PublishConfig{ GenerateRoutingKey: func(topic string) string { return "" @@ -83,7 +87,11 @@ func NewNonDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenera Queue: QueueConfig{ GenerateName: generateQueueName, }, - QueueBind: QueueBindConfig{}, + QueueBind: QueueBindConfig{ + GenerateRoutingKey: func(topic string) string { + return "" + }, + }, Publish: PublishConfig{ GenerateRoutingKey: func(topic string) string { return "" @@ -124,7 +132,11 @@ func NewDurableQueueConfig(amqpURI string) Config { GenerateName: GenerateQueueNameTopicName, Durable: true, }, - QueueBind: QueueBindConfig{}, + QueueBind: QueueBindConfig{ + GenerateRoutingKey: func(topic string) string { + return "" + }, + }, Publish: PublishConfig{ GenerateRoutingKey: func(topic string) string { return topic @@ -163,7 +175,11 @@ func NewNonDurableQueueConfig(amqpURI string) Config { Queue: QueueConfig{ GenerateName: GenerateQueueNameTopicName, }, - QueueBind: QueueBindConfig{}, + QueueBind: QueueBindConfig{ + GenerateRoutingKey: func(topic string) string { + return "" + }, + }, Publish: PublishConfig{ GenerateRoutingKey: func(topic string) string { return topic @@ -350,7 +366,7 @@ type QueueConfig struct { // be routed to the queue when the publishing routing key matches the binding // routing key. type QueueBindConfig struct { - RoutingKey string + GenerateRoutingKey func(topic string) string // When noWait is false and the queue could not be bound, the channel will be // closed with an error. diff --git a/message/infrastructure/amqp/subscriber.go b/message/infrastructure/amqp/subscriber.go index 673563662..962baa3ff 100644 --- a/message/infrastructure/amqp/subscriber.go +++ b/message/infrastructure/amqp/subscriber.go @@ -148,7 +148,7 @@ func (s *Subscriber) prepareConsume(queueName string, exchangeName string, logFi if err := channel.QueueBind( queueName, - s.config.QueueBind.RoutingKey, + s.config.QueueBind.GenerateRoutingKey(queueName), exchangeName, s.config.QueueBind.NoWait, s.config.QueueBind.Arguments,