Skip to content

Commit

Permalink
Move fallback topic option to the configuration and allow the topic t…
Browse files Browse the repository at this point in the history
…o be configured
  • Loading branch information
annismckenzie committed Apr 10, 2024
1 parent 0eae6f7 commit c2ff17b
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 11 deletions.
24 changes: 17 additions & 7 deletions pubsub/gochannel/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
)

// NoSubscribersFallbackTopic is the fallback topic messages without any subscribers will be sent to.
// This is used if the `EnableFallback` configuration option is enabled.
const NoSubscribersFallbackTopic = "*"
// NoSubscribersFallbackDefaultTopic is the default fallback topic messages without any subscribers
// will be sent to – it is used if the `EnableNoSubscribersFallback` option is enabled and no
// fallback topic is configured via the `NoSubscribersFallbackTopic` option.
const NoSubscribersFallbackDefaultTopic = "*"

// Config holds the GoChannel Pub/Sub's configuration options.
type Config struct {
Expand All @@ -32,8 +33,13 @@ type Config struct {
BlockPublishUntilSubscriberAck bool

// When true, messages sent to a topic without any subscribers will be sent to the
// subscribers of the `*` topic.
EnableFallback bool
// subscribers of the fallback topic (configured via `NoSubscribersFallbackTopic` option).
EnableNoSubscribersFallback bool

// NoSubscribersFallbackTopic is the fallback topic messages without any subscribers will be sent to.
// This is used if the `EnableNoSubscribersFallback` configuration option is enabled.
// If it's not set then `*` is used by default.
NoSubscribersFallbackTopic string
}

// GoChannel is the simplest Pub/Sub implementation.
Expand Down Expand Up @@ -69,6 +75,10 @@ func NewGoChannel(config Config, logger watermill.LoggerAdapter) *GoChannel {
logger = watermill.NopLogger{}
}

if config.EnableNoSubscribersFallback && config.NoSubscribersFallbackTopic == "" {
config.NoSubscribersFallbackTopic = NoSubscribersFallbackDefaultTopic
}

return &GoChannel{
config: config,

Expand Down Expand Up @@ -149,12 +159,12 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan
logFields := watermill.LogFields{"message_uuid": message.UUID, "topic": topic}

if len(subscribers) == 0 {
if !g.config.EnableFallback {
if !g.config.EnableNoSubscribersFallback {
return g.handleNoSubscribers(ackedBySubscribers, logFields)
}

g.logger.Debug("No subscribers to send the message to, trying the fallback subscribers", logFields)
if subscribers = g.topicSubscribers(NoSubscribersFallbackTopic); len(subscribers) == 0 {
if subscribers = g.topicSubscribers(g.config.NoSubscribersFallbackTopic); len(subscribers) == 0 {
return g.handleNoSubscribers(ackedBySubscribers, logFields)
}
}
Expand Down
31 changes: 27 additions & 4 deletions pubsub/gochannel/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,41 @@ func TestPublishSubscribe_not_persistent(t *testing.T) {
assert.NoError(t, pubSub.Close())
}

func TestPublishSubscribe_enable_fallback(t *testing.T) {
func TestPublishSubscribe_enable_no_subscribers_fallback(t *testing.T) {
messagesCount := 100
pubSub := gochannel.NewGoChannel(
gochannel.Config{
OutputChannelBuffer: int64(messagesCount),
EnableFallback: true,
OutputChannelBuffer: int64(messagesCount),
EnableNoSubscribersFallback: true,
},
watermill.NewStdLogger(true, true),
)
topicName := "test_topic_" + watermill.NewUUID()

msgs, err := pubSub.Subscribe(context.Background(), gochannel.NoSubscribersFallbackDefaultTopic)
require.NoError(t, err)

sendMessages := tests.PublishSimpleMessages(t, messagesCount, pubSub, topicName)
receivedMsgs, _ := subscriber.BulkRead(msgs, messagesCount, time.Second)

tests.AssertAllMessagesReceived(t, sendMessages, receivedMsgs)

assert.NoError(t, pubSub.Close())
}

func TestPublishSubscribe_enable_no_subscribers_fallback_with_custom_topic(t *testing.T) {
messagesCount := 100
pubSub := gochannel.NewGoChannel(
gochannel.Config{
OutputChannelBuffer: int64(messagesCount),
EnableNoSubscribersFallback: true,
NoSubscribersFallbackTopic: "custom_fallback_topic",
},
watermill.NewStdLogger(true, true),
)
topicName := "test_topic_" + watermill.NewUUID()

msgs, err := pubSub.Subscribe(context.Background(), gochannel.NoSubscribersFallbackTopic)
msgs, err := pubSub.Subscribe(context.Background(), "custom_fallback_topic")
require.NoError(t, err)

sendMessages := tests.PublishSimpleMessages(t, messagesCount, pubSub, topicName)
Expand Down

0 comments on commit c2ff17b

Please sign in to comment.