diff --git a/pubsub/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go index c607b19e2..b243d31e0 100644 --- a/pubsub/gochannel/pubsub.go +++ b/pubsub/gochannel/pubsub.go @@ -148,10 +148,15 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan logFields := watermill.LogFields{"message_uuid": message.UUID, "topic": topic} - if len(subscribers) == 0 { - close(ackedBySubscribers) - g.logger.Info("No subscribers to send message", logFields) - return ackedBySubscribers, nil + switch { + case len(subscribers) == 0 && g.config.EnableFallback: + g.logger.Debug("No subscribers to send the message to, trying the fallback subscribers", logFields) + subscribers = g.topicSubscribers(NoSubscribersFallbackTopic) + if len(subscribers) == 0 { + return g.handleNoSubscribers(ackedBySubscribers, logFields) + } + case len(subscribers) == 0: + return g.handleNoSubscribers(ackedBySubscribers, logFields) } go func(subscribers []*subscriber) { @@ -174,6 +179,12 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan return ackedBySubscribers, nil } +func (g *GoChannel) handleNoSubscribers(ackedBySubscribers chan struct{}, logFields watermill.LogFields) (<-chan struct{}, error) { + close(ackedBySubscribers) + g.logger.Info("No subscribers to send the message to", logFields) + return ackedBySubscribers, nil +} + // Subscribe returns channel to which all published messages are sent. // Messages are not persisted. If there are no subscribers and message is produced it will be gone. // diff --git a/pubsub/gochannel/pubsub_test.go b/pubsub/gochannel/pubsub_test.go index 012064091..758fad56f 100644 --- a/pubsub/gochannel/pubsub_test.go +++ b/pubsub/gochannel/pubsub_test.go @@ -63,6 +63,28 @@ func TestPublishSubscribe_not_persistent(t *testing.T) { assert.NoError(t, pubSub.Close()) } +func TestPublishSubscribe_enable_fallback(t *testing.T) { + messagesCount := 100 + pubSub := gochannel.NewGoChannel( + gochannel.Config{ + OutputChannelBuffer: int64(messagesCount), + EnableFallback: true, + }, + watermill.NewStdLogger(true, true), + ) + topicName := "test_topic_" + watermill.NewUUID() + + msgs, err := pubSub.Subscribe(context.Background(), gochannel.NoSubscribersFallbackTopic) + require.NoError(t, err) + + sendMessages := tests.PublishSimpleMessages(t, messagesCount, pubSub, topicName) + receivedMsgs, _ := subscriber.BulkRead(msgs, messagesCount, time.Second) + + tests.AssertAllMessagesReceived(t, sendMessages, receivedMsgs) + + assert.NoError(t, pubSub.Close()) +} + func TestPublishSubscribe_block_until_ack(t *testing.T) { pubSub := gochannel.NewGoChannel( gochannel.Config{BlockPublishUntilSubscriberAck: true},