Skip to content

Commit

Permalink
Implement EnableFallback configuration option
Browse files Browse the repository at this point in the history
If the option is enabled, messages sent to topic without any subscribers
will be forwarded to the `*` topic subscribers.
  • Loading branch information
annismckenzie committed Jan 7, 2024
1 parent 29f58c4 commit ddda866
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
19 changes: 15 additions & 4 deletions pubsub/gochannel/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,15 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan

logFields := watermill.LogFields{"message_uuid": message.UUID, "topic": topic}

if len(subscribers) == 0 {
close(ackedBySubscribers)
g.logger.Info("No subscribers to send message", logFields)
return ackedBySubscribers, nil
switch {
case len(subscribers) == 0 && g.config.EnableFallback:
g.logger.Debug("No subscribers to send the message to, trying the fallback subscribers", logFields)
subscribers = g.topicSubscribers(NoSubscribersFallbackTopic)
if len(subscribers) == 0 {
return g.handleNoSubscribers(ackedBySubscribers, logFields)
}
case len(subscribers) == 0:
return g.handleNoSubscribers(ackedBySubscribers, logFields)
}

go func(subscribers []*subscriber) {
Expand All @@ -174,6 +179,12 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan
return ackedBySubscribers, nil
}

func (g *GoChannel) handleNoSubscribers(ackedBySubscribers chan struct{}, logFields watermill.LogFields) (<-chan struct{}, error) {
close(ackedBySubscribers)
g.logger.Info("No subscribers to send the message to", logFields)
return ackedBySubscribers, nil
}

// Subscribe returns channel to which all published messages are sent.
// Messages are not persisted. If there are no subscribers and message is produced it will be gone.
//
Expand Down
22 changes: 22 additions & 0 deletions pubsub/gochannel/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,28 @@ func TestPublishSubscribe_not_persistent(t *testing.T) {
assert.NoError(t, pubSub.Close())
}

func TestPublishSubscribe_enable_fallback(t *testing.T) {
messagesCount := 100
pubSub := gochannel.NewGoChannel(
gochannel.Config{
OutputChannelBuffer: int64(messagesCount),
EnableFallback: true,
},
watermill.NewStdLogger(true, true),
)
topicName := "test_topic_" + watermill.NewUUID()

msgs, err := pubSub.Subscribe(context.Background(), gochannel.NoSubscribersFallbackTopic)
require.NoError(t, err)

sendMessages := tests.PublishSimpleMessages(t, messagesCount, pubSub, topicName)
receivedMsgs, _ := subscriber.BulkRead(msgs, messagesCount, time.Second)

tests.AssertAllMessagesReceived(t, sendMessages, receivedMsgs)

assert.NoError(t, pubSub.Close())
}

func TestPublishSubscribe_block_until_ack(t *testing.T) {
pubSub := gochannel.NewGoChannel(
gochannel.Config{BlockPublishUntilSubscriberAck: true},
Expand Down

0 comments on commit ddda866

Please sign in to comment.